This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-pratic/src/authentic2_pratic/forms.py

200 lines
7.0 KiB
Python

import re
import uuid
import tempfile
import subprocess
from django import forms
from django.utils.translation import ugettext_lazy as _, ugettext
from django.contrib.auth import authenticate
from authentic2.manager.forms import UserEditForm, UserAddForm
from . import models, fields
class BaseForm(forms.ModelForm):
required_css_class = 'required'
error_css_class = 'error'
class ServiceForm(BaseForm):
class Meta:
model = models.Service
fields = '__all__'
class CertificateMixin(forms.ModelForm):
certificate = forms.FileField(label=_('X509 Certificate'), help_text=_('You can set '
'the issuer and subject DNs by uploading the certificate'), required=False)
def clean(self):
super(CertificateMixin, self).clean()
cleaned_data = self.cleaned_data
certificate = cleaned_data.get('certificate')
if certificate is not None:
with tempfile.NamedTemporaryFile() as certificate_file:
certificate_file.write(certificate.read())
certificate_file.flush()
description = subprocess.check_output(['/usr/bin/openssl', 'x509', '-text', '-in', certificate_file.name])
match = re.findall(r'(?:Issuer|Subject): (.*)', description)
if len(match) == 2:
cleaned_data['certificate_issuer_dn'] = '/' + match[0].replace(', ', '/')
cleaned_data['certificate_subject_dn'] = '/' + match[1].replace(', ', '/')
if not cleaned_data.get('certificate_issuer_dn') or \
not cleaned_data.get('certificate_subject_dn'):
cleaned_data['certificate_issuer_dn'] = None
cleaned_data['certificate_subject_dn'] = None
return cleaned_data
class CollectivityForm(CertificateMixin, BaseForm):
class Meta:
model = models.Collectivity
exclude = ('uuid', 'default')
class ServiceInstanceForm(BaseForm):
class Meta:
model = models.ServiceInstance
exclude = ('collectivity',)
class AccessForm(BaseForm):
def __init__(self, *args, **kwargs):
collectivity = kwargs.pop('collectivity')
super(AccessForm, self).__init__(*args, **kwargs)
self.fields['user'].queryset = self.fields['user'].queryset.filter(collectivity=collectivity)
self.fields['service_instance'].queryset = self.fields['service_instance'].queryset.filter(collectivity=collectivity)
class Meta:
model = models.Access
fields = '__all__'
class UserMixin(object):
def clean(self):
cleaned_data = self.cleaned_data
cleaned_data['username'] = str(uuid.uuid1())
return super(UserMixin, self).clean()
class UserEditForm(UserMixin, CertificateMixin, UserEditForm):
class Meta:
model = models.User
fields = (
'uid',
'first_name',
'last_name',
'email',
'is_admin',
'direction',
'employee_type',
'postal_address',
'fax',
'mobile',
'phone',
'certificate_issuer_dn',
'certificate_subject_dn',
'certificate',
)
class UserAddForm(UserMixin, CertificateMixin, UserAddForm):
class Meta:
model = models.User
fields = (
'uid',
'first_name',
'last_name',
'email',
'is_admin',
'direction',
'employee_type',
'postal_address',
'fax',
'mobile',
'phone',
'certificate_issuer_dn',
'certificate_subject_dn',
'certificate',
)
class AuthenticationForm(forms.Form):
how = None
collectivity = fields.CollectivityField(
queryset=models.Collectivity.objects,
label=_('Collectivity'))
username = forms.CharField(label=_('Username'))
password = forms.CharField(label=_('Password'), widget=forms.PasswordInput)
error_messages = {
'invalid_login': _("Please enter a correct username and password. "
"Note that both fields may be case-sensitive."),
'inactive': _("This account is inactive."),
}
def __init__(self, request=None, *args, **kwargs):
"""
The 'request' parameter is set for custom auth use by subclasses.
The form data comes in via the standard 'data' kwarg.
"""
self.request = request
self.user_cache = None
super(AuthenticationForm, self).__init__(*args, **kwargs)
if request and hasattr(request, 'ssl_info') and request.ssl_info \
and request.ssl_info.collectivity:
self.fields['collectivity'].initial = request.ssl_info.collectivity.pk
def clean(self):
collectivity = self.cleaned_data.get('collectivity')
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
if username and password and collectivity:
if self.request and hasattr(self.request, 'ssl_info') and self.request.ssl_info:
self.user_cache = authenticate(collectivity=collectivity,
username=username,
password=password,
ssl_info=self.request.ssl_info)
if self.user_cache:
self.how = 'ssl-collectivity'
else:
self.user_cache = authenticate(collectivity=collectivity,
username=username,
password=password)
if self.user_cache:
self.how = 'password'
if self.user_cache is None:
raise forms.ValidationError(
self.error_messages['invalid_login'],
code='invalid_login',
params={'username': ugettext('username')},
)
else:
self.confirm_login_allowed(self.user_cache)
return self.cleaned_data
def confirm_login_allowed(self, user):
"""
Controls whether the given User may log in. This is a policy setting,
independent of end-user authentication. This default behavior is to
allow login by active users, and reject login by inactive users.
If the given user cannot log in, this method should raise a
``forms.ValidationError``.
If the given user may log in, this method should return None.
"""
if not user.is_active:
raise forms.ValidationError(
self.error_messages['inactive'],
code='inactive',
)
def get_user_id(self):
if self.user_cache:
return self.user_cache.id
return None
def get_user(self):
return self.user_cache
def get_how(self):
return self.how