/* GStreamer unix file-descriptor source/sink
 *
 * Copyright (C) 2023 Netflix Inc.
 *  Author: Xavier Claessens <xavier.claessens@collabora.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:element-unixfdsink
 * @title: unixfdsink
 *
 * Send file-descriptor backed buffers (e.g. memfd, dmabuf) over unix socket to
 * matching unixfdsrc. There can be any number of clients, if none are connected
 * buffers are dropped.
 *
 * Buffers can have any number of #GstMemory, but it is an error if any one of
 * them lacks a file-descriptor.
 *
 * #GstShmAllocator is added into the allocation proposition, which makes
 * most sources write their data into shared memory automatically.
 *
 * ## Example launch lines
 * |[
 * gst-launch-1.0 -v videotestsrc ! video/x-raw,format=RGBx,width=1920,height=1080 ! timeoverlay ! unixfdsink socket-path=/tmp/blah
 * gst-launch-1.0 -v unixfdsrc socket-path=/tmp/blah ! videoconvert ! autovideosink
 * ]|
 *
 * Since: 1.24
 */

#include "gstunixfd.h"

#include "gstunixfdallocator.h"

#include <gst/base/base.h>
#include <gst/allocators/allocators.h>

#include <stdint.h>
#include <glib/gstdio.h>
#include <gio/gio.h>
#include <gio/gunixsocketaddress.h>

GST_DEBUG_CATEGORY (unixfdsink_debug);
#define GST_CAT_DEFAULT (unixfdsink_debug)

static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

#define GST_TYPE_UNIX_FD_SINK gst_unix_fd_sink_get_type()
G_DECLARE_FINAL_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST, UNIX_FD_SINK,
    GstBaseSink);

typedef struct
{
  GHashTable *buffers;
  GSource *source;
} Client;

struct _GstUnixFdSink
{
  GstBaseSink parent;

  GThread *thread;
  GMainContext *context;
  GMainLoop *loop;

  gchar *socket_path;
  GUnixSocketAddressType socket_type;
  GSocket *socket;
  GSource *source;

  /* GSocket -> Client */
  GHashTable *clients;
  GstCaps *caps;
  gboolean uses_monotonic_clock;
  GByteArray *payload;

  gboolean wait_for_connection;
  GCond wait_for_connection_cond;
  gboolean unlock;

  GstUnixFdAllocator *allocator;
  gint64 min_memory_size;
};

G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE,
    GST_TYPE_UNIX_FD_SINK);

#define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH
#define DEFAULT_WAIT_FOR_CONNECTION FALSE
#define DEFAULT_MIN_MEMORY_SIZE 0

enum
{
  PROP_0,
  PROP_SOCKET_PATH,
  PROP_SOCKET_TYPE,
  PROP_WAIT_FOR_CONNECTION,
  PROP_MIN_MEMORY_SIZE,
  PROP_NUM_CLIENTS,
  NUM_PROPERTIES
};

static GParamSpec *properties[NUM_PROPERTIES];

static void
client_free (Client * client)
{
  g_hash_table_unref (client->buffers);
  g_source_destroy (client->source);
  g_source_unref (client->source);
  g_free (client);
}

