import copy
import importlib
import uuid
import warnings

from django.apps import apps
from django.conf import settings
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db.models import ManyToManyField, Q
from django.db.models.fields.proxy import OrderWrt
from django.forms.models import model_to_dict
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import smart_str
from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy as _

from simple_history import utils

from . import exceptions
from .manager import HistoryDescriptor
from .signals import post_create_historical_record, pre_create_historical_record
from .utils import get_change_reason_from_object

try:
    from asgiref.local import Local as LocalContext
except ImportError:
    from threading import local as LocalContext

registered_models = {}


def _default_get_user(request, **kwargs):
    try:
        return request.user
    except AttributeError:
        return None


def _history_user_getter(historical_instance):
    if historical_instance.history_user_id is None:
        return None
    User = get_user_model()
    try:
        return User.objects.get(pk=historical_instance.history_user_id)
    except User.DoesNotExist:
        return None


def _history_user_setter(historical_instance, user):
    if user is not None:
        historical_instance.history_user_id = user.pk


class HistoricalRecords:
    thread = context = LocalContext()  # retain thread for backwards compatibility

    def __init__(
        self,
        verbose_name=None,
        bases=(models.Model,),
        user_related_name="+",
        table_name=None,
        inherit=False,
        excluded_fields=None,
        history_id_field=None,
        history_change_reason_field=None,
        user_model=None,
        get_user=_default_get_user,
        cascade_delete_history=False,
        custom_model_name=None,
        app=None,
        history_user_id_field=None,
        history_user_getter=_history_user_getter,
        history_user_setter=_history_user_setter,
        related_name=None,
        use_base_model_db=False,
        user_db_constraint=True,
    ):
        self.user_set_verbose_name = verbose_name
        self.user_related_name = user_related_name
        self.user_db_constraint = user_db_constraint
        self.table_name = table_name
        self.inherit = inherit
        self.history_id_field = history_id_field
        self.history_change_reason_field = history_change_reason_field
        self.user_model = user_model
        self.get_user = get_user
        self.cascade_delete_history = cascade_delete_history
        self.custom_model_name = custom_model_name
        self.app = app
        self.user_id_field = history_user_id_field
        self.user_getter = history_user_getter
        self.user_setter = history_user_setter
        self.related_name = related_name
        self.use_base_model_db = use_base_model_db

        if excluded_fields is None:
            excluded_fields = []
        self.excluded_fields = excluded_fields
        try:
            if isinstance(bases, str):
                raise TypeError
            self.bases = (HistoricalChanges,) + tuple(bases)
        except TypeError:
            raise TypeError("The `bases` option must be a list or a tuple.")

    def contribute_to_class(self, cls, name):
        self.manager_name = name
        self.module = cls.__module__
        self.cls = cls
        models.signals.class_prepared.connect(self.finalize, weak=False)
        self.add_extra_methods(cls)

        if cls._meta.abstract and not self.inherit:
            msg = (
                "HistoricalRecords added to abstract model ({}) without "
                "inherit=True".format(self.cls.__name__)
            )
            warnings.warn(msg, UserWarning)

    def add_extra_methods(self, cls):
        def save_without_historical_record(self, *args, **kwargs):
            """
            Save model without saving a historical record

            Make sure you know what you're doing before you use this method.
            """
            self.skip_history_when_saving = True
            try:
                ret = self.save(*args, **kwargs)
            finally:
                del self.skip_history_when_saving
            return ret

        setattr(cls, "save_without_historical_record", save_without_historical_record)

    def finalize(self, sender, **kwargs):
        inherited = False
        if self.cls is not sender:  # set in concrete
            inherited = self.inherit and issubclass(sender, self.cls)
            if not inherited:
                return  # set in abstract

        if hasattr(sender._meta, "simple_history_manager_attribute"):
            raise exceptions.MultipleRegistrationsError(
                "{}.{} registered multiple times for history tracking.".format(
                    sender._meta.app_label, sender._meta.object_name
                )
            )
        history_model = self.create_history_model(sender, inherited)
        if inherited:
            # Make sure history model is in same module as concrete model
            module = importlib.import_module(history_model.__module__)
        else:
            module = importlib.import_module(self.module)
        setattr(module, history_model.__name__, history_model)

        # The HistoricalRecords object will be discarded,
        # so the signal handlers can't use weak references.
        models.signals.post_save.connect(self.post_save, sender=sender, weak=False)
        models.signals.post_delete.connect(self.post_delete, sender=sender, weak=False)

        descriptor = HistoryDescriptor(history_model)
        setattr(sender, self.manager_name, descriptor)
        sender._meta.simple_history_manager_attribute = self.manager_name

    def get_history_model_name(self, model):
        if not self.custom_model_name:
            return "Historical{}".format(model._meta.object_name)
        # Must be trying to use a custom history model name
        if callable(self.custom_model_name):
            name = self.custom_model_name(model._meta.object_name)
        else:
            #  simple string
            name = self.custom_model_name
        # Desired class name cannot be same as the model it is tracking
        if not (
            name.lower() == model._meta.object_name.lower()
            and model.__module__ == self.module
        ):
            return name
        raise ValueError(
            "The 'custom_model_name' option '{}' evaluates to a name that is the same "
            "as the model it is tracking. This is not permitted.".format(
                self.custom_model_name
            )
        )

    def create_history_model(self, model, inherited):
        """
        Creates a historical model to associate with the model provided.
        """
        attrs = {
            "__module__": self.module,
            "_history_excluded_fields": self.excluded_fields,
        }

        app_module = "%s.models" % model._meta.app_label

        if inherited:
            # inherited use models module
            attrs["__module__"] = model.__module__
        elif model.__module__ != self.module:
            # registered under different app
            attrs["__module__"] = self.module
        elif app_module != self.module:
            # Abuse an internal API because the app registry is loading.
            app = apps.app_configs[model._meta.app_label]
            models_module = app.name
            attrs["__module__"] = models_module

        fields = self.copy_fields(model)
        attrs.update(fields)
        attrs.update(self.get_extra_fields(model, fields))
        # type in python2 wants str as a first argument
        attrs.update(Meta=type(str("Meta"), (), self.get_meta_options(model)))
        if self.table_name is not None:
            attrs["Meta"].db_table = self.table_name

        # Set as the default then check for overrides
        name = self.get_history_model_name(model)

        registered_models[model._meta.db_table] = model
        history_model = type(str(name), self.bases, attrs)
        return history_model

    def fields_included(self, model):
        fields = []
        for field in model._meta.fields:
            if field.name not in self.excluded_fields:
                fields.append(field)
        return fields

    def copy_fields(self, model):
        """
        Creates copies of the model's original fields, returning
        a dictionary mapping field name to copied field object.
        """
        fields = {}
        for field in self.fields_included(model):
            field = copy.copy(field)
            field.remote_field = copy.copy(field.remote_field)
            if isinstance(field, OrderWrt):
                # OrderWrt is a proxy field, switch to a plain IntegerField
                field.__class__ = models.IntegerField
            if isinstance(field, models.ForeignKey):
                old_field = field
                old_swappable = old_field.swappable
                old_field.swappable = False
                try:
                    _name, _path, args, field_args = old_field.deconstruct()
                finally:
                    old_field.swappable = old_swappable
                if getattr(old_field, "one_to_one", False) or isinstance(
                    old_field, models.OneToOneField
                ):
                    FieldType = models.ForeignKey
                else:
                    FieldType = type(old_field)

                # If field_args['to'] is 'self' then we have a case where the object
                # has a foreign key to itself. If we pass the historical record's
                # field to = 'self', the foreign key will point to an historical
                # record rather than the base record. We can use old_field.model here.
                if field_args.get("to", None) == "self":
                    field_args["to"] = old_field.model

                # Override certain arguments passed when creating the field
                # so that they work for the historical field.
                field_args.update(
                    db_constraint=False,
                    related_name="+",
                    null=True,
                    blank=True,
                    primary_key=False,
                    db_index=True,
                    serialize=True,
                    unique=False,
                    on_delete=models.DO_NOTHING,
                )
                field = FieldType(*args, **field_args)
                field.name = old_field.name
            else:
                transform_field(field)
            fields[field.name] = field
        return fields

    def _get_history_change_reason_field(self):
        if self.history_change_reason_field:
            # User specific field from init
            history_change_reason_field = self.history_change_reason_field
        elif getattr(
            settings, "SIMPLE_HISTORY_HISTORY_CHANGE_REASON_USE_TEXT_FIELD", False
        ):
            # Use text field with no max length, not enforced by DB anyways
            history_change_reason_field = models.TextField(null=True)
        else:
            # Current default, with max length
            history_change_reason_field = models.CharField(max_length=100, null=True)

        return history_change_reason_field

    def _get_history_id_field(self):
        if self.history_id_field:
            history_id_field = self.history_id_field
            history_id_field.primary_key = True
            history_id_field.editable = False
        elif getattr(settings, "SIMPLE_HISTORY_HISTORY_ID_USE_UUID", False):
            history_id_field = models.UUIDField(
                primary_key=True, default=uuid.uuid4, editable=False
            )
        else:
            history_id_field = models.AutoField(primary_key=True)

        return history_id_field

    def _get_history_user_fields(self):
        if self.user_id_field is not None:
            # Tracking user using explicit id rather than Django ForeignKey
            history_user_fields = {
                "history_user": property(self.user_getter, self.user_setter),
                "history_user_id": self.user_id_field,
            }
        else:
            user_model = self.user_model or getattr(
                settings, "AUTH_USER_MODEL", "auth.User"
            )

            history_user_fields = {
                "history_user": models.ForeignKey(
                    user_model,
                    null=True,
                    related_name=self.user_related_name,
                    on_delete=models.SET_NULL,
                    db_constraint=self.user_db_constraint,
                )
            }

        return history_user_fields

    def _get_history_related_field(self, model):
        if self.related_name:
            if self.manager_name == self.related_name:
                raise exceptions.RelatedNameConflictError(
                    "The related name must not be called like the history manager."
                )
            return {
                "history_relation": models.ForeignKey(
                    model,
                    on_delete=models.DO_NOTHING,
                    related_name=self.related_name,
                    db_constraint=False,
                )
            }
        else:
            return {}

    def get_extra_fields(self, model, fields):
        """Return dict of extra fields added to the historical record model"""

        def revert_url(self):
            """URL for this change in the default admin site."""
            opts = model._meta
            app_label, model_name = opts.app_label, opts.model_name
            return reverse(
                "%s:%s_%s_simple_history" % (admin.site.name, app_label, model_name),
                args=[getattr(self, opts.pk.attname), self.history_id],
            )

        def get_instance(self):
            attrs = {
                field.attname: getattr(self, field.attname) for field in fields.values()
            }
            if self._history_excluded_fields:
                # We don't add ManyToManyFields to this list because they may cause
                # the subsequent `.get()` call to fail. See #706 for context.
                excluded_attnames = [
                    model._meta.get_field(field).attname
                    for field in self._history_excluded_fields
                    if not isinstance(model._meta.get_field(field), ManyToManyField)
                ]
                try:
                    values = (
                        model.objects.filter(pk=getattr(self, model._meta.pk.attname))
                        .values(*excluded_attnames)
                        .get()
                    )
                except ObjectDoesNotExist:
                    pass
                else:
                    attrs.update(values)
            return model(**attrs)

        def get_next_record(self):
            """
            Get the next history record for the instance. `None` if last.
            """
            history = utils.get_history_manager_for_model(self.instance)
            return (
                history.filter(Q(history_date__gt=self.history_date))
                .order_by("history_date")
                .first()
            )

        def get_prev_record(self):
            """
            Get the previous history record for the instance. `None` if first.
            """
            history = utils.get_history_manager_for_model(self.instance)
            return (
                history.filter(Q(history_date__lt=self.history_date))
                .order_by("history_date")
                .last()
            )

        def get_default_history_user(instance):
            """
            Returns the user specified by `get_user` method for manually creating
            historical objects
            """
            return self.get_history_user(instance)

        extra_fields = {
            "history_id": self._get_history_id_field(),
            "history_date": models.DateTimeField(),
            "history_change_reason": self._get_history_change_reason_field(),
            "history_type": models.CharField(
                max_length=1,
                choices=(("+", _("Created")), ("~", _("Changed")), ("-", _("Deleted"))),
            ),
            "history_object": HistoricalObjectDescriptor(
                model, self.fields_included(model)
            ),
            "instance": property(get_instance),
            "instance_type": model,
            "next_record": property(get_next_record),
            "prev_record": property(get_prev_record),
            "revert_url": revert_url,
            "__str__": lambda self: "{} as of {}".format(
                self.history_object, self.history_date
            ),
            "get_default_history_user": staticmethod(get_default_history_user),
        }

        extra_fields.update(self._get_history_related_field(model))
        extra_fields.update(self._get_history_user_fields())

        return extra_fields

    def get_meta_options(self, model):
        """
        Returns a dictionary of fields that will be added to
        the Meta inner class of the historical record model.
        """
        meta_fields = {
            "ordering": ("-history_date", "-history_id"),
            "get_latest_by": "history_date",
        }
        if self.user_set_verbose_name:
            name = self.user_set_verbose_name
        else:
            name = format_lazy("historical {}", smart_str(model._meta.verbose_name))
        meta_fields["verbose_name"] = name
        if self.app:
            meta_fields["app_label"] = self.app
        return meta_fields

    def post_save(self, instance, created, using=None, **kwargs):
        if not created and hasattr(instance, "skip_history_when_saving"):
            return
        if not kwargs.get("raw", False):
            self.create_historical_record(instance, created and "+" or "~", using=using)

    def post_delete(self, instance, using=None, **kwargs):
        if self.cascade_delete_history:
            manager = getattr(instance, self.manager_name)
            manager.using(using).all().delete()
        else:
            self.create_historical_record(instance, "-", using=using)

    def create_historical_record(self, instance, history_type, using=None):
        using = using if self.use_base_model_db else None
        history_date = getattr(instance, "_history_date", timezone.now())
        history_user = self.get_history_user(instance)
        history_change_reason = get_change_reason_from_object(instance)
        manager = getattr(instance, self.manager_name)

        attrs = {}
        for field in self.fields_included(instance):
            attrs[field.attname] = getattr(instance, field.attname)

        relation_field = getattr(manager.model, "history_relation", None)
        if relation_field is not None:
            attrs["history_relation"] = instance

        history_instance = manager.model(
            history_date=history_date,
            history_type=history_type,
            history_user=history_user,
            history_change_reason=history_change_reason,
            **attrs,
        )

        pre_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            history_instance=history_instance,
            using=using,
        )

        history_instance.save(using=using)

        post_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_instance=history_instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            using=using,
        )

    def get_history_user(self, instance):
        """Get the modifying user from instance or middleware."""
        try:
            return instance._history_user
        except AttributeError:
            request = None
            try:
                if self.context.request.user.is_authenticated:
                    request = self.context.request
            except AttributeError:
                pass

        return self.get_user(instance=instance, request=request)


