/* GStreamer
 *
 * SPDX-License-Identifier: LGPL-2.1
 *
 * Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
 * Copyright (C) 2013, 2022, 2023 Collabora Ltd.
 * Copyright (C) 2013 Orange
 * Copyright (C) 2014, 2015 Sebastian Dröge <sebastian@centricular.com>
 * Copyright (C) 2015, 2016, 2018, 2019, 2020, 2021 Igalia, S.L
 * Copyright (C) 2015, 2016, 2018, 2019, 2020, 2021 Metrological Group B.V.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:gstmsesrc
 * @title: GstMseSrc
 * @short_description: Source Element for Media Source playback
 *
 * #GstMseSrc is a source Element that interacts with a #GstMediaSource to
 * consume #GstSample<!-- -->s processed by the Media Source and supplies them
 * to the containing #GstPipeline. In the perspective of the Media Source API,
 * this element fulfills the basis of the Media Element's role relating to
 * working with a Media Source. The remaining responsibilities are meant to be
 * fulfilled by the application and #GstPlay can be used to satisfy many of
 * them.
 *
 * Once added to a Pipeline, this element should be attached to a Media Source
 * using gst_media_source_attach().
 *
 * Since: 1.24
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gst/gst.h>
#include <gst/base/base.h>

#include <gst/mse/mse-enumtypes.h>
#include "gstmsesrc.h"
#include "gstmsesrc-private.h"

#include "gstmselogging-private.h"

#include "gstmediasource.h"
#include "gstmediasource-private.h"
#include "gstmediasourcetrack-private.h"
#include "gstsourcebuffer.h"
#include "gstsourcebuffer-private.h"

#define DEFAULT_POSITION 0
#define DEFAULT_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_READY_STATE GST_MSE_SRC_READY_STATE_HAVE_NOTHING
#define DECODE_ERROR "decode error"
#define NETWORK_ERROR "network error"

enum
{
  PROP_0,

  PROP_POSITION,
  PROP_DURATION,
  PROP_READY_STATE,

  PROP_N_AUDIO,
  PROP_N_TEXT,
  PROP_N_VIDEO,

  N_PROPS,
};

enum
{
  THRESHOLD_FUTURE_DATA = GST_SECOND * 5,
  THRESHOLD_ENOUGH_DATA = GST_SECOND * 50,
};

static GParamSpec *properties[N_PROPS];

static GstStaticPadTemplate gst_mse_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%s", GST_PAD_SRC, GST_PAD_SOMETIMES,
    GST_STATIC_CAPS_ANY);

typedef struct
{
  GWeakRef parent;
  GstTask *task;
  GRecMutex mutex;
} ReadyStateUpdateTask;

/**
 * GstMseSrcPad:
 *
 * Since: 1.24
 */
struct _GstMseSrcPad
{
  GstPad base;

  GstStream *stream;
  GstMediaSourceTrack *track;
  GstCaps *most_recent_caps;
  GstSegment segment;

  gboolean sent_stream_collection;
  gboolean sent_stream_start;
  gboolean sent_initial_caps;
  gboolean does_need_segment;

  GCond linked_or_flushing_cond;
  GMutex linked_or_flushing_lock;
  gboolean flushing;
};

#define MEDIA_SOURCE_LOCK(a) g_mutex_lock (&(a)->media_source_lock)
#define MEDIA_SOURCE_UNLOCK(a) g_mutex_unlock (&(a)->media_source_lock)

#define STREAMS_LOCK(a) (g_mutex_lock (&a->streams_lock))
#define STREAMS_UNLOCK(a) (g_mutex_unlock (&a->streams_lock))

#define LINKED_OR_FLUSHING_LOCK(a) (g_mutex_lock (&a->linked_or_flushing_lock))
#define LINKED_OR_FLUSHING_UNLOCK(a) (g_mutex_unlock (&a->linked_or_flushing_lock))
#define LINKED_OR_FLUSHING_SIGNAL(a) (g_cond_signal (&a->linked_or_flushing_cond))
#define LINKED_OR_FLUSHING_WAIT(a) \
  (g_cond_wait (&a->linked_or_flushing_cond, &a->linked_or_flushing_lock))

#define FLOW_COMBINER_LOCK(a) (g_mutex_lock (&a->flow_combiner_lock))
#define FLOW_COMBINER_UNLOCK(a) (g_mutex_unlock (&a->flow_combiner_lock))

G_DEFINE_TYPE (GstMseSrcPad, gst_mse_src_pad, GST_TYPE_PAD);

static gboolean pad_activate_mode (GstMseSrcPad * pad, GstObject * parent,
    GstPadMode mode, gboolean active);

static GstPadLinkReturn pad_linked (GstMseSrcPad * pad, GstMseSrc * parent,
    GstPad * sink);
static gboolean pad_event (GstMseSrcPad * pad, GstMseSrc * parent,
    GstEvent * event);
static gboolean pad_query (GstMseSrcPad * pad, GstObject * parent,
    GstQuery * query);
static void pad_task (GstMseSrcPad * pad);

static ReadyStateUpdateTask *ready_state_update_task_new (GstMseSrc * parent);
static void ready_state_update_task_free (ReadyStateUpdateTask * task);
static void ready_state_update_task_func (ReadyStateUpdateTask * task);
static void ready_state_update_task_start (ReadyStateUpdateTask * task);
static void ready_state_update_task_stop (ReadyStateUpdateTask * task);
static void ready_state_update_task_join (ReadyStateUpdateTask * task);

static const gchar *
mse_src_ready_state_name (GstMseSrcReadyState state)
{
  return gst_mse_enum_value_nick (gst_mse_src_ready_state_get_type (), state);
}

static GstPad *
gst_mse_src_pad_new (GstMediaSourceTrack * track, GstStream * stream,
    guint id, GstClockTime start, gdouble rate)
{
  gchar *name = g_strdup_printf ("src_%u", id);
  GstMseSrcPad *self = g_object_new (GST_TYPE_MSE_SRC_PAD, "name", name,
      "direction", GST_PAD_SRC, NULL);
  g_free (name);
  self->stream = stream;
  self->track = track;
  self->segment.start = start;
  self->segment.rate = rate;

  return GST_PAD (self);
}

