from time import time
from re import search as re_search
from asyncio import sleep as asyncio_sleep, Lock
from contextlib import contextmanager, asynccontextmanager

from playwright.sync_api._generated import Page
from playwright.sync_api import (
    Frame,
    BrowserContext,
    Response as SyncPlaywrightResponse,
)
from playwright.async_api._generated import Page as AsyncPage
from playwright.async_api import (
    Frame as AsyncFrame,
    Response as AsyncPlaywrightResponse,
    BrowserContext as AsyncBrowserContext,
)
from playwright._impl._errors import Error as PlaywrightError

from scrapling.parser import Selector
from scrapling.engines._browsers._page import PageInfo, PagePool
from scrapling.engines._browsers._validators import validate, PlaywrightConfig, StealthConfig
from scrapling.engines._browsers._config_tools import __default_chrome_useragent__, __default_useragent__
from scrapling.engines.toolbelt.navigation import (
    construct_proxy_dict,
    create_intercept_handler,
    create_async_intercept_handler,
)
from scrapling.core._types import (
    Any,
    Awaitable,
    Dict,
    List,
    Set,
    Optional,
    Callable,
    TYPE_CHECKING,
    cast,
    overload,
    Tuple,
    ProxyType,
    Generator,
    AsyncGenerator,
)
from scrapling.engines.constants import STEALTH_ARGS, HARMFUL_ARGS, DEFAULT_ARGS


class SyncSession:
    _config: "PlaywrightConfig | StealthConfig"
    _context_options: Dict[str, Any]
    if TYPE_CHECKING:
        _build_context_with_proxy: Callable[..., Dict[str, Any]]

    def __init__(self, max_pages: int = 1):
        self.max_pages = max_pages
        self.page_pool = PagePool(max_pages)
        self._max_wait_for_page = 60
        self.playwright: Any = None
        self.context: Any = None
        self.browser: Any = None
        self._is_alive = False

    def start(self) -> None:
        pass

    def close(self):  # pragma: no cover
        """Close all resources"""
        if not self._is_alive:
            return

        if self.context:
            self.context.close()
            self.context = None

        if self.browser:
            self.browser.close()
            self.browser = None

        if self.playwright:
            self.playwright.stop()
            self.playwright = None  # pyright: ignore

        self._is_alive = False

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _initialize_context(self, config: PlaywrightConfig | StealthConfig, ctx: BrowserContext) -> BrowserContext:
        """Initialize the browser context."""
        if config.init_script:
            ctx.add_init_script(path=config.init_script)

        if config.cookies:  # pragma: no cover
            ctx.add_cookies(config.cookies)

        return ctx

    def _get_page(
        self,
        timeout: int | float,
        extra_headers: Optional[Dict[str, str]],
        disable_resources: bool,
        blocked_domains: Optional[Set[str]] = None,
        context: Optional[BrowserContext] = None,
    ) -> PageInfo[Page]:  # pragma: no cover
        """Get a new page to use"""
        # No need to check if a page is available or not in sync code because the code blocked before reaching here till the page closed, ofc.
        ctx = context if context is not None else self.context
        assert ctx is not None, "Browser context not initialized"
        page = ctx.new_page()
        page.set_default_navigation_timeout(timeout)
        page.set_default_timeout(timeout)
        if extra_headers:
            page.set_extra_http_headers(extra_headers)

        if disable_resources or blocked_domains:
            page.route("**/*", create_intercept_handler(disable_resources, blocked_domains))
        page_info = self.page_pool.add_page(page)
        page_info.mark_busy()
        return page_info

    def get_pool_stats(self) -> Dict[str, int]:
        """Get statistics about the current page pool"""
        return {
            "total_pages": self.page_pool.pages_count,
            "busy_pages": self.page_pool.busy_count,
            "max_pages": self.max_pages,
        }

    @staticmethod
    def _wait_for_networkidle(page: Page | Frame, timeout: Optional[int] = None):
        """Wait for the page to become idle (no network activity) even if there are never-ending requests."""
        try:
            page.wait_for_load_state("networkidle", timeout=timeout)
        except (PlaywrightError, Exception):
            pass

    def _wait_for_page_stability(self, page: Page | Frame, load_dom: bool, network_idle: bool):
        page.wait_for_load_state(state="load")
        if load_dom:
            page.wait_for_load_state(state="domcontentloaded")
        if network_idle:
            self._wait_for_networkidle(page)

    @staticmethod
    def _create_response_handler(
        page_info: PageInfo[Page],
        response_container: List,
        xhr_pattern: Optional[str] = None,
        xhr_container: Optional[List] = None,
    ) -> Callable[[SyncPlaywrightResponse], None]:
        """Create a response handler that captures the final navigation response and optionally XHR/fetch responses.

        :param page_info: The PageInfo object containing the page
        :param response_container: A list to store the final response (mutable container)
        :param xhr_pattern: Optional regex pattern to match XHR/fetch response URLs
        :param xhr_container: Optional list to store captured XHR/fetch responses
        :return: A callback function for page.on("response", ...)
        """

        def handle_response(finished_response: SyncPlaywrightResponse) -> None:
            if (
                finished_response.request.resource_type == "document"
                and finished_response.request.is_navigation_request()
                and finished_response.request.frame == page_info.page.main_frame
            ):
                response_container[0] = finished_response
            elif (
                xhr_pattern
                and xhr_container is not None
                and finished_response.request.resource_type in ("xhr", "fetch")
                and re_search(xhr_pattern, finished_response.url)
            ):
                xhr_container.append(finished_response)

        return handle_response

    @contextmanager
    def _page_generator(
        self,
        timeout: int | float,
        extra_headers: Optional[Dict[str, str]],
        disable_resources: bool,
        proxy: Optional[ProxyType] = None,
        blocked_domains: Optional[Set[str]] = None,
    ) -> Generator["PageInfo[Page]", None, None]:
        """Acquire a page - either from persistent context or fresh context with proxy."""
        if proxy:
            # Rotation mode: create fresh context with the provided proxy
            if not self.browser:  # pragma: no cover
                raise RuntimeError("Browser not initialized for proxy rotation mode")
            context_options = self._build_context_with_proxy(proxy)
            context: BrowserContext = self.browser.new_context(**context_options)

            page_info = None
            try:
                context = self._initialize_context(self._config, context)
                page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains, context=context)
                yield page_info
            finally:
                if page_info is not None and page_info in self.page_pool.pages:
                    self.page_pool.pages.remove(page_info)
                context.close()
        else:
            # Standard mode: use PagePool with persistent context
            page_info = self._get_page(timeout, extra_headers, disable_resources, blocked_domains)
            try:
                yield page_info
            finally:
                page_info.page.close()
                self.page_pool.pages.remove(page_info)


