# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import argparse
import io
import os
import platform
import sys

from mach.decorators import Command
from mozbuild.base import BuildEnvironmentNotFoundException, MozbuildObject

here = os.path.abspath(os.path.dirname(__file__))

GVE = "org.mozilla.geckoview_example"


def get(url):
    import requests

    resp = requests.get(url)
    resp.raise_for_status()
    return resp


def untar(fileobj, dest):
    import tarfile

    with tarfile.open(fileobj=fileobj, mode="r") as tar_data:
        tar_data.extractall(path=dest)


def unzip(fileobj, dest):
    import zipfile

    with zipfile.ZipFile(fileobj) as zip_data:
        zip_data.extractall(path=dest)


def writable_dir(path):
    if not os.path.isdir(path):
        raise argparse.ArgumentTypeError(f"{path} is not a valid dir")
    if os.access(path, os.W_OK):
        return path
    else:
        raise argparse.ArgumentTypeError(f"{path} is not a writable dir")


def create_parser_interventions():
    from mozlog import commandline

    parser = argparse.ArgumentParser()
    parser.add_argument("--webdriver-binary", help="Path to webdriver binary")
    parser.add_argument(
        "--webdriver-port",
        action="store",
        default="4444",
        help="Port on which to run WebDriver",
    )
    parser.add_argument(
        "--webdriver-ws-port",
        action="store",
        default="9222",
        help="Port on which to run WebDriver BiDi websocket",
    )
    parser.add_argument("-b", "--bugs", nargs="*", help="Bugs to run tests for")
    parser.add_argument(
        "--do2fa",
        action="store_true",
        default=False,
        help="Do two-factor auth live in supporting tests",
    )
    parser.add_argument(
        "--config", help="Path to JSON file containing logins and other settings"
    )
    parser.add_argument(
        "--debug", action="store_true", default=False, help="Debug failing tests"
    )
    parser.add_argument(
        "-H",
        "--headless",
        action="store_true",
        default=False,
        help="Run firefox in headless mode",
    )
    parser.add_argument(
        "--interventions",
        action="store",
        default="both",
        choices=["enabled", "disabled", "both", "none"],
        help="Enable webcompat interventions",
    )
    parser.add_argument(
        "--shims",
        action="store",
        default="none",
        choices=["enabled", "disabled", "both", "none"],
        help="Enable SmartBlock shims",
    )
    parser.add_argument(
        "--platform",
        action="store",
        choices=["android", "desktop"],
        help="Platform to target",
    )
    parser.add_argument(
        "--failure-screenshots-dir",
        action="store",
        type=writable_dir,
        help="Path to save failure screenshots",
    )
    parser.add_argument(
        "-s",
        "--no-failure-screenshots",
        action="store_true",
        default=False,
        help="Do not save a screenshot for each test failure",
    )
    parser.add_argument(
        "-P",
        "--platform-override",
        action="store",
        choices=["android", "linux", "mac", "windows"],
        help="Override key navigator properties to match the given platform and/or use responsive design mode to mimic the given platform",
    )

    desktop_group = parser.add_argument_group("Desktop-specific arguments")
    desktop_group.add_argument("--binary", help="Path to browser binary")

    android_group = parser.add_argument_group("Android-specific arguments")
    android_group.add_argument(
        "--device-serial",
        action="store",
        help="Running Android instances to connect to, if not emulator-5554",
    )
    android_group.add_argument(
        "--package-name",
        action="store",
        default=GVE,
        help="Android package name to use",
    )

    commandline.add_logging_group(parser)
    return parser