static void
gst_mse_src_pad_init (GstMseSrcPad * self)
{
  gst_segment_init (&self->segment, GST_FORMAT_TIME);
  self->sent_stream_collection = FALSE;
  self->sent_stream_start = FALSE;
  self->sent_initial_caps = FALSE;
  self->does_need_segment = TRUE;
  self->flushing = FALSE;
  g_mutex_init (&self->linked_or_flushing_lock);
  g_cond_init (&self->linked_or_flushing_cond);

  gst_pad_set_activatemode_function (GST_PAD (self),
      (GstPadActivateModeFunction) pad_activate_mode);
  gst_pad_set_link_function (GST_PAD (self), (GstPadLinkFunction) pad_linked);
  gst_pad_set_event_function (GST_PAD (self), (GstPadEventFunction) pad_event);
  gst_pad_set_query_function (GST_PAD (self), (GstPadQueryFunction) pad_query);
}

static void
gst_mse_src_pad_finalize (GObject * object)
{
  GstMseSrcPad *self = GST_MSE_SRC_PAD (object);

  gst_clear_caps (&self->most_recent_caps);
  g_mutex_clear (&self->linked_or_flushing_lock);
  g_cond_clear (&self->linked_or_flushing_cond);

  G_OBJECT_CLASS (gst_mse_src_pad_parent_class)->finalize (object);
}

static void
gst_mse_src_pad_class_init (GstMseSrcPadClass * klass)
{
  GObjectClass *oclass = G_OBJECT_CLASS (klass);
  oclass->finalize = GST_DEBUG_FUNCPTR (gst_mse_src_pad_finalize);
}

// TODO: Check if this struct is even necessary.
// The custom pad should be able to keep track of information for each track.
typedef struct
{
  GstMediaSourceTrack *track;
  GstPad *pad;
  GstStream *info;
} Stream;

/**
 * GstMseSrc:
 *
 * Since: 1.24
 */
struct _GstMseSrc
{
  GstElement base;

  GstMediaSource *media_source;
  GMutex media_source_lock;

  guint group_id;
  GstStreamCollection *collection;
  GHashTable *streams;
  GMutex streams_lock;

  GstClockTime duration;
  GstClockTime start_time;
  gdouble rate;
  GstMseSrcReadyState ready_state;

  ReadyStateUpdateTask *ready_state_update_task;

  GstFlowCombiner *flow_combiner;
  GMutex flow_combiner_lock;

  gchar *uri;
};

static void gst_mse_src_uri_handler_init (gpointer g_iface,
    gpointer iface_data);
static GstStateChangeReturn gst_mse_src_change_state (GstElement * element,
    GstStateChange transition);
static gboolean gst_mse_src_send_event (GstElement * element, GstEvent * event);
static void update_ready_state_for_init_segment (GstMseSrc * self);
static void update_ready_state (GstMseSrc * self);

G_DEFINE_TYPE_WITH_CODE (GstMseSrc, gst_mse_src, GST_TYPE_ELEMENT,
    G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_mse_src_uri_handler_init));

static void
gst_mse_src_constructed (GObject * object)
{
  GstMseSrc *self = GST_MSE_SRC (object);

  G_OBJECT_CLASS (gst_mse_src_parent_class)->constructed (object);
  GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_SOURCE);
}

static void
gst_mse_src_dispose (GObject * object)
{
  GstMseSrc *self = GST_MSE_SRC (object);
  g_clear_pointer (&self->ready_state_update_task,
      ready_state_update_task_free);
  gst_clear_object (&self->media_source);
  g_mutex_clear (&self->media_source_lock);
  gst_clear_object (&self->collection);
  g_clear_pointer (&self->streams, g_hash_table_unref);
  g_mutex_clear (&self->streams_lock);
  g_clear_pointer (&self->flow_combiner, gst_flow_combiner_free);
  g_mutex_clear (&self->flow_combiner_lock);
  G_OBJECT_CLASS (gst_mse_src_parent_class)->dispose (object);
}

static void
gst_mse_src_finalize (GObject * object)
{
  GstMseSrc *self = GST_MSE_SRC (object);

  g_clear_pointer (&self->uri, g_free);

  G_OBJECT_CLASS (gst_mse_src_parent_class)->finalize (object);
}

