authentic/src/authentic2/custom_user/models.py

278 lines
10 KiB
Python

from django.db import models
from django.utils import timezone
from django.core.mail import send_mail
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, MultipleObjectsReturned
try:
from django.contrib.contenttypes.fields import GenericRelation
except ImportError:
from django.contrib.contenttypes.generic import GenericRelation
from django.contrib.auth.models import AbstractBaseUser
from django_rbac.models import PermissionMixin
from django_rbac.utils import get_role_parenting_model
from authentic2 import utils, validators, app_settings
from authentic2.decorators import errorcollector, RequestCache
from authentic2.models import Service, AttributeValue, Attribute
from .managers import UserManager, UserQuerySet
@RequestCache
def get_attributes():
return Attribute.objects.all()
class Attributes(object):
def __init__(self, owner, verified=None):
super(Attributes, self).__setattr__('owner', owner)
super(Attributes, self).__setattr__('verified', verified)
attributes = get_attributes()
at_map = {attribute.name: attribute for attribute in attributes}
for attribute in attributes:
at_map[attribute.id] = attribute
super(Attributes, self).__setattr__('attributes', at_map)
values = {}
super(Attributes, self).__setattr__('values', values)
for atv in self.owner.attribute_values.all():
if verified and not atv.verified:
continue
try:
attribute = at_map[atv.attribute_id]
except KeyError:
continue
value = atv.to_python()
if attribute.multiple:
values.setdefault(attribute.name, []).append(value)
else:
values[attribute.name] = value
def __setattr__(self, name, value):
try:
attribute = self.attributes[name]
except KeyError:
raise AttributeError(name)
attribute.set_value(self.owner, value, verified=bool(self.verified))
self.values[name] = value
def __getattr__(self, name):
try:
attribute = self.attributes[name]
except KeyError:
raise AttributeError(name)
return self.values.get(name, [] if attribute.multiple else None)
class AttributesDescriptor(object):
def __init__(self, verified=None):
self.verified = verified
def __get__(self, obj, objtype):
cache_name = '_attributes_verified_cache' if self.verified else '_attributes_cache'
if not hasattr(obj, cache_name):
setattr(obj, cache_name, Attributes(obj, verified=self.verified))
return getattr(obj, cache_name)
class IsVerified(object):
def __init__(self, user):
self.user = user
def __getattr__(self, name):
v = getattr(self.user.attributes, name, None)
return (
v is not None and
v == getattr(self.user.verified_attributes, name, None)
)
class IsVerifiedDescriptor(object):
def __get__(self, obj, objtype):
return IsVerified(obj)
class User(AbstractBaseUser, PermissionMixin):
"""
An abstract base class implementing a fully featured User model with
admin-compliant permissions.
Username, password and email are required. Other fields are optional.
"""
uuid = models.CharField(_('uuid'), max_length=32,
default=utils.get_hex_uuid, editable=False, unique=True)
username = models.CharField(_('username'), max_length=256, null=True, blank=True)
first_name = models.CharField(_('first name'), max_length=128, blank=True)
last_name = models.CharField(_('last name'), max_length=128, blank=True)
email = models.EmailField(_('email address'), blank=True,
validators=[validators.EmailValidator], max_length=254)
email_verified = models.BooleanField(
default=False,
verbose_name=_('email verified'))
is_staff = models.BooleanField(_('staff status'), default=False,
help_text=_('Designates whether the user can log into this admin '
'site.'))
is_active = models.BooleanField(_('active'), default=True,
help_text=_('Designates whether this user should be treated as '
'active. Unselect this instead of deleting accounts.'))
date_joined = models.DateTimeField(_('date joined'), default=timezone.now)
ou = models.ForeignKey(
verbose_name=_('organizational unit'),
to='a2_rbac.OrganizationalUnit',
blank=True,
null=True,
swappable=False)
modified = models.DateTimeField(
verbose_name=_('Last modification time'),
db_index=True,
auto_now=True)
objects = UserManager.from_queryset(UserQuerySet)()
attributes = AttributesDescriptor()
verified_attributes = AttributesDescriptor(verified=True)
is_verified = IsVerifiedDescriptor()
attribute_values = GenericRelation('authentic2.AttributeValue')
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
USER_PROFILE = ('first_name', 'last_name', 'email')
class Meta:
verbose_name = _('user')
verbose_name_plural = _('users')
permissions = (
('view_user', 'can see available users'),
)
ordering = ('last_name', 'first_name', 'email', 'username')
def get_full_name(self):
"""
Returns the first_name plus the last_name, with a space in between.
"""
full_name = '%s %s' % (self.first_name, self.last_name)
return full_name.strip() or self.username or self.email
def get_short_name(self):
"Returns the short name for the user."
return self.first_name or self.username or self.email or self.uuid[:6]
def email_user(self, subject, message, from_email=None):
"""
Sends an email to this User.
"""
send_mail(subject, message, from_email, [self.email])
def get_username(self):
"Return the identifying username for this User"
return self.username or self.email or self.get_full_name() or self.uuid
def roles_and_parents(self):
qs1 = self.roles.all()
qs2 = qs1.model.objects.filter(child_relation__child__in=qs1)
qs = (qs1 | qs2).order_by('name').distinct()
RoleParenting = get_role_parenting_model()
rp_qs = RoleParenting.objects.filter(child__in=qs1)
qs = qs.prefetch_related(models.Prefetch(
'child_relation', queryset=rp_qs), 'child_relation__parent')
qs = qs.prefetch_related(models.Prefetch(
'members', queryset=self.__class__.objects.filter(pk=self.pk), to_attr='member'))
return qs
def __unicode__(self):
human_name = self.username or self.email or self.get_full_name()
short_id = self.uuid[:6]
return u'%s (%s)' % (human_name, short_id)
def __repr__(self):
return '<User: %r>' % unicode(self)
def clean(self):
pass
def clean_fields(self, exclude=None):
errors = {}
with errorcollector(errors):
super(User, self).clean_fields(exclude=exclude)
exclude = exclude or []
model = self.__class__
qs = model.objects
if self.pk:
qs = qs.exclude(pk=self.pk)
if self.ou_id:
qs = qs.filter(ou_id=self.ou_id)
else:
qs = qs.filter(ou__isnull=True)
if 'username' not in exclude and self.username and app_settings.A2_USERNAME_IS_UNIQUE:
try:
try:
qs.get(username=self.username)
except MultipleObjectsReturned:
pass
except model.DoesNotExist:
pass
else:
errors.setdefault('username', []).append(
_('This username is already in use. Please supply a different username.'))
if 'email' not in exclude and self.email and app_settings.A2_EMAIL_IS_UNIQUE:
try:
try:
qs.get(email__iexact=self.email)
except MultipleObjectsReturned:
pass
except model.DoesNotExist:
pass
else:
errors.setdefault('email', []).append(
_('This email address is already in use. Please supply a different email '
'address.'))
if errors:
raise ValidationError(errors)
def natural_key(self):
return (self.uuid,)
def has_verified_attributes(self):
return AttributeValue.objects.with_owner(self).filter(verified=True).exists()
def to_json(self):
d = {}
for av in AttributeValue.objects.with_owner(self):
d[str(av.attribute.name)] = av.to_python()
d.update({
'uuid': self.uuid,
'username': self.username,
'email': self.email,
'ou': self.ou.name if self.ou else None,
'ou__uuid': self.ou.uuid if self.ou else None,
'ou__slug': self.ou.slug if self.ou else None,
'ou__name': self.ou.name if self.ou else None,
'first_name': self.first_name,
'last_name': self.last_name,
'is_superuser': self.is_superuser,
'roles': [role.to_json() for role in self.roles_and_parents()],
'services': [service.to_json(roles=self.roles_and_parents()) for service in Service.objects.all()],
})
return d
def save(self, *args, **kwargs):
sync = not(kwargs.pop('nosync', False))
rc = super(User, self).save(*args, **kwargs)
if sync:
for attr_name in ('first_name', 'last_name'):
try:
attribute = Attribute.objects.get(name=attr_name)
except Attribute.DoesNotExist:
pass
else:
if attribute.get_value(self) != getattr(self, attr_name, None):
attribute.set_value(self, getattr(self, attr_name, None))
return rc
def can_change_password(self):
return True