static GstMemory *
copy_to_shm (GstUnixFdSink * self, GstMemory * mem)
{
  GST_OBJECT_LOCK (self);

  if (self->min_memory_size < 0) {
    GST_ERROR_OBJECT (self,
        "Buffer has non-FD memories and copying is disabled. Set min-memory-size to a value >= 0 to allow copying.");
    GST_OBJECT_UNLOCK (self);
    return NULL;
  }

  if (self->allocator == NULL)
    self->allocator = gst_unix_fd_allocator_new ();

  gsize size = gst_memory_get_sizes (mem, NULL, NULL);
  gsize alloc_size = MAX (size, self->min_memory_size);
  GstMemory *fd_mem =
      gst_allocator_alloc (GST_ALLOCATOR_CAST (self->allocator), alloc_size,
      NULL);

  GST_OBJECT_UNLOCK (self);

  if (fd_mem == NULL) {
    GST_ERROR_OBJECT (self, "Shared memory allocation failed.");
    return NULL;
  }

  gst_memory_resize (fd_mem, 0, size);

  GstMapInfo src_map, dst_map;

  if (!gst_memory_map (mem, &src_map, GST_MAP_READ)) {
    GST_ERROR_OBJECT (self, "Mapping of source memory failed.");
    gst_memory_unref (fd_mem);
    return NULL;
  }

  if (!gst_memory_map (fd_mem, &dst_map, GST_MAP_WRITE)) {
    GST_ERROR_OBJECT (self, "Mapping of shared memory failed.");
    gst_memory_unmap (mem, &src_map);
    gst_memory_unref (fd_mem);
    return NULL;
  }

  memcpy (dst_map.data, src_map.data, src_map.size);

  gst_memory_unmap (mem, &src_map);
  gst_memory_unmap (fd_mem, &dst_map);

  return fd_mem;
}

static void
allocator_unref (GstUnixFdAllocator * allocator)
{
  gst_unix_fd_allocator_flush (allocator);
  g_object_unref (allocator);
}

static void
gst_unix_fd_sink_init (GstUnixFdSink * self)
{
  g_return_if_fail (GST_IS_UNIX_FD_SINK (self));

  self->context = g_main_context_new ();
  self->loop = g_main_loop_new (self->context, FALSE);
  self->clients =
      g_hash_table_new_full (NULL, NULL, g_object_unref,
      (GDestroyNotify) client_free);
  g_cond_init (&self->wait_for_connection_cond);
}

static void
gst_unix_fd_sink_finalize (GObject * object)
{
  GstUnixFdSink *self = GST_UNIX_FD_SINK (object);

  g_free (self->socket_path);
  g_main_context_unref (self->context);
  g_main_loop_unref (self->loop);
  g_hash_table_unref (self->clients);
  g_cond_clear (&self->wait_for_connection_cond);

  G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object);
}

static void
gst_unix_fd_sink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstUnixFdSink *self = GST_UNIX_FD_SINK (object);

  GST_OBJECT_LOCK (self);

  switch (prop_id) {
    case PROP_SOCKET_PATH:
      if (self->socket) {
        GST_WARNING_OBJECT (self,
            "Can only change socket path in NULL or READY state");
        break;
      }
      g_free (self->socket_path);
      self->socket_path = g_value_dup_string (value);
      break;
    case PROP_SOCKET_TYPE:
      if (self->socket) {
        GST_WARNING_OBJECT (self,
            "Can only change socket type in NULL or READY state");
        break;
      }
      self->socket_type = g_value_get_enum (value);
      break;
    case PROP_WAIT_FOR_CONNECTION:
      self->wait_for_connection = g_value_get_boolean (value);
      g_cond_signal (&self->wait_for_connection_cond);
      break;
    case PROP_MIN_MEMORY_SIZE:
      self->min_memory_size = g_value_get_int64 (value);
      g_clear_pointer (&self->allocator, allocator_unref);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_OBJECT_UNLOCK (self);
}

static void
gst_unix_fd_sink_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstUnixFdSink *self = GST_UNIX_FD_SINK (object);

  GST_OBJECT_LOCK (self);

  switch (prop_id) {
    case PROP_SOCKET_PATH:
      g_value_set_string (value, self->socket_path);
      break;
    case PROP_SOCKET_TYPE:
      g_value_set_enum (value, self->socket_type);
      break;
    case PROP_WAIT_FOR_CONNECTION:
      g_value_set_boolean (value, self->wait_for_connection);
      break;
    case PROP_MIN_MEMORY_SIZE:
      g_value_set_int64 (value, self->min_memory_size);
      break;
    case PROP_NUM_CLIENTS:
      g_value_set_uint (value, g_hash_table_size (self->clients));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_OBJECT_UNLOCK (self);
}