static void
gst_mse_src_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
{
  GstMseSrc *self = GST_MSE_SRC (object);

  switch (prop_id) {
    case PROP_DURATION:
      g_value_set_uint64 (value, gst_mse_src_get_duration (self));
      break;
    case PROP_POSITION:
      g_value_set_uint64 (value, gst_mse_src_get_position (self));
      break;
    case PROP_READY_STATE:
      g_value_set_enum (value, gst_mse_src_get_ready_state (self));
      break;
    case PROP_N_AUDIO:
      g_value_set_uint (value, gst_mse_src_get_n_audio (self));
      break;
    case PROP_N_TEXT:
      g_value_set_uint (value, gst_mse_src_get_n_text (self));
      break;
    case PROP_N_VIDEO:
      g_value_set_uint (value, gst_mse_src_get_n_video (self));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_mse_src_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMseSrc *self = GST_MSE_SRC (object);

  switch (prop_id) {
    case PROP_DURATION:
      gst_mse_src_set_duration (self, g_value_get_uint64 (value));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_mse_src_class_init (GstMseSrcClass * klass)
{
  GObjectClass *oclass = (GObjectClass *) klass;
  GstElementClass *eclass = (GstElementClass *) klass;

  oclass->constructed = GST_DEBUG_FUNCPTR (gst_mse_src_constructed);
  oclass->finalize = GST_DEBUG_FUNCPTR (gst_mse_src_finalize);
  oclass->dispose = GST_DEBUG_FUNCPTR (gst_mse_src_dispose);
  oclass->get_property = GST_DEBUG_FUNCPTR (gst_mse_src_get_property);
  oclass->set_property = GST_DEBUG_FUNCPTR (gst_mse_src_set_property);

  eclass->change_state = GST_DEBUG_FUNCPTR (gst_mse_src_change_state);
  eclass->send_event = GST_DEBUG_FUNCPTR (gst_mse_src_send_event);

  /**
   * GstMseSrc:position:
   *
   * The playback position as a #GstClockTime
   *
   * [Specification](https://html.spec.whatwg.org/multipage/media.html#current-playback-position)
   *
   * Since: 1.24
   */
  properties[PROP_POSITION] = g_param_spec_uint64 ("position",
      "Position",
      "The playback position as a GstClockTime",
      0, G_MAXUINT64, DEFAULT_POSITION, G_PARAM_READABLE |
      G_PARAM_STATIC_STRINGS);

  /**
   * GstMseSrc:duration:
   *
   * The duration of the stream as a #GstClockTime
   *
   * [Specification](https://html.spec.whatwg.org/multipage/media.html#dom-media-duration)
   *
   * Since: 1.24
   */
  properties[PROP_DURATION] = g_param_spec_uint64 ("duration",
      "Duration",
      "The duration of the stream as a GstClockTime",
      0, G_MAXUINT64, DEFAULT_DURATION, G_PARAM_READWRITE |
      G_PARAM_STATIC_STRINGS);

  /**
   * GstMseSrc:ready-state:
   *
   * The Ready State of this element, describing to what level it can supply
   * content for the current #GstMseSrc:position. This is a separate concept
   * from #GstMediaSource:ready-state: and corresponds to the HTML Media
   * Element's Ready State.
   *
   * [Specification](https://html.spec.whatwg.org/multipage/media.html#ready-states)
   *
   * Since: 1.24
   */
  properties[PROP_READY_STATE] = g_param_spec_enum ("ready-state",
      "Ready State",
      "The Ready State of this Element",
      GST_TYPE_MSE_SRC_READY_STATE,
      DEFAULT_READY_STATE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

  /**
   * GstMseSrc:n-audio:
   *
   * The number of audio tracks in the Media Source
   *
   * Since: 1.24
   */
  properties[PROP_N_AUDIO] = g_param_spec_uint ("n-audio",
      "Number of Audio Tracks",
      "The number of audio tracks in the Media Source",
      0, G_MAXINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

  /**
   * GstMseSrc:n-text:
   *
   * The number of text tracks in the Media Source
   *
   * Since: 1.24
   */
  properties[PROP_N_TEXT] = g_param_spec_uint ("n-text",
      "Number of Text Tracks",
      "The number of text tracks in the Media Source",
      0, G_MAXINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

  /**
   * GstMseSrc:n-video:
   *
   * The number of video tracks in the Media Source
   *
   * Since: 1.24
   */
  properties[PROP_N_VIDEO] = g_param_spec_uint ("n-video",
      "Number of Video Tracks",
      "The number of video tracks in the Media Source",
      0, G_MAXINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);

  g_object_class_install_properties (oclass, N_PROPS, properties);

  gst_element_class_set_static_metadata (eclass, "MseSrc",
      "Generic/Source",
      "Implements a GStreamer Source for the gstreamer-mse API", "Collabora");
  gst_element_class_add_static_pad_template (eclass, &gst_mse_src_template);

  gst_mse_init_logging ();
}

static void
clear_stream (Stream * stream)
{
  gst_clear_object (&stream->track);
  gst_clear_object (&stream->info);
  g_free (stream);
}

static GHashTable *
streams_init (const GstMseSrc * self)
{
  return g_hash_table_new_full (g_direct_hash, g_direct_equal,
      NULL, (GDestroyNotify) clear_stream);
}

static GstStreamCollection *
collection_init (const GstMseSrc * self)
{
  return gst_stream_collection_new (G_OBJECT_TYPE_NAME (self));
}

static ReadyStateUpdateTask *
ready_state_update_task_new (GstMseSrc * parent)
{
  ReadyStateUpdateTask *task = g_new0 (ReadyStateUpdateTask, 1);
  g_rec_mutex_init (&task->mutex);
  g_weak_ref_init (&task->parent, parent);
  task->task =
      gst_task_new ((GstTaskFunction) ready_state_update_task_func, task, NULL);
  gst_task_set_lock (task->task, &task->mutex);
  return task;
}

static void
ready_state_update_task_free (ReadyStateUpdateTask * task)
{
  g_weak_ref_set (&task->parent, NULL);
  gst_task_join (task->task);
  gst_clear_object (&task->task);
  g_weak_ref_clear (&task->parent);
  g_rec_mutex_clear (&task->mutex);
  g_free (task);
}

static void
ready_state_update_task_start (ReadyStateUpdateTask * task)
{
  GstMseSrc *parent = g_weak_ref_get (&task->parent);
  if (parent) {
    gchar *name = g_strdup_printf ("%s:ready-state", GST_OBJECT_NAME (parent));
    g_object_set (task->task, "name", name, NULL);
    g_clear_pointer (&name, g_free);
  }
  gst_clear_object (&parent);
  gst_task_start (task->task);
}

static void
ready_state_update_task_stop (ReadyStateUpdateTask * task)
{
  gst_task_stop (task->task);
}

static void
ready_state_update_task_join (ReadyStateUpdateTask * task)
{
  gst_task_join (task->task);
}

static void
ready_state_update_task_func (ReadyStateUpdateTask * task)
{
  GstMseSrc *self = (GstMseSrc *) g_weak_ref_get (&task->parent);

  if (self == NULL) {
    GST_ERROR_OBJECT (task->task, "parent object is gone, stopping");
    gst_task_stop (task->task);
    return;
  }

  update_ready_state (self);
  gst_object_unref (self);
  g_usleep (G_TIME_SPAN_SECOND);
}

static void
gst_mse_src_init (GstMseSrc * self)
{
  self->group_id = gst_util_group_id_next ();
  self->streams = streams_init (self);
  self->collection = collection_init (self);
  self->uri = NULL;
  self->start_time = 0;
  self->rate = 1;
  self->media_source = NULL;
  g_mutex_init (&self->media_source_lock);
  g_mutex_init (&self->streams_lock);
  self->flow_combiner = gst_flow_combiner_new ();
  g_mutex_init (&self->flow_combiner_lock);
  self->ready_state_update_task = ready_state_update_task_new (self);
}

/**
 * gst_mse_src_get_position:
 * @self: #GstMseSrc instance
 *
 * Gets the current playback position of @self.
 *
 * [Specification](https://html.spec.whatwg.org/multipage/media.html#current-playback-position)
 *
 * Returns: The playback position of this Element as a #GstClockTime
 * Since: 1.24
 */
GstClockTime
gst_mse_src_get_position (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), GST_CLOCK_TIME_NONE);
  gint64 position;
  gboolean success = gst_element_query_position (GST_ELEMENT (self),
      GST_FORMAT_TIME, &position);
  if (success)
    return (GstClockTime) position;
  else
    return DEFAULT_POSITION;
}

static void
update_pad_duration (GstMseSrc * self, GstMseSrcPad * pad)
{
  pad->segment.duration = self->duration;
  pad->does_need_segment = TRUE;
}

void
gst_mse_src_set_duration (GstMseSrc * self, GstClockTime duration)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));

  self->duration = duration;

  gst_element_foreach_src_pad (GST_ELEMENT (self),
      (GstElementForeachPadFunc) update_pad_duration, NULL);

  gst_element_post_message (GST_ELEMENT (self),
      gst_message_new_duration_changed (GST_OBJECT (self)));
}

/**
 * gst_mse_src_get_duration:
 * @self: #GstMseSrc instance
 *
 * Gets the duration of @self.
 *
 * [Specification](https://html.spec.whatwg.org/multipage/media.html#dom-media-duration)
 *
 * Returns: The duration of this stream as a #GstClockTime
 * Since: 1.24
 */
GstClockTime
gst_mse_src_get_duration (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), DEFAULT_DURATION);
  return self->duration;
}

