misc: integration of journal authentic views (#47155)

This commit is contained in:
Benjamin Dauvergne 2020-10-13 19:08:00 +02:00
parent 9a1631b18a
commit 1cc04e3ad7
18 changed files with 465 additions and 39 deletions

View File

@ -133,6 +133,10 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk)
return response
else:
username = form.cleaned_data.get('username', '').strip()
if username:
request.journal.record('user.login.failure', username=username)
context['form'] = form
return render(request, 'authentic2/login_password_form.html', context)

37
src/authentic2/journal.py Normal file
View File

@ -0,0 +1,37 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from authentic2.utils.service import get_service_from_request
from authentic2.apps.journal.journal import Journal
class Journal(Journal):
def __init__(self, **kwargs):
self._service = kwargs.pop('service', None)
super().__init__(**kwargs)
@property
def service(self):
return self._service or (get_service_from_request(self.request) if self.request else None)
def massage_kwargs(self, record_parameters, kwargs):
if 'service' not in kwargs and 'service' in record_parameters:
kwargs['service'] = self.service
return super().massage_kwargs(record_parameters, kwargs)
journal = Journal()

View File

@ -0,0 +1,261 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.utils.translation import ugettext_lazy as _
from authentic2.custom_user.models import get_attributes_map
from authentic2.apps.journal.models import EventTypeDefinition
from authentic2.apps.journal.utils import form_to_old_new
from authentic2.custom_user.models import User
from . import models
class EventTypeWithService(EventTypeDefinition):
@classmethod
def record(cls, user=None, service=None, session=None, references=None, data=None):
if service:
if not data:
data = {}
data['service_name'] = str(service)
if not references:
references = []
references = [service] + references
super().record(user=user, session=session, references=references, data=data)
@classmethod
def get_service_name(self, event):
(service,) = event.get_typed_references(models.Service)
if service is not None:
return str(service)
if 'service_name' in event.data:
return event.data['service_name']
return ''
def login_method_label(how):
if how.startswith('password'):
return _('password')
elif how == 'fc':
return _('FranceConnect')
elif how == 'saml':
return _('SAML')
elif how == 'oidc':
return _('OpenIDConnect')
elif how:
return how
else:
return _('none')
def get_attributes_label(attributes_new_values):
attributes_map = get_attributes_map()
for name in attributes_new_values:
if name in ('email', 'first_name', 'last_name'):
yield str(User._meta.get_field(name).verbose_name)
else:
if name in attributes_map:
yield attributes_map[name].label
else:
yield name
class UserLogin(EventTypeWithService):
name = 'user.login'
label = _('login')
@classmethod
def record(cls, user, session, service, how):
super().record(user=user, session=session, service=service, data={'how': how})
@classmethod
def get_message(cls, event, context):
how = event.get_data('how')
return _('login using {method}').format(method=login_method_label(how))
class UserLoginFailure(EventTypeWithService):
name = 'user.login.failure'
label = _('login failure')
@classmethod
def record(cls, service, username):
super().record(service=service, data={'username': username})
@classmethod
def get_message(cls, event, context):
username = event.get_data('username')
return _('login failure with username "{username}"').format(username=username)
class UserRegistrationRequest(EventTypeDefinition):
name = 'user.registration.request'
label = _('registration request')
@classmethod
def record(cls, email):
super().record(data={'email': email.lower()})
@classmethod
def get_message(cls, event, context):
email = event.get_data('email')
return _('registration request with email "%s"') % email
class UserRegistration(EventTypeWithService):
name = 'user.registration'
label = _('registration')
@classmethod
def record(cls, user, session, service, how):
super().record(user=user, session=session, service=service, data={'how': how})
@classmethod
def get_message(cls, event, context):
how = event.get_data('how')
return _('registration using {method}').format(method=login_method_label(how))
class UserLogout(EventTypeWithService):
name = 'user.logout'
label = _('logout')
@classmethod
def record(cls, user, session, service):
super().record(user=user, session=session, service=service)
@classmethod
def get_message(cls, event, context):
return _('logout')
class UserRequestPasswordReset(EventTypeDefinition):
name = 'user.password.reset.request'
label = _('password reset request')
@classmethod
def record(cls, user, email):
super().record(user=user, data={'email': email.lower()})
@classmethod
def get_message(cls, event, context):
email = event.get_data('email')
if email:
return _('password reset request with email "%s"') % email
return super().get_message(event, context)
class UserResetPassword(EventTypeDefinition):
name = 'user.password.reset'
label = _('password reset')
@classmethod
def record(cls, user, session):
super().record(user=user, session=session)
class UserResetPasswordFailure(EventTypeDefinition):
name = 'user.password.reset.failure'
label = _('password reset failure')
@classmethod
def record(cls, email):
super().record(data={'email': email})
@classmethod
def get_message(cls, event, context):
email = event.get_data('email')
if email:
return _('password reset failure with email "%s"') % email
return super().get_message(event, context)
class UserChangePassword(EventTypeWithService):
name = 'user.password.change'
label = _('password change')
@classmethod
def record(cls, user, session, service):
super().record(user=user, session=session, service=service)
class UserEdit(EventTypeWithService):
name = 'user.profile.edit'
label = _('profile edit')
@classmethod
def record(cls, user, session, service, form):
data = form_to_old_new(form)
super().record(user=user, session=session, service=service, data=data)
@classmethod
def get_message(cls, event, context):
new = event.get_data('new')
if new:
edited_attributes = ', '.join(get_attributes_label(new))
return _('profile edit (%s)') % edited_attributes
return super().get_message(event, context)
class UserDeletion(EventTypeWithService):
name = 'user.deletion'
label = _('deletion')
@classmethod
def record(cls, user, session, service):
super().record(user=user, session=session, service=service)
class UserServiceSSO(EventTypeWithService):
name = 'user.service.sso'
label = _('service single sign on')
@classmethod
def record(cls, user, session, service, how):
super().record(user=user, session=session, service=service, data={'how': how})
@classmethod
def get_message(cls, event, context):
service_name = cls.get_service_name(event)
return _('service single sign on with "{service}"').format(service=service_name)
class UserServiceSSOAuthorization(EventTypeWithService):
name = 'user.service.sso.authorization'
label = _('consentment to single sign on')
@classmethod
def record(cls, user, session, service, **kwargs):
super().record(user=user, session=session, service=service, data=kwargs)
@classmethod
def get_message(cls, event, context):
service_name = cls.get_service_name(event)
return _('authorization of single sign on with "{service}"').format(service=service_name)
class UserServiceSSOUnauthorization(EventTypeWithService):
name = 'user.service.sso.unauthorization'
label = _('remove consentment to single sign on')
@classmethod
def record(cls, user, session, service):
super().record(user=user, session=session, service=service)
@classmethod
def get_message(cls, event, context):
service_name = cls.get_service_name(event)
return _('unauthorization of single sign on with "{service}"').format(service=service_name)