static gboolean
incoming_command_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
{
  GstUnixFdSink *self = user_data;
  Client *client;
  CommandType command;
  guint8 *payload = NULL;
  gsize payload_size;
  GError *error = NULL;

  GST_OBJECT_LOCK (self);

  client = g_hash_table_lookup (self->clients, socket);

  if (client == NULL) {
    GST_ERROR_OBJECT (self, "Received data from unknown client");
    goto on_error;
  }

  if (!gst_unix_fd_receive_command (socket, NULL, &command, NULL, &payload,
          &payload_size, &error)) {
    GST_DEBUG_OBJECT (self, "Failed to receive message from client %p: %s",
        client, error != NULL ? error->message : "Connection closed by peer");
    goto on_error;
  }

  switch (command) {
    case COMMAND_TYPE_NEW_BUFFER:
    case COMMAND_TYPE_CAPS:
      GST_ERROR_OBJECT (self, "Received wrong command %d from client %p",
          command, client);
      goto on_error;
    case COMMAND_TYPE_RELEASE_BUFFER:{
      ReleaseBufferPayload *release_buffer;
      if (!gst_unix_fd_parse_release_buffer (payload, payload_size,
              &release_buffer)) {
        GST_ERROR_OBJECT (self,
            "Received release-buffer with wrong payload size from client %p",
            client);
        goto on_error;
      }
      /* id is actually the GstBuffer pointer casted to guint64.
       * We can now drop its reference kept for this client. */
      if (release_buffer->id > UINTPTR_MAX
          || !g_hash_table_remove (client->buffers,
              (gpointer) (guintptr) release_buffer->id)) {
        GST_ERROR_OBJECT (self,
            "Received wrong id %" G_GUINT64_FORMAT
            " in release-buffer command from client %p", release_buffer->id,
            client);
        goto on_error;
      }
      break;
    }
    default:
      /* Protocol could have been extended with new command */
      GST_DEBUG_OBJECT (self, "Ignoring unknown command %d", command);
      break;
  }

  g_free (payload);
  GST_OBJECT_UNLOCK (self);

  return G_SOURCE_CONTINUE;

on_error:
  g_hash_table_remove (self->clients, socket);
  g_clear_error (&error);
  g_free (payload);
  GST_OBJECT_UNLOCK (self);
  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_NUM_CLIENTS]);
  return G_SOURCE_REMOVE;
}

static guint8 *
caps_to_payload (GstCaps * caps, gsize * payload_size)
{
  gchar *payload = gst_caps_to_string (caps);
  *payload_size = strlen (payload) + 1;
  return (guint8 *) payload;
}

static gboolean
new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
{
  GstUnixFdSink *self = user_data;
  Client *client;
  GError *error = NULL;

  GSocket *client_socket = g_socket_accept (self->socket, NULL, &error);
  if (client_socket == NULL) {
    GST_ERROR_OBJECT (self, "Failed to accept connection: %s", error->message);
    return G_SOURCE_CONTINUE;
  }

  client = g_new0 (Client, 1);
  client->buffers =
      g_hash_table_new_full (NULL, NULL, (GDestroyNotify) gst_buffer_unref,
      NULL);
  client->source = g_socket_create_source (client_socket, G_IO_IN, NULL);
  g_source_set_callback (client->source, (GSourceFunc) incoming_command_cb,
      self, NULL);
  g_source_attach (client->source, self->context);

  GST_OBJECT_LOCK (self);

  GST_DEBUG_OBJECT (self, "New client %p", client);
  g_hash_table_insert (self->clients, client_socket, client);

  /* Start by sending our current caps. Keep the lock while doing that because
   * we don't want this client to miss a caps event or receive a buffer while we
   * send initial caps. */
  gsize payload_size;
  guint8 *payload = caps_to_payload (self->caps, &payload_size);
  if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL,
          payload, payload_size, &error)) {
    GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client,
        error->message);
    g_hash_table_remove (self->clients, client_socket);
    g_clear_error (&error);
  }
  g_free (payload);

  g_cond_signal (&self->wait_for_connection_cond);

  GST_OBJECT_UNLOCK (self);

  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_NUM_CLIENTS]);

  return G_SOURCE_CONTINUE;
}

