/*
 * GStreamer
 * Copyright (C) 2023 Mathieu Duponchelle <mathieu@centricular.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:element-cea608mux
 * @title: cea608mux
 * @short_description: Combine CC1 and CC3 raw 608 streams
 *
 * ```
 * gst-launch-1.0 cea608mux name=mux ! fakesink dump=true \
 * filesrc location=one.scc ! sccparse ! closedcaption/x-cea-608 ! ccconverter ! mux. \
 * filesrc location=two.scc ! sccparse ! ccconverter ! closedcaption/x-cea-608, format=raw, field=0 ! \
 *     capssetter caps="closedcaption/x-cea-608, format=raw, field=1" ! mux.
 * ```
 *
 * Since: 1.24
 */


#ifdef HAVE_CONFIG_H
#  include <config.h>
#endif

#include <gst/gst.h>
#include <gst/base/base.h>
#include <gst/video/video.h>
#include <string.h>

#include "ccutils.h"
#include "gstcea608mux.h"

GST_DEBUG_CATEGORY_STATIC (gst_cea608_mux_debug);
#define GST_CAT_DEFAULT gst_cea608_mux_debug

enum
{
  PROP_0,
  PROP_FORCE_LIVE,
};

#define DEFAULT_FORCE_LIVE              FALSE

static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("closedcaption/x-cea-608, format=s334-1a, "
        "framerate=(fraction){60/1, 60000/1001, 50/1, 30/1, 30000/1001, 25/1, 24/1, 24000/1001}"));

static GstStaticPadTemplate cc1_template = GST_STATIC_PAD_TEMPLATE ("cc1",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("closedcaption/x-cea-608,format=raw,field=0"));

static GstStaticPadTemplate cc3_template = GST_STATIC_PAD_TEMPLATE ("cc3",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("closedcaption/x-cea-608,format=raw,field=1"));

#define parent_class gst_cea608_mux_parent_class
G_DEFINE_TYPE (GstCea608Mux, gst_cea608_mux, GST_TYPE_AGGREGATOR);
GST_ELEMENT_REGISTER_DEFINE (cea608mux, "cea608mux",
    GST_RANK_NONE, GST_TYPE_CEA608MUX);

static void
gst_cea608_mux_finalize (GObject * object)
{
  GstCea608Mux *self = GST_CEA608MUX (object);

  gst_clear_object (&self->cc_buffer);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

#define GST_FLOW_NEED_DATA GST_FLOW_CUSTOM_SUCCESS

static GstAggregatorPad *
find_best_pad (GstAggregator * aggregator, GstClockTime * ts, gboolean timeout)
{
  GstAggregatorPad *best = NULL;
  GstClockTime best_ts = GST_CLOCK_TIME_NONE;
  GstIterator *pads;
  GValue padptr = { 0, };
  gboolean done = FALSE;

  pads = gst_element_iterate_sink_pads (GST_ELEMENT (aggregator));

  while (!done) {
    switch (gst_iterator_next (pads, &padptr)) {
      case GST_ITERATOR_OK:{
        GstAggregatorPad *apad = g_value_get_object (&padptr);
        GstClockTime t = GST_CLOCK_TIME_NONE;
        GstBuffer *buffer;

        buffer = gst_aggregator_pad_peek_buffer (apad);
        if (!buffer) {
          if (!timeout && !gst_aggregator_pad_is_eos (apad)) {
            gst_object_replace ((GstObject **) & best, NULL);
            best_ts = GST_CLOCK_TIME_NONE;
            done = TRUE;
          }
          break;
        }

        if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buffer))) {
          t = gst_segment_to_running_time (&apad->segment, GST_FORMAT_TIME,
              GST_BUFFER_PTS (buffer));
        }

        if (!GST_CLOCK_TIME_IS_VALID (best_ts) ||
            (GST_CLOCK_TIME_IS_VALID (t) && t < best_ts)) {
          gst_object_replace ((GstObject **) & best, GST_OBJECT (apad));
          best_ts = t;
        }
        gst_buffer_unref (buffer);
        break;
      }
      case GST_ITERATOR_DONE:
        done = TRUE;
        break;
      case GST_ITERATOR_RESYNC:
        gst_iterator_resync (pads);
        /* Clear the best pad and start again. It might have disappeared */
        gst_object_replace ((GstObject **) & best, NULL);
        best_ts = GST_CLOCK_TIME_NONE;
        break;
      case GST_ITERATOR_ERROR:
        /* This can't happen if the parameters to gst_iterator_next() are valid */
        g_assert_not_reached ();
        break;
    }
    g_value_reset (&padptr);
  }
  g_value_unset (&padptr);
  gst_iterator_free (pads);

  if (best) {
    GST_LOG_OBJECT (aggregator,
        "Best pad found with TS %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
        GST_TIME_ARGS (best_ts), best);
  } else {
    GST_LOG_OBJECT (aggregator, "Best pad not found");
  }

  if (ts && GST_CLOCK_TIME_IS_VALID (best_ts))
    *ts = best_ts;

  return best;
}