View File

@ -27,12 +27,13 @@ from django.conf import settings
from django.contrib import messages
from django.utils.deprecation import MiddlewareMixin
from django.utils.encoding import force_text
from django.utils.functional import SimpleLazyObject
from django.utils.translation import ugettext as _
from django.utils.six.moves.urllib import parse as urlparse
from django.shortcuts import render
from . import app_settings, utils, plugins
from .utils.service import get_service_from_request
from .utils.service import get_service_from_request, get_service_from_session
class CollectIPMiddleware(MiddlewareMixin):
@ -208,10 +209,18 @@ class SaveServiceInSessionMiddleware:
self.get_response = get_response
def __call__(self, request):
service = None
service = get_service_from_request(request)
if service:
request.session['service_pk'] = service.pk
request.service = SimpleLazyObject(lambda: get_service_from_session(request))
return self.get_response(request)
def journal_middleware(get_response):
from . import journal
def middleware(request):
request.journal = journal.Journal(request=request)
return get_response(request)
return middleware

View File

@ -100,6 +100,7 @@ MIDDLEWARE = (
'django.middleware.locale.LocaleMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'authentic2.middleware.journal_middleware',
)
DATABASES['default']['ATOMIC_REQUESTS'] = True

View File

@ -447,6 +447,7 @@ def login(request, user, how, service=None, service_slug=None, nonce=None, **kwa
# prevent logint-hint to influence next use of the login page
if 'login-hint' in request.session:
del request.session['login-hint']
request.journal.record('user.login', how=how)
return continue_to_next_url(request, **kwargs)
@ -757,6 +758,7 @@ def send_registration_mail(request, email, ou, template_names=None, next_url=Non
legacy_html_body_templates=['registration/activation_email.html'])
logger.info(u'registration mail sent to %s with registration URL %s...', email,
registration_url)
request.journal.record('user.registration.request', email=email)
def send_account_deletion_code(request, user):
@ -823,6 +825,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
sign_next_url=True,
**kwargs):
from .. import middleware
from authentic2.journal import journal
if not user.email:
raise ValueError('user must have an email')
@ -852,6 +855,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
per_ou_templates=True, **kwargs)
logger.info(u'password reset request for user %s, email sent to %s '
'with token %s', user, user.email, token.uuid)
journal.record('user.password.reset.request', email=user.email, user=user)
def batch(iterable, size):