static gpointer
thread_cb (gpointer user_data)
{
  GstUnixFdSink *self = user_data;
  g_main_loop_run (self->loop);
  return NULL;
}

static gboolean
gst_unix_fd_sink_start (GstBaseSink * bsink)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;
  GSocketAddress *addr = NULL;
  GError *error = NULL;
  gboolean ret = TRUE;

  GST_OBJECT_LOCK (self);

  self->socket =
      gst_unix_fd_socket_new (self->socket_path, self->socket_type, &addr,
      &error);
  if (self->socket == NULL) {
    GST_ERROR_OBJECT (self, "Failed to create UNIX socket: %s", error->message);
    ret = FALSE;
    goto out;
  }

  if (!g_socket_bind (self->socket, addr, TRUE, &error)) {
    GST_ERROR_OBJECT (self, "Failed to bind socket: %s", error->message);
    g_clear_object (&self->socket);
    ret = FALSE;
    goto out;
  }

  if (!g_socket_listen (self->socket, &error)) {
    GST_ERROR_OBJECT (self, "Failed to listen socket: %s", error->message);
    g_clear_object (&self->socket);
    ret = FALSE;
    goto out;
  }

  self->source = g_socket_create_source (self->socket, G_IO_IN, NULL);
  g_source_set_callback (self->source, (GSourceFunc) new_client_cb, self, NULL);
  g_source_attach (self->source, self->context);

  self->thread = g_thread_new ("unixfdsink", thread_cb, self);

  /* Preallocate the minimum payload size for a buffer with a single memory and
   * no metas. Chances are that every buffer will require roughly the same
   * payload size, by reusing the same GByteArray we avoid reallocations. */
  self->payload =
      g_byte_array_sized_new (sizeof (NewBufferPayload) +
      sizeof (MemoryPayload));

out:
  GST_OBJECT_UNLOCK (self);
  g_clear_error (&error);
  g_clear_object (&addr);
  return ret;
}

static gboolean
gst_unix_fd_sink_stop (GstBaseSink * bsink)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;

  g_main_loop_quit (self->loop);
  g_thread_join (self->thread);

  g_source_destroy (self->source);
  g_clear_pointer (&self->source, g_source_unref);
  g_clear_object (&self->socket);
  gst_clear_caps (&self->caps);
  g_hash_table_remove_all (self->clients);
  g_clear_pointer (&self->payload, g_byte_array_unref);

  if (self->socket_type == G_UNIX_SOCKET_ADDRESS_PATH)
    g_unlink (self->socket_path);

  return TRUE;
}

static gboolean
send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds,
    const guint8 * payload, gsize payload_size, GstBuffer * buffer)
{
  GHashTableIter iter;
  GSocket *socket;
  Client *client;
  GError *error = NULL;
  gboolean client_removed = FALSE;

  g_hash_table_iter_init (&iter, self->clients);
  while (g_hash_table_iter_next (&iter, (gpointer) & socket,
          (gpointer) & client)) {
    if (!gst_unix_fd_send_command (socket, type, fds, payload, payload_size,
            &error)) {
      GST_ERROR_OBJECT (self, "Failed to send command %d to client %p: %s",
          type, client, error->message);
      g_clear_error (&error);
      g_hash_table_iter_remove (&iter);
      client_removed = TRUE;
      continue;
    }
    /* Keep a ref on this buffer until all clients released it. */
    if (buffer != NULL)
      g_hash_table_add (client->buffers, gst_buffer_ref (buffer));
  }

  return client_removed;
}

static GstClockTime
to_monotonic (GstClockTime timestamp, const GstSegment * segment,
    GstClockTime base_time, GstClockTime latency, GstClockTimeDiff clock_diff)
{
  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
    /* Convert running time to pipeline clock time */
    gint res =
        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, timestamp,
        &timestamp);
    if (res == 0)
      return GST_CLOCK_TIME_NONE;
    else if (res > 0)
      timestamp += base_time;
    else if (base_time > timestamp)
      timestamp = base_time - timestamp;
    else
      timestamp = 0;
    if (GST_CLOCK_TIME_IS_VALID (latency))
      timestamp += latency;
    /* Convert to system monotonic clock time */
    if (clock_diff < 0 && -clock_diff > timestamp)
      return 0;
    timestamp += clock_diff;
  }
  return timestamp;
}

