# Copyright 2014-2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License.  You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.

"""Criteria to select some ServerDescriptions from a TopologyDescription."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, TypeVar, cast

from pymongo.server_type import SERVER_TYPE

if TYPE_CHECKING:
    from pymongo.server_description import ServerDescription
    from pymongo.topology_description import TopologyDescription


T = TypeVar("T")
TagSet = Mapping[str, Any]
TagSets = Sequence[TagSet]


class Selection:
    """Input or output of a server selector function."""

    @classmethod
    def from_topology_description(cls, topology_description: TopologyDescription) -> Selection:
        known_servers = topology_description.known_servers
        primary = None
        for sd in known_servers:
            if sd.server_type == SERVER_TYPE.RSPrimary:
                primary = sd
                break

        return Selection(
            topology_description,
            topology_description.known_servers,
            topology_description.common_wire_version,
            primary,
        )

    def __init__(
        self,
        topology_description: TopologyDescription,
        server_descriptions: list[ServerDescription],
        common_wire_version: Optional[int],
        primary: Optional[ServerDescription],
    ):
        self.topology_description = topology_description
        self.server_descriptions = server_descriptions
        self.primary = primary
        self.common_wire_version = common_wire_version

    def with_server_descriptions(self, server_descriptions: list[ServerDescription]) -> Selection:
        return Selection(
            self.topology_description, server_descriptions, self.common_wire_version, self.primary
        )

    def secondary_with_max_last_write_date(self) -> Optional[ServerDescription]:
        secondaries = secondary_server_selector(self)
        if secondaries.server_descriptions:
            return max(
                secondaries.server_descriptions, key=lambda sd: cast(float, sd.last_write_date)
            )
        return None

    @property
    def primary_selection(self) -> Selection:
        primaries = [self.primary] if self.primary else []
        return self.with_server_descriptions(primaries)

    @property
    def heartbeat_frequency(self) -> int:
        return self.topology_description.heartbeat_frequency

    @property
    def topology_type(self) -> int:
        return self.topology_description.topology_type

    def __bool__(self) -> bool:
        return bool(self.server_descriptions)

    def __getitem__(self, item: int) -> ServerDescription:
        return self.server_descriptions[item]


def any_server_selector(selection: T) -> T:
    return selection


def readable_server_selector(selection: Selection) -> Selection:
    return selection.with_server_descriptions(
        [s for s in selection.server_descriptions if s.is_readable]
    )


def writable_server_selector(selection: Selection) -> Selection:
    return selection.with_server_descriptions(
        [s for s in selection.server_descriptions if s.is_writable]
    )


def secondary_server_selector(selection: Selection) -> Selection:
    return selection.with_server_descriptions(
        [s for s in selection.server_descriptions if s.server_type == SERVER_TYPE.RSSecondary]
    )


def arbiter_server_selector(selection: Selection) -> Selection:
    return selection.with_server_descriptions(
        [s for s in selection.server_descriptions if s.server_type == SERVER_TYPE.RSArbiter]
    )


def writable_preferred_server_selector(selection: Selection) -> Selection:
    """Like PrimaryPreferred but doesn't use tags or latency."""
    return writable_server_selector(selection) or secondary_server_selector(selection)


def apply_single_tag_set(tag_set: TagSet, selection: Selection) -> Selection:
    """All servers matching one tag set.

    A tag set is a dict. A server matches if its tags are a superset:
    A server tagged {'a': '1', 'b': '2'} matches the tag set {'a': '1'}.

    The empty tag set {} matches any server.
    """

    def tags_match(server_tags: Mapping[str, Any]) -> bool:
        for key, value in tag_set.items():
            if key not in server_tags or server_tags[key] != value:
                return False

        return True

    return selection.with_server_descriptions(
        [s for s in selection.server_descriptions if tags_match(s.tags)]
    )


def apply_tag_sets(tag_sets: TagSets, selection: Selection) -> Selection:
    """All servers match a list of tag sets.

    tag_sets is a list of dicts. The empty tag set {} matches any server,
    and may be provided at the end of the list as a fallback. So
    [{'a': 'value'}, {}] expresses a preference for servers tagged
    {'a': 'value'}, but accepts any server if none matches the first
    preference.
    """
    for tag_set in tag_sets:
        with_tag_set = apply_single_tag_set(tag_set, selection)
        if with_tag_set:
            return with_tag_set

    return selection.with_server_descriptions([])


def secondary_with_tags_server_selector(tag_sets: TagSets, selection: Selection) -> Selection:
    """All near-enough secondaries matching the tag sets."""
    return apply_tag_sets(tag_sets, secondary_server_selector(selection))


def member_with_tags_server_selector(tag_sets: TagSets, selection: Selection) -> Selection:
    """All near-enough members matching the tag sets."""
    return apply_tag_sets(tag_sets, readable_server_selector(selection))