diff --git a/src/authentic2/authenticators.py b/src/authentic2/authenticators.py
index 0f9ba781c..f5958dfb2 100644
--- a/src/authentic2/authenticators.py
+++ b/src/authentic2/authenticators.py
@@ -133,6 +133,10 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk)
return response
+ else:
+ username = form.cleaned_data.get('username', '').strip()
+ if username:
+ request.journal.record('user.login.failure', username=username)
context['form'] = form
return render(request, 'authentic2/login_password_form.html', context)
diff --git a/src/authentic2/journal.py b/src/authentic2/journal.py
new file mode 100644
index 000000000..dfd7b9a14
--- /dev/null
+++ b/src/authentic2/journal.py
@@ -0,0 +1,37 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2020 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from authentic2.utils.service import get_service_from_request
+
+from authentic2.apps.journal.journal import Journal
+
+
+class Journal(Journal):
+ def __init__(self, **kwargs):
+ self._service = kwargs.pop('service', None)
+ super().__init__(**kwargs)
+
+ @property
+ def service(self):
+ return self._service or (get_service_from_request(self.request) if self.request else None)
+
+ def massage_kwargs(self, record_parameters, kwargs):
+ if 'service' not in kwargs and 'service' in record_parameters:
+ kwargs['service'] = self.service
+ return super().massage_kwargs(record_parameters, kwargs)
+
+
+journal = Journal()
diff --git a/src/authentic2/journal_event_types.py b/src/authentic2/journal_event_types.py
new file mode 100644
index 000000000..3b11279bf
--- /dev/null
+++ b/src/authentic2/journal_event_types.py
@@ -0,0 +1,261 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2020 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from django.utils.translation import ugettext_lazy as _
+
+from authentic2.custom_user.models import get_attributes_map
+from authentic2.apps.journal.models import EventTypeDefinition
+from authentic2.apps.journal.utils import form_to_old_new
+from authentic2.custom_user.models import User
+
+from . import models
+
+
+class EventTypeWithService(EventTypeDefinition):
+ @classmethod
+ def record(cls, user=None, service=None, session=None, references=None, data=None):
+ if service:
+ if not data:
+ data = {}
+ data['service_name'] = str(service)
+ if not references:
+ references = []
+ references = [service] + references
+ super().record(user=user, session=session, references=references, data=data)
+
+ @classmethod
+ def get_service_name(self, event):
+ (service,) = event.get_typed_references(models.Service)
+ if service is not None:
+ return str(service)
+ if 'service_name' in event.data:
+ return event.data['service_name']
+ return ''
+
+
+def login_method_label(how):
+ if how.startswith('password'):
+ return _('password')
+ elif how == 'fc':
+ return _('FranceConnect')
+ elif how == 'saml':
+ return _('SAML')
+ elif how == 'oidc':
+ return _('OpenIDConnect')
+ elif how:
+ return how
+ else:
+ return _('none')
+
+
+def get_attributes_label(attributes_new_values):
+ attributes_map = get_attributes_map()
+ for name in attributes_new_values:
+ if name in ('email', 'first_name', 'last_name'):
+ yield str(User._meta.get_field(name).verbose_name)
+ else:
+ if name in attributes_map:
+ yield attributes_map[name].label
+ else:
+ yield name
+
+
+class UserLogin(EventTypeWithService):
+ name = 'user.login'
+ label = _('login')
+
+ @classmethod
+ def record(cls, user, session, service, how):
+ super().record(user=user, session=session, service=service, data={'how': how})
+
+ @classmethod
+ def get_message(cls, event, context):
+ how = event.get_data('how')
+ return _('login using {method}').format(method=login_method_label(how))
+
+
+class UserLoginFailure(EventTypeWithService):
+ name = 'user.login.failure'
+ label = _('login failure')
+
+ @classmethod
+ def record(cls, service, username):
+ super().record(service=service, data={'username': username})
+
+ @classmethod
+ def get_message(cls, event, context):
+ username = event.get_data('username')
+ return _('login failure with username "{username}"').format(username=username)
+
+
+class UserRegistrationRequest(EventTypeDefinition):
+ name = 'user.registration.request'
+ label = _('registration request')
+
+ @classmethod
+ def record(cls, email):
+ super().record(data={'email': email.lower()})
+
+ @classmethod
+ def get_message(cls, event, context):
+ email = event.get_data('email')
+ return _('registration request with email "%s"') % email
+
+
+class UserRegistration(EventTypeWithService):
+ name = 'user.registration'
+ label = _('registration')
+
+ @classmethod
+ def record(cls, user, session, service, how):
+ super().record(user=user, session=session, service=service, data={'how': how})
+
+ @classmethod
+ def get_message(cls, event, context):
+ how = event.get_data('how')
+ return _('registration using {method}').format(method=login_method_label(how))
+
+
+class UserLogout(EventTypeWithService):
+ name = 'user.logout'
+ label = _('logout')
+
+ @classmethod
+ def record(cls, user, session, service):
+ super().record(user=user, session=session, service=service)
+
+ @classmethod
+ def get_message(cls, event, context):
+ return _('logout')
+
+
+class UserRequestPasswordReset(EventTypeDefinition):
+ name = 'user.password.reset.request'
+ label = _('password reset request')
+
+ @classmethod
+ def record(cls, user, email):
+ super().record(user=user, data={'email': email.lower()})
+
+ @classmethod
+ def get_message(cls, event, context):
+ email = event.get_data('email')
+ if email:
+ return _('password reset request with email "%s"') % email
+ return super().get_message(event, context)
+
+
+class UserResetPassword(EventTypeDefinition):
+ name = 'user.password.reset'
+ label = _('password reset')
+
+ @classmethod
+ def record(cls, user, session):
+ super().record(user=user, session=session)
+
+
+class UserResetPasswordFailure(EventTypeDefinition):
+ name = 'user.password.reset.failure'
+ label = _('password reset failure')
+
+ @classmethod
+ def record(cls, email):
+ super().record(data={'email': email})
+
+ @classmethod
+ def get_message(cls, event, context):
+ email = event.get_data('email')
+ if email:
+ return _('password reset failure with email "%s"') % email
+ return super().get_message(event, context)
+
+
+class UserChangePassword(EventTypeWithService):
+ name = 'user.password.change'
+ label = _('password change')
+
+ @classmethod
+ def record(cls, user, session, service):
+ super().record(user=user, session=session, service=service)
+
+
+class UserEdit(EventTypeWithService):
+ name = 'user.profile.edit'
+ label = _('profile edit')
+
+ @classmethod
+ def record(cls, user, session, service, form):
+ data = form_to_old_new(form)
+ super().record(user=user, session=session, service=service, data=data)
+
+ @classmethod
+ def get_message(cls, event, context):
+ new = event.get_data('new')
+ if new:
+ edited_attributes = ', '.join(get_attributes_label(new))
+ return _('profile edit (%s)') % edited_attributes
+ return super().get_message(event, context)
+
+
+class UserDeletion(EventTypeWithService):
+ name = 'user.deletion'
+ label = _('deletion')
+
+ @classmethod
+ def record(cls, user, session, service):
+ super().record(user=user, session=session, service=service)
+
+
+class UserServiceSSO(EventTypeWithService):
+ name = 'user.service.sso'
+ label = _('service single sign on')
+
+ @classmethod
+ def record(cls, user, session, service, how):
+ super().record(user=user, session=session, service=service, data={'how': how})
+
+ @classmethod
+ def get_message(cls, event, context):
+ service_name = cls.get_service_name(event)
+ return _('service single sign on with "{service}"').format(service=service_name)
+
+
+class UserServiceSSOAuthorization(EventTypeWithService):
+ name = 'user.service.sso.authorization'
+ label = _('consentment to single sign on')
+
+ @classmethod
+ def record(cls, user, session, service, **kwargs):
+ super().record(user=user, session=session, service=service, data=kwargs)
+
+ @classmethod
+ def get_message(cls, event, context):
+ service_name = cls.get_service_name(event)
+ return _('authorization of single sign on with "{service}"').format(service=service_name)
+
+
+class UserServiceSSOUnauthorization(EventTypeWithService):
+ name = 'user.service.sso.unauthorization'
+ label = _('remove consentment to single sign on')
+
+ @classmethod
+ def record(cls, user, session, service):
+ super().record(user=user, session=session, service=service)
+
+ @classmethod
+ def get_message(cls, event, context):
+ service_name = cls.get_service_name(event)
+ return _('unauthorization of single sign on with "{service}"').format(service=service_name)
diff --git a/src/authentic2/middleware.py b/src/authentic2/middleware.py
index f3b32aa04..67638cd62 100644
--- a/src/authentic2/middleware.py
+++ b/src/authentic2/middleware.py
@@ -27,12 +27,13 @@ from django.conf import settings
from django.contrib import messages
from django.utils.deprecation import MiddlewareMixin
from django.utils.encoding import force_text
+from django.utils.functional import SimpleLazyObject
from django.utils.translation import ugettext as _
from django.utils.six.moves.urllib import parse as urlparse
from django.shortcuts import render
from . import app_settings, utils, plugins
-from .utils.service import get_service_from_request
+from .utils.service import get_service_from_request, get_service_from_session
class CollectIPMiddleware(MiddlewareMixin):
@@ -208,10 +209,18 @@ class SaveServiceInSessionMiddleware:
self.get_response = get_response
def __call__(self, request):
- service = None
-
service = get_service_from_request(request)
if service:
request.session['service_pk'] = service.pk
-
+ request.service = SimpleLazyObject(lambda: get_service_from_session(request))
return self.get_response(request)
+
+
+def journal_middleware(get_response):
+ from . import journal
+
+ def middleware(request):
+ request.journal = journal.Journal(request=request)
+ return get_response(request)
+
+ return middleware
diff --git a/src/authentic2/settings.py b/src/authentic2/settings.py
index 447044351..ab8ce4feb 100644
--- a/src/authentic2/settings.py
+++ b/src/authentic2/settings.py
@@ -100,6 +100,7 @@ MIDDLEWARE = (
'django.middleware.locale.LocaleMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
+ 'authentic2.middleware.journal_middleware',
)
DATABASES['default']['ATOMIC_REQUESTS'] = True
diff --git a/src/authentic2/utils/__init__.py b/src/authentic2/utils/__init__.py
index 8d344891b..56052c2ca 100644
--- a/src/authentic2/utils/__init__.py
+++ b/src/authentic2/utils/__init__.py
@@ -447,6 +447,7 @@ def login(request, user, how, service=None, service_slug=None, nonce=None, **kwa
# prevent logint-hint to influence next use of the login page
if 'login-hint' in request.session:
del request.session['login-hint']
+ request.journal.record('user.login', how=how)
return continue_to_next_url(request, **kwargs)
@@ -757,6 +758,7 @@ def send_registration_mail(request, email, ou, template_names=None, next_url=Non
legacy_html_body_templates=['registration/activation_email.html'])
logger.info(u'registration mail sent to %s with registration URL %s...', email,
registration_url)
+ request.journal.record('user.registration.request', email=email)
def send_account_deletion_code(request, user):
@@ -823,6 +825,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
sign_next_url=True,
**kwargs):
from .. import middleware
+ from authentic2.journal import journal
if not user.email:
raise ValueError('user must have an email')
@@ -852,6 +855,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
per_ou_templates=True, **kwargs)
logger.info(u'password reset request for user %s, email sent to %s '
'with token %s', user, user.email, token.uuid)
+ journal.record('user.password.reset.request', email=user.email, user=user)
def batch(iterable, size):
diff --git a/src/authentic2/utils/service.py b/src/authentic2/utils/service.py
index 8468b0e42..bcce22ce9 100644
--- a/src/authentic2/utils/service.py
+++ b/src/authentic2/utils/service.py
@@ -52,9 +52,17 @@ def get_service_from_ref(ref):
def get_service_from_request(request):
service_ref = request.GET.get(SERVICE_FIELD_NAME)
- if not service_ref or '\x00' in service_ref:
- return None
- return get_service_from_ref(service_ref)
+ if service_ref and '\x00' not in service_ref:
+ return get_service_from_ref(service_ref)
+ return None
+
+
+def get_service_from_session(request):
+ session = getattr(request, 'session', None)
+ if session and 'service_pk' in session:
+ from authentic2.models import Service
+ return Service.objects.get(pk=session['service_pk'])
+ return None
def get_service_from_token(params):
diff --git a/src/authentic2/views.py b/src/authentic2/views.py
index d886d5c7e..1827ce469 100644
--- a/src/authentic2/views.py
+++ b/src/authentic2/views.py
@@ -151,6 +151,7 @@ class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
def form_valid(self, form):
response = super(EditProfile, self).form_valid(form)
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
+ self.request.journal.record('user.profile.edit', form=form)
return response
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
@@ -575,6 +576,7 @@ def logout(request,
targets = redirect_logout_list(request)
logger.debug('Accumulated redirections : {}'.format(targets))
# Local logout
+ request.journal.record('user.logout')
auth_logout(request)
logger.info('Logged out')
local_logout_done = True
@@ -684,6 +686,7 @@ class PasswordResetView(FormView):
if is_ratelimited(self.request, key='post:email', group='pw-reset-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT, increment=True):
+ self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_('Multiple emails have already been sent to this address. Further attempts are '
@@ -692,6 +695,7 @@ class PasswordResetView(FormView):
return self.form_invalid(form)
if is_ratelimited(self.request, key='ip', group='pw-reset-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT, increment=True):
+ self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_('Multiple password reset attempts have already been made from this IP address. No '
@@ -783,7 +787,9 @@ class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
return self.finish()
def finish(self):
- return utils.simulate_authentication(self.request, self.user, 'email')
+ response = utils.simulate_authentication(self.request, self.user, 'email')
+ self.request.journal.record('user.password.reset')
+ return response
password_reset_confirm = PasswordResetConfirmView.as_view()
@@ -1136,6 +1142,11 @@ class RegistrationCompletionView(CreateView):
return self.registration_success(self.request, form.instance, form)
def registration_success(self, request, user, form):
+ request.journal.record(
+ 'user.registration',
+ user=user,
+ session=None,
+ how=self.authentication_method)
hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
authentication_method=self.authentication_method,
token=self.token, service=self.service and self.service.slug)
@@ -1233,6 +1244,7 @@ class ValidateDeletionView(TemplateView):
self.user.mark_as_deleted()
logger.info(u'deletion of account %s performed', self.user)
hooks.call_hooks('event', name='delete-account', user=self.user)
+ request.journal.record('user.deletion', user=self.user)
if self.user == request.user:
# No validation message displayed, as the user will surely
# notice their own account deletion...
@@ -1289,7 +1301,9 @@ class PasswordChangeView(DjPasswordChangeView):
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
messages.info(self.request, _('Password changed'))
models.PasswordReset.objects.filter(user=self.request.user).delete()
- return super(PasswordChangeView, self).form_valid(form)
+ response = super(PasswordChangeView, self).form_valid(form)
+ self.request.journal.record('user.password.change', session=self.request.session)
+ return response
def get_form_class(self):
if self.request.user.has_usable_password():
diff --git a/src/authentic2_idp_oidc/views.py b/src/authentic2_idp_oidc/views.py
index 83965aab9..4253a59bb 100644
--- a/src/authentic2_idp_oidc/views.py
+++ b/src/authentic2_idp_oidc/views.py
@@ -287,6 +287,10 @@ def authorize(request, *args, **kwargs):
expired=start + datetime.timedelta(days=365))
if pk_to_deletes:
auth_manager.filter(pk__in=pk_to_deletes).delete()
+ request.journal.record(
+ 'user.service.sso.authorization',
+ service=client,
+ scopes=list(sorted(scopes)))
logger.info(u'authorized scopes %s saved for service %s', ' '.join(scopes),
client.name)
else:
@@ -365,6 +369,10 @@ def authorize(request, *args, **kwargs):
})
# query is transfered through the hashtag
response = redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
+ request.journal.record(
+ 'user.service.sso',
+ service=client,
+ how=last_auth and last_auth.get('how'))
hooks.call_hooks('event', name='sso-success', idp='oidc', service=client, user=request.user)
utils.add_oidc_session(request, client)
return response
diff --git a/tests/conftest.py b/tests/conftest.py
index dc5476c39..6e12af599 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -126,6 +126,7 @@ def create_user(**kwargs):
password = kwargs.pop('password', None) or kwargs['username']
user, created = User.objects.get_or_create(**kwargs)
if password:
+ user.clear_password = password
user.set_password(password)
user.save()
return user
@@ -167,7 +168,7 @@ def user_ou1(db, ou1):
@pytest.fixture
def user_ou2(db, ou2):
- return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe',
+ return create_user(username='john.doe.ou2', first_name=u'Jôhn', last_name=u'Dôe',
email='john.doe@example.net', ou=ou2)
diff --git a/tests/test_all.py b/tests/test_all.py
index 9243bfc9f..98215100e 100644
--- a/tests/test_all.py
+++ b/tests/test_all.py
@@ -41,7 +41,7 @@ from django_rbac.utils import get_role_model, get_ou_model
from authentic2 import utils, models, attribute_kinds
-from .utils import Authentic2TestCase, get_response_form, get_link_from_mail
+from .utils import Authentic2TestCase, get_response_form, get_link_from_mail, assert_event
class SerializerTests(TestCase):
@@ -215,6 +215,7 @@ class UserProfileTests(TestCase):
user = User.objects.create(username='testbot')
user.set_password('secret')
user.save()
+ self.user = user
self.client = Client()
def test_edit_profile_attributes(self):
@@ -249,6 +250,8 @@ class UserProfileTests(TestCase):
for k, v in kwargs.items())
response = self.client.post(reverse('profile_edit'), kwargs)
+ new = {'custom': 'random data', 'next_url': '', 'national_number': 'xx20153566342yy'}
+ assert_event('user.profile.edit', user=self.user, session=self.client.session, old={}, new=new)
self.assertEqual(response.status_code, 302)
response = self.client.get(reverse('account_management'))
diff --git a/tests/test_idp_oidc.py b/tests/test_idp_oidc.py
index 962be9c1f..3bb06f471 100644
--- a/tests/test_idp_oidc.py
+++ b/tests/test_idp_oidc.py
@@ -236,6 +236,13 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi
assert authz.expired >= now()
else:
assert OIDCAuthorization.objects.count() == 0
+ utils.assert_event('user.service.sso.authorization',
+ session=app.session,
+ user=simple_user, service=oidc_client,
+ scopes=['email', 'openid', 'profile'])
+ utils.assert_event('user.service.sso', session=app.session,
+ user=simple_user, service=oidc_client,
+ how='password-on-https')
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
diff --git a/tests/test_login.py b/tests/test_login.py
index 7302df195..dc9e97a69 100644
--- a/tests/test_login.py
+++ b/tests/test_login.py
@@ -21,11 +21,25 @@ from django.contrib.auth import get_user_model
from authentic2 import models
-from .utils import login, check_log
+from .utils import login, check_log, assert_event
+
+User = get_user_model()
+
+
+def test_success(db, app, simple_user):
+ login(app, simple_user)
+ assert_event('user.login', user=simple_user, session=app.session, how='password-on-https')
+ session = app.session
+ app.get('/logout/').form.submit()
+ assert_event('user.logout', user=simple_user, session=session)
+
+
+def test_failure(db, app, simple_user):
+ login(app, simple_user, password='wrong', fail=True)
+ assert_event('user.login.failure', username=simple_user.username)
def test_login_inactive_user(db, app):
- User = get_user_model()
user1 = User.objects.create(username='john.doe')
user1.set_password('john.doe')
user1.save()
@@ -39,7 +53,7 @@ def test_login_inactive_user(db, app):
assert '_auth_user_id' not in app.session
user1.is_active = False
user1.save()
- login(app, user1)
+ login(app, user2)
assert int(app.session['_auth_user_id']) == user2.id
app.get('/logout/').form.submit()
assert '_auth_user_id' not in app.session
diff --git a/tests/test_password_reset.py b/tests/test_password_reset.py
index 3e1c8490a..3681efd5d 100644
--- a/tests/test_password_reset.py
+++ b/tests/test_password_reset.py
@@ -23,14 +23,16 @@ from . import utils
def test_send_password_reset_email(app, simple_user, mailoutbox):
from authentic2.utils import send_password_reset_mail
assert len(mailoutbox) == 0
- send_password_reset_mail(
- simple_user,
- legacy_subject_templates=['registration/password_reset_subject.txt'],
- legacy_body_templates=['registration/password_reset_email.html'],
- context={
- 'base_url': 'http://testserver',
- })
+ with utils.run_on_commit_hooks():
+ send_password_reset_mail(
+ simple_user,
+ legacy_subject_templates=['registration/password_reset_subject.txt'],
+ legacy_body_templates=['registration/password_reset_email.html'],
+ context={
+ 'base_url': 'http://testserver',
+ })
assert len(mailoutbox) == 1
+ utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
url = utils.get_link_from_mail(mailoutbox[0])
relative_url = url.split('testserver')[1]
resp = app.get(relative_url, status=200)
@@ -38,6 +40,7 @@ def test_send_password_reset_email(app, simple_user, mailoutbox):
resp.form.set('new_password2', '1234==aA')
resp = resp.form.submit().follow()
assert str(app.session['_auth_user_id']) == str(simple_user.pk)
+ utils.assert_event('user.password.reset', user=simple_user, session=app.session)
def test_view(app, simple_user, mailoutbox, settings):
@@ -47,6 +50,7 @@ def test_view(app, simple_user, mailoutbox, settings):
assert len(mailoutbox) == 0
settings.DEFAULT_FROM_EMAIL = 'show only addr '
resp = resp.form.submit()
+ utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
assert resp['Location'].endswith('/instructions/')
resp = resp.follow()
assert simple_user.email in resp.text
diff --git a/tests/test_registration.py b/tests/test_registration.py
index de785b25f..2963d6788 100644
--- a/tests/test_registration.py
+++ b/tests/test_registration.py
@@ -25,13 +25,13 @@ from django.utils.six.moves.urllib.parse import urlparse
from authentic2 import utils, models
from authentic2.validators import EmailValidator
-from .utils import get_link_from_mail
+from .utils import get_link_from_mail, assert_event
User = get_user_model()
-def test_registration(app, db, settings, mailoutbox, external_redirect):
+def test_registration_success(app, db, settings, mailoutbox, external_redirect):
next_url, good_next_url = external_redirect
settings.LANGUAGE_CODE = 'en-us'
@@ -45,6 +45,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
response.form.set('email', 'testbot@entrouvert.com')
response = response.form.submit()
+ assert_event('user.registration.request', email='testbot@entrouvert.com')
assert urlparse(response['Location']).path == reverse('registration_complete')
if not good_next_url:
assert not urlparse(response['Location']).query
@@ -87,6 +88,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
assert 'was successful' in mailoutbox[1].body
new_user = User.objects.get()
+ assert_event('user.registration', user=new_user, how='email')
assert new_user.email == 'testbot@entrouvert.com'
assert new_user.username is None
assert new_user.check_password('T0==toto')
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 0dd789301..c3b0b9a6a 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -23,6 +23,7 @@ from django.utils.functional import lazy
from django_rbac.utils import get_ou_model
+from authentic2.journal import Journal
from authentic2.utils import (good_next_url, same_origin, select_next_url,
user_can_change_password, login,
get_authentication_events, authenticate,
@@ -91,6 +92,7 @@ def test_get_authentication_events_hows(rf, simple_user):
middleware = AuthenticationMiddleware()
middleware.process_request(request)
MessageMiddleware().process_request(request)
+ request.journal = Journal(request=request)
assert 'password' not in [ev['how'] for ev in get_authentication_events(request)]
login(request, user, 'password')
assert 'password' in [ev['how'] for ev in get_authentication_events(request)]
diff --git a/tests/test_views.py b/tests/test_views.py
index 6824a861a..5828b345a 100644
--- a/tests/test_views.py
+++ b/tests/test_views.py
@@ -16,7 +16,7 @@
# authentic2
import datetime
-from .utils import login, logout, get_link_from_mail
+from .utils import login, logout, get_link_from_mail, assert_event
import pytest
from django.urls import reverse
@@ -38,13 +38,19 @@ def test_password_change(app, simple_user):
simple_user.set_password('hop')
simple_user.save()
resp = login(app, simple_user, password='hop', path=reverse('password_change'))
+ old_session_key = app.session.session_key
resp.form['old_password'] = 'hop'
resp.form['new_password1'] = 'hopAbcde1'
resp.form['new_password2'] = 'hopAbcde1'
resp = resp.form.submit()
+ new_session_key = app.session.session_key
+
+ assert old_session_key != new_session_key, 'session\'s key has not been cycled'
+
assert resp.location == '/accounts/password/change/done/'
+ assert_event('user.password.change', user=simple_user, session=app.session)
def test_account_delete(app, simple_user, mailoutbox):
@@ -84,8 +90,8 @@ def test_account_delete_when_logged_out(app, simple_user, mailoutbox):
link = get_link_from_mail(mailoutbox[0])
logout(app)
page = app.get(link)
- assert 'You are about to delete the account of %s.' % \
- escape(simple_user.get_full_name()) in page.text
+ assert 'You are about to delete the account of %s.' % escape(
+ simple_user.get_full_name()) in page.text
response = page.form.submit(name='delete').follow().follow()
assert not User.objects.get(pk=simple_user.pk).is_active
assert len(mailoutbox) == 2
@@ -105,8 +111,8 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
logout(app)
login(app, user_ou1, path=reverse('account_management'))
page = app.get(link)
- assert 'You are about to delete the account of %s.' % \
- escape(simple_user.get_full_name()) in page.text
+ assert 'You are about to delete the account of %s.' % escape(
+ simple_user.get_full_name()) in page.text
response = page.form.submit(name='delete').follow()
assert not User.objects.get(pk=simple_user.pk).is_active
assert User.objects.get(pk=user_ou1.pk).is_active
@@ -117,7 +123,12 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
def test_account_delete_fake_token(app, simple_user, mailoutbox):
- response = app.get(reverse('validate_deletion', kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'})).follow().follow()
+ response = (
+ app.get(reverse('validate_deletion',
+ kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'}))
+ .follow()
+ .follow()
+ )
assert "The account deletion request is invalid, try again" in response.text
@@ -182,6 +193,8 @@ def test_views_email_ratelimit(app, db, simple_user, settings, mailoutbox, freez
response = response.form.submit()
assert len(mailoutbox) == 3
assert 'try again later' in response.text
+ if view_name == 'password_reset':
+ assert_event('user.password.reset.failure', email=simple_user.email)
# reach ip limit
for i in range(7):
diff --git a/tests/utils.py b/tests/utils.py
index a3765f6f7..69ab3bc14 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -30,10 +30,11 @@ from django.shortcuts import resolve_url
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
-from authentic2 import utils
+from authentic2 import utils, models
+from authentic2.apps.journal.models import Event
-def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None):
+def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None, fail=False):
if path:
args = args or []
kwargs = kwargs or {}
@@ -43,17 +44,24 @@ def login(app, user, path=None, password=None, remember_me=None, args=None, kwar
login_page = app.get(reverse('auth_login'))
assert login_page.request.path == reverse('auth_login')
form = login_page.form
- form.set('username', user.username if hasattr(user, 'username') else user)
+ username = user.username if hasattr(user, 'username') else user
+ form.set('username', username)
# password is supposed to be the same as username
- form.set('password', password or user.username)
+ form.set('password', password or (user.clear_password if hasattr(user, 'clear_password') else username))
if remember_me is not None:
form.set('remember_me', bool(remember_me))
- response = form.submit(name='login-password-submit').follow()
- if path:
- assert response.request.path == path
+ response = form.submit(name='login-password-submit')
+ if fail:
+ assert response.status_code == 200
+ assert '_auth_user_id' not in app.session
else:
- assert response.request.path == reverse('auth_homepage')
- assert '_auth_user_id' in app.session
+ response = response.follow()
+ if path:
+ assert response.request.path == path
+ else:
+ assert response.request.path == reverse('auth_homepage')
+ assert '_auth_user_id' in app.session
+ assert not hasattr(user, 'id') or (app.session['_auth_user_id'] == str(user.id))
return response
@@ -242,4 +250,30 @@ def call_command(*args, **kwargs):
def text_content(node):
'''Extract text content from node and all its children. Equivalent to
xmlNodeGetContent from libxml.'''
- return u''.join(node.itertext())
+ return u''.join(node.itertext()) if node is not None else ''
+
+
+def assert_event(event_type_name, user=None, session=None, service=None, **data):
+ qs = Event.objects.filter(type__name=event_type_name)
+ if user:
+ qs = qs.filter(user=user)
+ else:
+ qs = qs.filter(user__isnull=True)
+ if session:
+ qs = qs.filter(session=session.session_key)
+ else:
+ qs = qs.filter(session__isnull=True)
+ if service:
+ qs = qs.which_references(service)
+ else:
+ qs = qs.exclude(qs._which_references_query(models.Service))
+
+ assert qs.count() == 1
+
+ if data:
+ event = qs.get()
+ assert event.data, 'no event.data, should be %s' % data
+ for key, value in data.items():
+ assert event.data.get(key) == value, (
+ 'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value)
+ )