View File

@ -52,9 +52,17 @@ def get_service_from_ref(ref):
def get_service_from_request(request):
service_ref = request.GET.get(SERVICE_FIELD_NAME)
if not service_ref or '\x00' in service_ref:
return None
return get_service_from_ref(service_ref)
if service_ref and '\x00' not in service_ref:
return get_service_from_ref(service_ref)
return None
def get_service_from_session(request):
session = getattr(request, 'session', None)
if session and 'service_pk' in session:
from authentic2.models import Service
return Service.objects.get(pk=session['service_pk'])
return None
def get_service_from_token(params):

View File

@ -151,6 +151,7 @@ class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
def form_valid(self, form):
response = super(EditProfile, self).form_valid(form)
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
self.request.journal.record('user.profile.edit', form=form)
return response
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
@ -575,6 +576,7 @@ def logout(request,
targets = redirect_logout_list(request)
logger.debug('Accumulated redirections : {}'.format(targets))
# Local logout
request.journal.record('user.logout')
auth_logout(request)
logger.info('Logged out')
local_logout_done = True
@ -684,6 +686,7 @@ class PasswordResetView(FormView):
if is_ratelimited(self.request, key='post:email', group='pw-reset-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT, increment=True):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_('Multiple emails have already been sent to this address. Further attempts are '
@ -692,6 +695,7 @@ class PasswordResetView(FormView):
return self.form_invalid(form)
if is_ratelimited(self.request, key='ip', group='pw-reset-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT, increment=True):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_('Multiple password reset attempts have already been made from this IP address. No '
@ -783,7 +787,9 @@ class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
return self.finish()
def finish(self):
return utils.simulate_authentication(self.request, self.user, 'email')
response = utils.simulate_authentication(self.request, self.user, 'email')
self.request.journal.record('user.password.reset')
return response
password_reset_confirm = PasswordResetConfirmView.as_view()
@ -1136,6 +1142,11 @@ class RegistrationCompletionView(CreateView):
return self.registration_success(self.request, form.instance, form)
def registration_success(self, request, user, form):
request.journal.record(
'user.registration',
user=user,
session=None,
how=self.authentication_method)
hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
authentication_method=self.authentication_method,
token=self.token, service=self.service and self.service.slug)
@ -1233,6 +1244,7 @@ class ValidateDeletionView(TemplateView):
self.user.mark_as_deleted()
logger.info(u'deletion of account %s performed', self.user)
hooks.call_hooks('event', name='delete-account', user=self.user)
request.journal.record('user.deletion', user=self.user)
if self.user == request.user:
# No validation message displayed, as the user will surely
# notice their own account deletion...
@ -1289,7 +1301,9 @@ class PasswordChangeView(DjPasswordChangeView):
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
messages.info(self.request, _('Password changed'))
models.PasswordReset.objects.filter(user=self.request.user).delete()
return super(PasswordChangeView, self).form_valid(form)
response = super(PasswordChangeView, self).form_valid(form)
self.request.journal.record('user.password.change', session=self.request.session)
return response
def get_form_class(self):
if self.request.user.has_usable_password():

View File

@ -287,6 +287,10 @@ def authorize(request, *args, **kwargs):
expired=start + datetime.timedelta(days=365))
if pk_to_deletes:
auth_manager.filter(pk__in=pk_to_deletes).delete()
request.journal.record(
'user.service.sso.authorization',
service=client,
scopes=list(sorted(scopes)))
logger.info(u'authorized scopes %s saved for service %s', ' '.join(scopes),
client.name)
else:
@ -365,6 +369,10 @@ def authorize(request, *args, **kwargs):
})
# query is transfered through the hashtag
response = redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
request.journal.record(
'user.service.sso',
service=client,
how=last_auth and last_auth.get('how'))
hooks.call_hooks('event', name='sso-success', idp='oidc', service=client, user=request.user)
utils.add_oidc_session(request, client)
return response