class AsyncSession:
    _config: "PlaywrightConfig | StealthConfig"
    _context_options: Dict[str, Any]
    if TYPE_CHECKING:
        _build_context_with_proxy: Callable[..., Dict[str, Any]]

    def __init__(self, max_pages: int = 1):
        self.max_pages = max_pages
        self.page_pool = PagePool(max_pages)
        self._max_wait_for_page = 60
        self.playwright: Any = None
        self.context: Any = None
        self.browser: Any = None
        self._is_alive = False
        self._lock = Lock()

    async def start(self) -> None:
        pass

    async def close(self):
        """Close all resources"""
        if not self._is_alive:  # pragma: no cover
            return

        if self.context:
            await self.context.close()
            self.context = None  # pyright: ignore

        if self.browser:
            await self.browser.close()
            self.browser = None

        if self.playwright:
            await self.playwright.stop()
            self.playwright = None  # pyright: ignore

        self._is_alive = False

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    async def _initialize_context(
        self, config: PlaywrightConfig | StealthConfig, ctx: AsyncBrowserContext
    ) -> AsyncBrowserContext:
        """Initialize the browser context."""
        if config.init_script:  # pragma: no cover
            await ctx.add_init_script(path=config.init_script)

        if config.cookies:  # pragma: no cover
            await ctx.add_cookies(config.cookies)

        return ctx

    async def _get_page(
        self,
        timeout: int | float,
        extra_headers: Optional[Dict[str, str]],
        disable_resources: bool,
        blocked_domains: Optional[Set[str]] = None,
        context: Optional[AsyncBrowserContext] = None,
    ) -> PageInfo[AsyncPage]:  # pragma: no cover
        """Get a new page to use"""
        ctx = context if context is not None else self.context
        if TYPE_CHECKING:
            assert ctx is not None, "Browser context not initialized"

        async with self._lock:
            # If we're at max capacity after cleanup, wait for busy pages to finish
            if context is None and self.page_pool.pages_count >= self.max_pages:
                # Only applies when using persistent context
                start_time = time()
                while time() - start_time < self._max_wait_for_page:
                    await asyncio_sleep(0.05)
                    if self.page_pool.pages_count < self.max_pages:
                        break
                else:
                    raise TimeoutError(
                        f"No pages finished to clear place in the pool within the {self._max_wait_for_page}s timeout period"
                    )

            page = await ctx.new_page()
            page.set_default_navigation_timeout(timeout)
            page.set_default_timeout(timeout)
            if extra_headers:
                await page.set_extra_http_headers(extra_headers)

            if disable_resources or blocked_domains:
                await page.route("**/*", create_async_intercept_handler(disable_resources, blocked_domains))

            return self.page_pool.add_page(page)

    def get_pool_stats(self) -> Dict[str, int]:
        """Get statistics about the current page pool"""
        return {
            "total_pages": self.page_pool.pages_count,
            "busy_pages": self.page_pool.busy_count,
            "max_pages": self.max_pages,
        }

    @staticmethod
    async def _wait_for_networkidle(page: AsyncPage | AsyncFrame, timeout: Optional[int] = None):
        """Wait for the page to become idle (no network activity) even if there are never-ending requests."""
        try:
            await page.wait_for_load_state("networkidle", timeout=timeout)
        except (PlaywrightError, Exception):
            pass

    async def _wait_for_page_stability(self, page: AsyncPage | AsyncFrame, load_dom: bool, network_idle: bool):
        await page.wait_for_load_state(state="load")
        if load_dom:
            await page.wait_for_load_state(state="domcontentloaded")
        if network_idle:
            await self._wait_for_networkidle(page)

    @staticmethod
    def _create_response_handler(
        page_info: PageInfo[AsyncPage],
        response_container: List,
        xhr_pattern: Optional[str] = None,
        xhr_container: Optional[List] = None,
    ) -> Callable[[AsyncPlaywrightResponse], Awaitable[None]]:
        """Create an async response handler that captures the final navigation response and optionally XHR/fetch responses.

        :param page_info: The PageInfo object containing the page
        :param response_container: A list to store the final response (mutable container)
        :param xhr_pattern: Optional regex pattern to match XHR/fetch response URLs
        :param xhr_container: Optional list to store captured XHR/fetch responses
        :return: A callback function for page.on("response", ...)
        """

        async def handle_response(finished_response: AsyncPlaywrightResponse) -> None:
            if (
                finished_response.request.resource_type == "document"
                and finished_response.request.is_navigation_request()
                and finished_response.request.frame == page_info.page.main_frame
            ):
                response_container[0] = finished_response
            elif (
                xhr_pattern
                and xhr_container is not None
                and finished_response.request.resource_type in ("xhr", "fetch")
                and re_search(xhr_pattern, finished_response.url)
            ):
                xhr_container.append(finished_response)

        return handle_response

    @asynccontextmanager
    async def _page_generator(
        self,
        timeout: int | float,
        extra_headers: Optional[Dict[str, str]],
        disable_resources: bool,
        proxy: Optional[ProxyType] = None,
        blocked_domains: Optional[Set[str]] = None,
    ) -> AsyncGenerator["PageInfo[AsyncPage]", None]:
        """Acquire a page - either from persistent context or fresh context with proxy."""
        if proxy:
            # Rotation mode: create fresh context with the provided proxy
            if not self.browser:  # pragma: no cover
                raise RuntimeError("Browser not initialized for proxy rotation mode")
            context_options = self._build_context_with_proxy(proxy)
            context: AsyncBrowserContext = await self.browser.new_context(**context_options)

            page_info = None
            try:
                context = await self._initialize_context(self._config, context)
                page_info = await self._get_page(
                    timeout, extra_headers, disable_resources, blocked_domains, context=context
                )
                yield page_info
            finally:
                if page_info is not None and page_info in self.page_pool.pages:
                    self.page_pool.pages.remove(page_info)
                await context.close()
        else:
            # Standard mode: use PagePool with persistent context
            page_info = await self._get_page(timeout, extra_headers, disable_resources, blocked_domains)
            try:
                yield page_info
            finally:
                await page_info.page.close()
                self.page_pool.pages.remove(page_info)