def transform_field(field):
    """Customize field appropriately for use in historical model"""
    field.name = field.attname
    if isinstance(field, models.BigAutoField):
        field.__class__ = models.BigIntegerField
    elif isinstance(field, models.AutoField):
        field.__class__ = models.IntegerField

    elif isinstance(field, models.FileField):
        # Don't copy file, just path.
        if getattr(settings, "SIMPLE_HISTORY_FILEFIELD_TO_CHARFIELD", False):
            field.__class__ = models.CharField
        else:
            field.__class__ = models.TextField

    # Historical instance shouldn't change create/update timestamps
    field.auto_now = False
    field.auto_now_add = False
    # Just setting db_collation explicitly since we're not using
    # field.deconstruct() here
    field.db_collation = None

    if field.primary_key or field.unique:
        # Unique fields can no longer be guaranteed unique,
        # but they should still be indexed for faster lookups.
        field.primary_key = False
        field._unique = False
        field.db_index = True
        field.serialize = True


class HistoricalObjectDescriptor:
    def __init__(self, model, fields_included):
        self.model = model
        self.fields_included = fields_included

    def __get__(self, instance, owner):
        values = {f.attname: getattr(instance, f.attname) for f in self.fields_included}
        return self.model(**values)


class HistoricalChanges:
    def diff_against(self, old_history, excluded_fields=None):
        if not isinstance(old_history, type(self)):
            raise TypeError(
                ("unsupported type(s) for diffing: " "'{}' and '{}'").format(
                    type(self), type(old_history)
                )
            )
        if excluded_fields is None:
            excluded_fields = []
        changes = []
        changed_fields = []
        old_values = model_to_dict(old_history.instance)
        current_values = model_to_dict(self.instance)
        for field, new_value in current_values.items():
            if field in excluded_fields:
                continue
            if field in old_values:
                old_value = old_values[field]
                if old_value != new_value:
                    change = ModelChange(field, old_value, new_value)
                    changes.append(change)
                    changed_fields.append(field)

        return ModelDelta(changes, changed_fields, old_history, self)


class ModelChange:
    def __init__(self, field_name, old_value, new_value):
        self.field = field_name
        self.old = old_value
        self.new = new_value


class ModelDelta:
    def __init__(self, changes, changed_fields, old_record, new_record):
        self.changes = changes
        self.changed_fields = changed_fields
        self.old_record = old_record
        self.new_record = new_record
