#!/usr/bin/env python3
#
# Copyright (c) 2013, Thibault Saunier <thibault.saunier@collabora.com>
# Copyright (c) 2020, Igalia S.L
#    Author: Thibault Saunier <tsaunier@igalia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.

import os
import sys
import tempfile
import urllib.parse
import subprocess
from launcher import utils
from urllib.parse import unquote
import xml.etree.ElementTree as ET
from launcher.baseclasses import GstValidateTest, TestsManager, ScenarioManager, MediaFormatCombination, \
    MediaDescriptor, GstValidateEncodingTestInterface

GES_DURATION_TOLERANCE = utils.GST_SECOND / 2

GES_LAUNCH_COMMAND = "ges-launch-1.0"
if "win32" in sys.platform:
    GES_LAUNCH_COMMAND += ".exe"


GES_ENCODING_TARGET_COMBINATIONS = [
    MediaFormatCombination("ogg", "vorbis", "theora"),
    MediaFormatCombination("ogg", "opus", "theora"),
    MediaFormatCombination("webm", "vorbis", "vp8"),
    MediaFormatCombination("webm", "opus", "vp8"),
    MediaFormatCombination("mp4", "aac", "h264"),
    MediaFormatCombination("mp4", "ac3", "h264"),
    MediaFormatCombination("quicktime", "aac", "jpeg"),
    MediaFormatCombination("mkv", "opus", "h264"),
    MediaFormatCombination("mkv", "vorbis", "h264"),
    MediaFormatCombination("mkv", "opus", "jpeg"),
    MediaFormatCombination("mkv", "vorbis", "jpeg")
]


def quote_uri(uri):
    """
    Encode a URI/path according to RFC 2396, without touching the file:/// part.
    """
    # Split off the "file:///" part, if present.
    parts = urllib.parse.urlsplit(uri, allow_fragments=False)
    # Make absolutely sure the string is unquoted before quoting again!
    raw_path = unquote(parts.path)
    return utils.path2url(raw_path)


class XgesProjectDescriptor(MediaDescriptor):
    def __init__(self, uri):
        super(XgesProjectDescriptor, self).__init__()

        self._uri = uri
        self._xml_path = utils.url2path(uri)
        self._root = ET.parse(self._xml_path)
        self._duration = None

    def get_media_filepath(self):
        return self._xml_path

    def get_path(self):
        return self._xml_path

    def get_caps(self):
        raise NotImplemented

    def get_uri(self):
        return self._uri

    def get_duration(self):
        if self._duration:
            return self._duration

        for l in self._root.iter():
            if l.tag == "timeline":
                self._duration=int(l.attrib['metadatas'].split("duration=(guint64)")[1].split(" ")[0].split(";")[0])
                break

        if not self._duration:
            self.error("%s does not have duration! (setting 2mins)" % self._uri)
            self._duration = 2 * 60

        return self._duration

    def get_protocol(self):
        return Protocols.FILE

    def is_seekable(self):
        return True

    def is_image(self):
        return False

    def get_num_tracks(self, track_type):
        num_tracks = 0
        for l in self._root.iter():
            if l.tag == "track":
                if track_type in l.attrib["caps"]:
                    num_tracks += 1
        return num_tracks