static gboolean
all_pads_eos (GstAggregator * agg)
{
  GList *l;
  gboolean ret = TRUE;

  GST_OBJECT_LOCK (agg);
  for (l = GST_ELEMENT_CAST (agg)->sinkpads; l; l = l->next) {
    GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);

    if (!gst_aggregator_pad_is_eos (pad)) {
      ret = FALSE;
      break;
    }
  }
  GST_OBJECT_UNLOCK (agg);

  return ret;
}

static void
take_s334_both_fields (GstCea608Mux * self, GstBuffer * buffer)
{
  GstMapInfo out = GST_MAP_INFO_INIT;
  gint s334_len;
  guint cc_data_len, i;

  gst_buffer_map (buffer, &out, GST_MAP_READWRITE);

  cc_data_len = out.size;
  cc_buffer_take_cc_data (self->cc_buffer, self->cdp_fps_entry, out.data,
      &cc_data_len);
  s334_len = drop_ccp_from_cc_data (out.data, cc_data_len);
  if (s334_len < 0) {
    s334_len = 0;
    goto out;
  }

  for (i = 0; i < s334_len / 3; i++) {
    guint byte = out.data[i * 3];
    /* We have to assume a line offset of 0 */
    out.data[i * 3] = (byte == 0xfc || byte == 0xf8) ? 0x80 : 0x00;
  }

out:
  gst_buffer_unmap (buffer, &out);
  gst_buffer_set_size (buffer, s334_len);
}

static GstFlowReturn
finish_s334_both_fields (GstCea608Mux * self)
{
  GstClockTime output_pts = gst_util_uint64_scale (GST_SECOND,
      (guint64) self->cdp_fps_entry->fps_d * self->n_output_buffers,
      self->cdp_fps_entry->fps_n);
  GstClockTime output_duration =
      gst_util_uint64_scale_int (GST_SECOND, self->cdp_fps_entry->fps_d,
      self->cdp_fps_entry->fps_n);
  GstBuffer *output = gst_buffer_new_allocate (NULL, MAX_CDP_PACKET_LEN, NULL);
  GstSegment *agg_segment =
      &GST_AGGREGATOR_PAD (GST_AGGREGATOR (self)->srcpad)->segment;

  output_pts += self->start_time;

  take_s334_both_fields (self, output);
  GST_BUFFER_PTS (output) = output_pts;
  GST_BUFFER_DURATION (output) = output_duration;
  GST_DEBUG_OBJECT (self, "Finishing %" GST_PTR_FORMAT, output);
  self->n_output_buffers += 1;
  agg_segment->position = output_pts + output_duration;

  return gst_aggregator_finish_buffer (GST_AGGREGATOR (self), output);
}