class BaseSessionMixin:
    _config: "PlaywrightConfig | StealthConfig"

    @overload
    def __validate_routine__(self, params: Dict, model: type[StealthConfig]) -> StealthConfig: ...

    @overload
    def __validate_routine__(self, params: Dict, model: type[PlaywrightConfig]) -> PlaywrightConfig: ...

    def __validate_routine__(
        self, params: Dict, model: type[PlaywrightConfig] | type[StealthConfig]
    ) -> PlaywrightConfig | StealthConfig:
        # Dark color scheme bypasses the 'prefersLightColor' check in creepjs
        self._context_options: Dict[str, Any] = {"color_scheme": "dark", "device_scale_factor": 2}
        self._browser_options: Dict[str, Any] = {
            "args": DEFAULT_ARGS,
            "ignore_default_args": HARMFUL_ARGS,
        }
        if "__max_pages" in params:
            params["max_pages"] = params.pop("__max_pages")

        config = validate(params, model=model)
        self._headers_keys = (
            {header.lower() for header in config.extra_headers.keys()} if config.extra_headers else set()
        )

        return config

    def __generate_options__(self, extra_flags: Tuple | None = None) -> None:
        config: PlaywrightConfig | StealthConfig = self._config
        self._context_options.update(
            {
                "proxy": config.proxy,
                "locale": config.locale,
                "timezone_id": config.timezone_id,
                "extra_http_headers": config.extra_headers,
            }
        )
        # The default useragent in the headful is always correct now in the current versions of Playwright
        if config.useragent:
            self._context_options["user_agent"] = config.useragent
        elif not config.useragent and config.headless:
            self._context_options["user_agent"] = (
                __default_chrome_useragent__ if config.real_chrome else __default_useragent__
            )

        if not config.cdp_url:
            flags = self._browser_options["args"]
            if config.extra_flags or extra_flags:
                flags = list(set(tuple(flags) + tuple(config.extra_flags or extra_flags or ())))

            if config.dns_over_https:
                doh_flag = "--dns-over-https-templates=https://cloudflare-dns.com/dns-query"
                if isinstance(flags, list):
                    flags.append(doh_flag)
                else:
                    flags = list(flags) + [doh_flag]

            self._browser_options.update(
                {
                    "args": flags,
                    "headless": config.headless,
                    "channel": "chrome" if config.real_chrome else "chromium",
                }
            )
            if config.executable_path:
                self._browser_options["executable_path"] = config.executable_path

            self._user_data_dir = config.user_data_dir
        else:
            self._browser_options = {}

        if config.additional_args:
            self._context_options.update(config.additional_args)

    def _build_context_with_proxy(self, proxy: Optional[ProxyType] = None) -> Dict[str, Any]:
        """
        Build context options with a specific proxy for rotation mode.

        :param proxy: Proxy URL string or Playwright-style proxy dict to use for this context.
        :return: Dictionary of context options for browser.new_context().
        """

        context_options = self._context_options.copy()

        # Override proxy if provided
        if proxy:
            context_options["proxy"] = construct_proxy_dict(proxy)

        return context_options