class GESTest(GstValidateTest):
    def __init__(self, classname, options, reporter, project, scenario=None,
                 combination=None, expected_failures=None, nest=False, testfile=None):

        super(GESTest, self).__init__(GES_LAUNCH_COMMAND, classname, options, reporter,
                                      scenario=scenario)

        self.project = project
        self.nested = nest
        self.testfile = testfile

    def set_sample_paths(self):
        if self.testfile:
            # testfile should be self contained
            return

        if not self.options.paths:
            if self.options.disable_recurse:
                return
            if self.project:
                paths = [os.path.dirname(self.project.get_media_filepath())]
            else:
                paths = []
        else:
            paths = self.options.paths

        if not isinstance(paths, list):
            paths = [paths]

        for path in paths:
            # We always want paths separator to be cut with '/' for ges-launch
            path = path.replace("\\", "/")
            if not self.options.disable_recurse:
                self.add_arguments("--ges-sample-path-recurse", quote_uri(path))
                self.add_arguments("--ges-sample-path-recurse", quote_uri(self.options.projects_paths))
            else:
                self.add_arguments("--ges-sample-paths", quote_uri(path))

    def set_sink_args(self):
        if self.testfile:
            # testfile should be self contained and --mute should give required infos.
            if self.options.mute:
                self.add_arguments("--mute")
            return

        if self.options.mute:
            needs_clock = self.scenario.needs_clock_sync() \
                if self.scenario else False
            audiosink = utils.get_fakesink_for_media_type("audio", needs_clock)
            videosink = utils.get_fakesink_for_media_type("video", needs_clock)
        else:
            audiosink = 'autoaudiosink'
            videosink = 'autovideosink'
        self.add_arguments("--videosink", videosink + " name=videosink")
        self.add_arguments("--audiosink", audiosink + " name=audiosink")

    def build_arguments(self):
        GstValidateTest.build_arguments(self)

        self.set_sink_args()
        self.set_sample_paths()

        if self.project:
            assert self.testfile is None
            if self.nested:
                self.add_arguments("+clip", self.project.get_uri())
            else:
                self.add_arguments("-l", self.project.get_uri())
        elif self.testfile:
            self.add_arguments("--set-test-file", self.testfile)

class GESPlaybackTest(GESTest):
    def __init__(self, classname, options, reporter, project, scenario,nest):
        super(GESPlaybackTest, self).__init__(classname, options, reporter,
                                      project, scenario=scenario, nest=nest)

    def get_current_value(self):
        return self.get_current_position()

class GESScenarioTest(GESTest):
    def __init__(self, classname, options, reporter, scenario):
        super().__init__(classname, options, reporter, None, scenario=scenario)

    def build_arguments(self):
        super().build_arguments()
        self.add_arguments("--set-scenario", self.scenario.path)

    def get_subproc_env(self):
        scenario = self.scenario
        self.scenario = None
        res = super().get_subproc_env()
        self.scenario = scenario

        return res

    def get_current_value(self):
        return self.get_current_position()


class GESRenderTest(GESTest, GstValidateEncodingTestInterface):
    def __init__(self, classname, options, reporter, project, combination):
        GESTest.__init__(self, classname, options, reporter, project)

        GstValidateEncodingTestInterface.__init__(self, combination, self.project)

    def build_arguments(self):
        GESTest.build_arguments(self)
        self._set_rendering_info()

    def run_external_checks(self):
        reference_file_path = urllib.parse.urlsplit(self.media_descriptor.get_uri()).path + ".expected_result"
        if os.path.exists(reference_file_path):
            self.run_iqa_test(utils.path2url(reference_file_path))

    def _set_rendering_info(self):
        self.dest_file = path = os.path.join(self.options.dest,
                                             self.classname.replace(".render.", os.sep).
                                             replace(".", os.sep))
        utils.mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
        if not utils.isuri(self.dest_file):
            self.dest_file = utils.path2url(self.dest_file)

        profile = self.get_profile()
        self.add_arguments("-f", profile, "-o", self.dest_file)

    def check_results(self):
        self.check_encoded_file()
        return GstValidateTest.check_results(self)

    def get_current_value(self):
        size = self.get_current_size()
        if size is None:
            return self.get_current_position()

        return size