View File

@ -126,6 +126,7 @@ def create_user(**kwargs):
password = kwargs.pop('password', None) or kwargs['username']
user, created = User.objects.get_or_create(**kwargs)
if password:
user.clear_password = password
user.set_password(password)
user.save()
return user
@ -167,7 +168,7 @@ def user_ou1(db, ou1):
@pytest.fixture
def user_ou2(db, ou2):
return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe',
return create_user(username='john.doe.ou2', first_name=u'Jôhn', last_name=u'Dôe',
email='john.doe@example.net', ou=ou2)

View File

@ -41,7 +41,7 @@ from django_rbac.utils import get_role_model, get_ou_model
from authentic2 import utils, models, attribute_kinds
from .utils import Authentic2TestCase, get_response_form, get_link_from_mail
from .utils import Authentic2TestCase, get_response_form, get_link_from_mail, assert_event
class SerializerTests(TestCase):
@ -215,6 +215,7 @@ class UserProfileTests(TestCase):
user = User.objects.create(username='testbot')
user.set_password('secret')
user.save()
self.user = user
self.client = Client()
def test_edit_profile_attributes(self):
@ -249,6 +250,8 @@ class UserProfileTests(TestCase):
for k, v in kwargs.items())
response = self.client.post(reverse('profile_edit'), kwargs)
new = {'custom': 'random data', 'next_url': '', 'national_number': 'xx20153566342yy'}
assert_event('user.profile.edit', user=self.user, session=self.client.session, old={}, new=new)
self.assertEqual(response.status_code, 302)
response = self.client.get(reverse('account_management'))

View File

@ -236,6 +236,13 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi
assert authz.expired >= now()
else:
assert OIDCAuthorization.objects.count() == 0
utils.assert_event('user.service.sso.authorization',
session=app.session,
user=simple_user, service=oidc_client,
scopes=['email', 'openid', 'profile'])
utils.assert_event('user.service.sso', session=app.session,
user=simple_user, service=oidc_client,
how='password-on-https')
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()

View File

@ -21,11 +21,25 @@ from django.contrib.auth import get_user_model
from authentic2 import models
from .utils import login, check_log
from .utils import login, check_log, assert_event
User = get_user_model()
def test_success(db, app, simple_user):
login(app, simple_user)
assert_event('user.login', user=simple_user, session=app.session, how='password-on-https')
session = app.session
app.get('/logout/').form.submit()
assert_event('user.logout', user=simple_user, session=session)
def test_failure(db, app, simple_user):
login(app, simple_user, password='wrong', fail=True)
assert_event('user.login.failure', username=simple_user.username)
def test_login_inactive_user(db, app):
User = get_user_model()
user1 = User.objects.create(username='john.doe')
user1.set_password('john.doe')
user1.save()
@ -39,7 +53,7 @@ def test_login_inactive_user(db, app):
assert '_auth_user_id' not in app.session
user1.is_active = False
user1.save()
login(app, user1)
login(app, user2)
assert int(app.session['_auth_user_id']) == user2.id
app.get('/logout/').form.submit()
assert '_auth_user_id' not in app.session

View File