static guint16
serialize_metas (GstBuffer * buffer, GByteArray * payload)
{
  gpointer state = NULL;
  GstMeta *meta;
  guint16 n_meta = 0;

  while ((meta = gst_buffer_iterate_meta (buffer, &state)) != NULL) {
    if (gst_meta_serialize_simple (meta, payload))
      n_meta++;
  }

  return n_meta;
}

static GstFlowReturn
gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;
  GstFlowReturn ret = GST_FLOW_OK;
  GError *error = NULL;

  guint n_memory = gst_buffer_n_memory (buffer);
  gsize struct_size =
      sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory;
  g_byte_array_set_size (self->payload, struct_size);
  guint32 n_meta = serialize_metas (buffer, self->payload);

  GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self));
  GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self));
  GstClockTimeDiff clock_diff = 0;
  if (!self->uses_monotonic_clock) {
    clock_diff = GST_CLOCK_DIFF (g_get_monotonic_time () * GST_USECOND,
        gst_clock_get_time (GST_ELEMENT_CLOCK (self)));
  }

  NewBufferPayload *new_buffer = (NewBufferPayload *) self->payload->data;
  /* Cast buffer pointer to guint64 identifier. Client will send us back that
   * id so we know which buffer to unref. */
  new_buffer->id = (guint64) (guintptr) buffer;
  new_buffer->pts =
      to_monotonic (GST_BUFFER_PTS (buffer),
      &GST_BASE_SINK_CAST (self)->segment, base_time, latency, clock_diff);
  new_buffer->dts =
      to_monotonic (GST_BUFFER_DTS (buffer),
      &GST_BASE_SINK_CAST (self)->segment, base_time, latency, clock_diff);
  new_buffer->duration = GST_BUFFER_DURATION (buffer);
  new_buffer->offset = GST_BUFFER_OFFSET (buffer);
  new_buffer->offset_end = GST_BUFFER_OFFSET_END (buffer);
  new_buffer->flags = GST_BUFFER_FLAGS (buffer);
  new_buffer->type = MEMORY_TYPE_DEFAULT;
  new_buffer->n_memory = n_memory;
  new_buffer->n_meta = n_meta;

  if ((GST_BUFFER_PTS_IS_VALID (buffer)
          && !GST_CLOCK_TIME_IS_VALID (new_buffer->pts))
      || (GST_BUFFER_DTS_IS_VALID (buffer)
          && !GST_CLOCK_TIME_IS_VALID (new_buffer->dts))) {
    GST_ERROR_OBJECT (self,
        "Could not convert buffer timestamp to running time");
    return GST_FLOW_ERROR;
  }

  /* dst_buffer is used to hold reference on new GstMemory we'll create, if any.
   * ref_original_buffer is set to TRUE if dst_buffer also needs to hold
   * reference on the original buffer. */
  GstBuffer *dst_buffer = NULL;
  gboolean ref_original_buffer = FALSE;

  gint dmabuf_count = 0;
  GUnixFDList *fds = g_unix_fd_list_new ();

  for (int i = 0; i < n_memory; i++) {
    GstMemory *mem = gst_buffer_peek_memory (buffer, i);

    if (!gst_is_fd_memory (mem)) {
      if (dst_buffer == NULL)
        dst_buffer = gst_buffer_new ();
      mem = copy_to_shm (self, mem);
      if (mem == NULL) {
        ret = GST_FLOW_ERROR;
        goto out;
      }
      gst_buffer_append_memory (dst_buffer, mem);
    } else {
      ref_original_buffer = TRUE;
    }

    if (gst_is_dmabuf_memory (mem))
      dmabuf_count++;

    if (g_unix_fd_list_append (fds, gst_fd_memory_get_fd (mem), &error) < 0) {
      GST_ERROR_OBJECT (self, "Failed to append FD: %s", error->message);
      ret = GST_FLOW_ERROR;
      goto out;
    }

    gsize offset;
    new_buffer->memories[i].size = gst_memory_get_sizes (mem, &offset, NULL);
    new_buffer->memories[i].offset = offset;
  }

  if (dmabuf_count > 0 && dmabuf_count != n_memory) {
    GST_ERROR_OBJECT (self, "Some but not all memories are DMABuf");
    ret = GST_FLOW_ERROR;
    goto out;
  }

  if (dmabuf_count > 0)
    new_buffer->type = MEMORY_TYPE_DMABUF;

  if (dst_buffer != NULL) {
    new_buffer->id = (guint64) (guintptr) dst_buffer;
    if (ref_original_buffer)
      gst_buffer_add_parent_buffer_meta (dst_buffer, buffer);
    buffer = dst_buffer;
  }

  GST_OBJECT_LOCK (self);

  while (self->wait_for_connection && g_hash_table_size (self->clients) == 0) {
    g_cond_wait (&self->wait_for_connection_cond, GST_OBJECT_GET_LOCK (self));
    if (self->unlock) {
      GST_OBJECT_UNLOCK (self);
      ret = gst_base_sink_wait_preroll (bsink);
      if (ret != GST_FLOW_OK)
        goto out;
      GST_OBJECT_LOCK (self);
    }
  }

  gboolean client_removed =
      send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds,
      self->payload->data, self->payload->len, buffer);

  GST_OBJECT_UNLOCK (self);

  if (client_removed) {
    g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_NUM_CLIENTS]);
  }

