814 lines
35 KiB
Python
814 lines
35 KiB
Python
#
|
|
# authentic2_cut - Authentic2 plugin for CUT
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import logging
|
|
|
|
import django.apps
|
|
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY
|
|
from django.conf import settings
|
|
from django.db import DEFAULT_DB_ALIAS, router
|
|
from django.urls import reverse_lazy
|
|
from django.utils.timezone import now, utc
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AppConfig(django.apps.AppConfig):
|
|
name = 'authentic2_cut'
|
|
|
|
def post_migrate(self, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, **kwargs):
|
|
# create custom operations
|
|
from django_rbac.models import Operation
|
|
from django_rbac.utils import get_operation
|
|
|
|
if not router.allow_migrate(using, Operation):
|
|
return
|
|
|
|
FC_MANAGE_OP = Operation.register(name='Gérer FranceConnect', slug='fc_manage')
|
|
get_operation(FC_MANAGE_OP)
|
|
|
|
def ready(self):
|
|
from django.db.models.signals import post_migrate, post_save
|
|
from django_rbac.utils import get_ou_model
|
|
|
|
from . import api_views
|
|
|
|
post_migrate.connect(self.post_migrate, sender=self)
|
|
post_save.connect(self.ou_post_save, sender=get_ou_model())
|
|
|
|
def ou_post_save(self, sender, instance, raw, created, **kwargs):
|
|
from .utils import update_roles
|
|
|
|
update_roles()
|
|
|
|
def a2_hook_manager_user_data(self, view, user):
|
|
'''Retourne des objets pour afficher la fédération FranceConnect'''
|
|
from .user_datas import FranceConnectUserData, ValidationUserData
|
|
|
|
user_datas = []
|
|
|
|
if view.__class__.__name__.endswith('UserDetailView'):
|
|
user_datas.append(
|
|
FranceConnectUserData(user, view.request),
|
|
)
|
|
if user.attributes.validated:
|
|
user_datas.append(ValidationUserData(user))
|
|
return user_datas
|
|
|
|
def a2_hook_manager_modify_form(self, view, form):
|
|
from authentic2.passwords import generate_password
|
|
from django.forms.widgets import DateTimeInput, HiddenInput, Textarea
|
|
|
|
from . import models
|
|
|
|
if view.__class__.__name__ == 'UserAddView':
|
|
# Les usagers CUT n'ont pas d'identifiant
|
|
ou = getattr(form, 'ou', None)
|
|
if ou:
|
|
password = generate_password()
|
|
form.fields['password1'].initial = password
|
|
form.fields['password1'].widget = HiddenInput()
|
|
form.fields['password2'].initial = password
|
|
form.fields['password2'].widget = HiddenInput()
|
|
if ou.slug == 'usagers':
|
|
del form.fields['username']
|
|
for field_name in [
|
|
'is_superuser',
|
|
'validated',
|
|
'validation_context',
|
|
'phone',
|
|
'address',
|
|
'validation_date',
|
|
]:
|
|
if field_name in form.fields:
|
|
del form.fields[field_name]
|
|
|
|
form.fields['generate_password'].initial = False
|
|
form.fields['generate_password'].widget = HiddenInput()
|
|
form.fields['reset_password_at_next_login'].initial = False
|
|
form.fields['reset_password_at_next_login'].widget = HiddenInput()
|
|
form.fields['send_mail'].initial = False
|
|
form.fields['send_mail'].widget = HiddenInput()
|
|
form.fields['send_password_reset'].initial = True
|
|
form.fields['send_password_reset'].widget = HiddenInput()
|
|
form.fields['creation_mode'].initial = 'backoffice'
|
|
form.fields['creation_mode'].widget = HiddenInput()
|
|
form.fields['creation_partner'].initial = (
|
|
view.request.user.ou.name if view.request.user.ou else 'super-utilisateur'
|
|
)
|
|
form.fields['creation_partner'].widget = HiddenInput()
|
|
form.fields['comment'].widget = Textarea(attrs={'rows': 4})
|
|
del form.fields['creation_domain']
|
|
del form.fields['validation_partner']
|
|
else:
|
|
for field_name in list(form.fields):
|
|
if field_name not in [
|
|
'username',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'send_password_reset',
|
|
'generate_password',
|
|
]:
|
|
del form.fields[field_name]
|
|
form.fields['email'].required = True
|
|
form.fields['generate_password'].initial = False
|
|
form.fields['generate_password'].widget = HiddenInput()
|
|
form.fields['send_password_reset'].initial = True
|
|
form.fields['send_password_reset'].widget = HiddenInput()
|
|
|
|
if view.__class__.__name__ == 'UserEditView':
|
|
# Les usagers CUT n'ont pas d'identifiant
|
|
del form.fields['creation_mode']
|
|
del form.fields['creation_partner']
|
|
del form.fields['creation_domain']
|
|
del form.fields['validation_partner']
|
|
if form.instance.ou and form.instance.ou.slug == 'usagers':
|
|
del form.fields['username']
|
|
for field_name in [
|
|
'password1',
|
|
'password2',
|
|
'is_superuser',
|
|
'validated',
|
|
'validation_context',
|
|
'validation_date',
|
|
'ou',
|
|
]:
|
|
if field_name in form.fields:
|
|
del form.fields[field_name]
|
|
form.fields['comment'].widget = Textarea(attrs={'rows': 4})
|
|
else:
|
|
for field_name in list(form.fields):
|
|
if field_name not in [
|
|
'username',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'is_superuser',
|
|
'generate_password',
|
|
'ou',
|
|
]:
|
|
del form.fields[field_name]
|
|
# do not allow to move an agent in the usagers OU
|
|
form.fields['ou'].queryset = form.fields['ou'].queryset.exclude(slug='usagers')
|
|
|
|
def generate_new_edit_save(old_save):
|
|
def edit_save(*args, **kwargs):
|
|
response = old_save(*args, **kwargs)
|
|
user = form.instance
|
|
user.roles.remove(*list(user.roles.exclude(ou=form.instance.ou)))
|
|
return response
|
|
|
|
return edit_save
|
|
|
|
form.save = generate_new_edit_save(form.save)
|
|
|
|
# Si un compte est validé, on interdit la modification des attributs coeurs
|
|
if form.instance.attributes.validated:
|
|
for field_name in list(form.fields):
|
|
if field_name in [
|
|
'first_name',
|
|
'last_name',
|
|
'birthcountry_insee',
|
|
'birthplace_insee',
|
|
'birthcountry',
|
|
'birthplace',
|
|
'gender',
|
|
'title',
|
|
'birthdate',
|
|
]:
|
|
# del form.fields[field_name]
|
|
field = form.fields[field_name]
|
|
field.required = False
|
|
if field_name == 'birthdate':
|
|
field.widget = DateTimeInput(attrs={'readonly': ''})
|
|
attrs = field.widget.attrs or {}
|
|
attrs['disabled'] = ''
|
|
attrs['title'] = 'Champ validé'
|
|
field.widget.attrs = attrs
|
|
|
|
def new_clean(self, field_name):
|
|
def clean():
|
|
self.instance.refresh_from_db()
|
|
if hasattr(self.instance, field_name):
|
|
return getattr(self.instance, field_name)
|
|
else:
|
|
return getattr(self.instance.attributes, field_name)
|
|
|
|
return clean
|
|
|
|
setattr(form, 'clean_' + field_name, new_clean(form, field_name))
|
|
if field_name in ['generate_password']:
|
|
del form.fields[field_name]
|
|
|
|
if view.__class__.__name__.endswith('UserDetailView'):
|
|
if form.instance.ou:
|
|
if form.instance.ou.slug == 'usagers':
|
|
for field_name in [
|
|
'username',
|
|
'is_superuser',
|
|
'validated',
|
|
'validation_context',
|
|
'validation_date',
|
|
'creation_mode',
|
|
'creation_partner',
|
|
'creation_domain',
|
|
]:
|
|
if field_name in form.fields:
|
|
del form.fields[field_name]
|
|
form.fields['comment'].widget = Textarea(attrs={'readonly': '', 'rows': 4})
|
|
else:
|
|
for field_name in list(form.fields):
|
|
if field_name not in ['username', 'first_name', 'last_name', 'email']:
|
|
del form.fields[field_name]
|
|
|
|
if view.__class__.__name__ in [
|
|
'OrganizationalUnitDetailView',
|
|
'OrganizationalUnitEditView',
|
|
'OrganizationalUnitAddView',
|
|
]:
|
|
del form.fields['default']
|
|
del form.fields['email_is_unique']
|
|
del form.fields['username_is_unique']
|
|
|
|
def a2_hook_manager_modify_table(self, view, table):
|
|
import django_tables2 as tables
|
|
|
|
if view.__class__.__name__ == 'UsersView':
|
|
ou = view.search_form.cleaned_data['ou']
|
|
sequence = list(table.sequence)
|
|
base_columns = copy.deepcopy(type(table).base_columns)
|
|
|
|
if ou and ou.slug == 'usagers':
|
|
for column_name in ['username', 'get_full_name']:
|
|
if column_name in base_columns:
|
|
del base_columns[column_name]
|
|
if column_name in sequence:
|
|
sequence.remove(column_name)
|
|
sequence.remove('email')
|
|
sequence.insert(2, 'email')
|
|
base_columns['preferred_username'] = tables.Column(
|
|
accessor='attributes.preferred_username', verbose_name='Nom d\'usage'
|
|
)
|
|
sequence.insert(2, 'preferred_username')
|
|
base_columns['validated'] = tables.BooleanColumn(
|
|
accessor='attributes.validated', verbose_name='Validé'
|
|
)
|
|
sequence += ['validated']
|
|
else:
|
|
del base_columns['get_full_name']
|
|
sequence.remove('get_full_name')
|
|
|
|
table.base_columns = base_columns
|
|
table.sequence = sequence
|
|
table.columns = tables.columns.BoundColumns(table, base_columns)
|
|
return table
|
|
|
|
def a2_hook_api_modify_serializer(self, view, serializer):
|
|
from rest_framework import serializers
|
|
|
|
if view.__class__.__name__ == 'UsersAPI':
|
|
del serializer.fields['id']
|
|
serializer.fields['sub'] = serializer.fields['uuid']
|
|
del serializer.fields['uuid']
|
|
del serializer.fields['is_superuser']
|
|
del serializer.fields['is_staff']
|
|
del serializer.fields['password']
|
|
# forbid modification of email trough POST/PATCH/PUT
|
|
if serializer.instance:
|
|
serializer.fields['email'].read_only = True
|
|
serializer.fields['email'].required = True
|
|
serializer.fields['email_verified'].read_only = True
|
|
serializer.fields['ou'] = serializers.HiddenField(default=serializer.fields['ou'].default)
|
|
del serializer.fields['username']
|
|
del serializer.fields['last_login']
|
|
|
|
def get_gender(obj):
|
|
attributes = obj.attributes
|
|
try:
|
|
title = attributes.title
|
|
except AttributeError:
|
|
return None
|
|
return {'Monsieur': 'male', 'Madame': 'female'}.get(title)
|
|
|
|
serializer.get_gender = get_gender
|
|
|
|
serializer.fields['gender'] = serializers.SerializerMethodField()
|
|
serializer.fields['given_name'] = serializers.CharField(read_only=True, source='first_name')
|
|
serializer.fields['family_name'] = serializers.CharField(read_only=True, source='last_name')
|
|
serializer.fields['email_verified'].read_only = True
|
|
serializer.fields['validation_context'] = serializers.ChoiceField(
|
|
source='attributes.validation_context',
|
|
choices=[
|
|
('online', 'online'),
|
|
('office', 'office'),
|
|
],
|
|
required=False,
|
|
)
|
|
serializer.fields['validation_partner'].read_only = True
|
|
serializer.fields['creation_mode'].read_only = True
|
|
serializer.fields['creation_partner'].read_only = True
|
|
serializer.fields['creation_domain'].read_only = True
|
|
|
|
def get_address_fc(obj):
|
|
if obj.fc_accounts.all():
|
|
return obj.fc_accounts.all()[0].get_user_info().get('address')
|
|
|
|
serializer.get_address_fc = get_address_fc
|
|
serializer.fields['address_fc'] = serializers.SerializerMethodField()
|
|
|
|
def get_phone_number_fc(obj):
|
|
if obj.fc_accounts.all():
|
|
return obj.fc_accounts.all()[0].get_user_info().get('phone_number')
|
|
|
|
serializer.get_phone_number_fc = get_phone_number_fc
|
|
serializer.fields['phone_number_fc'] = serializers.SerializerMethodField()
|
|
|
|
# override serializer.create to set the creation mode
|
|
old_create = serializer.create
|
|
|
|
def new_create(*args, **kwargs):
|
|
instance = old_create(*args, **kwargs)
|
|
request = serializer.context['request']
|
|
instance.attributes.creation_mode = 'API'
|
|
if hasattr(request.user, 'oidc_client'):
|
|
instance.attributes.creation_partner = request.user.oidc_client.slug
|
|
elif hasattr(request.user, 'ou') and request.user.ou:
|
|
instance.attributes.creation_partner = request.user.ou.slug
|
|
else:
|
|
instance.attributes.creation_partner = 'UNKNOWN'
|
|
instance.attributes.creation_mode = 'BO'
|
|
return instance
|
|
|
|
serializer.create = new_create
|
|
|
|
old_update = serializer.update
|
|
|
|
def new_update(instance, validated_data):
|
|
validated = validated_data.get('attributes', {}).get('validated')
|
|
instance = old_update(instance, validated_data)
|
|
if validated:
|
|
logging.info('validated')
|
|
request = serializer.context['request']
|
|
if hasattr(request.user, 'oidc_client'):
|
|
partner = request.user.oidc_client.slug
|
|
else:
|
|
raise NotImplementedError
|
|
instance.attributes.validation_partner = partner
|
|
return instance
|
|
|
|
serializer.update = new_update
|
|
|
|
validation_variables = ['validated', 'validation_context', 'validation_date']
|
|
old_validate = serializer.validate
|
|
|
|
def new_validate(data):
|
|
data = old_validate(data)
|
|
attributes = data.get('attributes', {})
|
|
instance = serializer.instance
|
|
if instance:
|
|
if set(attributes) & set(validation_variables):
|
|
if instance.attributes.validated:
|
|
raise serializers.ValidationError(
|
|
'account already validated you cannot modify the validation status'
|
|
)
|
|
if not set(validation_variables).issubset(set(attributes)):
|
|
raise serializers.ValidationError(
|
|
'validated, validation_context and validation_date are required together'
|
|
)
|
|
if attributes['validated'] is not True:
|
|
raise serializers.ValidationError('validated can only be true')
|
|
request = serializer.context['request']
|
|
if hasattr(request.user, 'oidc_client'):
|
|
pass
|
|
else:
|
|
raise serializers.ValidationError('you are not a partner, you cannot validate')
|
|
else:
|
|
if set(attributes) & set(validation_variables):
|
|
raise serializers.ValidationError('you cannot validate during creation')
|
|
return data
|
|
|
|
serializer.validate = new_validate
|
|
|
|
serializer.fields['modified'].timezone = utc
|
|
serializer.fields['date_joined'].timezone = utc
|
|
|
|
# execute after other modifiers
|
|
a2_hook_api_modify_serializer.order = 999
|
|
|
|
def a2_hook_modify_context_data(self, view, context):
|
|
from .custom_settings import CORE_ATTRIBUTES, CROWN_ATTRIBUTES
|
|
|
|
if view.__class__.__name__ == 'ProfileView':
|
|
context['cut_core_filled'] = all(
|
|
getattr(view.request.user.attributes, a, None) for a in CORE_ATTRIBUTES
|
|
)
|
|
context['cut_crown_filled'] = any(
|
|
getattr(view.request.user.attributes, a, None) for a in CROWN_ATTRIBUTES
|
|
)
|
|
|
|
def a2_hook_manager_modify_other_actions(self, view, other_actions):
|
|
from authentic2.manager.views import Action
|
|
|
|
from .actions import RemoveFranceConnect
|
|
|
|
class CUTValidate(Action):
|
|
name = 'cut-validate'
|
|
title = 'Valider le compte'
|
|
permission = 'custom_user.cut_validate_user'
|
|
url_name = 'cut-manager-user-edit-core'
|
|
popup = False
|
|
|
|
def display(self, user, request):
|
|
if user.ou and user.ou.slug != 'usagers':
|
|
return False
|
|
if user.attributes.validated and user.attributes.validation_context == 'fc':
|
|
return False
|
|
if user.attributes.validated:
|
|
self.title = 'Modifier les données coeur'
|
|
self.user = user
|
|
return super().display(user, request)
|
|
|
|
class CUTJournalActions(Action):
|
|
name = 'cut-journal-actions'
|
|
title = 'Journal des actions'
|
|
permission = 'custom_user.view_user'
|
|
url_name = 'cut-manager-user-actions-journal'
|
|
popup = False
|
|
|
|
class CUTJournalModifications(Action):
|
|
name = 'cut-journal-modifications'
|
|
title = 'Journal des modifications'
|
|
permission = 'custom_user.view_user'
|
|
url_name = 'cut-manager-user-modifications-journal'
|
|
popup = False
|
|
|
|
if view.__class__.__name__.endswith('UserDetailView'):
|
|
other_actions.append(CUTValidate())
|
|
other_actions.append(CUTJournalActions())
|
|
other_actions.append(CUTJournalModifications())
|
|
other_actions.append(RemoveFranceConnect())
|
|
|
|
def a2_hook_front_modify_form(self, view, form):
|
|
from django.forms.widgets import HiddenInput
|
|
|
|
if view.__class__.__name__ == 'EditProfile':
|
|
if form.instance and form.instance.attributes.validated:
|
|
for field in ('first_name', 'last_name', 'birthdate', 'title', 'birthplace', 'birthcountry'):
|
|
form.fields.pop(field, None)
|
|
for field in form.fields.values():
|
|
if hasattr(field, 'max_length'):
|
|
if field.max_length is None:
|
|
field.max_length = 128
|
|
field.widget.attrs['maxlength'] = 128
|
|
elif view.__class__.__name__ == 'RegistrationCompletionView':
|
|
form.initial['preferred_username'] = view.token.get('last_name', '')
|
|
form.fields['preferred_username'].widget = HiddenInput()
|
|
form.initial['preferred_givenname'] = view.token.get('first_name', '')
|
|
form.fields['preferred_givenname'].widget = HiddenInput()
|
|
|
|
for name in list(form.fields):
|
|
if 'password' not in name and name not in ('preferred_username', 'preferred_givenname'):
|
|
form.fields.pop(name)
|
|
|
|
def a2_hook_api_modify_queryset(self, view, queryset):
|
|
from datetime import date
|
|
|
|
from django.utils.timezone import now
|
|
|
|
def majority(today):
|
|
for i in range(5):
|
|
try:
|
|
return date(today.year - 18, today.month, today.day - i)
|
|
except ValueError as e:
|
|
continue
|
|
raise e
|
|
|
|
if view.__class__.__name__ == 'UsersAPI':
|
|
if hasattr(view.request.user, 'oidc_client'):
|
|
oidc_client = view.request.user.oidc_client
|
|
hide_underaged_oidc_client = getattr(settings, 'A2_CUT_HIDE_UNDERAGED_CLIENT_IDS', [])
|
|
# Cache les comptes des mineurs pour les clients OIDC listés
|
|
# dans A2_CUT_HIDE_UNDERAGED_CLIENT_IDS
|
|
if oidc_client.client_id in hide_underaged_oidc_client:
|
|
from authentic2.models import AttributeValue
|
|
|
|
before = majority(now().date()).isoformat()
|
|
excluded = AttributeValue.objects.filter(attribute__name='birthdate', content__gt=before)
|
|
queryset = queryset.exclude(attribute_values__in=excluded)
|
|
queryset = queryset.filter(ou__slug='usagers')
|
|
return queryset
|
|
|
|
def a2_hook_idp_oidc_modify_user_info(self, client, user, scope_set, user_info):
|
|
sub = user_info['sub']
|
|
user_info.clear()
|
|
user_info['sub'] = sub
|
|
if 'email' in scope_set:
|
|
user_info['email'] = user.email
|
|
if 'profile' in scope_set:
|
|
user_info['first_name'] = user.first_name
|
|
user_info['last_name'] = user.last_name
|
|
user_info['given_name'] = user.first_name
|
|
user_info['family_name'] = user.last_name
|
|
user_info['title'] = user.attributes.title
|
|
user_info['gender'] = {'Monsieur': 'male', 'Madame': 'female'}.get(user.attributes.title)
|
|
user_info['birthdate'] = user.attributes.birthdate and user.attributes.birthdate.isoformat()
|
|
user_info['birthplace'] = user.attributes.birthplace
|
|
user_info['birthcountry'] = user.attributes.birthcountry
|
|
user_info['birthplace_insee'] = user.attributes.birthplace_insee
|
|
user_info['birthcountry_insee'] = user.attributes.birthcountry_insee
|
|
user_info['validated'] = user.attributes.validated
|
|
user_info['validation_date'] = (
|
|
user.attributes.validation_date and user.attributes.validation_date.isoformat()
|
|
)
|
|
user_info['validation_context'] = user.attributes.validation_context
|
|
# pass user.ou.slug for agent's users
|
|
if user.ou:
|
|
if user.ou.slug != 'usagers':
|
|
user_info['ou'] = user.ou.slug
|
|
else:
|
|
if user.attributes.validated:
|
|
for name in [
|
|
'first_name',
|
|
'last_name',
|
|
'given_name',
|
|
'family_name',
|
|
'title',
|
|
'gender',
|
|
'birthdate',
|
|
'birthplace_insee',
|
|
'birthplace',
|
|
'birthcountry_insee',
|
|
'birthcountry',
|
|
]:
|
|
user_info['%s_verified' % name] = True
|
|
if 'crown' in scope_set:
|
|
user_info['preferred_username'] = user.attributes.preferred_username
|
|
user_info['preferred_givenname'] = user.attributes.preferred_givenname
|
|
user_info['address_number'] = user.attributes.address_number
|
|
user_info['address_street'] = user.attributes.address_street
|
|
user_info['address_complement'] = user.attributes.address_complement
|
|
user_info['address_zipcode'] = user.attributes.address_zipcode
|
|
user_info['address_city'] = user.attributes.address_city
|
|
user_info['address_country'] = user.attributes.address_country
|
|
user_info['home_mobile_phone'] = user.attributes.home_mobile_phone
|
|
user_info['home_phone'] = user.attributes.home_phone
|
|
user_info['professional_mobile_phone'] = user.attributes.professional_mobile_phone
|
|
user_info['professional_phone'] = user.attributes.professional_phone
|
|
user_info['birthdepartment'] = user.attributes.birthdepartment
|
|
|
|
if user.fc_accounts.exists():
|
|
import json
|
|
|
|
try:
|
|
fc_user_info = json.loads(user.fc_accounts.all()[0].user_info)
|
|
except ValueError:
|
|
fc_user_info = {}
|
|
address = fc_user_info.get('address')
|
|
if isinstance(address, dict):
|
|
for key, value in address.items():
|
|
user_info['address_fc_%s' % key] = value
|
|
else:
|
|
user_info['address_fc_formatted'] = address
|
|
user_info['phone_number_fc'] = fc_user_info.get('phone_number')
|
|
|
|
def a2_hook_event(self, name, **kwargs):
|
|
method_name = 'cut_event_' + name.replace('-', '_')
|
|
if hasattr(self, method_name):
|
|
getattr(self, method_name)(**kwargs)
|
|
|
|
def log_action(self, actor, message):
|
|
from . import models
|
|
|
|
models.Journal.objects.create(actor=actor, message=message)
|
|
|
|
def log_modification(self, actor, subject, message, mail=True, mail_message=None):
|
|
from . import models
|
|
|
|
models.Journal.objects.create(actor=actor, subject=subject, message=message)
|
|
# pour les modifications sur les usagers on envoie un mail à l'usager
|
|
if mail:
|
|
self.mail_notification(actor, subject, mail_message or message)
|
|
|
|
def mail_notification(self, actor, subject, message):
|
|
from authentic2.utils.misc import send_templated_mail
|
|
|
|
if subject.ou and subject.ou.slug == 'usagers':
|
|
send_templated_mail(
|
|
subject,
|
|
'authentic2/cut-notify-usager-modification',
|
|
context={
|
|
'message': message,
|
|
'usager': subject,
|
|
'agent': actor,
|
|
},
|
|
)
|
|
|
|
@property
|
|
def redis_client(self):
|
|
from django.core import cache
|
|
|
|
return cache.cache.client.get_client()
|
|
|
|
def stat(self, *args):
|
|
t = now()
|
|
key = 'stat__%s%02d__' % (t.date(), t.hour)
|
|
key += '__'.join(args)
|
|
self.redis_client.expire(key, 3600 * 24 * 7)
|
|
self.redis_client.incr(key)
|
|
|
|
def cut_event_login(self, user, how, service=None, **kwargs):
|
|
# log login for current user
|
|
if how == 'switch':
|
|
return
|
|
method = self.get_authentication_method(how)
|
|
if not method:
|
|
method = how.upper()
|
|
msg = self.get_authentication_message(how)
|
|
if service:
|
|
msg += ' pour %s' % service
|
|
self.log_action(user, msg)
|
|
self.stat('login', method)
|
|
|
|
def cut_event_registration(self, user, view, form, token, service, **kwargs):
|
|
# log registration for current user
|
|
creation_mode = 'FO'
|
|
msg = 'création du compte en ligne'
|
|
if 'franceconnect' in token:
|
|
creation_mode = 'FC'
|
|
msg = 'création du compte via FranceConnect'
|
|
user.attributes.creation_mode = creation_mode
|
|
self.log_action(user, msg)
|
|
if service:
|
|
user.attributes.creation_partner = service
|
|
|
|
def cut_event_sso_request(self, idp, service, **kwargs):
|
|
self.stat('sso-request', service.slug)
|
|
from authentic2.middleware import StoreRequestMiddleware
|
|
|
|
request = StoreRequestMiddleware.get_request()
|
|
if request:
|
|
request.session['service_slug'] = service.slug
|
|
|
|
def cut_event_sso_success(self, idp, service, user, **kwargs):
|
|
msg = 'connexion à %s' % service.name
|
|
self.log_action(user, msg)
|
|
how = self.get_authentication_how()
|
|
method = self.get_authentication_method(how)
|
|
self.stat('sso-success', service.slug, method or 'inconnu')
|
|
|
|
def cut_event_cut_edit_core(self, user, form, **kwargs):
|
|
if not form.has_changed():
|
|
return
|
|
self.log_action(user, 'édition du profil coeur')
|
|
|
|
def cut_event_cut_edit_crown(self, user, form, **kwargs):
|
|
if not form.has_changed():
|
|
return
|
|
self.log_action(user, 'édition du profil couronne')
|
|
|
|
def cut_event_password_reset_confirm(self, user, **kwargs):
|
|
self.log_action(user, 'ré-initialisation du mot de passe')
|
|
|
|
def cut_event_change_email_confirm(self, user, **kwargs):
|
|
self.log_action(user, 'changement de l\'adresse de courriel')
|
|
|
|
def cut_event_delete_account(self, user, **kwargs):
|
|
self.log_action(user, 'demande de suppression du compte')
|
|
|
|
def cut_event_change_password(self, user, **kwargs):
|
|
self.log_action(user, 'changement du mot de passe')
|
|
|
|
def cut_event_manager_action(self, user, action, instance, **kwargs):
|
|
msgs = {
|
|
'activate': 'ré-activation du compte',
|
|
'deactivate': 'suspension du compte',
|
|
'password_reset': 'envoi d\'un courriel de ré-initialisation du mot de passe',
|
|
'force_password_change': 'force un changement de mot de passe à ' 'la prochaine connexion',
|
|
'delete_password_reset': 'supprime l\'obligation de changement de mot de passe à '
|
|
'la prochaine connexion',
|
|
'remove-franceconnect': 'suppression de la liaison FranceConnect',
|
|
}
|
|
if action.name in msgs:
|
|
self.log_modification(user, instance, msgs[action.name])
|
|
|
|
def cut_event_manager_add_user(self, user, instance, **kwargs):
|
|
self.log_modification(user, instance, 'création d\'un utilisateur', mail=False)
|
|
|
|
def cut_event_manager_edit_user(self, user, instance, form, **kwargs):
|
|
if not form.has_changed():
|
|
return
|
|
if instance.ou and instance.ou.slug == 'usagers':
|
|
# ce hook est appelé par UserEditCoreView et UserEditView pour différencier les deux on
|
|
# vérifie la présence d'un attribut couronne dans le formulaire
|
|
if 'preferred_username' in form.fields:
|
|
if instance.attributes.validated:
|
|
self.log_modification(user, instance, 'modification du profil couronne')
|
|
else:
|
|
self.log_modification(user, instance, 'modification du profil coeur et couronne')
|
|
# si on est dans le formulaire coeur on ne log rien
|
|
else:
|
|
self.log_modification(user, instance, 'modification du profil')
|
|
|
|
def cut_event_manager_delete_user(self, user, instance, **kwargs):
|
|
self.log_action(user, 'suppression de l\'utilisateur %s' % instance)
|
|
self.mail_notification(user, instance, 'suppression de votre compte')
|
|
|
|
def cut_event_manager_add_role_member(self, user, role, member, **kwargs):
|
|
self.log_modification(user, member, 'ajoute le rôle %s' % role)
|
|
|
|
def cut_event_manager_remove_role_member(self, user, role, member, **kwargs):
|
|
self.log_modification(user, member, 'retire le rôle %s' % role)
|
|
|
|
def cut_event_manager_cut_validate(self, user, instance, context, partner, **kwargs):
|
|
if instance.attributes.validated:
|
|
msg = 'modification du profil cœur'
|
|
else:
|
|
msg = 'validation du compte'
|
|
mail_msg = msg
|
|
msg += ', contexte "%s" venant de %s' % (context, partner)
|
|
self.log_modification(user, instance, msg, mail_message=mail_msg)
|
|
|
|
def cut_event_manager_view_user(self, user, instance, **kwargs):
|
|
self.log_modification(user, instance, 'fiche consultée', mail=False)
|
|
|
|
def cut_event_manager_change_password(self, user, instance, form, **kwargs):
|
|
if form.cleaned_data.get('generate_password'):
|
|
msg = 'génération d\'un nouveau mot de passe'
|
|
else:
|
|
msg = 'changement du mot de passe'
|
|
if form.cleaned_data.get('send_mail'):
|
|
msg += ' envoyé par mail'
|
|
self.log_modification(user, instance, msg)
|
|
|
|
def cut_event_fc_link(self, user, request, **kwargs):
|
|
self.log_action(user, 'liaison avec FranceConnect')
|
|
user.attributes.validated = True
|
|
user.attributes.validation_context = 'FC'
|
|
user.attributes.validation_date = now().date()
|
|
partner = request.session.get('service_slug')
|
|
if hasattr(request, 'partner'):
|
|
partner = (request.partner and request.partner.get('ou_slug')) or partner
|
|
if partner:
|
|
user.attributes.validation_partner = partner
|
|
|
|
def cut_event_fc_unlink(self, user, **kwargs):
|
|
self.log_action(user, 'déliaison de FranceConnect')
|
|
|
|
def get_authentication_how(self):
|
|
from authentic2.middleware import StoreRequestMiddleware
|
|
|
|
request = StoreRequestMiddleware.get_request()
|
|
if request:
|
|
for event in request.session.get(AUTHENTICATION_EVENTS_SESSION_KEY, []):
|
|
if 'how' in event:
|
|
return event['how']
|
|
|
|
def get_authentication_method(self, how):
|
|
if how.startswith('password'):
|
|
return 'PWD'
|
|
elif how == 'france-connect':
|
|
return 'FC'
|
|
elif how == 'email':
|
|
return 'PWD'
|
|
elif how == 'oidc':
|
|
return 'AGENT'
|
|
elif how == 'switch':
|
|
return
|
|
else:
|
|
return
|
|
|
|
def get_authentication_message(self, how):
|
|
if how.startswith('password'):
|
|
return 'connexion par mot de passe'
|
|
elif how == 'france-connect':
|
|
return 'connexion par FranceConnect'
|
|
elif how == 'email':
|
|
return 'connexion à l\'enregistrement ou par récupération de mot de passe'
|
|
elif how == 'oidc':
|
|
return 'connexion'
|
|
else:
|
|
return 'connexion how:%s' % how
|
|
|
|
def a2_hook_manager_homepage_entries(self, view):
|
|
return {
|
|
'label': 'Validation des comptes',
|
|
'href': reverse_lazy('cut-manager-user-validation'),
|
|
'class': 'icon-management',
|
|
}
|