@ -23,14 +23,16 @@ from . import utils
def test_send_password_reset_email(app, simple_user, mailoutbox):
from authentic2.utils import send_password_reset_mail
assert len(mailoutbox) == 0
send_password_reset_mail(
simple_user,
legacy_subject_templates=['registration/password_reset_subject.txt'],
legacy_body_templates=['registration/password_reset_email.html'],
context={
'base_url': 'http://testserver',
})
with utils.run_on_commit_hooks():
send_password_reset_mail(
simple_user,
legacy_subject_templates=['registration/password_reset_subject.txt'],
legacy_body_templates=['registration/password_reset_email.html'],
context={
'base_url': 'http://testserver',
})
assert len(mailoutbox) == 1
utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
url = utils.get_link_from_mail(mailoutbox[0])
relative_url = url.split('testserver')[1]
resp = app.get(relative_url, status=200)
@ -38,6 +40,7 @@ def test_send_password_reset_email(app, simple_user, mailoutbox):
resp.form.set('new_password2', '1234==aA')
resp = resp.form.submit().follow()
assert str(app.session['_auth_user_id']) == str(simple_user.pk)
utils.assert_event('user.password.reset', user=simple_user, session=app.session)
def test_view(app, simple_user, mailoutbox, settings):
@ -47,6 +50,7 @@ def test_view(app, simple_user, mailoutbox, settings):
assert len(mailoutbox) == 0
settings.DEFAULT_FROM_EMAIL = 'show only addr <noreply@example.net>'
resp = resp.form.submit()
utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
assert resp['Location'].endswith('/instructions/')
resp = resp.follow()
assert simple_user.email in resp.text

View File

@ -25,13 +25,13 @@ from django.utils.six.moves.urllib.parse import urlparse
from authentic2 import utils, models
from authentic2.validators import EmailValidator
from .utils import get_link_from_mail
from .utils import get_link_from_mail, assert_event
User = get_user_model()
def test_registration(app, db, settings, mailoutbox, external_redirect):
def test_registration_success(app, db, settings, mailoutbox, external_redirect):
next_url, good_next_url = external_redirect
settings.LANGUAGE_CODE = 'en-us'
@ -45,6 +45,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
response.form.set('email', 'testbot@entrouvert.com')
response = response.form.submit()
assert_event('user.registration.request', email='testbot@entrouvert.com')
assert urlparse(response['Location']).path == reverse('registration_complete')
if not good_next_url:
assert not urlparse(response['Location']).query
@ -87,6 +88,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
assert 'was successful' in mailoutbox[1].body
new_user = User.objects.get()
assert_event('user.registration', user=new_user, how='email')
assert new_user.email == 'testbot@entrouvert.com'
assert new_user.username is None
assert new_user.check_password('T0==toto')

View File

