504 lines
17 KiB
Python
504 lines
17 KiB
Python
import logging
|
|
|
|
from django.db.models import (Model, TextField, CharField, EmailField,
|
|
URLField, BooleanField, IntegerField, ForeignKey, SlugField, Manager)
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.core.exceptions import ValidationError
|
|
from django.dispatch import receiver
|
|
from django.db.models.signals import post_save, post_delete
|
|
from django.contrib.auth.models import Group
|
|
|
|
from authentic2 import managers
|
|
from authentic2.models import Service as Authentic2Service
|
|
from authentic2.idp.signals import authorize_service
|
|
from authentic2.custom_user.models import User as BaseUser
|
|
from authentic2.a2_rbac.models import OrganizationalUnit
|
|
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY
|
|
from authentic2.saml.fields import MultiSelectField
|
|
|
|
from . import constants, managers
|
|
|
|
|
|
class User(BaseUser):
|
|
# givenName -> first_name
|
|
# sn -> last_name
|
|
# userPassword -> password
|
|
# mail -> email
|
|
# cdg59isDisabled -> ! active
|
|
# cdg59lastConnectionTime -> last_login
|
|
# cn -> get_full_name()
|
|
# ou -> collectivity
|
|
# username = uid + '@' + collectivity.slug
|
|
USER_PROFILE = ['uid', 'first_name', 'last_name', 'email', 'collectivity',
|
|
'is_admin', 'sirh_code', 'direction',
|
|
'postal_address', 'fax', 'mobile', 'phone',
|
|
'certificate_issuer_dn', 'certificate_subject_dn']
|
|
|
|
uid = CharField(
|
|
verbose_name=_('identifier'),
|
|
max_length=128)
|
|
collectivity = ForeignKey(
|
|
'Collectivity',
|
|
related_name='collectivities',
|
|
verbose_name=_('collectivity'))
|
|
|
|
# cdg59isAdmin
|
|
is_admin = BooleanField(
|
|
verbose_name=_('is admin'),
|
|
default=False,
|
|
blank=True)
|
|
# cdg59agentSirhCode
|
|
sirh_code = CharField(
|
|
verbose_name=_('SIRH Code'),
|
|
max_length=32,
|
|
blank=True)
|
|
# cdg59direction
|
|
direction = CharField(
|
|
verbose_name=_('direction'),
|
|
max_length=128,
|
|
blank=True)
|
|
# cdg59lastConnectionDuration
|
|
last_login_duration = IntegerField(
|
|
verbose_name=_('last connection duration'),
|
|
default=0,
|
|
blank=True)
|
|
# cdg59serviceAccesses -> convert to ACLs
|
|
# employeeType
|
|
employee_type = CharField(
|
|
verbose_name=_('employee type'),
|
|
max_length=128,
|
|
blank=True)
|
|
# postalAddress
|
|
postal_address = TextField(
|
|
verbose_name=_('postal address'),
|
|
blank=True)
|
|
# facsimileTelephoneNumber
|
|
fax = CharField(
|
|
verbose_name=_('fax'),
|
|
max_length=32,
|
|
blank=True)
|
|
# mobile
|
|
mobile = CharField(
|
|
verbose_name=_('mobile'),
|
|
max_length=16,
|
|
blank=True)
|
|
# telephoneNumber
|
|
phone = CharField(
|
|
verbose_name=_('phone'),
|
|
max_length=32,
|
|
blank=True)
|
|
certificate_issuer_dn = CharField(
|
|
verbose_name=_('certificate issuer DN'),
|
|
max_length=256,
|
|
blank=True,
|
|
null=True)
|
|
certificate_subject_dn = CharField(
|
|
verbose_name=_('certificate subject DN'),
|
|
max_length=256,
|
|
blank=True,
|
|
null=True)
|
|
|
|
objects = managers.UserManager()
|
|
|
|
def __unicode__(self):
|
|
return self.get_full_name().strip() or self.uid
|
|
|
|
def natural_key(self):
|
|
return (self.collectivity.slug, self.uid)
|
|
|
|
class Meta:
|
|
verbose_name = _('agent')
|
|
verbose_name_plural = _('agents')
|
|
# enforce unicity of login by collectivity
|
|
unique_together = (('uid', 'collectivity'),)
|
|
|
|
def clean(self):
|
|
# prevent collisions between users from multiple collectivities
|
|
qs = self.__class__.objects.exclude(pk=self.pk)
|
|
if qs.filter(uid=self.uid, collectivity=self.collectivity).exists():
|
|
raise ValidationError(_('This username is already used'))
|
|
if self.uid and not self.username and self.collectivity:
|
|
self.username = u'%s@%s' % (self.uid, self.collectivity.slug)
|
|
if self.collectivity:
|
|
self.is_superuser = self.collectivity.is_superuser
|
|
self.is_staff = self.collectivity.is_superuser
|
|
self.ou = self.collectivity
|
|
|
|
# Fields to support
|
|
class Collectivity(OrganizationalUnit):
|
|
# Identifiers
|
|
# cn = ou
|
|
is_superuser = BooleanField(
|
|
verbose_name=_('is superuser'),
|
|
default=False,
|
|
blank=True)
|
|
# cdg59collectivityId
|
|
collectivity_id = CharField(
|
|
verbose_name=_('collectivity id'),
|
|
max_length=8,
|
|
blank=True)
|
|
# cdg59collectivitySirhCode
|
|
sirh_code = CharField(
|
|
verbose_name=_('SIRH Code'),
|
|
max_length=8,
|
|
blank=True)
|
|
# cdg59collectivitySirhLabel
|
|
sirh_label = CharField(
|
|
verbose_name=_('SIRH Label'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59inseeCode
|
|
insee_code = CharField(
|
|
verbose_name=_('INSEE Code'),
|
|
max_length=8,
|
|
blank=True)
|
|
# cdg59siretCode
|
|
siret_code = CharField(
|
|
verbose_name=_('SIRET Code'),
|
|
max_length=32,
|
|
blank=True)
|
|
|
|
|
|
# Postal addresse
|
|
# postalAddress
|
|
postal_address = TextField(
|
|
verbose_name=_('postal address'),
|
|
blank=True)
|
|
# cdg59streetNumber
|
|
street_number = CharField(
|
|
verbose_name=_('street number'),
|
|
max_length=8,
|
|
blank=True)
|
|
# street
|
|
street = CharField(
|
|
verbose_name=_('street'),
|
|
max_length=128,
|
|
blank=True)
|
|
# postalCode
|
|
postal_code = CharField(
|
|
verbose_name=_('postal code'),
|
|
max_length=16,
|
|
blank=True)
|
|
# cdg59addressCompl
|
|
address_complementary = CharField(
|
|
verbose_name=_('complementary address'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59addressMention
|
|
address_mention = CharField(
|
|
verbose_name=_('address mention'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59arrondissementCode
|
|
arrondissement_code = CharField(
|
|
verbose_name=_('arrondissement code'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59cantonCode
|
|
canton_code = CharField(
|
|
verbose_name=_('canton code'),
|
|
max_length=4,
|
|
blank=True)
|
|
# cdg59departementCode
|
|
departement_code = CharField(
|
|
verbose_name=_('departement code'),
|
|
max_length=2,
|
|
blank=True)
|
|
# cdg59distOffice
|
|
dist_office = CharField(
|
|
verbose_name=_('distribution office'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59regionCode
|
|
region_code = CharField(
|
|
verbose_name=_('distribution office'),
|
|
max_length=4,
|
|
blank=True)
|
|
# Contact
|
|
# telephoneNumber
|
|
phone = CharField(
|
|
verbose_name=_('phone'),
|
|
max_length=32,
|
|
blank=True)
|
|
# facsimileTelephoneNumber
|
|
fax = CharField(
|
|
verbose_name=_('fax'),
|
|
max_length=32,
|
|
blank=True)
|
|
# mail
|
|
email = EmailField(
|
|
verbose_name=_('email'),
|
|
max_length=64,
|
|
blank=True)
|
|
# cdg59URL
|
|
url = URLField(
|
|
verbose_name=_('URL'),
|
|
max_length=128,
|
|
blank=True)
|
|
certificate_issuer_dn = CharField(
|
|
verbose_name=_('certificate issuer DN'),
|
|
max_length=256,
|
|
blank=True,
|
|
null=True)
|
|
certificate_subject_dn = CharField(
|
|
verbose_name=_('certificate subject DN'),
|
|
max_length=256,
|
|
blank=True,
|
|
null=True)
|
|
|
|
objects = managers.GetByNameManager()
|
|
|
|
def natural_key(self):
|
|
return (self.name,)
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
def save(self, *args, **kwargs):
|
|
# set is_superuser on users
|
|
response = super(Collectivity, self).save(*args, **kwargs)
|
|
User.objects.filter(collectivity=self).update(is_superuser=self.is_superuser)
|
|
return response
|
|
|
|
class Meta:
|
|
verbose_name = _('collectivity')
|
|
verbose_name_plural = _('collectivities')
|
|
ordering = ('name',)
|
|
# do not allow two collectivities to have the same certificate
|
|
unique_together = (('certificate_issuer_dn', 'certificate_subject_dn'),)
|
|
|
|
|
|
class Service(Model):
|
|
# Services without a collectivity are global
|
|
# cn
|
|
name = CharField(
|
|
verbose_name=_('name'),
|
|
max_length=128,
|
|
unique=True)
|
|
# cdg59sid
|
|
slug = SlugField(
|
|
verbose_name=_('slug'),
|
|
max_length=128,
|
|
unique=True)
|
|
is_global = BooleanField(
|
|
verbose_name=_('is global'),
|
|
default=False,
|
|
blank=True)
|
|
service_url = URLField(
|
|
verbose_name=_('service base URL'))
|
|
metadata_url = URLField(
|
|
verbose_name=_('SAML Metadata URL'),
|
|
blank=True)
|
|
cas_service_url = URLField(
|
|
verbose_name=_('CAS service URL'),
|
|
blank=True)
|
|
a2_service = ForeignKey(
|
|
to='authentic2.Service',
|
|
verbose_name=_('related authentic2 service'),
|
|
related_name='pratic_service',
|
|
blank=True,
|
|
null=True)
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
objects = managers.GetByNameManager()
|
|
|
|
def natural_key(self):
|
|
return (self.name,)
|
|
|
|
class Meta:
|
|
verbose_name = _('service')
|
|
verbose_name_plural = _('services')
|
|
ordering = ('name',)
|
|
|
|
class ServiceInstanceManager(Manager):
|
|
def get_by_natural_key(self, slug, service_nk, collectivity_nk):
|
|
try:
|
|
service = Service.objects.get_by_natural_key(*service_nk)
|
|
except Service.DoesNotExist:
|
|
raise ServiceInstance.DoesNotExist
|
|
try:
|
|
collectivity = Collectivity.objects.get_by_natural_key(*collectivity_nk)
|
|
except Collectivity.DoesNotExist:
|
|
raise ServiceInstance.DoesNotExist
|
|
return self.get(slug=slug, service=service, collectivity=collectivity)
|
|
|
|
class ServiceInstance(Model):
|
|
# cdg59sid
|
|
slug = SlugField(
|
|
verbose_name=_('slug'),
|
|
max_length=128)
|
|
service = ForeignKey(
|
|
'Service',
|
|
verbose_name=_('service'),
|
|
related_name='service_instances')
|
|
collectivity = ForeignKey(
|
|
'Collectivity',
|
|
verbose_name=_('collectivity'),
|
|
related_name='service_instances')
|
|
service_url = URLField(
|
|
verbose_name=_('service base URL'),
|
|
blank=True)
|
|
metadata_url = URLField(
|
|
verbose_name=_('SAML Metadata URL'),
|
|
blank=True)
|
|
cas_service_url = URLField(
|
|
verbose_name=_('CAS service URL'),
|
|
blank=True)
|
|
authentication_level = MultiSelectField(
|
|
verbose_name=_('authentification level'),
|
|
max_length=256,
|
|
blank=False,
|
|
default='password-on-https,ssl-collectivity,ssl',
|
|
choices=constants.AUTHENTICATION_LEVELS)
|
|
a2_service = ForeignKey(
|
|
to='authentic2.Service',
|
|
verbose_name=_('related authentic2 service'),
|
|
related_name='pratic_service_instances',
|
|
blank=True,
|
|
null=True)
|
|
|
|
def __unicode__(self):
|
|
return unicode(self.service)
|
|
|
|
objects = ServiceInstanceManager()
|
|
|
|
def natural_key(self):
|
|
return (self.slug, self.service.natural_key(), self.collectivity.natural_key())
|
|
|
|
def clean(self):
|
|
super(ServiceInstance, self).clean()
|
|
if self.service_id:
|
|
if self.collectivity and self.service.is_global:
|
|
qs = ServiceInstance.objects.exclude(id=self.id)
|
|
qs = qs.filter(collectivity=self.collectivity,
|
|
service=self.service)
|
|
if qs.exists():
|
|
raise ValidationError(_('There can be only one instance of a global service by collectivity'))
|
|
if self.service.is_global:
|
|
self.service_url = self.service.service_url
|
|
self.metadata_url = self.service.metadata_url
|
|
self.cas_service_url = self.service.cas_service_url
|
|
if self.service.a2_service_id:
|
|
self.a2_service = self.service.a2_service
|
|
if not self.service.is_global and not self.service_url:
|
|
raise ValidationError(_('Service URL field is required'))
|
|
|
|
class Meta:
|
|
verbose_name = _('service instance')
|
|
verbose_name = _('service instances')
|
|
unique_together = (('slug', 'collectivity'),)
|
|
ordering = ('service__name', 'slug')
|
|
|
|
class AccessManager(Manager):
|
|
def get_by_natural_key(self, user_nk, service_instance_nk):
|
|
try:
|
|
user = User.objects.get_by_natural_key(*user_nk)
|
|
except User.DoesNotExist:
|
|
raise ServiceInstance.DoesNotExist
|
|
try:
|
|
service_instance = ServiceInstance.objects.get_by_natural_key(*service_instance_nk)
|
|
except ServiceInstance.DoesNotExist:
|
|
raise Access.DoesNotExist
|
|
return self.get(user=user, service_instance=service_instance)
|
|
|
|
class Access(Model):
|
|
user = ForeignKey('User',
|
|
verbose_name=_('user'))
|
|
service_instance = ForeignKey('ServiceInstance',
|
|
verbose_name=_('service instance'))
|
|
|
|
objects = AccessManager()
|
|
|
|
def clean(self):
|
|
if self.user and self.service_instance and self.user.collectivity != \
|
|
self.service_instance.collectivity:
|
|
raise ValidationError(_('Access can only be created between users '
|
|
'and service instances of the same collectivity'))
|
|
|
|
def natural_key(self):
|
|
return (self.user.natural_key(), self.service_instance.natural_key())
|
|
|
|
class Meta:
|
|
verbose_name = _('access')
|
|
verbose_name = _('accesses')
|
|
unique_together = (('user', 'service_instance'),)
|
|
ordering = ('user__last_name', 'user__first_name', 'service_instance__service__name')
|
|
|
|
|
|
# helper function to build authorization responses
|
|
def authz(value, message=None):
|
|
d = {'authz': value}
|
|
if message:
|
|
d['message'] = message
|
|
return d
|
|
|
|
@receiver(authorize_service)
|
|
def authorize_service_cb(request, user, audience, attributes, **kwargs):
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if user.is_superuser:
|
|
logger.info('%r is authorized to connect on %r because he is a '
|
|
'superuser', unicode(user), audience)
|
|
return authz(True)
|
|
|
|
if not hasattr(user, 'collectivity'):
|
|
return authz(False, 'not a pr@tic user')
|
|
collectivity = user.collectivity
|
|
try:
|
|
if isinstance(audience, basestring):
|
|
si = ServiceInstance.objects.get(collectivity=collectivity,
|
|
metadata_url=audience)
|
|
elif isinstance(audience, Authentic2Service):
|
|
si = ServiceInstance.objects.get(collectivity=collectivity,
|
|
a2_service=audience)
|
|
except ServiceInstance.DoesNotExist:
|
|
logger.warn('unable to find service for audience %r and user %r in collectivity %r',
|
|
audience, unicode(user), unicode(collectivity))
|
|
return authz(False, _('This service is unknown.'))
|
|
events = request.session.get(AUTHENTICATION_EVENTS_SESSION_KEY, [])
|
|
if not any(event['how'] in si.authentication_level for event in events):
|
|
return authz(False, _('This service requires certificate authentication.'))
|
|
|
|
if Access.objects.filter(service_instance=si, user=user).exists():
|
|
logger.info('%r of collectivity %r is authorized to connect on %r', unicode(user),
|
|
unicode(collectivity), audience)
|
|
return authz(True)
|
|
elif user.is_admin:
|
|
logger.info('%r is authorized to connect on %r because he is a '
|
|
'local admin', unicode(user), audience)
|
|
return authz(True)
|
|
else:
|
|
logger.warn('%r of collectivity %r is forbidden to connect on %r', unicode(user),
|
|
unicode(collectivity), audience)
|
|
return authz(False, _('You are not authorized to access this service. '
|
|
'Please contact your administrator'))
|
|
|
|
@receiver(post_save)
|
|
def service_post_save(sender, instance, created, raw, **kwargs):
|
|
"""Create SAML provider models"""
|
|
from . import utils
|
|
if raw:
|
|
return
|
|
if sender not in (Service, ServiceInstance):
|
|
return
|
|
utils.sync_saml_provider(instance)
|
|
utils.sync_cas_provider(instance)
|
|
if getattr(instance, 'is_global', None):
|
|
instance.service_instances.update(service_url=instance.service_url,
|
|
metadata_url=instance.metadata_url,
|
|
a2_service_id=instance.a2_service_id)
|
|
|
|
|
|
@receiver(post_save, sender=User)
|
|
def user_post_save(sender, instance, **kwargs):
|
|
group, created = Group.objects.get_or_create(name='Agent pr@tic')
|
|
instance.groups.add(group)
|
|
if instance.is_admin:
|
|
instance.collectivity.get_admin_role().members.add(instance)
|
|
|
|
from authentic2.a2_rbac.signal_handlers import update_rbac_on_ou_post_save, update_rbac_on_ou_post_delete
|
|
post_save.connect(update_rbac_on_ou_post_save, sender=Collectivity)
|
|
post_delete.connect(update_rbac_on_ou_post_delete, sender=Collectivity)
|