authentic/src/authentic2/models.py

529 lines
19 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
import uuid
import django
from django.utils.http import urlquote
from django.conf import settings
from django.db import models, transaction
from django.db.models.query import Q
from django.utils import six, timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.six.moves.urllib import parse as urlparse
from django.core.exceptions import ValidationError
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import jsonb
from model_utils.managers import QueryManager
from authentic2.a2_rbac.models import Role
from authentic2.crypto import base64url_encode, base64url_decode
from django_rbac.utils import get_role_model_name
try:
from django.contrib.contenttypes.fields import GenericForeignKey
except ImportError:
from django.contrib.contenttypes.generic import GenericForeignKey
from . import managers
# install our natural_key implementation
from . import natural_key as unused_natural_key # noqa: F401
from .utils import ServiceAccessDenied
class UserExternalId(models.Model):
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('user'),
on_delete=models.CASCADE)
source = models.CharField(max_length=256, verbose_name=_('source'))
external_id = models.CharField(max_length=256, verbose_name=_('external id'))
created = models.DateTimeField(auto_now_add=True, verbose_name=_('creation date'))
updated = models.DateTimeField(auto_now=True, verbose_name=_('last update date'))
def __str__(self):
return u'{0} is {1} on {2}'.format(self.user, self.external_id, self.source)
def __repr__(self):
return '<UserExternalId user: {0!r} source: {1!r} ' \
'external_id: {2!r} created: {3} updated: {4}' \
.format(self.user_id, self.source, self.external_id,
self.created, self.updated)
class Meta:
verbose_name = _('user external id')
verbose_name_plural = _('user external ids')
unique_together = [
('source', 'external_id'),
]
class AuthenticationEvent(models.Model):
'''Record authentication events whatever the source'''
when = models.DateTimeField(auto_now=True, verbose_name=_('when'))
who = models.CharField(max_length=80, verbose_name=_('who'))
how = models.CharField(max_length=32, verbose_name=_('how'))
nonce = models.CharField(max_length=255, verbose_name=_('nonce'))
objects = managers.AuthenticationEventManager()
class Meta:
verbose_name = _('authentication log')
verbose_name_plural = _('authentication logs')
def __str__(self):
return _('Authentication of %(who)s by %(how)s at %(when)s') % \
self.__dict__
class LogoutUrlAbstract(models.Model):
logout_url = models.URLField(
verbose_name=_('url'),
help_text=_('you can use a {} to pass the URL of the success icon, '
'ex.: http://example.com/logout?next={}'),
max_length=255,
blank=True,
null=True)
logout_use_iframe = models.BooleanField(
verbose_name=_('use an iframe instead of an img tag for logout'),
default=False)
logout_use_iframe_timeout = models.PositiveIntegerField(
verbose_name=_('iframe logout timeout (ms)'),
help_text=_('if iframe logout is used, it\'s the time between the '
'onload event for this iframe and the moment we consider its '
'loading to be really finished'),
default=300)
def get_logout_url(self, request):
ok_icon_url = (
request.build_absolute_uri(urlparse.urljoin(settings.STATIC_URL, 'authentic2/images/ok.png'))
+ '?nonce=%s' % time.time())
return self.logout_url.format(urlquote(ok_icon_url))
class Meta:
abstract = True
class LogoutUrl(LogoutUrlAbstract):
content_type = models.ForeignKey(
ContentType,
verbose_name=_('content type'),
on_delete=models.CASCADE)
object_id = models.PositiveIntegerField(verbose_name=_('object identifier'))
provider = GenericForeignKey('content_type', 'object_id')
class Meta:
verbose_name = _('logout URL')
verbose_name_plural = _('logout URL')
class Attribute(models.Model):
label = models.CharField(verbose_name=_('label'), max_length=63, unique=True)
description = models.TextField(verbose_name=_('description'), blank=True)
name = models.SlugField(verbose_name=_('name'), max_length=256, unique=True)
required = models.BooleanField(verbose_name=_('required'), blank=True, default=False)
asked_on_registration = models.BooleanField(verbose_name=_('asked on registration'), blank=True, default=False)
user_editable = models.BooleanField(verbose_name=_('user editable'), blank=True, default=False)
user_visible = models.BooleanField(verbose_name=_('user visible'), blank=True, default=False)
multiple = models.BooleanField(verbose_name=_('multiple'), blank=True, default=False)
kind = models.CharField(max_length=16, verbose_name=_('kind'))
disabled = models.BooleanField(verbose_name=_('disabled'), blank=True, default=False)
searchable = models.BooleanField(verbose_name=_('searchable'), blank=True, default=False)
scopes = models.CharField(
verbose_name=_('scopes'),
help_text=_('scopes separated by spaces'),
blank=True,
default='',
max_length=256)
order = models.PositiveIntegerField(
verbose_name=_('order'),
default=0)
all_objects = managers.AttributeManager()
objects = managers.AttributeManager(disabled=False)
registration_attributes = QueryManager(asked_on_registration=True)
user_attributes = QueryManager(user_editable=True)
def get_form_field(self, **kwargs):
from . import attribute_kinds
kwargs['label'] = self.label
kwargs['required'] = self.required
if self.description:
kwargs['help_text'] = self.description
return attribute_kinds.get_form_field(self.kind, **kwargs)
def get_drf_field(self, **kwargs):
from rest_framework import serializers
from authentic2.attribute_kinds import DateRestField
kind = self.get_kind()
field_class = kind['rest_framework_field_class']
base_kwargs = (kind.get('rest_framework_field_kwargs') or {}).copy()
base_kwargs.update({
'source': 'attributes.%s' % self.name,
'required': self.required,
})
if not self.required:
# setting an attribute to null will delete it
# NullBooleanField and BooleanField does not support allow_null
if field_class is serializers.BooleanField:
field_class = serializers.NullBooleanField
elif field_class is not serializers.NullBooleanField:
base_kwargs['allow_null'] = True
# if not stated otherwise by the definition of the kind, string alike fields
# accept blank values when not required
if (issubclass(field_class, serializers.CharField) and 'allow_blank' not in
base_kwargs):
base_kwargs['allow_blank'] = True
elif (issubclass(field_class, DateRestField) and 'allow_blank' not in
base_kwargs):
base_kwargs['allow_blank'] = True
elif issubclass(field_class, serializers.CharField):
base_kwargs['allow_blank'] = False
elif issubclass(field_class, DateRestField):
base_kwargs['allow_blank'] = False
base_kwargs.update(kwargs)
return field_class(**base_kwargs)
def get_kind(self):
from . import attribute_kinds
return attribute_kinds.get_kind(self.kind)
def contribute_to_form(self, form, **kwargs):
form.fields[self.name] = self.get_form_field(**kwargs)
def get_value(self, owner, verified=None):
kind = self.get_kind()
deserialize = kind['deserialize']
atvs = AttributeValue.all_objects.with_owner(owner)
if verified is True or verified is False:
atvs = atvs.filter(verified=verified)
if self.multiple:
result = []
for atv in atvs.filter(attribute=self, multiple=True):
result.append(deserialize(atv.content))
return result
else:
try:
atv = atvs.get(attribute=self, multiple=False)
return deserialize(atv.content)
except AttributeValue.DoesNotExist:
return kind['default']
def set_value(self, owner, value, verified=False, attribute_value=None):
serialize = self.get_kind()['serialize']
# setting to None is to delete
if value is None:
AttributeValue.objects.with_owner(owner).filter(attribute=self).delete()
return
with transaction.atomic():
if self.multiple:
assert isinstance(value, (list, set, tuple))
values = value
avs = []
content_list = []
list(owner.__class__.objects.filter(pk=owner.pk).select_for_update())
for value in values:
content = serialize(value)
av, created = AttributeValue.objects.get_or_create(
content_type=ContentType.objects.get_for_model(owner),
object_id=owner.pk,
attribute=self,
multiple=True,
content=content,
defaults={'verified': verified})
if not created:
av.verified = verified
av.save()
avs.append(av)
content_list.append(content)
AttributeValue.objects.filter(
attribute=self,
content_type=ContentType.objects.get_for_model(owner),
object_id=owner.pk,
multiple=True
).exclude(content__in=content_list).delete()
return avs
else:
content = serialize(value)
if attribute_value:
av, created = attribute_value, False
else:
av, created = AttributeValue.objects.get_or_create(
content_type=ContentType.objects.get_for_model(owner),
object_id=owner.pk,
attribute=self,
multiple=False,
defaults={'content': content, 'verified': verified})
if not created and (av.content != content or av.verified != verified):
av.content = content
av.verified = verified
av.save()
return av
def natural_key(self):
return (self.name,)
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, repr(str(self)))
def __str__(self):
return self.label
class Meta:
verbose_name = _('attribute definition')
verbose_name_plural = _('attribute definitions')
ordering = ('order', 'id')
base_manager_name = 'all_objects'
class AttributeValue(models.Model):
content_type = models.ForeignKey(
'contenttypes.ContentType',
verbose_name=_('content type'),
on_delete=models.CASCADE)
object_id = models.PositiveIntegerField(verbose_name=_('object identifier'), db_index=True)
owner = GenericForeignKey('content_type', 'object_id')
attribute = models.ForeignKey(
'Attribute',
verbose_name=_('attribute'),
on_delete=models.CASCADE)
if django.VERSION < (2,):
multiple = models.NullBooleanField(default=False)
else:
multiple = models.BooleanField(default=False, null=True)
content = models.TextField(verbose_name=_('content'), db_index=True)
verified = models.BooleanField(default=False)
all_objects = managers.AttributeValueManager()
objects = managers.AttributeValueManager(attribute__disabled=False)
def to_python(self):
deserialize = self.attribute.get_kind()['deserialize']
return deserialize(self.content)
def natural_key(self):
if not hasattr(self.owner, 'natural_key'):
return self.id
return (self.content_type.natural_key(), self.owner.natural_key(),
self.attribute.natural_key())
class Meta:
verbose_name = _('attribute value')
ordering = ('attribute__order', 'id')
verbose_name_plural = _('attribute values')
unique_together = (
('content_type', 'object_id', 'attribute', 'multiple', 'content'),
)
class PasswordReset(models.Model):
user = models.OneToOneField(
settings.AUTH_USER_MODEL,
verbose_name=_('user'),
on_delete=models.CASCADE)
def save(self, *args, **kwargs):
if self.user_id and not self.user.has_usable_password():
self.user.set_password(uuid.uuid4().hex)
self.user.save()
return super(PasswordReset, self).save(*args, **kwargs)
class Meta:
verbose_name = _('password reset')
verbose_name_plural = _('password reset')
def __str__(self):
return six.text_type(self.user)
class Service(models.Model):
name = models.CharField(
verbose_name=_('name'),
max_length=128)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=128)
ou = models.ForeignKey(
verbose_name=_('organizational unit'),
to='a2_rbac.OrganizationalUnit',
null=True,
blank=True,
swappable=False,
on_delete=models.CASCADE)
authorized_roles = models.ManyToManyField(
get_role_model_name(), verbose_name=_('authorized services'),
through='AuthorizedRole', through_fields=('service', 'role'),
related_name='allowed_services', blank=True)
unauthorized_url = models.URLField(
verbose_name=_('callback url when unauthorized'),
max_length=256, null=True, blank=True)
objects = managers.ServiceManager()
def clean(self):
errors = {}
if self.ou is None and self.__class__.objects.exclude(pk=self.pk) \
.filter(slug=self.slug, ou__isnull=True):
errors['slug'] = ValidationError(
_('The slug must be unique for this ou'),
code='duplicate-slug')
if self.ou is None and self.__class__.objects.exclude(pk=self.pk) \
.filter(name=self.name, ou__isnull=True):
errors['name'] = ValidationError(
_('The name must be unique for this ou'),
code='duplicate-name')
if errors:
raise ValidationError(errors)
class Meta:
verbose_name = _('base service model')
verbose_name_plural = _('base service models')
unique_together = (
('slug', 'ou'),
)
base_manager_name = 'objects'
def natural_key(self):
return [self.ou and self.ou.natural_key(), self.slug]
def __str__(self):
return self.name
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, six.text_type(self))
def authorize(self, user):
if not self.authorized_roles.exists():
return True
if user.roles_and_parents().filter(allowed_services=self).exists():
return True
raise ServiceAccessDenied(service=self)
def add_authorized_role(self, role):
authorization, created = AuthorizedRole.objects.get_or_create(
service=self, role=role)
return authorization
def remove_authorized_role(self, role):
try:
authorization = AuthorizedRole.objects.get(service=self, role=role)
authorization.delete()
except AuthorizedRole.DoesNotExist:
pass
return True
def to_json(self, roles=None):
if roles is None:
roles = Role.objects.all()
roles = roles.filter(Q(service=self) | Q(ou=self.ou, service__isnull=True))
return {
'name': self.name,
'slug': self.slug,
'ou': self.ou.name if self.ou else None,
'ou__uuid': self.ou.uuid if self.ou else None,
'ou__name': self.ou.name if self.ou else None,
'ou__slug': self.ou.slug if self.ou else None,
'roles': [role.to_json() for role in roles],
}
Service._meta.natural_key = [['slug', 'ou']]
class AuthorizedRole(models.Model):
service = models.ForeignKey(Service, on_delete=models.CASCADE)
role = models.ForeignKey(get_role_model_name(), on_delete=models.CASCADE)
class Token(models.Model):
uuid = models.UUIDField(
verbose_name=_('Identifier'),
primary_key=True,
default=uuid.uuid4,
editable=False)
kind = models.CharField(
verbose_name=_('Kind'),
max_length=32)
content = jsonb.JSONField(
verbose_name=_('Content'),
blank=True)
created = models.DateTimeField(
verbose_name=_('Creation date'),
auto_now_add=True)
expires = models.DateTimeField(
verbose_name=_('Expires'))
class Meta:
ordering = ('-expires', 'kind', 'uuid')
@property
def uuid_b64url(self):
return base64url_encode(self.uuid.bytes).decode('ascii')
@classmethod
def create(cls, kind, content, expires=None, duration=60):
expires = expires or (timezone.now() + datetime.timedelta(seconds=duration))
return cls.objects.create(
kind=kind,
content=content,
expires=expires)
@classmethod
def _decode_uuid(cls, _uuid):
try:
_uuid = uuid.UUID(_uuid)
except (TypeError, ValueError):
pass
else:
return _uuid
if isinstance(_uuid, six.text_type):
_uuid = _uuid.encode('ascii')
_uuid = base64url_decode(_uuid)
return uuid.UUID(bytes=_uuid)
@classmethod
def use(cls, kind, _uuid, now=None, delete=True):
'''Can raise TypeError, ValueError if uuid is invalid, DoesNotExist if uuid is unknown or expired.'''
now = now or timezone.now()
if not isinstance(_uuid, uuid.UUID):
_uuid = cls._decode_uuid(_uuid)
with transaction.atomic():
token = cls.objects.get(kind=kind, uuid=_uuid, expires__gt=now)
if delete:
token.delete()
return token
@classmethod
def cleanup(cls, now=None):
now = now or timezone.now()
cls.objects.filter(expires__lte=now).delete()