class DynamicSessionMixin(BaseSessionMixin):
    def __validate__(self, **params):
        self._config = self.__validate_routine__(params, model=PlaywrightConfig)
        self.__generate_options__()


class StealthySessionMixin(BaseSessionMixin):
    def __validate__(self, **params):
        self._config = self.__validate_routine__(params, model=StealthConfig)
        self._context_options.update(
            {
                "is_mobile": False,
                "has_touch": False,
                # I'm thinking about disabling it to rest from all Service Workers' headache, but let's keep it as it is for now
                "service_workers": "allow",
                "ignore_https_errors": True,
                "screen": {"width": 1920, "height": 1080},
                "viewport": {"width": 1920, "height": 1080},
                "permissions": ["geolocation", "notifications"],
            }
        )
        self.__generate_stealth_options()

    def __generate_stealth_options(self) -> None:
        config = cast(StealthConfig, self._config)
        flags: Tuple[str, ...] = tuple()
        if not config.cdp_url:
            flags = tuple(DEFAULT_ARGS) + tuple(STEALTH_ARGS)

            if config.block_webrtc:
                flags += (
                    "--webrtc-ip-handling-policy=disable_non_proxied_udp",
                    "--force-webrtc-ip-handling-policy",  # Ensures the policy is enforced
                )
            if not config.allow_webgl:
                flags += (
                    "--disable-webgl",
                    "--disable-webgl-image-chromium",
                    "--disable-webgl2",
                )
            if config.hide_canvas:
                flags += ("--fingerprinting-canvas-image-data-noise",)

        super(StealthySessionMixin, self).__generate_options__(flags)

    @staticmethod
    def _detect_cloudflare(page_content: str) -> str | None:
        """
        Detect the type of Cloudflare challenge present in the provided page content.

        This function analyzes the given page content to identify whether a specific
        type of Cloudflare challenge is present. It checks for three predefined
        challenge types: non-interactive, managed, and interactive. If a challenge
        type is detected, it returns the corresponding type as a string. If no
        challenge type is detected, it returns None.

        Args:
            page_content (str): The content of the page to analyze for Cloudflare
                challenge types.

        Returns:
            str: A string representing the detected Cloudflare challenge type, if
                found. Returns None if no challenge matches.
        """
        challenge_types = (
            "non-interactive",
            "managed",
            "interactive",
        )
        for ctype in challenge_types:
            if f"cType: '{ctype}'" in page_content:
                return ctype

        # Check if turnstile captcha is embedded inside the page (Usually inside a closed Shadow iframe)
        selector = Selector(content=page_content)
        if selector.css('script[src*="challenges.cloudflare.com/turnstile/v"]'):
            return "embedded"

        return None