class InterventionTest(MozbuildObject):
    def set_default_kwargs(self, logger, command_context, kwargs):
        platform = kwargs["platform"]
        binary = kwargs["binary"]
        device_serial = kwargs["device_serial"]
        try:
            is_gve_build = command_context.substs.get("MOZ_APP_NAME") == "fennec"
        except BuildEnvironmentNotFoundException:
            # If we don't have a build, just use the logic below to choose between
            # desktop and Android
            is_gve_build = False

        if platform == "android" or (
            platform is None and binary is None and (device_serial or is_gve_build)
        ):
            kwargs["platform"] = "android"
        else:
            kwargs["platform"] = "desktop"

        if kwargs["platform"] == "desktop" and kwargs["binary"] is None:
            kwargs["binary"] = self.get_binary_path()

        if kwargs["webdriver_binary"] is None:
            webdriver_binary = self.get_binary_path(
                "geckodriver", validate_exists=False
            )

            if not os.path.exists(webdriver_binary):
                webdriver_binary = self.install_geckodriver(
                    logger, dest=os.path.dirname(webdriver_binary)
                )

            if not os.path.exists(webdriver_binary):
                logger.error("Can't find geckodriver")
                sys.exit(1)
            kwargs["webdriver_binary"] = webdriver_binary

    def platform_string_geckodriver(self):
        uname = platform.uname()
        platform_name = {"Linux": "linux", "Windows": "win", "Darwin": "macos"}.get(
            uname[0]
        )

        if platform_name in ("linux", "win"):
            bits = "64" if uname[4] == "x86_64" else "32"
        elif platform_name == "macos":
            bits = ""
        else:
            raise ValueError(f"No precompiled geckodriver for platform {uname}")

        return f"{platform_name}{bits}"

    def install_geckodriver(self, logger, dest):
        """Install latest Geckodriver."""
        if dest is None:
            dest = os.path.join(self.distdir, "dist", "bin")

        is_windows = platform.uname()[0] == "Windows"

        release = get(
            "https://api.github.com/repos/mozilla/geckodriver/releases/latest"
        ).json()
        ext = "zip" if is_windows else "tar.gz"
        platform_name = self.platform_string_geckodriver()
        name_suffix = f"-{platform_name}.{ext}"
        for item in release["assets"]:
            if item["name"].endswith(name_suffix):
                url = item["browser_download_url"]
                break
        else:
            raise ValueError(f"Failed to find geckodriver for platform {platform_name}")

        logger.info(f"Installing geckodriver from {url}")

        data = io.BytesIO(get(url).content)
        data.seek(0)
        decompress = unzip if ext == "zip" else untar
        decompress(data, dest=dest)

        exe_ext = ".exe" if is_windows else ""
        path = os.path.join(dest, f"geckodriver{exe_ext}")

        return path

    def setup_device(self, command_context, kwargs):
        if kwargs["platform"] != "android":
            return

        app = kwargs["package_name"]
        device_serial = kwargs["device_serial"]

        if not device_serial:
            from mozrunner.devices.android_device import (
                InstallIntent,
                verify_android_device,
            )

            # verify_android_device sets up device/emulator and records selected
            # one to DEVICE_SERIAL environment.
            verify_android_device(
                command_context, app=app, network=True, install=InstallIntent.YES
            )
            kwargs["device_serial"] = os.environ.get("DEVICE_SERIAL")

        # GVE does not have the webcompat addon by default. Add it.
        if app == GVE:
            kwargs["addon"] = "/data/local/tmp/webcompat.xpi"
            push_to_device(
                command_context.substs["ADB"],
                device_serial,
                webcompat_addon(command_context),
                kwargs["addon"],
            )

    def run(self, command_context, **kwargs):
        import mozlog
        import runner

        mozlog.commandline.setup_logging(
            "test-interventions", kwargs, {"mach": sys.stdout}
        )
        logger = mozlog.get_default_logger("test-interventions")

        log_level = "INFO"
        # It's not trivial to get a single log level out of mozlog, because we might have
        # different levels going to different outputs. We look for the maximum (i.e. most
        # verbose) level of any handler with an attached formatter.
        configured_level_number = None
        for handler in logger.handlers:
            if hasattr(handler, "formatter") and hasattr(handler.formatter, "level"):
                formatter_level = handler.formatter.level
                configured_level_number = (
                    formatter_level
                    if configured_level_number is None
                    else max(configured_level_number, formatter_level)
                )
        if configured_level_number is not None:
            for level, number in mozlog.structuredlog.log_levels.items():
                if number == configured_level_number:
                    log_level = level
                    break

        status_handler = mozlog.handlers.StatusHandler()
        logger.add_handler(status_handler)

        self.set_default_kwargs(logger, command_context, kwargs)

        self.setup_device(command_context, kwargs)

        if kwargs["interventions"] != "none":
            interventions = (
                ["enabled", "disabled"]
                if kwargs["interventions"] == "both"
                else [kwargs["interventions"]]
            )

            for interventions_setting in interventions:
                runner.run(
                    logger,
                    os.path.join(here, "interventions"),
                    kwargs["webdriver_binary"],
                    kwargs["webdriver_port"],
                    kwargs["webdriver_ws_port"],
                    browser_binary=kwargs.get("binary"),
                    device_serial=kwargs.get("device_serial"),
                    package_name=kwargs.get("package_name"),
                    addon=kwargs.get("addon"),
                    bugs=kwargs["bugs"],
                    debug=kwargs["debug"],
                    interventions=interventions_setting,
                    config=kwargs["config"],
                    headless=kwargs["headless"],
                    do2fa=kwargs["do2fa"],
                    log_level=log_level,
                    failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
                    no_failure_screenshots=kwargs.get("no_failure_screenshots"),
                    platform_override=kwargs.get("platform_override"),
                )

        if kwargs["shims"] != "none":
            shims = (
                ["enabled", "disabled"]
                if kwargs["shims"] == "both"
                else [kwargs["shims"]]
            )

            for shims_setting in shims:
                runner.run(
                    logger,
                    os.path.join(here, "shims"),
                    kwargs["webdriver_binary"],
                    kwargs["webdriver_port"],
                    kwargs["webdriver_ws_port"],
                    browser_binary=kwargs.get("binary"),
                    device_serial=kwargs.get("device_serial"),
                    package_name=kwargs.get("package_name"),
                    addon=kwargs.get("addon"),
                    bugs=kwargs["bugs"],
                    debug=kwargs["debug"],
                    shims=shims_setting,
                    config=kwargs["config"],
                    headless=kwargs["headless"],
                    do2fa=kwargs["do2fa"],
                    failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
                    no_failure_screenshots=kwargs.get("no_failure_screenshots"),
                    platform_override=kwargs.get("platform_override"),
                )

        summary = status_handler.summarize()
        passed = (
            summary.unexpected_statuses == 0
            and summary.log_level_counts.get("ERROR", 0) == 0
            and summary.log_level_counts.get("CRITICAL", 0) == 0
        )
        return passed


