############################################################################
# Copyright(c) Open Law Library. All rights reserved.                      #
# See ThirdPartyNotices.txt in the project root for additional notices.    #
#                                                                          #
# Licensed under the Apache License, Version 2.0 (the "License")           #
# you may not use this file except in compliance with the License.         #
# You may obtain a copy of the License at                                  #
#                                                                          #
#     http: // www.apache.org/licenses/LICENSE-2.0                         #
#                                                                          #
# Unless required by applicable law or agreed to in writing, software      #
# distributed under the License is distributed on an "AS IS" BASIS,        #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and      #
# limitations under the License.                                           #
############################################################################
import asyncio
import functools
import inspect
import itertools
import logging
from typing import Any, Callable, Dict, Optional, get_type_hints

from pygls.constants import (
    ATTR_COMMAND_TYPE,
    ATTR_EXECUTE_IN_THREAD,
    ATTR_FEATURE_TYPE,
    ATTR_REGISTERED_NAME,
    ATTR_REGISTERED_TYPE,
    PARAM_LS,
)
from pygls.exceptions import (
    CommandAlreadyRegisteredError,
    FeatureAlreadyRegisteredError,
    ThreadDecoratorError,
    ValidationError,
)
from pygls.lsp import get_method_options_type, is_instance

logger = logging.getLogger(__name__)


def assign_help_attrs(f, reg_name, reg_type):
    setattr(f, ATTR_REGISTERED_NAME, reg_name)
    setattr(f, ATTR_REGISTERED_TYPE, reg_type)


def assign_thread_attr(f):
    setattr(f, ATTR_EXECUTE_IN_THREAD, True)


def get_help_attrs(f):
    return getattr(f, ATTR_REGISTERED_NAME, None), getattr(
        f, ATTR_REGISTERED_TYPE, None
    )


def has_ls_param_or_annotation(f, annotation):
    """Returns true if callable has first parameter named `ls` or type of
    annotation"""
    try:
        sig = inspect.signature(f)
        first_p = next(itertools.islice(sig.parameters.values(), 0, 1))
        return first_p.name == PARAM_LS or get_type_hints(f)[first_p.name] == annotation
    except Exception:
        return False


def is_thread_function(f):
    return getattr(f, ATTR_EXECUTE_IN_THREAD, False)


def wrap_with_server(f, server):
    """Returns a new callable/coroutine with server as first argument."""
    if not has_ls_param_or_annotation(f, type(server)):
        return f

    if asyncio.iscoroutinefunction(f):

        async def wrapped(*args, **kwargs):
            return await f(server, *args, **kwargs)

    else:
        wrapped = functools.partial(f, server)
        if is_thread_function(f):
            assign_thread_attr(wrapped)

    return wrapped


class FeatureManager:
    """A class for managing server features.

    Attributes:
        _builtin_features(dict): Predefined set of lsp methods
        _feature_options(dict): Registered feature's options
        _features(dict): Registered features
        _commands(dict): Registered commands
        server(LanguageServer): Reference to the language server
                                If passed, server will be passed to registered
                                features/commands with first parameter:
                                    1. ls - parameter naming convention
                                    2. name: LanguageServer - add typings
    """

    def __init__(self, server=None, converter=None):
        self._builtin_features = {}
        self._feature_options = {}
        self._features = {}
        self._commands = {}
        self.server = server
        self.converter = converter

    def add_builtin_feature(self, feature_name: str, func: Callable) -> None:
        """Registers builtin (predefined) feature."""
        self._builtin_features[feature_name] = func
        logger.info("Registered builtin feature %s", feature_name)

    @property
    def builtin_features(self) -> Dict:
        """Returns server builtin features."""
        return self._builtin_features

    def command(self, command_name: str) -> Callable:
        """Decorator used to register custom commands.

        Example:
            @ls.command('myCustomCommand')
        """

        def decorator(f):
            # Validate
            if command_name is None or command_name.strip() == "":
                logger.error("Missing command name.")
                raise ValidationError("Command name is required.")

            # Check if not already registered
            if command_name in self._commands:
                logger.error('Command "%s" is already registered.', command_name)
                raise CommandAlreadyRegisteredError(command_name)

            assign_help_attrs(f, command_name, ATTR_COMMAND_TYPE)

            wrapped = wrap_with_server(f, self.server)
            # Assign help attributes for thread decorator
            assign_help_attrs(wrapped, command_name, ATTR_COMMAND_TYPE)

            self._commands[command_name] = wrapped

            logger.info('Command "%s" is successfully registered.', command_name)

            return f

        return decorator

    @property
    def commands(self) -> Dict:
        """Returns registered custom commands."""
        return self._commands

    def feature(
        self,
        feature_name: str,
        options: Optional[Any] = None,
    ) -> Callable:
        """Decorator used to register LSP features.

        Example:
            @ls.feature('textDocument/completion', CompletionItems(trigger_characters=['.']))
        """

        def decorator(f):
            # Validate
            if feature_name is None or feature_name.strip() == "":
                logger.error("Missing feature name.")
                raise ValidationError("Feature name is required.")

            # Add feature if not exists
            if feature_name in self._features:
                logger.error('Feature "%s" is already registered.', feature_name)
                raise FeatureAlreadyRegisteredError(feature_name)

            assign_help_attrs(f, feature_name, ATTR_FEATURE_TYPE)

            wrapped = wrap_with_server(f, self.server)
            # Assign help attributes for thread decorator
            assign_help_attrs(wrapped, feature_name, ATTR_FEATURE_TYPE)

            self._features[feature_name] = wrapped

            if options:
                options_type = get_method_options_type(feature_name)
                if options_type and not is_instance(
                    self.converter, options, options_type
                ):
                    raise TypeError(
                        (
                            f'Options of method "{feature_name}"'
                            f" should be instance of type {options_type}"
                        )
                    )
                self._feature_options[feature_name] = options

            logger.info('Registered "%s" with options "%s"', feature_name, options)

            return f

        return decorator

    @property
    def feature_options(self) -> Dict:
        """Returns feature options for registered features."""
        return self._feature_options

    @property
    def features(self) -> Dict:
        """Returns registered features"""
        return self._features

    def thread(self) -> Callable:
        """Decorator that mark function to execute it in a thread."""

        def decorator(f):
            if asyncio.iscoroutinefunction(f):
                raise ThreadDecoratorError(
                    f'Thread decorator cannot be used with async functions "{f.__name__}"'
                )

            # Allow any decorator order
            try:
                reg_name = getattr(f, ATTR_REGISTERED_NAME)
                reg_type = getattr(f, ATTR_REGISTERED_TYPE)

                if reg_type is ATTR_FEATURE_TYPE:
                    assign_thread_attr(self.features[reg_name])
                elif reg_type is ATTR_COMMAND_TYPE:
                    assign_thread_attr(self.commands[reg_name])

            except AttributeError:
                assign_thread_attr(f)

            return f

        return decorator