/**
 * gst_mse_src_get_ready_state:
 * @self: #GstMseSrc instance
 *
 * The Ready State of @self, describing to what level it can supply content for
 * the current #GstMseSrc:position. This is a separate concept from
 * #GstMediaSource:ready-state: and corresponds to the HTML Media Element's
 * Ready State.
 *
 * [Specification](https://html.spec.whatwg.org/multipage/media.html#ready-states)
 *
 * Returns: the current #GstMseSrcReadyState
 * Since: 1.24
 */
GstMseSrcReadyState
gst_mse_src_get_ready_state (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), DEFAULT_READY_STATE);
  return self->ready_state;
}

static guint
n_streams_by_type (GstMseSrc * self, GstMediaSourceTrackType type)
{
  guint count = 0;
  GHashTableIter iter;
  g_hash_table_iter_init (&iter, self->streams);
  for (gpointer key; g_hash_table_iter_next (&iter, &key, NULL);) {
    GstMediaSourceTrack *track = GST_MEDIA_SOURCE_TRACK (key);
    GstMediaSourceTrackType stream_type =
        gst_media_source_track_get_track_type (track);
    if (type == stream_type) {
      count++;
    }
  }
  return count;
}

/**
 * gst_mse_src_get_n_audio:
 * @self: #GstMseSrc instance
 *
 * Returns: the number of audio tracks available from this source
 * Since: 1.24
 */
guint
gst_mse_src_get_n_audio (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), 0);
  return n_streams_by_type (self, GST_MEDIA_SOURCE_TRACK_TYPE_AUDIO);
}

/**
 * gst_mse_src_get_n_text:
 * @self: #GstMseSrc instance
 *
 * Returns: the number of text tracks available from this source
 * Since: 1.24
 */
guint
gst_mse_src_get_n_text (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), 0);
  return n_streams_by_type (self, GST_MEDIA_SOURCE_TRACK_TYPE_TEXT);
}

/**
 * gst_mse_src_get_n_video:
 * @self: #GstMseSrc instance
 *
 * Returns: the number of video tracks available from this source
 * Since: 1.24
 */
guint
gst_mse_src_get_n_video (GstMseSrc * self)
{
  g_return_val_if_fail (GST_IS_MSE_SRC (self), 0);
  return n_streams_by_type (self, GST_MEDIA_SOURCE_TRACK_TYPE_VIDEO);
}

void
gst_mse_src_decode_error (GstMseSrc * self)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));
  GstMseSrcReadyState ready_state = g_atomic_int_get (&self->ready_state);
  if (ready_state == GST_MSE_SRC_READY_STATE_HAVE_NOTHING) {
    GST_ELEMENT_ERROR (self, STREAM, DECODE, (DECODE_ERROR),
        ("the necessary decoder may be missing from this installation"));
  } else {
    GST_ELEMENT_ERROR (self, STREAM, DECODE, (DECODE_ERROR),
        ("the stream may be corrupt"));
  }
}

void
gst_mse_src_network_error (GstMseSrc * self)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));
  GstMseSrcReadyState ready_state = g_atomic_int_get (&self->ready_state);
  if (ready_state == GST_MSE_SRC_READY_STATE_HAVE_NOTHING) {
    GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NETWORK_ERROR),
        ("an error occurred before any media was read"));
  } else {
    GST_ELEMENT_ERROR (self, RESOURCE, READ, (NETWORK_ERROR),
        ("an error occurred while reading media"));
  }
}

static inline gboolean
is_streamable (GstMediaSourceTrack * track)
{
  switch (gst_media_source_track_get_track_type (track)) {
    case GST_MEDIA_SOURCE_TRACK_TYPE_AUDIO:
    case GST_MEDIA_SOURCE_TRACK_TYPE_TEXT:
    case GST_MEDIA_SOURCE_TRACK_TYPE_VIDEO:
      return TRUE;
    default:
      return FALSE;
  }
}

static GstStream *
create_gst_stream (GstMediaSourceTrack * track)
{
  gchar *stream_id = g_strdup_printf ("%s-%s",
      GST_OBJECT_NAME (track), gst_media_source_track_get_id (track));
  GstStream *stream = gst_stream_new (stream_id,
      gst_media_source_track_get_initial_caps (track),
      gst_media_source_track_get_stream_type (track), GST_STREAM_FLAG_SELECT);
  g_free (stream_id);
  return stream;
}

static void
set_flushing_and_signal (GstMseSrcPad * pad)
{
  GST_TRACE_OBJECT (pad, "locking");
  LINKED_OR_FLUSHING_LOCK (pad);
  g_atomic_int_set (&pad->flushing, TRUE);
  LINKED_OR_FLUSHING_SIGNAL (pad);
  LINKED_OR_FLUSHING_UNLOCK (pad);
  GST_TRACE_OBJECT (pad, "done");
}