def webcompat_addon(command_context):
    import shutil
    import tempfile

    src = os.path.join(command_context.topsrcdir, "browser", "extensions", "webcompat")

    # We use #include directives in the system addon's moz.build (to inject our JSON config
    # into interventions.js), so we must do that here to make a working XPI.
    tmpdir_kwargs = {}
    if sys.version_info.major >= 3 and sys.version_info.minor >= 10:
        tmpdir_kwargs["ignore_cleanup_errors"] = True
    with tempfile.TemporaryDirectory(**tmpdir_kwargs) as src_copy:

        def process_includes(path):
            fullpath = os.path.join(src_copy, path)
            in_lines = None
            with open(fullpath) as f:
                in_lines = f.readlines()
            with open(fullpath, "w") as f:
                for line in in_lines:
                    if not line.startswith("#include"):
                        f.write(line)
                        continue
                    include_path = line.split()[1]
                    include_fullpath = os.path.join(
                        os.path.dirname(fullpath), include_path
                    )
                    with open(include_fullpath) as inc:
                        f.write(inc.read())
                    f.write("\n")

        shutil.copytree(src, src_copy, dirs_exist_ok=True)
        process_includes("run.js")

        dst = os.path.join(
            command_context.virtualenv_manager.virtualenv_root, "webcompat.xpi"
        )
        shutil.make_archive(dst, "zip", src_copy)
        shutil.move(f"{dst}.zip", dst)
        return dst


def push_to_device(adb_path, device_serial, local_path, remote_path):
    from mozdevice import ADBDeviceFactory

    device = ADBDeviceFactory(adb=adb_path, device=device_serial)
    device.push(local_path, remote_path)
    device.chmod(remote_path)


@Command(
    "test-interventions",
    category="testing",
    description="Test the webcompat interventions",
    parser=create_parser_interventions,
    virtualenv_name="webcompat",
)
def test_interventions(command_context, **params):
    here = os.path.abspath(os.path.dirname(__file__))
    command_context.virtualenv_manager.activate()
    command_context.virtualenv_manager.install_pip_requirements(
        os.path.join(here, "requirements.txt"),
        require_hashes=False,
    )

    intervention_test = command_context._spawn(InterventionTest)
    return 0 if intervention_test.run(command_context, **params) else 1