out:
  gst_clear_buffer (&dst_buffer);
  g_clear_object (&fds);
  g_clear_error (&error);
  return ret;
}

static gboolean
gst_unix_fd_sink_unlock (GstBaseSink * bsink)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;

  GST_OBJECT_LOCK (self);
  self->unlock = TRUE;
  g_cond_signal (&self->wait_for_connection_cond);
  GST_OBJECT_UNLOCK (self);

  return TRUE;
}

static gboolean
gst_unix_fd_sink_unlock_stop (GstBaseSink * bsink)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;

  GST_OBJECT_LOCK (self);
  self->unlock = FALSE;
  GST_OBJECT_UNLOCK (self);

  return TRUE;
}

static gboolean
gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
{
  GstUnixFdSink *self = (GstUnixFdSink *) bsink;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_CAPS:{
      GST_OBJECT_LOCK (self);
      gst_clear_caps (&self->caps);
      gst_event_parse_caps (event, &self->caps);
      gst_caps_ref (self->caps);
      GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT,
          self->caps);
      gsize payload_size;
      guint8 *payload = caps_to_payload (self->caps, &payload_size);
      gboolean client_removed =
          send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload,
          payload_size,
          NULL);
      g_free (payload);
      /* New caps could mean new buffer size, or even no copies needed anymore.
       * We'll create a new pool if still needed. */
      g_clear_pointer (&self->allocator, allocator_unref);
      GST_OBJECT_UNLOCK (self);
      if (client_removed) {
        g_object_notify_by_pspec (G_OBJECT (self),
            properties[PROP_NUM_CLIENTS]);
      }
      break;
    }
    case GST_EVENT_EOS:{
      GST_OBJECT_LOCK (self);
      gboolean client_removed =
          send_command_to_all (self, COMMAND_TYPE_EOS, NULL, NULL, 0, NULL);
      GST_OBJECT_UNLOCK (self);
      if (client_removed) {
        g_object_notify_by_pspec (G_OBJECT (self),
            properties[PROP_NUM_CLIENTS]);
      }
      break;
    }
    default:
      break;
  }

  return GST_BASE_SINK_CLASS (gst_unix_fd_sink_parent_class)->event (bsink,
      event);
}

static gboolean
gst_unix_fd_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
{
  GstAllocator *allocator = gst_shm_allocator_get ();
  gst_query_add_allocation_param (query, allocator, NULL);
  gst_object_unref (allocator);

  return TRUE;
}