@ -23,6 +23,7 @@ from django.utils.functional import lazy
from django_rbac.utils import get_ou_model
from authentic2.journal import Journal
from authentic2.utils import (good_next_url, same_origin, select_next_url,
user_can_change_password, login,
get_authentication_events, authenticate,
@ -91,6 +92,7 @@ def test_get_authentication_events_hows(rf, simple_user):
middleware = AuthenticationMiddleware()
middleware.process_request(request)
MessageMiddleware().process_request(request)
request.journal = Journal(request=request)
assert 'password' not in [ev['how'] for ev in get_authentication_events(request)]
login(request, user, 'password')
assert 'password' in [ev['how'] for ev in get_authentication_events(request)]

View File

@ -16,7 +16,7 @@
# authentic2
import datetime
from .utils import login, logout, get_link_from_mail
from .utils import login, logout, get_link_from_mail, assert_event
import pytest
from django.urls import reverse
@ -38,13 +38,19 @@ def test_password_change(app, simple_user):
simple_user.set_password('hop')
simple_user.save()
resp = login(app, simple_user, password='hop', path=reverse('password_change'))
old_session_key = app.session.session_key
resp.form['old_password'] = 'hop'
resp.form['new_password1'] = 'hopAbcde1'
resp.form['new_password2'] = 'hopAbcde1'
resp = resp.form.submit()
new_session_key = app.session.session_key
assert old_session_key != new_session_key, 'session\'s key has not been cycled'
assert resp.location == '/accounts/password/change/done/'
assert_event('user.password.change', user=simple_user, session=app.session)
def test_account_delete(app, simple_user, mailoutbox):
@ -84,8 +90,8 @@ def test_account_delete_when_logged_out(app, simple_user, mailoutbox):
link = get_link_from_mail(mailoutbox[0])
logout(app)
page = app.get(link)
assert 'You are about to delete the account of <strong>%s</strong>.' % \
escape(simple_user.get_full_name()) in page.text
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
simple_user.get_full_name()) in page.text
response = page.form.submit(name='delete').follow().follow()
assert not User.objects.get(pk=simple_user.pk).is_active
assert len(mailoutbox) == 2
@ -105,8 +111,8 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
logout(app)
login(app, user_ou1, path=reverse('account_management'))
page = app.get(link)
assert 'You are about to delete the account of <strong>%s</strong>.' % \
escape(simple_user.get_full_name()) in page.text
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
simple_user.get_full_name()) in page.text
response = page.form.submit(name='delete').follow()
assert not User.objects.get(pk=simple_user.pk).is_active
assert User.objects.get(pk=user_ou1.pk).is_active
@ -117,7 +123,12 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
def test_account_delete_fake_token(app, simple_user, mailoutbox):
response = app.get(reverse('validate_deletion', kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'})).follow().follow()
response = (
app.get(reverse('validate_deletion',
kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'}))
.follow()
.follow()
)
assert "The account deletion request is invalid, try again" in response.text
@ -182,6 +193,8 @@ def test_views_email_ratelimit(app, db, simple_user, settings, mailoutbox, freez
response = response.form.submit()
assert len(mailoutbox) == 3
assert 'try again later' in response.text
if view_name == 'password_reset':
assert_event('user.password.reset.failure', email=simple_user.email)
# reach ip limit
for i in range(7):

View File

@ -30,10 +30,11 @@ from django.shortcuts import resolve_url
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from authentic2 import utils
from authentic2 import utils, models
from authentic2.apps.journal.models import Event
def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None):
def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None, fail=False):
if path:
args = args or []
kwargs = kwargs or {}
@ -43,17 +44,24 @@ def login(app, user, path=None, password=None, remember_me=None, args=None, kwar
login_page = app.get(reverse('auth_login'))
assert login_page.request.path == reverse('auth_login')
form = login_page.form
form.set('username', user.username if hasattr(user, 'username') else user)
username = user.username if hasattr(user, 'username') else user
form.set('username', username)
# password is supposed to be the same as username
form.set('password', password or user.username)
form.set('password', password or (user.clear_password if hasattr(user, 'clear_password') else username))
if remember_me is not None:
form.set('remember_me', bool(remember_me))
response = form.submit(name='login-password-submit').follow()
if path:
assert response.request.path == path
response = form.submit(name='login-password-submit')
if fail:
assert response.status_code == 200
assert '_auth_user_id' not in app.session
else:
assert response.request.path == reverse('auth_homepage')
assert '_auth_user_id' in app.session
response = response.follow()
if path:
assert response.request.path == path
else:
assert response.request.path == reverse('auth_homepage')
assert '_auth_user_id' in app.session
assert not hasattr(user, 'id') or (app.session['_auth_user_id'] == str(user.id))
return response
@ -242,4 +250,30 @@ def call_command(*args, **kwargs):
def text_content(node):
'''Extract text content from node and all its children. Equivalent to
xmlNodeGetContent from libxml.'''
return u''.join(node.itertext())
return u''.join(node.itertext()) if node is not None else ''
def assert_event(event_type_name, user=None, session=None, service=None, **data):
qs = Event.objects.filter(type__name=event_type_name)
if user:
qs = qs.filter(user=user)
else:
qs = qs.filter(user__isnull=True)
if session:
qs = qs.filter(session=session.session_key)
else:
qs = qs.filter(session__isnull=True)
if service:
qs = qs.which_references(service)
else:
qs = qs.exclude(qs._which_references_query(models.Service))
assert qs.count() == 1
if data:
event = qs.get()
assert event.data, 'no event.data, should be %s' % data
for key, value in data.items():
assert event.data.get(key) == value, (
'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value)
)