class GESTestsManager(TestsManager):
    name = "ges"

    _scenarios = ScenarioManager()

    def __init__(self):
        super(GESTestsManager, self).__init__()

    def init(self):
        try:
            with tempfile.NamedTemporaryFile() as f:
                if "--set-scenario=" in subprocess.check_output([GES_LAUNCH_COMMAND, "--help"], stderr=f).decode():
                    return True
                else:
                    self.warning("Can not use ges-launch, it seems not to be compiled against"
                                " gst-validate")
        except subprocess.CalledProcessError as e:
            self.warning("Can not use ges-launch: %s" % e)
        except OSError as e:
            self.warning("Can not use ges-launch: %s" % e)

    def add_options(self, parser):
        group = parser.add_argument_group("GStreamer Editing Services specific option"
                            " and behaviours",
                            description="""
The GStreamer Editing Services launcher will be usable only if GES has been compiled against GstValidate
You can simply run scenarios specifying project as args. For example the following will run all available
and activated scenarios on project.xges:

    $gst-validate-launcher ges /some/ges/project.xges


Available options:""")
        group.add_argument("-P", "--projects-paths", dest="projects_paths",
                         default=os.path.join(utils.DEFAULT_GST_QA_ASSETS,
                                              "ges",
                                              "ges-projects"),
                         help="Paths in which to look for moved medias")
        group.add_argument("--ges-scenario-paths", dest="scenarios_path",
                         default=None,
                         help="Paths in which to look for moved medias")
        group.add_argument("-r", "--disable-recurse-paths", dest="disable_recurse",
                         default=False, action="store_true",
                         help="Whether to recurse into paths to find medias")

    def set_settings(self, options, args, reporter):
        TestsManager.set_settings(self, options, args, reporter)
        self._scenarios.config = self.options

        try:
            os.makedirs(utils.url2path(options.dest)[0])
        except OSError:
            pass

    def list_tests(self):
        return self.tests

    def register_defaults(self, project_paths=None, scenarios_path=None):
        projects = list()
        all_scenarios = {}
        if not self.args:
            if project_paths == None:
                path = self.options.projects_paths
            else:
                path = project_paths

            for root, dirs, files in os.walk(path):
                for f in files:
                    if not f.endswith(".xges"):
                        continue
                    projects.append(utils.path2url(os.path.join(path, root, f)))

            if self.options.scenarios_path:
                scenarios_path = self.options.scenarios_path

            if scenarios_path:
                for root, dirs, files in os.walk(scenarios_path):
                    for f in files:
                        name, ext = os.path.splitext(f)
                        f = os.path.join(root, f)
                        if ext == ".validatetest":
                            fpath = os.path.abspath(os.path.join(root, f))
                            pathname = os.path.abspath(os.path.join(root, name))
                            name = pathname.replace(os.path.commonpath([scenarios_path, root]), '').replace('/', '.')
                            self.add_test(GESTest('test' + name,
                                                  self.options,
                                                  self.reporter,
                                                  None,
                                                  testfile=fpath))
                            continue
                        elif ext != ".scenario":
                            continue
                        config = f + ".config"
                        if not os.path.exists(config):
                            config = None
                        all_scenarios[f] = config
        else:
            for proj_uri in self.args:
                if not utils.isuri(proj_uri):
                    proj_uri = utils.path2url(proj_uri)

                if os.path.exists(proj_uri):
                    projects.append(proj_uri)

        if self.options.long_limit != 0:
            scenarios = ["none",
                         "scrub_forward_seeking",
                         "scrub_backward_seeking"]
        else:
            scenarios = ["play_15s",
                         "scrub_forward_seeking_full",
                         "scrub_backward_seeking_full"]
        for proj_uri in projects:
            # First playback casses
            project = XgesProjectDescriptor(proj_uri)
            for scenario_name in scenarios:
                scenario = self._scenarios.get_scenario(scenario_name)
                if scenario is None:
                    continue

                if scenario.get_min_media_duration() >= (project.get_duration() / utils.GST_SECOND):
                    continue

                classname = "playback.%s.%s" % (scenario.name,
                                                    os.path.basename(proj_uri).replace(".xges", ""))
                self.add_test(GESPlaybackTest(classname,
                                              self.options,
                                              self.reporter,
                                              project,
                                              scenario=scenario,
                                              nest=False))
                #For nested timelines
                classname = "playback.nested.%s.%s" % (scenario.name,
                                                    os.path.basename(proj_uri).replace(".xges", ""))
                self.add_test(GESPlaybackTest(classname,
                                              self.options,
                                              self.reporter,
                                              project,
                                              scenario=scenario,
                                              nest=True))

            # And now rendering casses
            for comb in GES_ENCODING_TARGET_COMBINATIONS:
                classname = "render.%s.%s" % (str(comb).replace(' ', '_'),
                                                  os.path.splitext(os.path.basename(proj_uri))[0])
                self.add_test(GESRenderTest(classname, self.options,
                                            self.reporter, project,
                                            combination=comb)
                                  )
        if all_scenarios:
            for scenario in self._scenarios.discover_scenarios(list(all_scenarios.keys())):
                config = all_scenarios[scenario.path]
                classname = "scenario.%s" % scenario.name
                test = GESScenarioTest(classname, self.options, self.reporter, scenario=scenario)
                if config:
                    test.add_validate_config(config)
                self.add_test(test)