static void
clear_flushing (GstMseSrcPad * pad)
{
  GST_TRACE_OBJECT (pad, "locking");
  LINKED_OR_FLUSHING_LOCK (pad);
  g_atomic_int_set (&pad->flushing, FALSE);
  LINKED_OR_FLUSHING_UNLOCK (pad);
  GST_TRACE_OBJECT (pad, "done");
}

static void
flush_stream (GstMseSrc * self, Stream * stream, gboolean is_seek)
{
  GstMseSrcPad *pad = GST_MSE_SRC_PAD (stream->pad);
  gst_pad_push_event (GST_PAD (pad), gst_event_new_flush_start ());

  if (is_seek) {
    GST_DEBUG_OBJECT (pad, "flushing for seek to %" GST_TIMEP_FORMAT,
        &self->start_time);
    set_flushing_and_signal (pad);
    gst_media_source_track_flush (stream->track);
    gst_pad_stop_task (GST_PAD (pad));
    GST_DEBUG_OBJECT (pad, "stopped task");
    GstSegment *segment = &(pad->segment);
    segment->base = 0;
    segment->start = self->start_time;
    segment->position = self->start_time;
    segment->time = self->start_time;
    segment->rate = self->rate;
  } else {
    gst_media_source_track_flush (stream->track);
  }

  g_atomic_int_set (&pad->does_need_segment, TRUE);

  gst_pad_push_event (GST_PAD (pad), gst_event_new_flush_stop (is_seek));
}

static void
flush_all_streams (GstMseSrc * self, gboolean is_seek)
{
  GHashTableIter iter;
  g_hash_table_iter_init (&iter, self->streams);
  for (gpointer value; g_hash_table_iter_next (&iter, NULL, &value);) {
    flush_stream (self, (Stream *) value, is_seek);
  }
}

static void
resume_all_streams (GstMseSrc * self)
{
  GstState state;
  gst_element_get_state (GST_ELEMENT (self), &state, NULL, 0);
  gboolean active = state > GST_STATE_READY;

  GHashTableIter iter;
  g_hash_table_iter_init (&iter, self->streams);
  for (gpointer value; g_hash_table_iter_next (&iter, NULL, &value);) {
    Stream *stream = value;
    GstPad *pad = GST_PAD (stream->pad);
    if (active) {
      clear_flushing (GST_MSE_SRC_PAD (pad));
      gst_pad_start_task (pad, (GstTaskFunction) pad_task, pad, NULL);
    }
  }
}

static void
tear_down_stream (GstMseSrc * self, Stream * stream)
{
  GST_DEBUG_OBJECT (self, "tearing down stream %s",
      gst_media_source_track_get_id (stream->track));

  flush_stream (self, stream, FALSE);
  gst_pad_set_active (stream->pad, FALSE);

  if (gst_stream_collection_get_size (self->collection) > 0) {
    gst_element_remove_pad (GST_ELEMENT (self), stream->pad);
    FLOW_COMBINER_LOCK (self);
    gst_flow_combiner_remove_pad (self->flow_combiner, stream->pad);
    FLOW_COMBINER_UNLOCK (self);
  }
}


static void
tear_down_all_streams (GstMseSrc * self)
{
  GHashTableIter iter;
  g_hash_table_iter_init (&iter, self->streams);
  for (gpointer value; g_hash_table_iter_next (&iter, NULL, &value);) {
    tear_down_stream (self, (Stream *) value);
    g_hash_table_iter_remove (&iter);
  }
}

static GstStateChangeReturn
gst_mse_src_change_state (GstElement * element, GstStateChange transition)
{
  GstMseSrc *self = GST_MSE_SRC (element);
  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      ready_state_update_task_stop (self->ready_state_update_task);
      tear_down_all_streams (self);
      break;
    case GST_STATE_CHANGE_READY_TO_NULL:
      ready_state_update_task_join (self->ready_state_update_task);
      gst_mse_src_detach (self);
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      ready_state_update_task_start (self->ready_state_update_task);
      break;
    default:
      break;
  }
  return GST_ELEMENT_CLASS (gst_mse_src_parent_class)->change_state (element,
      transition);
}

static void
gst_mse_src_seek (GstMseSrc * self, GstClockTime start_time, gdouble rate)
{
  GST_OBJECT_LOCK (self);
  self->start_time = start_time;
  self->rate = rate;
  GST_OBJECT_UNLOCK (self);

  MEDIA_SOURCE_LOCK (self);
  flush_all_streams (self, TRUE);
  if (self->media_source) {
    GST_DEBUG_OBJECT (self, "seeking on media source %" GST_PTR_FORMAT,
        self->media_source);
    gst_media_source_seek (self->media_source, start_time);
  } else {
    GST_DEBUG_OBJECT (self, "detached, not seeking on media source");
  }
  MEDIA_SOURCE_UNLOCK (self);
  resume_all_streams (self);
}

static gboolean
gst_mse_src_send_event (GstElement * element, GstEvent * event)
{
  if (GST_EVENT_TYPE (event) != GST_EVENT_SEEK) {
    return GST_ELEMENT_CLASS (gst_mse_src_parent_class)->send_event (element,
        event);
  }

  GstMseSrc *self = GST_MSE_SRC (element);

  gdouble rate;
  GstFormat format;
  GstSeekType seek_type;
  gint64 start;
  gst_event_parse_seek (event, &rate, &format, NULL, &seek_type, &start, NULL,
      NULL);

  gst_event_unref (event);

  if (format != GST_FORMAT_TIME || seek_type != GST_SEEK_TYPE_SET || rate < 0) {
    GST_ERROR_OBJECT (self,
        "Rejecting unsupported seek event: %" GST_PTR_FORMAT, event);
    return FALSE;
  }

  GST_DEBUG_OBJECT (self, "handling %" GST_PTR_FORMAT, event);
  gst_mse_src_seek (self, start, rate);
  return TRUE;
}

static inline gboolean
is_flushing (GstMseSrcPad * pad)
{
  return g_atomic_int_get (&pad->flushing) || GST_PAD_IS_FLUSHING (pad);
}