static gboolean
gst_unix_fd_sink_set_clock (GstElement * element, GstClock * clock)
{
  GstUnixFdSink *self = (GstUnixFdSink *) element;

  self->uses_monotonic_clock = clock != NULL
      && gst_clock_is_system_monotonic (clock);

  return GST_ELEMENT_CLASS (gst_unix_fd_sink_parent_class)->set_clock (element,
      clock);
}

static GstStateChangeReturn
gst_unix_fd_sink_change_state (GstElement * element, GstStateChange transition)
{
  GstUnixFdSink *self = (GstUnixFdSink *) element;

  GstStateChangeReturn ret =
      GST_ELEMENT_CLASS (gst_unix_fd_sink_parent_class)->change_state (element,
      transition);

  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      g_clear_pointer (&self->allocator, allocator_unref);
      break;
    default:
      break;
  }

  return ret;
}

static void
gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  GstElementClass *gstelement_class = (GstElementClass *) klass;
  GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;

  GST_DEBUG_CATEGORY_INIT (unixfdsink_debug, "unixfdsink", 0,
      "Unix file descriptor sink");
  gst_element_class_set_static_metadata (gstelement_class,
      "Unix file descriptor sink", "Sink", "Unix file descriptor sink",
      "Xavier Claessens <xavier.claessens@collabora.com>");
  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);

  gst_shm_allocator_init_once ();

  gobject_class->finalize = gst_unix_fd_sink_finalize;
  gobject_class->set_property = gst_unix_fd_sink_set_property;
  gobject_class->get_property = gst_unix_fd_sink_get_property;

  gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_set_clock);
  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_unix_fd_sink_change_state);

  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_start);
  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_stop);
  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_render);
  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event);
  gstbasesink_class->propose_allocation =
      GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation);
  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock);
  gstbasesink_class->unlock_stop =
      GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock_stop);

  properties[PROP_SOCKET_PATH] =
      g_param_spec_string ("socket-path",
      "Path to the control socket",
      "The path to the control socket used to control the shared memory "
      "transport. This may be modified during the NULL->READY transition",
      NULL,
      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_READY);

  properties[PROP_SOCKET_TYPE] =
      g_param_spec_enum ("socket-type", "Socket type",
      "The type of underlying socket",
      G_TYPE_UNIX_SOCKET_ADDRESS_TYPE, DEFAULT_SOCKET_TYPE,
      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT |
      GST_PARAM_MUTABLE_READY);

  /**
   * GstUnixFdSink:wait-for-connection:
   *
   * Block the stream until a least one client is connected.
   *
   * Since: 1.26
   */
  properties[PROP_WAIT_FOR_CONNECTION] =
      g_param_spec_boolean ("wait-for-connection",
      "Wait for a connection until rendering",
      "Block the stream until a least one client is connected",
      DEFAULT_WAIT_FOR_CONNECTION,
      G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS);

  /**
   * GstUnixFdSink:min-memory-size:
   *
   * Minimum size to allocate in the case a copy into shared memory is needed.
   * Memories are kept in a pool and reused when possible.
   *
   * A value of 0 (the default) means only the needed size is allocated which
   * reduces the possibility of reusing the memory in the case not all buffers
   * need the same size.
   *
   * A negative value disables copying and the pipeline will stop with an error
   * in the case a copy into shared memory is needed.
   *
   * Since: 1.28
   */
  properties[PROP_MIN_MEMORY_SIZE] =
      g_param_spec_int64 ("min-memory-size", "Minimum memory size",
      "Minimum size to allocate in the case a copy into shared memory is needed.",
      -1, G_MAXINT64, DEFAULT_MIN_MEMORY_SIZE,
      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT);

  /**
   * GstUnixFdSink:num-clients:
   *
   * The number of clients that are currently connected to the sink.
   * This property is read-only and reflects the current connection count.
   *
   * Since: 1.28
   */
  properties[PROP_NUM_CLIENTS] =
      g_param_spec_uint ("num-clients", "Number of clients",
      "The number of clients that are connected currently",
      0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

  g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
}