static GstFlowReturn
gst_cea608_mux_aggregate (GstAggregator * aggregator, gboolean timeout)
{
  GstCea608Mux *self = GST_CEA608MUX (aggregator);
  GstFlowReturn flow_ret = GST_FLOW_OK;
  GstAggregatorPad *best_pad = NULL;
  GstClockTime output_duration =
      gst_util_uint64_scale_int (GST_SECOND, self->cdp_fps_entry->fps_d,
      self->cdp_fps_entry->fps_n);
  GstSegment *agg_segment = &GST_AGGREGATOR_PAD (aggregator->srcpad)->segment;
  GstClockTime output_start_time = agg_segment->position;
  GstClockTime output_end_running_time;

  if (agg_segment->position == -1 || agg_segment->position < agg_segment->start)
    output_start_time = agg_segment->start;

  if (!GST_CLOCK_TIME_IS_VALID (self->start_time)) {
    self->start_time = output_start_time;
    GST_DEBUG_OBJECT (self, "Start time %" GST_TIME_FORMAT,
        GST_TIME_ARGS (self->start_time));
  }

  best_pad =
      find_best_pad (aggregator, &self->earliest_input_running_time, timeout);

  output_end_running_time =
      gst_segment_to_running_time (agg_segment, GST_FORMAT_TIME,
      output_start_time + output_duration);

  GST_LOG_OBJECT (self, "best-pad: %s, timeout: %d, "
      "earliest input running time: %"
      GST_TIME_FORMAT ", output running time: %" GST_TIME_FORMAT,
      best_pad ? GST_OBJECT_NAME (best_pad) : "NULL", timeout,
      GST_TIME_ARGS (self->earliest_input_running_time),
      GST_TIME_ARGS (output_end_running_time));

  if (GST_CLOCK_TIME_IS_VALID (self->earliest_input_running_time)
      && self->earliest_input_running_time > output_end_running_time) {
    /* Nothing to consume, earliest pad is not ready yet */
    GST_LOG_OBJECT (self, "Nothing to consume");
  } else if (best_pad) {
    GstBuffer *buffer;

    buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best_pad));

    if (buffer) {
      GstMapInfo map;

      gst_buffer_map (buffer, &map, GST_MAP_READ);

      if (g_strcmp0 (GST_PAD_NAME (best_pad), "cc1") == 0) {
        GST_DEBUG_OBJECT (self, "Consuming CC1 %" GST_PTR_FORMAT, buffer);
        cc_buffer_push_separated (self->cc_buffer, map.data, map.size, NULL, 0,
            NULL, 0);
      } else {
        GST_DEBUG_OBJECT (self, "Consuming CC3 %" GST_PTR_FORMAT, buffer);
        cc_buffer_push_separated (self->cc_buffer, NULL, 0, map.data, map.size,
            NULL, 0);
      }

      gst_buffer_unmap (buffer, &map);
      gst_buffer_unref (buffer);
    } else if (!timeout) {
      /* We got flushed and still have time to wait before the deadline */
      flow_ret = GST_AGGREGATOR_FLOW_NEED_DATA;
    }
  } else if (!gst_aggregator_get_force_live (aggregator)
      && all_pads_eos (aggregator)) {
    GST_INFO_OBJECT (self, "EOS!");
    flow_ret = GST_FLOW_EOS;
  } else if (!timeout) {
    GST_LOG_OBJECT (self, "Need more data");
    flow_ret = GST_AGGREGATOR_FLOW_NEED_DATA;
  }

  if (flow_ret == GST_FLOW_OK) {
    if (timeout || output_end_running_time < self->earliest_input_running_time) {
      flow_ret = finish_s334_both_fields (self);
    }
  } else if (flow_ret == GST_FLOW_EOS && !cc_buffer_is_empty (self->cc_buffer)) {
    flow_ret = finish_s334_both_fields (self);
  }

  g_clear_pointer (&best_pad, gst_object_unref);

  return flow_ret;
}

static gboolean
gst_cea608_mux_stop (GstAggregator * aggregator)
{
  GstCea608Mux *self = GST_CEA608MUX (aggregator);

  cc_buffer_discard (self->cc_buffer);
  self->n_output_buffers = 0;
  self->earliest_input_running_time = 0;
  self->start_time = GST_CLOCK_TIME_NONE;

  return TRUE;
}

static GstFlowReturn
gst_cea608_mux_flush (GstAggregator * aggregator)
{
  GstCea608Mux *self = GST_CEA608MUX (aggregator);
  GstSegment *agg_segment = &GST_AGGREGATOR_PAD (aggregator->srcpad)->segment;

  GST_DEBUG_OBJECT (self, "Flush");

  cc_buffer_discard (self->cc_buffer);
  self->n_output_buffers = 0;
  self->earliest_input_running_time = 0;
  self->start_time = GST_CLOCK_TIME_NONE;
  agg_segment->position = -1;

  return GST_FLOW_OK;
}