static gboolean
await_pad_linked_or_flushing (GstMseSrcPad * pad)
{
  GST_TRACE_OBJECT (pad, "waiting for link");
  LINKED_OR_FLUSHING_LOCK (pad);
  gboolean flushing = FALSE;
  while (!gst_pad_is_linked (GST_PAD_CAST (pad)) && !is_flushing (pad)) {
    LINKED_OR_FLUSHING_WAIT (pad);
  }
  flushing = is_flushing (pad);
  LINKED_OR_FLUSHING_UNLOCK (pad);
  GST_TRACE_OBJECT (pad, "linked");
  return flushing;
}

static void
pad_task (GstMseSrcPad * pad)
{
  GstMseSrc *self = NULL;
  gboolean flushing = await_pad_linked_or_flushing (pad);
  if (flushing) {
    GST_TRACE_OBJECT (pad, "pad is flushing");
    goto pause;
  }

  self = GST_MSE_SRC (gst_pad_get_parent_element (GST_PAD (pad)));

  GstMediaSourceTrack *track = pad->track;

  GstMiniObject *object = gst_media_source_track_pop (track);

  if (object == NULL) {
    GST_DEBUG_OBJECT (pad, "nothing was popped from track, must be flushing");
    gst_media_source_track_flush (track);
    goto pause;
  }

  if (!g_atomic_int_get (&pad->sent_stream_start)) {
    const gchar *track_id = gst_media_source_track_get_id (track);
    GstEvent *event = gst_event_new_stream_start (track_id);
    gst_event_set_group_id (event, self->group_id);
    gst_event_set_stream (event, pad->stream);
    if (!gst_pad_push_event (GST_PAD (pad), event)) {
      GST_ERROR_OBJECT (pad, "failed to push stream start");
      goto pause;
    }
    GST_TRACE_OBJECT (pad, "stream start");
    g_atomic_int_set (&pad->sent_stream_start, TRUE);
  }

  GstCaps *caps = gst_media_source_track_get_initial_caps (track);
  if (!g_atomic_int_get (&pad->sent_initial_caps) && GST_IS_CAPS (caps)) {
    GST_DEBUG_OBJECT (pad, "sending initial caps");
    gst_caps_replace (&pad->most_recent_caps, caps);
    GstEvent *event = gst_event_new_caps (caps);
    if (!gst_pad_push_event (GST_PAD (pad), event)) {
      GST_ERROR_OBJECT (pad, "failed to push caps update");
      goto pause;
    }
    GST_TRACE_OBJECT (pad, "initial caps %" GST_PTR_FORMAT, caps);
    g_atomic_int_set (&pad->sent_initial_caps, TRUE);
  }

  if (g_atomic_int_get (&pad->does_need_segment)) {
    GST_DEBUG_OBJECT (pad, "sending new segment starting@%" GST_TIMEP_FORMAT,
        &pad->segment.time);
    GstEvent *event = gst_event_new_segment (&pad->segment);
    if (!gst_pad_push_event (GST_PAD (pad), event)) {
      GST_ERROR_OBJECT (pad, "failed to push new segment");
      goto pause;
    }
    GST_TRACE_OBJECT (pad, "segment");
    g_atomic_int_set (&pad->does_need_segment, FALSE);
  }

  if (!g_atomic_int_get (&pad->sent_stream_collection)) {
    GstEvent *event = gst_event_new_stream_collection (self->collection);
    if (!gst_pad_push_event (GST_PAD (pad), event)) {
      GST_ERROR_OBJECT (pad, "failed to push stream collection");
      goto pause;
    }
    GST_TRACE_OBJECT (pad, "stream collection");
    g_atomic_int_set (&pad->sent_stream_collection, TRUE);
  }

  if (GST_IS_SAMPLE (object)) {
    GstSample *sample = GST_SAMPLE (object);
    GstCaps *sample_caps = gst_sample_get_caps (sample);

    if (!gst_caps_is_equal (pad->most_recent_caps, sample_caps)) {
      gst_caps_replace (&pad->most_recent_caps, sample_caps);
      GstEvent *event = gst_event_new_caps (gst_caps_ref (sample_caps));
      if (!gst_pad_push_event (GST_PAD (pad), event)) {
        GST_ERROR_OBJECT (pad, "failed to push new caps");
        goto pause;
      }
      GST_TRACE_OBJECT (pad, "new caps %" GST_PTR_FORMAT, sample_caps);
    }

    GstBuffer *buffer = gst_buffer_copy (gst_sample_get_buffer (sample));
    if (GST_BUFFER_DTS_IS_VALID (buffer)) {
      GstClockTime duration =
          GST_BUFFER_DURATION_IS_VALID (buffer) ? GST_BUFFER_DURATION (buffer) :
          1;
      GstClockTime buffer_end = GST_BUFFER_DTS (buffer) + duration;
      pad->segment.position = MAX (pad->segment.position, buffer_end);
    }

    GstFlowReturn push_result = gst_pad_push (GST_PAD (pad), buffer);

    FLOW_COMBINER_LOCK (self);
    GstFlowReturn combined_result =
        gst_flow_combiner_update_pad_flow (self->flow_combiner,
        GST_PAD_CAST (pad), push_result);
    FLOW_COMBINER_UNLOCK (self);

    switch (combined_result) {
      case GST_FLOW_OK:
        break;
      case GST_FLOW_FLUSHING:
        goto pause;
      default:
        GST_ELEMENT_ERROR (self, CORE, PAD, ("failed to push data downstream"),
            ("pad result: %s, combined result %s",
                gst_flow_get_name (push_result),
                gst_flow_get_name (combined_result)));
        goto pause;
    }
  } else if (GST_IS_EVENT (object)) {
    GstEvent *event = GST_EVENT (g_steal_pointer (&object));
    if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
      GST_LOG_OBJECT (self, "EOS");
    }
    if (!gst_pad_push_event (GST_PAD (pad), event)) {
      GST_ERROR_OBJECT (self, "failed to push enqueued event");
      goto pause;
    }
  } else {
    GST_ERROR_OBJECT (self, "unexpected object on track queue"
        ", only samples and events are supported");
    g_assert_not_reached ();
  }

  gst_clear_object (&self);
  return;

