217 lines
8.0 KiB
Python
217 lines
8.0 KiB
Python
import re
|
|
import uuid
|
|
import tempfile
|
|
import subprocess
|
|
|
|
from django import forms
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.translation import ugettext_lazy as _, ugettext
|
|
|
|
from django.contrib.auth import authenticate
|
|
|
|
from authentic2.manager.forms import UserEditForm, UserAddForm
|
|
|
|
from . import models, fields
|
|
|
|
class BaseForm(forms.ModelForm):
|
|
required_css_class = 'required'
|
|
error_css_class = 'error'
|
|
|
|
class ServiceForm(BaseForm):
|
|
class Meta:
|
|
model = models.Service
|
|
fields = ('name', 'slug', 'is_global', 'service_url',
|
|
'metadata_url', 'cas_service_url')
|
|
|
|
class CertificateMixin(forms.ModelForm):
|
|
certificate = forms.FileField(label=_('X509 Certificate'), help_text=_('You can set '
|
|
'the issuer and subject DNs by uploading the certificate'), required=False)
|
|
|
|
def clean(self):
|
|
super(CertificateMixin, self).clean()
|
|
cleaned_data = self.cleaned_data
|
|
certificate = cleaned_data.get('certificate')
|
|
if certificate is not None:
|
|
with tempfile.NamedTemporaryFile() as certificate_file:
|
|
certificate_file.write(certificate.read())
|
|
certificate_file.flush()
|
|
description = subprocess.check_output(['/usr/bin/openssl', 'x509', '-text', '-in', certificate_file.name])
|
|
match = re.findall(r'(?:Issuer|Subject): (.*)', description)
|
|
if len(match) == 2:
|
|
cleaned_data['certificate_issuer_dn'] = '/' + match[0].replace(', ', '/')
|
|
cleaned_data['certificate_subject_dn'] = '/' + match[1].replace(', ', '/')
|
|
if not cleaned_data.get('certificate_issuer_dn') or \
|
|
not cleaned_data.get('certificate_subject_dn'):
|
|
cleaned_data['certificate_issuer_dn'] = None
|
|
cleaned_data['certificate_subject_dn'] = None
|
|
return cleaned_data
|
|
|
|
class CollectivityForm(CertificateMixin, BaseForm):
|
|
class Meta:
|
|
model = models.Collectivity
|
|
exclude = ('username_is_unique', 'email_is_unique', 'uuid', 'default')
|
|
|
|
class ServiceInstanceForm(BaseForm):
|
|
def clean(self):
|
|
# check unicity constraint on slug and collectivity
|
|
if self.instance:
|
|
model = self.instance.__class__
|
|
if model.objects.exclude(pk=self.instance.pk) \
|
|
.filter(collectivity=self.instance.collectivity,
|
|
slug=self.cleaned_data.get('slug')).exists():
|
|
raise ValidationError(_('This slug is already used'))
|
|
|
|
class Meta:
|
|
model = models.ServiceInstance
|
|
fields = ('slug', 'service', 'service_url', 'metadata_url',
|
|
'cas_service_url', 'authentication_level')
|
|
|
|
class AccessForm(BaseForm):
|
|
def __init__(self, *args, **kwargs):
|
|
collectivity = kwargs.pop('collectivity')
|
|
super(AccessForm, self).__init__(*args, **kwargs)
|
|
self.fields['user'].queryset = self.fields['user'].queryset.filter(collectivity=collectivity)
|
|
self.fields['service_instance'].queryset = self.fields['service_instance'].queryset.filter(collectivity=collectivity)
|
|
|
|
class Meta:
|
|
model = models.Access
|
|
fields = '__all__'
|
|
|
|
|
|
class UserMixin(object):
|
|
def clean(self):
|
|
cleaned_data = self.cleaned_data
|
|
cleaned_data['username'] = str(uuid.uuid1())
|
|
return super(UserMixin, self).clean()
|
|
|
|
|
|
class UserEditForm(UserMixin, CertificateMixin, UserEditForm):
|
|
class Meta:
|
|
model = models.User
|
|
fields = (
|
|
'uid',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'is_admin',
|
|
'direction',
|
|
'employee_type',
|
|
'sirh_code',
|
|
'postal_address',
|
|
'fax',
|
|
'mobile',
|
|
'phone',
|
|
'certificate_issuer_dn',
|
|
'certificate_subject_dn',
|
|
'certificate',
|
|
)
|
|
|
|
class UserAddForm(UserMixin, CertificateMixin, UserAddForm):
|
|
class Meta:
|
|
model = models.User
|
|
fields = (
|
|
'uid',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'is_admin',
|
|
'direction',
|
|
'employee_type',
|
|
'sirh_code',
|
|
'postal_address',
|
|
'fax',
|
|
'mobile',
|
|
'phone',
|
|
'certificate_issuer_dn',
|
|
'certificate_subject_dn',
|
|
'certificate',
|
|
)
|
|
|
|
class AuthenticationForm(forms.Form):
|
|
how = None
|
|
|
|
collectivity = fields.CollectivityField(
|
|
queryset=models.Collectivity.objects,
|
|
label=_('Collectivity'))
|
|
username = forms.CharField(label=_('Username'))
|
|
password = forms.CharField(label=_('Password'), widget=forms.PasswordInput)
|
|
|
|
error_messages = {
|
|
'invalid_login': _("Please enter a correct username and password. "
|
|
"Note that both fields may be case-sensitive."),
|
|
'inactive': _("This account is inactive."),
|
|
}
|
|
|
|
def __init__(self, request=None, *args, **kwargs):
|
|
"""
|
|
The 'request' parameter is set for custom auth use by subclasses.
|
|
The form data comes in via the standard 'data' kwarg.
|
|
"""
|
|
self.request = request
|
|
self.user_cache = None
|
|
super(AuthenticationForm, self).__init__(*args, **kwargs)
|
|
if request and hasattr(request, 'ssl_info') and request.ssl_info \
|
|
and request.ssl_info.collectivity:
|
|
self.fields['collectivity'].queryset = request.ssl_info.collectivity
|
|
self.fields['collectivity'].initial = request.ssl_info.collectivity[0].pk
|
|
if len(request.ssl_info.collectivity) < 2:
|
|
self.fields['collectivity'].widget.attrs['readonly'] = 'readonly'
|
|
|
|
def clean(self):
|
|
collectivity = self.cleaned_data.get('collectivity')
|
|
username = self.cleaned_data.get('username')
|
|
password = self.cleaned_data.get('password')
|
|
|
|
if username and password and collectivity:
|
|
if self.request and hasattr(self.request, 'ssl_info') and self.request.ssl_info:
|
|
self.user_cache = authenticate(collectivity=collectivity,
|
|
username=username,
|
|
password=password,
|
|
ssl_info=self.request.ssl_info)
|
|
if self.user_cache:
|
|
self.how = 'ssl-collectivity'
|
|
else:
|
|
self.user_cache = authenticate(collectivity=collectivity,
|
|
username=username,
|
|
password=password)
|
|
if self.user_cache:
|
|
self.how = 'password'
|
|
if self.user_cache is None:
|
|
raise forms.ValidationError(
|
|
self.error_messages['invalid_login'],
|
|
code='invalid_login',
|
|
params={'username': ugettext('username')},
|
|
)
|
|
else:
|
|
self.confirm_login_allowed(self.user_cache)
|
|
|
|
return self.cleaned_data
|
|
|
|
def confirm_login_allowed(self, user):
|
|
"""
|
|
Controls whether the given User may log in. This is a policy setting,
|
|
independent of end-user authentication. This default behavior is to
|
|
allow login by active users, and reject login by inactive users.
|
|
|
|
If the given user cannot log in, this method should raise a
|
|
``forms.ValidationError``.
|
|
|
|
If the given user may log in, this method should return None.
|
|
"""
|
|
if not user.is_active:
|
|
raise forms.ValidationError(
|
|
self.error_messages['inactive'],
|
|
code='inactive',
|
|
)
|
|
|
|
def get_user_id(self):
|
|
if self.user_cache:
|
|
return self.user_cache.id
|
|
return None
|
|
|
|
def get_user(self):
|
|
return self.user_cache
|
|
|
|
def get_how(self):
|
|
return self.how
|