static gboolean
gst_cea608_mux_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
{
  GstStructure *s = gst_caps_get_structure (caps, 0);
  gint fps_n, fps_d;
  GstCea608Mux *self = GST_CEA608MUX (agg);
  GstClockTime latency;
  gboolean success GST_UNUSED_ASSERT;

  GST_INFO_OBJECT (agg->srcpad, "set src caps: %" GST_PTR_FORMAT, caps);

  success = gst_structure_get_fraction (s, "framerate", &fps_n, &fps_d);
  g_assert (success);
  self->cdp_fps_entry = cdp_fps_entry_from_fps (fps_n, fps_d);
  g_assert (self->cdp_fps_entry != NULL && self->cdp_fps_entry->fps_n != 0);

  latency =
      gst_util_uint64_scale (GST_SECOND, self->cdp_fps_entry->fps_d,
      self->cdp_fps_entry->fps_n);
  gst_aggregator_set_latency (agg, latency, latency);

  return TRUE;
}

static GstBuffer *
gst_cea608_mux_clip (GstAggregator * aggregator, GstAggregatorPad * pad,
    GstBuffer * buffer)
{
  GstClockTime time;

  if (!GST_BUFFER_PTS_IS_VALID (buffer))
    return buffer;

  time = gst_segment_to_running_time (&pad->segment, GST_FORMAT_TIME,
      GST_BUFFER_PTS (buffer));
  if (!GST_CLOCK_TIME_IS_VALID (time)) {
    GST_DEBUG_OBJECT (pad, "Dropping buffer on pad outside segment %"
        GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
    gst_buffer_unref (buffer);
    return NULL;
  }

  return buffer;
}

static void
gst_cea608_mux_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec)
{
  switch (prop_id) {
    case PROP_FORCE_LIVE:
      g_value_set_boolean (value,
          gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_cea608_mux_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec)
{
  switch (prop_id) {
    case PROP_FORCE_LIVE:
      gst_aggregator_set_force_live (GST_AGGREGATOR (object),
          g_value_get_boolean (value));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_cea608_mux_class_init (GstCea608MuxClass * klass)
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
  GstAggregatorClass *aggregator_class;

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;
  aggregator_class = (GstAggregatorClass *) klass;

  gobject_class->finalize = gst_cea608_mux_finalize;
  gobject_class->get_property = gst_cea608_mux_get_property;
  gobject_class->set_property = gst_cea608_mux_set_property;

  gst_element_class_set_static_metadata (gstelement_class,
      "Closed Caption Muxer",
      "Aggregator",
      "Combines raw 608 streams",
      "Mathieu Duponchelle <mathieu@centricular.com>");

  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
      &srctemplate, GST_TYPE_AGGREGATOR_PAD);
  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
      &cc1_template, GST_TYPE_AGGREGATOR_PAD);
  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
      &cc3_template, GST_TYPE_AGGREGATOR_PAD);

  aggregator_class->aggregate = gst_cea608_mux_aggregate;
  aggregator_class->stop = gst_cea608_mux_stop;
  aggregator_class->flush = gst_cea608_mux_flush;
  aggregator_class->negotiated_src_caps = gst_cea608_mux_negotiated_src_caps;
  aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
  aggregator_class->clip = gst_cea608_mux_clip;

  GST_DEBUG_CATEGORY_INIT (gst_cea608_mux_debug, "cea608mux",
      0, "Closed Caption muxer");

  /**
   * cea608mux:force-live:
   *
   * Causes the element to aggregate on a timeout even when no live source is
   * connected to its sinks. See #GstAggregator:min-upstream-latency for a
   * companion property: in the vast majority of cases where you plan to plug in
   * live sources with a non-zero latency, you should set it to a non-zero value.
   *
   * Since: 1.26
   */
  g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
      g_param_spec_boolean ("force-live", "Force live",
          "Always operate in live mode and aggregate on timeout regardless of "
          "whether any live sources are linked upstream",
          DEFAULT_FORCE_LIVE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
}

static void
gst_cea608_mux_init (GstCea608Mux * self)
{
  self->cc_buffer = cc_buffer_new ();
  cc_buffer_set_max_buffer_time (self->cc_buffer, GST_CLOCK_TIME_NONE);
  cc_buffer_set_output_padding (self->cc_buffer, TRUE, FALSE);
  cc_buffer_set_cea608_padding_strategy (self->cc_buffer,
      CC_BUFFER_CEA608_PADDING_STRATEGY_VALID |
      CC_BUFFER_CEA608_PADDING_STRATEGY_INPUT_REMOVE);
  self->cdp_fps_entry = &null_fps_entry;
  self->start_time = GST_CLOCK_TIME_NONE;
}
