This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-pratic/src/authentic2_pratic/models.py

501 lines
17 KiB
Python

import logging
from django.db.models import (Model, TextField, CharField, EmailField,
URLField, BooleanField, IntegerField, ForeignKey, SlugField, Manager)
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.dispatch import receiver
from django.db.models.signals import post_save
from django.contrib.auth.models import Group
from authentic2 import managers
from authentic2.models import Service as Authentic2Service
from authentic2.custom_user.managers import UserManager
from authentic2.idp.signals import authorize_service
from authentic2.custom_user.models import User as BaseUser
from authentic2.a2_rbac.models import OrganizationalUnit
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY
from authentic2.saml.fields import MultiSelectField
from . import constants
class User(BaseUser):
# givenName -> first_name
# sn -> last_name
# userPassword -> password
# mail -> email
# cdg59isDisabled -> ! active
# cdg59lastConnectionTime -> last_login
# cn -> get_full_name()
# ou -> collectivity
# username = uid + '@' + collectivity.slug
USER_PROFILE = ['uid', 'first_name', 'last_name', 'email', 'collectivity',
'is_admin', 'sirh_code', 'direction',
'postal_address', 'fax', 'mobile', 'phone',
'certificate_issuer_dn', 'certificate_subject_dn']
uid = CharField(
verbose_name=_('identifier'),
max_length=128)
collectivity = ForeignKey(
'Collectivity',
related_name='collectivities',
verbose_name=_('collectivity'))
# cdg59isAdmin
is_admin = BooleanField(
verbose_name=_('is admin'),
default=False,
blank=True)
# cdg59agentSirhCode
sirh_code = CharField(
verbose_name=_('SIRH Code'),
max_length=32,
blank=True)
# cdg59direction
direction = CharField(
verbose_name=_('direction'),
max_length=128,
blank=True)
# cdg59lastConnectionDuration
last_login_duration = IntegerField(
verbose_name=_('last connection duration'),
default=0,
blank=True)
# cdg59serviceAccesses -> convert to ACLs
# employeeType
employee_type = CharField(
verbose_name=_('employee type'),
max_length=128,
blank=True)
# postalAddress
postal_address = TextField(
verbose_name=_('postal address'),
blank=True)
# facsimileTelephoneNumber
fax = CharField(
verbose_name=_('fax'),
max_length=32,
blank=True)
# mobile
mobile = CharField(
verbose_name=_('mobile'),
max_length=16,
blank=True)
# telephoneNumber
phone = CharField(
verbose_name=_('phone'),
max_length=32,
blank=True)
certificate_issuer_dn = CharField(
verbose_name=_('certificate issuer DN'),
max_length=256,
blank=True,
null=True)
certificate_subject_dn = CharField(
verbose_name=_('certificate subject DN'),
max_length=256,
blank=True,
null=True)
objects = UserManager()
def __unicode__(self):
return self.get_full_name().strip() or self.uid
class Meta:
verbose_name = _('agent')
verbose_name_plural = _('agents')
# enforce unicity of login by collectivity
unique_together = (('uid', 'collectivity'),)
def clean(self):
# prevent collisions between users from multiple collectivities
qs = self.__class__.objects.exclude(pk=self.pk)
if qs.filter(uid=self.uid, collectivity=self.collectivity).exists():
raise ValidationError(_('This username is already used'))
if self.uid and not self.username and self.collectivity:
self.username = u'%s@%s' % (self.uid, self.collectivity.slug)
if self.collectivity:
self.is_superuser = self.collectivity.is_superuser
self.is_staff = self.collectivity.is_superuser
self.ou = self.collectivity
# Fields to support
class Collectivity(OrganizationalUnit):
# Identifiers
# cn = ou
is_superuser = BooleanField(
verbose_name=_('is superuser'),
default=False,
blank=True)
# cdg59collectivityId
collectivity_id = CharField(
verbose_name=_('collectivity id'),
max_length=8,
blank=True)
# cdg59collectivitySirhCode
sirh_code = CharField(
verbose_name=_('SIRH Code'),
max_length=8,
blank=True)
# cdg59collectivitySirhLabel
sirh_label = CharField(
verbose_name=_('SIRH Label'),
max_length=64,
blank=True)
# cdg59inseeCode
insee_code = CharField(
verbose_name=_('INSEE Code'),
max_length=8,
blank=True)
# cdg59siretCode
siret_code = CharField(
verbose_name=_('SIRET Code'),
max_length=32,
blank=True)
# Postal addresse
# postalAddress
postal_address = TextField(
verbose_name=_('postal address'),
blank=True)
# cdg59streetNumber
street_number = CharField(
verbose_name=_('street number'),
max_length=8,
blank=True)
# street
street = CharField(
verbose_name=_('street'),
max_length=128,
blank=True)
# postalCode
postal_code = CharField(
verbose_name=_('postal code'),
max_length=16,
blank=True)
# cdg59addressCompl
address_complementary = CharField(
verbose_name=_('complementary address'),
max_length=64,
blank=True)
# cdg59addressMention
address_mention = CharField(
verbose_name=_('address mention'),
max_length=64,
blank=True)
# cdg59arrondissementCode
arrondissement_code = CharField(
verbose_name=_('arrondissement code'),
max_length=64,
blank=True)
# cdg59cantonCode
canton_code = CharField(
verbose_name=_('canton code'),
max_length=4,
blank=True)
# cdg59departementCode
departement_code = CharField(
verbose_name=_('departement code'),
max_length=2,
blank=True)
# cdg59distOffice
dist_office = CharField(
verbose_name=_('distribution office'),
max_length=64,
blank=True)
# cdg59regionCode
region_code = CharField(
verbose_name=_('distribution office'),
max_length=4,
blank=True)
# Contact
# telephoneNumber
phone = CharField(
verbose_name=_('phone'),
max_length=32,
blank=True)
# facsimileTelephoneNumber
fax = CharField(
verbose_name=_('fax'),
max_length=32,
blank=True)
# mail
email = EmailField(
verbose_name=_('email'),
max_length=64,
blank=True)
# cdg59URL
url = URLField(
verbose_name=_('URL'),
max_length=128,
blank=True)
certificate_issuer_dn = CharField(
verbose_name=_('certificate issuer DN'),
max_length=256,
blank=True,
null=True)
certificate_subject_dn = CharField(
verbose_name=_('certificate subject DN'),
max_length=256,
blank=True,
null=True)
objects = managers.GetByNameManager()
def natural_key(self):
return (self.name,)
def __unicode__(self):
return self.name
def save(self, *args, **kwargs):
# set is_superuser on users
response = super(Collectivity, self).save(*args, **kwargs)
User.objects.filter(collectivity=self).update(is_superuser=self.is_superuser)
return response
class Meta:
verbose_name = _('collectivity')
verbose_name_plural = _('collectivities')
ordering = ('name',)
# do not allow two collectivities to have the same certificate
unique_together = (('certificate_issuer_dn', 'certificate_subject_dn'),)
class Service(Model):
# Services without a collectivity are global
# cn
name = CharField(
verbose_name=_('name'),
max_length=128,
unique=True)
# cdg59sid
slug = SlugField(
verbose_name=_('slug'),
max_length=128,
unique=True)
is_global = BooleanField(
verbose_name=_('is global'),
default=False,
blank=True)
service_url = URLField(
verbose_name=_('service base URL'))
metadata_url = URLField(
verbose_name=_('SAML Metadata URL'),
blank=True)
cas_service_url = URLField(
verbose_name=_('CAS service URL'),
blank=True)
a2_service = ForeignKey(
to='authentic2.Service',
verbose_name=_('related authentic2 service'),
related_name='pratic_service',
blank=True,
null=True)
def __unicode__(self):
return self.name
objects = managers.GetByNameManager()
def natural_key(self):
return (self.name,)
class Meta:
verbose_name = _('service')
verbose_name_plural = _('services')
ordering = ('name',)
class ServiceInstanceManager(Manager):
def get_by_natural_key(self, slug, service_nk, collectivity_nk):
try:
service = Service.objects.get_by_natural_key(*service_nk)
except Service.DoesNotExist:
raise ServiceInstance.DoesNotExist
try:
collectivity = Collectivity.objects.get_by_natural_key(*collectivity_nk)
except Collectivity.DoesNotExist:
raise ServiceInstance.DoesNotExist
return self.get(slug=slug, service=service, collectivity=collectivity)
class ServiceInstance(Model):
# cdg59sid
slug = SlugField(
verbose_name=_('slug'),
max_length=128)
service = ForeignKey(
'Service',
verbose_name=_('service'),
related_name='service_instances')
collectivity = ForeignKey(
'Collectivity',
verbose_name=_('collectivity'),
related_name='service_instances')
service_url = URLField(
verbose_name=_('service base URL'),
blank=True)
metadata_url = URLField(
verbose_name=_('SAML Metadata URL'),
blank=True)
cas_service_url = URLField(
verbose_name=_('CAS service URL'),
blank=True)
authentication_level = MultiSelectField(
verbose_name=_('authentification level'),
max_length=256,
blank=False,
default='password-on-https,ssl-collectivity,ssl',
choices=constants.AUTHENTICATION_LEVELS)
a2_service = ForeignKey(
to='authentic2.Service',
verbose_name=_('related authentic2 service'),
related_name='pratic_service_instances',
blank=True,
null=True)
def __unicode__(self):
return unicode(self.service)
objects = ServiceInstanceManager()
def natural_key(self):
return (self.slug, self.service.natural_key(), self.collectivity.natural_key())
def clean(self):
super(ServiceInstance, self).clean()
if self.service_id:
if self.collectivity and self.service.is_global:
qs = ServiceInstance.objects.exclude(id=self.id)
qs = qs.filter(collectivity=self.collectivity,
service=self.service)
if qs.exists():
raise ValidationError(_('There can be only one instance of a global service by collectivity'))
if self.service.is_global:
self.service_url = self.service.service_url
self.metadata_url = self.service.metadata_url
self.cas_service_url = self.service.cas_service_url
if self.service.a2_service_id:
self.a2_service = self.service.a2_service
if not self.service.is_global and not self.service_url:
raise ValidationError(_('Service URL field is required'))
class Meta:
verbose_name = _('service instance')
verbose_name = _('service instances')
unique_together = (('slug', 'collectivity'),)
ordering = ('service__name', 'slug')
class AccessManager(Manager):
def get_by_natural_key(self, user_nk, service_instance_nk):
try:
user = User.objects.get_by_natural_key(*user_nk)
except User.DoesNotExist:
raise ServiceInstance.DoesNotExist
try:
service_instance = ServiceInstance.objects.get_by_natural_key(*service_instance_nk)
except ServiceInstance.DoesNotExist:
raise Access.DoesNotExist
return self.get(user=user, service_instance=service_instance)
class Access(Model):
user = ForeignKey('User',
verbose_name=_('user'))
service_instance = ForeignKey('ServiceInstance',
verbose_name=_('service instance'))
objects = AccessManager()
def clean(self):
if self.user and self.service_instance and self.user.collectivity != \
self.service_instance.collectivity:
raise ValidationError(_('Access can only be created between users '
'and service instances of the same collectivity'))
def natural_key(self):
return (self.user.natural_key(), self.service_instance.natural_key())
class Meta:
verbose_name = _('access')
verbose_name = _('accesses')
unique_together = (('user', 'service_instance'),)
ordering = ('user__last_name', 'user__first_name', 'service_instance__service__name')
# helper function to build authorization responses
def authz(value, message=None):
d = {'authz': value}
if message:
d['message'] = message
return d
@receiver(authorize_service)
def authorize_service_cb(request, user, audience, attributes, **kwargs):
logger = logging.getLogger(__name__)
if user.is_superuser:
logger.info('%r is authorized to connect on %r because he is a '
'superuser', unicode(user), audience)
return authz(True)
if not hasattr(user, 'collectivity'):
return authz(False, 'not a pr@tic user')
collectivity = user.collectivity
try:
if isinstance(audience, basestring):
si = ServiceInstance.objects.get(collectivity=collectivity,
metadata_url=audience)
elif isinstance(audience, Authentic2Service):
si = ServiceInstance.objects.get(collectivity=collectivity,
a2_service=audience)
except ServiceInstance.DoesNotExist:
logger.warn('unable to find service for audience %r and user %r in collectivity %r',
audience, unicode(user), unicode(collectivity))
return authz(False, _('This service is unknown.'))
events = request.session.get(AUTHENTICATION_EVENTS_SESSION_KEY, [])
if not any(event['how'] in si.authentication_level for event in events):
return authz(False, _('This service requires certificate authentication.'))
if Access.objects.filter(service_instance=si, user=user).exists():
logger.info('%r of collectivity %r is authorized to connect on %r', unicode(user),
unicode(collectivity), audience)
return authz(True)
elif user.is_admin:
logger.info('%r is authorized to connect on %r because he is a '
'local admin', unicode(user), audience)
return authz(True)
else:
logger.warn('%r of collectivity %r is forbidden to connect on %r', unicode(user),
unicode(collectivity), audience)
return authz(False, _('You are not authorized to access this service. '
'Please contact your administrator'))
@receiver(post_save)
def service_post_save(sender, instance, created, raw, **kwargs):
"""Create SAML provider models"""
from . import utils
if raw:
return
if sender not in (Service, ServiceInstance):
return
utils.sync_saml_provider(instance)
utils.sync_cas_provider(instance)
if getattr(instance, 'is_global', None):
instance.service_instances.update(service_url=instance.service_url,
metadata_url=instance.metadata_url,
a2_service_id=instance.a2_service_id)
@receiver(post_save, sender=User)
def user_post_save(sender, instance, **kwargs):
group, created = Group.objects.get_or_create(name='Agent pr@tic')
instance.groups.add(group)
if instance.is_admin:
instance.collectivity.get_admin_role().members.add(instance)
from authentic2.a2_rbac.signal_handlers import update_rbac_on_ou_save
post_save.connect(update_rbac_on_ou_save, sender=Collectivity)