pause:
  if (!g_atomic_int_get (&pad->flushing)) {
    gst_pad_pause_task (GST_PAD (pad));
  }
  gst_clear_object (&self);
}

static gboolean
pad_activate_mode (GstMseSrcPad * pad, GstObject * parent, GstPadMode mode,
    gboolean active)
{
  if (mode != GST_PAD_MODE_PUSH) {
    GST_ERROR_OBJECT (parent, "msesrc only supports push mode");
    return FALSE;
  }

  if (active) {
    gst_pad_start_task (GST_PAD (pad), (GstTaskFunction) pad_task, pad, NULL);
  } else {
    set_flushing_and_signal (pad);
    gst_media_source_track_flush (pad->track);
    gst_pad_stop_task (GST_PAD (pad));
    clear_flushing (pad);
  }

  return TRUE;
}

static GstPadLinkReturn
pad_linked (GstMseSrcPad * pad, GstMseSrc * parent, GstPad * sink)
{
  GST_DEBUG_OBJECT (pad, "pad is linked to %" GST_PTR_FORMAT ", resuming task",
      sink);
  LINKED_OR_FLUSHING_LOCK (pad);
  LINKED_OR_FLUSHING_SIGNAL (pad);
  LINKED_OR_FLUSHING_UNLOCK (pad);
  return GST_PAD_LINK_OK;
}

static gboolean
pad_event (GstMseSrcPad * pad, GstMseSrc * parent, GstEvent * event)
{
  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_SEEK:
      return gst_element_send_event (GST_ELEMENT (parent), event);
    default:
      return gst_pad_event_default (GST_PAD (pad), GST_OBJECT (parent), event);
  }
}

static gboolean
pad_query (GstMseSrcPad * pad, GstObject * parent, GstQuery * query)
{
  GstMseSrc *self = GST_MSE_SRC (parent);
  switch (GST_QUERY_TYPE (query)) {
    case GST_QUERY_POSITION:{
      GstClockTime position = pad->segment.position;
      GstFormat fmt;
      gst_query_parse_position (query, &fmt, NULL);
      if (fmt == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (position)) {
        GST_TRACE_OBJECT (pad, "position query returning %" GST_TIMEP_FORMAT,
            &position);
        gst_query_set_position (query, GST_FORMAT_TIME, position);
        return TRUE;
      }
      break;
    }
    case GST_QUERY_DURATION:{
      GstFormat fmt;
      gst_query_parse_duration (query, &fmt, NULL);
      if (fmt == GST_FORMAT_TIME) {
        gst_query_set_duration (query, GST_FORMAT_TIME, self->duration);
        return TRUE;
      } else {
        return FALSE;
      }
    }
    case GST_QUERY_SEEKING:{
      GstFormat fmt;
      gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
      if (fmt != GST_FORMAT_TIME) {
        return FALSE;
      }
      gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0, self->duration);
      return TRUE;
    }
    default:
      break;
  }
  return gst_pad_query_default (GST_PAD (pad), parent, query);
}

static void
append_stream (GstMseSrc * self, GstMediaSourceTrack * track)
{
  if (g_hash_table_contains (self->streams, track)) {
    GST_DEBUG_OBJECT (self, "skipping processed %" GST_PTR_FORMAT, track);
    return;
  }
  GST_DEBUG_OBJECT (self, "creating stream for %" GST_PTR_FORMAT, track);
  guint pad_index = g_hash_table_size (self->streams);
  GstStream *info = create_gst_stream (track);
  Stream stream = {
    .info = gst_object_ref (info),
    .track = gst_object_ref (track),
    .pad = gst_mse_src_pad_new (track, info, pad_index,
        self->start_time, self->rate),
  };
  g_hash_table_insert (self->streams, track,
      g_memdup2 (&stream, sizeof (Stream)));
  gst_stream_collection_add_stream (self->collection, stream.info);
}

void
gst_mse_src_emit_streams (GstMseSrc * self, GstMediaSourceTrack ** tracks,
    gsize n_tracks)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));

  GstElement *element = GST_ELEMENT (self);
  GstObject *object = GST_OBJECT (self);

  update_ready_state_for_init_segment (self);

  STREAMS_LOCK (self);

  for (gsize i = 0; i < n_tracks; i++) {
    GstMediaSourceTrack *track = tracks[i];
    if (!is_streamable (track)) {
      continue;
    }
    append_stream (self, track);
  }

  GstState state;
  gst_element_get_state (element, &state, NULL, 0);
  gboolean active = state > GST_STATE_READY;

  GHashTableIter iter;
  g_hash_table_iter_init (&iter, self->streams);
  for (gpointer value; g_hash_table_iter_next (&iter, NULL, &value);) {
    Stream *stream = value;
    GstPad *pad = stream->pad;
    if (active) {
      gst_pad_set_active (pad, TRUE);
    }
    GstElement *parent = gst_pad_get_parent_element (pad);
    if (parent) {
      GST_DEBUG_OBJECT (self, "skipping parented pad %" GST_PTR_FORMAT, pad);
      gst_object_unref (parent);
      continue;
    }
    gst_element_add_pad (element, pad);
    FLOW_COMBINER_LOCK (self);
    gst_flow_combiner_add_pad (self->flow_combiner, pad);
    FLOW_COMBINER_UNLOCK (self);
  }
  STREAMS_UNLOCK (self);

  gst_element_no_more_pads (element);
  gst_element_post_message (element, gst_message_new_stream_collection (object,
          self->collection));
}

static GstURIType
gst_mse_src_uri_get_type (GType type)
{
  return GST_URI_SRC;
}

static const gchar *const *
gst_mse_src_uri_get_protocols (GType type)
{
  static const gchar *protocols[] = { "mse", NULL };
  return protocols;
}

static gchar *
gst_mse_src_uri_get_uri (GstURIHandler * handler)
{
  GstMseSrc *self = GST_MSE_SRC (handler);
  GST_OBJECT_LOCK (self);
  gchar *uri = g_strdup (self->uri);
  GST_OBJECT_UNLOCK (self);
  return uri;
}

static gboolean
gst_mse_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
    GError ** error)
{
  GstMseSrc *self = GST_MSE_SRC (handler);
  GST_OBJECT_LOCK (self);
  g_free (self->uri);
  self->uri = g_strdup (uri);
  GST_OBJECT_UNLOCK (self);
  return TRUE;
}

static void
gst_mse_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
  GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
  iface->get_type = gst_mse_src_uri_get_type;
  iface->get_protocols = gst_mse_src_uri_get_protocols;
  iface->get_uri = gst_mse_src_uri_get_uri;
  iface->set_uri = gst_mse_src_uri_set_uri;
}

void
gst_mse_src_attach (GstMseSrc * self, GstMediaSource * media_source)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));
  g_return_if_fail (GST_IS_MEDIA_SOURCE (media_source));

  MEDIA_SOURCE_LOCK (self);
  g_set_object (&self->media_source, media_source);
  MEDIA_SOURCE_UNLOCK (self);
}

void
gst_mse_src_detach (GstMseSrc * self)
{
  g_return_if_fail (GST_IS_MSE_SRC (self));

  MEDIA_SOURCE_LOCK (self);
  gst_clear_object (&self->media_source);
  MEDIA_SOURCE_UNLOCK (self);
}

static void
set_ready_state (GstMseSrc * self, GstMseSrcReadyState ready_state)
{
  if (ready_state == self->ready_state) {
    return;
  }
  GST_DEBUG_OBJECT (self, "ready state %s=>%s",
      mse_src_ready_state_name (self->ready_state),
      mse_src_ready_state_name (ready_state));
  self->ready_state = ready_state;
  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_READY_STATE]);
}

static void
update_ready_state_for_init_segment (GstMseSrc * self)
{
  MEDIA_SOURCE_LOCK (self);

  if (self->media_source == NULL) {
    goto done;
  }
  if (self->ready_state != GST_MSE_SRC_READY_STATE_HAVE_NOTHING) {
    goto done;
  }
  GstSourceBufferList *buffers = gst_media_source_get_source_buffers
      (self->media_source);
  gboolean all_received_init_segment = TRUE;
  for (guint i = 0; all_received_init_segment; i++) {
    GstSourceBuffer *buf = gst_source_buffer_list_index (buffers, i);
    if (buf == NULL) {
      break;
    }
    all_received_init_segment &= gst_source_buffer_has_init_segment (buf);
    gst_object_unref (buf);
  }
  if (!all_received_init_segment) {
    return;
  }
  set_ready_state (self, MAX (self->ready_state,
          GST_MSE_SRC_READY_STATE_HAVE_METADATA));

done:
  MEDIA_SOURCE_UNLOCK (self);
}

static gboolean
has_current_data (GstMseSrc * self, GstClockTime position,
    GstClockTime duration)
{
  if (!GST_CLOCK_TIME_IS_VALID (position)) {
    return FALSE;
  }
  GstSourceBufferList *active =
      gst_media_source_get_active_source_buffers (self->media_source);
  gboolean has_data = TRUE;
  for (guint i = 0; has_data; i++) {
    GstSourceBuffer *buf = gst_source_buffer_list_index (active, i);
    if (buf == NULL) {
      if (i == 0) {
        has_data = FALSE;
        GST_DEBUG_OBJECT (self,
            "no active source buffers, nothing is buffered");
      }
      break;
    }
    has_data = gst_source_buffer_is_buffered (buf, position);
    gst_object_unref (buf);
  }
  g_object_unref (active);
  return has_data;
}

static gboolean
has_future_data (GstMseSrc * self, GstClockTime position, GstClockTime duration)
{
  if (!GST_CLOCK_TIME_IS_VALID (position)
      || !GST_CLOCK_TIME_IS_VALID (duration)) {
    return FALSE;
  }
  GstClockTime target_position = MIN (position + THRESHOLD_FUTURE_DATA,
      duration);
  GstSourceBufferList *active =
      gst_media_source_get_active_source_buffers (self->media_source);
  gboolean has_data = TRUE;
  for (guint i = 0; has_data; i++) {
    GstSourceBuffer *buf = gst_source_buffer_list_index (active, i);
    if (buf == NULL) {
      if (i == 0) {
        has_data = FALSE;
        GST_DEBUG_OBJECT (self,
            "no active source buffers, nothing is buffered");
      }
      break;
    }
    has_data = gst_source_buffer_is_range_buffered (buf, position,
        target_position);
    gst_object_unref (buf);
  }
  g_object_unref (active);
  return has_data;
}

static gboolean
has_enough_data (GstMseSrc * self, GstClockTime position, GstClockTime duration)
{
  if (!GST_CLOCK_TIME_IS_VALID (position)
      || !GST_CLOCK_TIME_IS_VALID (duration)) {
    return FALSE;
  }
  GstClockTime target_position = MIN (position + THRESHOLD_ENOUGH_DATA,
      duration);
  GstSourceBufferList *active =
      gst_media_source_get_active_source_buffers (self->media_source);
  gboolean has_data = TRUE;
  for (guint i = 0; has_data; i++) {
    GstSourceBuffer *buf = gst_source_buffer_list_index (active, i);
    if (buf == NULL) {
      if (i == 0) {
        has_data = FALSE;
        GST_DEBUG_OBJECT (self,
            "no active source buffers, nothing is buffered");
      }
      break;
    }
    has_data = gst_source_buffer_is_range_buffered (buf, position,
        target_position);
    gst_object_unref (buf);
  }
  g_object_unref (active);
  return has_data;
}

static void
update_ready_state (GstMseSrc * self)
{
  MEDIA_SOURCE_LOCK (self);

  if (self->media_source == NULL) {
    goto done;
  }

  if (self->ready_state < GST_MSE_SRC_READY_STATE_HAVE_METADATA) {
    goto done;
  }

  GstClockTime position = gst_mse_src_get_position (self);
  GstClockTime duration = self->duration;

  if (has_enough_data (self, position, duration)) {
    set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_ENOUGH_DATA);
  } else if (has_future_data (self, position, duration)) {
    set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_FUTURE_DATA);
  } else if (has_current_data (self, position, duration)) {
    set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_CURRENT_DATA);
  } else {
    set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_METADATA);
  }

done:
  MEDIA_SOURCE_UNLOCK (self);
}
