misc: integration of journal authentic views (#47155)
This commit is contained in:
parent
9a1631b18a
commit
1cc04e3ad7
|
@ -133,6 +133,10 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
|
|||
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk)
|
||||
|
||||
return response
|
||||
else:
|
||||
username = form.cleaned_data.get('username', '').strip()
|
||||
if username:
|
||||
request.journal.record('user.login.failure', username=username)
|
||||
context['form'] = form
|
||||
return render(request, 'authentic2/login_password_form.html', context)
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2020 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from authentic2.utils.service import get_service_from_request
|
||||
|
||||
from authentic2.apps.journal.journal import Journal
|
||||
|
||||
|
||||
class Journal(Journal):
|
||||
def __init__(self, **kwargs):
|
||||
self._service = kwargs.pop('service', None)
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def service(self):
|
||||
return self._service or (get_service_from_request(self.request) if self.request else None)
|
||||
|
||||
def massage_kwargs(self, record_parameters, kwargs):
|
||||
if 'service' not in kwargs and 'service' in record_parameters:
|
||||
kwargs['service'] = self.service
|
||||
return super().massage_kwargs(record_parameters, kwargs)
|
||||
|
||||
|
||||
journal = Journal()
|
|
@ -0,0 +1,261 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2020 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from authentic2.custom_user.models import get_attributes_map
|
||||
from authentic2.apps.journal.models import EventTypeDefinition
|
||||
from authentic2.apps.journal.utils import form_to_old_new
|
||||
from authentic2.custom_user.models import User
|
||||
|
||||
from . import models
|
||||
|
||||
|
||||
class EventTypeWithService(EventTypeDefinition):
|
||||
@classmethod
|
||||
def record(cls, user=None, service=None, session=None, references=None, data=None):
|
||||
if service:
|
||||
if not data:
|
||||
data = {}
|
||||
data['service_name'] = str(service)
|
||||
if not references:
|
||||
references = []
|
||||
references = [service] + references
|
||||
super().record(user=user, session=session, references=references, data=data)
|
||||
|
||||
@classmethod
|
||||
def get_service_name(self, event):
|
||||
(service,) = event.get_typed_references(models.Service)
|
||||
if service is not None:
|
||||
return str(service)
|
||||
if 'service_name' in event.data:
|
||||
return event.data['service_name']
|
||||
return ''
|
||||
|
||||
|
||||
def login_method_label(how):
|
||||
if how.startswith('password'):
|
||||
return _('password')
|
||||
elif how == 'fc':
|
||||
return _('FranceConnect')
|
||||
elif how == 'saml':
|
||||
return _('SAML')
|
||||
elif how == 'oidc':
|
||||
return _('OpenIDConnect')
|
||||
elif how:
|
||||
return how
|
||||
else:
|
||||
return _('none')
|
||||
|
||||
|
||||
def get_attributes_label(attributes_new_values):
|
||||
attributes_map = get_attributes_map()
|
||||
for name in attributes_new_values:
|
||||
if name in ('email', 'first_name', 'last_name'):
|
||||
yield str(User._meta.get_field(name).verbose_name)
|
||||
else:
|
||||
if name in attributes_map:
|
||||
yield attributes_map[name].label
|
||||
else:
|
||||
yield name
|
||||
|
||||
|
||||
class UserLogin(EventTypeWithService):
|
||||
name = 'user.login'
|
||||
label = _('login')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service, how):
|
||||
super().record(user=user, session=session, service=service, data={'how': how})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
how = event.get_data('how')
|
||||
return _('login using {method}').format(method=login_method_label(how))
|
||||
|
||||
|
||||
class UserLoginFailure(EventTypeWithService):
|
||||
name = 'user.login.failure'
|
||||
label = _('login failure')
|
||||
|
||||
@classmethod
|
||||
def record(cls, service, username):
|
||||
super().record(service=service, data={'username': username})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
username = event.get_data('username')
|
||||
return _('login failure with username "{username}"').format(username=username)
|
||||
|
||||
|
||||
class UserRegistrationRequest(EventTypeDefinition):
|
||||
name = 'user.registration.request'
|
||||
label = _('registration request')
|
||||
|
||||
@classmethod
|
||||
def record(cls, email):
|
||||
super().record(data={'email': email.lower()})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
email = event.get_data('email')
|
||||
return _('registration request with email "%s"') % email
|
||||
|
||||
|
||||
class UserRegistration(EventTypeWithService):
|
||||
name = 'user.registration'
|
||||
label = _('registration')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service, how):
|
||||
super().record(user=user, session=session, service=service, data={'how': how})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
how = event.get_data('how')
|
||||
return _('registration using {method}').format(method=login_method_label(how))
|
||||
|
||||
|
||||
class UserLogout(EventTypeWithService):
|
||||
name = 'user.logout'
|
||||
label = _('logout')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service):
|
||||
super().record(user=user, session=session, service=service)
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
return _('logout')
|
||||
|
||||
|
||||
class UserRequestPasswordReset(EventTypeDefinition):
|
||||
name = 'user.password.reset.request'
|
||||
label = _('password reset request')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, email):
|
||||
super().record(user=user, data={'email': email.lower()})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
email = event.get_data('email')
|
||||
if email:
|
||||
return _('password reset request with email "%s"') % email
|
||||
return super().get_message(event, context)
|
||||
|
||||
|
||||
class UserResetPassword(EventTypeDefinition):
|
||||
name = 'user.password.reset'
|
||||
label = _('password reset')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session):
|
||||
super().record(user=user, session=session)
|
||||
|
||||
|
||||
class UserResetPasswordFailure(EventTypeDefinition):
|
||||
name = 'user.password.reset.failure'
|
||||
label = _('password reset failure')
|
||||
|
||||
@classmethod
|
||||
def record(cls, email):
|
||||
super().record(data={'email': email})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
email = event.get_data('email')
|
||||
if email:
|
||||
return _('password reset failure with email "%s"') % email
|
||||
return super().get_message(event, context)
|
||||
|
||||
|
||||
class UserChangePassword(EventTypeWithService):
|
||||
name = 'user.password.change'
|
||||
label = _('password change')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service):
|
||||
super().record(user=user, session=session, service=service)
|
||||
|
||||
|
||||
class UserEdit(EventTypeWithService):
|
||||
name = 'user.profile.edit'
|
||||
label = _('profile edit')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service, form):
|
||||
data = form_to_old_new(form)
|
||||
super().record(user=user, session=session, service=service, data=data)
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
new = event.get_data('new')
|
||||
if new:
|
||||
edited_attributes = ', '.join(get_attributes_label(new))
|
||||
return _('profile edit (%s)') % edited_attributes
|
||||
return super().get_message(event, context)
|
||||
|
||||
|
||||
class UserDeletion(EventTypeWithService):
|
||||
name = 'user.deletion'
|
||||
label = _('deletion')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service):
|
||||
super().record(user=user, session=session, service=service)
|
||||
|
||||
|
||||
class UserServiceSSO(EventTypeWithService):
|
||||
name = 'user.service.sso'
|
||||
label = _('service single sign on')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service, how):
|
||||
super().record(user=user, session=session, service=service, data={'how': how})
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
service_name = cls.get_service_name(event)
|
||||
return _('service single sign on with "{service}"').format(service=service_name)
|
||||
|
||||
|
||||
class UserServiceSSOAuthorization(EventTypeWithService):
|
||||
name = 'user.service.sso.authorization'
|
||||
label = _('consentment to single sign on')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service, **kwargs):
|
||||
super().record(user=user, session=session, service=service, data=kwargs)
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
service_name = cls.get_service_name(event)
|
||||
return _('authorization of single sign on with "{service}"').format(service=service_name)
|
||||
|
||||
|
||||
class UserServiceSSOUnauthorization(EventTypeWithService):
|
||||
name = 'user.service.sso.unauthorization'
|
||||
label = _('remove consentment to single sign on')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, service):
|
||||
super().record(user=user, session=session, service=service)
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
service_name = cls.get_service_name(event)
|
||||
return _('unauthorization of single sign on with "{service}"').format(service=service_name)
|
|
@ -27,12 +27,13 @@ from django.conf import settings
|
|||
from django.contrib import messages
|
||||
from django.utils.deprecation import MiddlewareMixin
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.shortcuts import render
|
||||
|
||||
from . import app_settings, utils, plugins
|
||||
from .utils.service import get_service_from_request
|
||||
from .utils.service import get_service_from_request, get_service_from_session
|
||||
|
||||
|
||||
class CollectIPMiddleware(MiddlewareMixin):
|
||||
|
@ -208,10 +209,18 @@ class SaveServiceInSessionMiddleware:
|
|||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
service = None
|
||||
|
||||
service = get_service_from_request(request)
|
||||
if service:
|
||||
request.session['service_pk'] = service.pk
|
||||
|
||||
request.service = SimpleLazyObject(lambda: get_service_from_session(request))
|
||||
return self.get_response(request)
|
||||
|
||||
|
||||
def journal_middleware(get_response):
|
||||
from . import journal
|
||||
|
||||
def middleware(request):
|
||||
request.journal = journal.Journal(request=request)
|
||||
return get_response(request)
|
||||
|
||||
return middleware
|
||||
|
|
|
@ -100,6 +100,7 @@ MIDDLEWARE = (
|
|||
'django.middleware.locale.LocaleMiddleware',
|
||||
'django.contrib.auth.middleware.AuthenticationMiddleware',
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'authentic2.middleware.journal_middleware',
|
||||
)
|
||||
|
||||
DATABASES['default']['ATOMIC_REQUESTS'] = True
|
||||
|
|
|
@ -447,6 +447,7 @@ def login(request, user, how, service=None, service_slug=None, nonce=None, **kwa
|
|||
# prevent logint-hint to influence next use of the login page
|
||||
if 'login-hint' in request.session:
|
||||
del request.session['login-hint']
|
||||
request.journal.record('user.login', how=how)
|
||||
return continue_to_next_url(request, **kwargs)
|
||||
|
||||
|
||||
|
@ -757,6 +758,7 @@ def send_registration_mail(request, email, ou, template_names=None, next_url=Non
|
|||
legacy_html_body_templates=['registration/activation_email.html'])
|
||||
logger.info(u'registration mail sent to %s with registration URL %s...', email,
|
||||
registration_url)
|
||||
request.journal.record('user.registration.request', email=email)
|
||||
|
||||
|
||||
def send_account_deletion_code(request, user):
|
||||
|
@ -823,6 +825,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
|
|||
sign_next_url=True,
|
||||
**kwargs):
|
||||
from .. import middleware
|
||||
from authentic2.journal import journal
|
||||
|
||||
if not user.email:
|
||||
raise ValueError('user must have an email')
|
||||
|
@ -852,6 +855,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
|
|||
per_ou_templates=True, **kwargs)
|
||||
logger.info(u'password reset request for user %s, email sent to %s '
|
||||
'with token %s', user, user.email, token.uuid)
|
||||
journal.record('user.password.reset.request', email=user.email, user=user)
|
||||
|
||||
|
||||
def batch(iterable, size):
|
||||
|
|
|
@ -52,9 +52,17 @@ def get_service_from_ref(ref):
|
|||
|
||||
def get_service_from_request(request):
|
||||
service_ref = request.GET.get(SERVICE_FIELD_NAME)
|
||||
if not service_ref or '\x00' in service_ref:
|
||||
return None
|
||||
return get_service_from_ref(service_ref)
|
||||
if service_ref and '\x00' not in service_ref:
|
||||
return get_service_from_ref(service_ref)
|
||||
return None
|
||||
|
||||
|
||||
def get_service_from_session(request):
|
||||
session = getattr(request, 'session', None)
|
||||
if session and 'service_pk' in session:
|
||||
from authentic2.models import Service
|
||||
return Service.objects.get(pk=session['service_pk'])
|
||||
return None
|
||||
|
||||
|
||||
def get_service_from_token(params):
|
||||
|
|
|
@ -151,6 +151,7 @@ class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
|
|||
def form_valid(self, form):
|
||||
response = super(EditProfile, self).form_valid(form)
|
||||
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
|
||||
self.request.journal.record('user.profile.edit', form=form)
|
||||
return response
|
||||
|
||||
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
|
||||
|
@ -575,6 +576,7 @@ def logout(request,
|
|||
targets = redirect_logout_list(request)
|
||||
logger.debug('Accumulated redirections : {}'.format(targets))
|
||||
# Local logout
|
||||
request.journal.record('user.logout')
|
||||
auth_logout(request)
|
||||
logger.info('Logged out')
|
||||
local_logout_done = True
|
||||
|
@ -684,6 +686,7 @@ class PasswordResetView(FormView):
|
|||
|
||||
if is_ratelimited(self.request, key='post:email', group='pw-reset-email',
|
||||
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT, increment=True):
|
||||
self.request.journal.record('user.password.reset.failure', email=email)
|
||||
form.add_error(
|
||||
'email',
|
||||
_('Multiple emails have already been sent to this address. Further attempts are '
|
||||
|
@ -692,6 +695,7 @@ class PasswordResetView(FormView):
|
|||
return self.form_invalid(form)
|
||||
if is_ratelimited(self.request, key='ip', group='pw-reset-email',
|
||||
rate=app_settings.A2_EMAILS_IP_RATELIMIT, increment=True):
|
||||
self.request.journal.record('user.password.reset.failure', email=email)
|
||||
form.add_error(
|
||||
'email',
|
||||
_('Multiple password reset attempts have already been made from this IP address. No '
|
||||
|
@ -783,7 +787,9 @@ class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
|
|||
return self.finish()
|
||||
|
||||
def finish(self):
|
||||
return utils.simulate_authentication(self.request, self.user, 'email')
|
||||
response = utils.simulate_authentication(self.request, self.user, 'email')
|
||||
self.request.journal.record('user.password.reset')
|
||||
return response
|
||||
|
||||
password_reset_confirm = PasswordResetConfirmView.as_view()
|
||||
|
||||
|
@ -1136,6 +1142,11 @@ class RegistrationCompletionView(CreateView):
|
|||
return self.registration_success(self.request, form.instance, form)
|
||||
|
||||
def registration_success(self, request, user, form):
|
||||
request.journal.record(
|
||||
'user.registration',
|
||||
user=user,
|
||||
session=None,
|
||||
how=self.authentication_method)
|
||||
hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
|
||||
authentication_method=self.authentication_method,
|
||||
token=self.token, service=self.service and self.service.slug)
|
||||
|
@ -1233,6 +1244,7 @@ class ValidateDeletionView(TemplateView):
|
|||
self.user.mark_as_deleted()
|
||||
logger.info(u'deletion of account %s performed', self.user)
|
||||
hooks.call_hooks('event', name='delete-account', user=self.user)
|
||||
request.journal.record('user.deletion', user=self.user)
|
||||
if self.user == request.user:
|
||||
# No validation message displayed, as the user will surely
|
||||
# notice their own account deletion...
|
||||
|
@ -1289,7 +1301,9 @@ class PasswordChangeView(DjPasswordChangeView):
|
|||
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
|
||||
messages.info(self.request, _('Password changed'))
|
||||
models.PasswordReset.objects.filter(user=self.request.user).delete()
|
||||
return super(PasswordChangeView, self).form_valid(form)
|
||||
response = super(PasswordChangeView, self).form_valid(form)
|
||||
self.request.journal.record('user.password.change', session=self.request.session)
|
||||
return response
|
||||
|
||||
def get_form_class(self):
|
||||
if self.request.user.has_usable_password():
|
||||
|
|
|
@ -287,6 +287,10 @@ def authorize(request, *args, **kwargs):
|
|||
expired=start + datetime.timedelta(days=365))
|
||||
if pk_to_deletes:
|
||||
auth_manager.filter(pk__in=pk_to_deletes).delete()
|
||||
request.journal.record(
|
||||
'user.service.sso.authorization',
|
||||
service=client,
|
||||
scopes=list(sorted(scopes)))
|
||||
logger.info(u'authorized scopes %s saved for service %s', ' '.join(scopes),
|
||||
client.name)
|
||||
else:
|
||||
|
@ -365,6 +369,10 @@ def authorize(request, *args, **kwargs):
|
|||
})
|
||||
# query is transfered through the hashtag
|
||||
response = redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
|
||||
request.journal.record(
|
||||
'user.service.sso',
|
||||
service=client,
|
||||
how=last_auth and last_auth.get('how'))
|
||||
hooks.call_hooks('event', name='sso-success', idp='oidc', service=client, user=request.user)
|
||||
utils.add_oidc_session(request, client)
|
||||
return response
|
||||
|
|
|
@ -126,6 +126,7 @@ def create_user(**kwargs):
|
|||
password = kwargs.pop('password', None) or kwargs['username']
|
||||
user, created = User.objects.get_or_create(**kwargs)
|
||||
if password:
|
||||
user.clear_password = password
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
return user
|
||||
|
@ -167,7 +168,7 @@ def user_ou1(db, ou1):
|
|||
|
||||
@pytest.fixture
|
||||
def user_ou2(db, ou2):
|
||||
return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe',
|
||||
return create_user(username='john.doe.ou2', first_name=u'Jôhn', last_name=u'Dôe',
|
||||
email='john.doe@example.net', ou=ou2)
|
||||
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ from django_rbac.utils import get_role_model, get_ou_model
|
|||
|
||||
from authentic2 import utils, models, attribute_kinds
|
||||
|
||||
from .utils import Authentic2TestCase, get_response_form, get_link_from_mail
|
||||
from .utils import Authentic2TestCase, get_response_form, get_link_from_mail, assert_event
|
||||
|
||||
|
||||
class SerializerTests(TestCase):
|
||||
|
@ -215,6 +215,7 @@ class UserProfileTests(TestCase):
|
|||
user = User.objects.create(username='testbot')
|
||||
user.set_password('secret')
|
||||
user.save()
|
||||
self.user = user
|
||||
self.client = Client()
|
||||
|
||||
def test_edit_profile_attributes(self):
|
||||
|
@ -249,6 +250,8 @@ class UserProfileTests(TestCase):
|
|||
for k, v in kwargs.items())
|
||||
|
||||
response = self.client.post(reverse('profile_edit'), kwargs)
|
||||
new = {'custom': 'random data', 'next_url': '', 'national_number': 'xx20153566342yy'}
|
||||
assert_event('user.profile.edit', user=self.user, session=self.client.session, old={}, new=new)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
response = self.client.get(reverse('account_management'))
|
||||
|
|
|
@ -236,6 +236,13 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi
|
|||
assert authz.expired >= now()
|
||||
else:
|
||||
assert OIDCAuthorization.objects.count() == 0
|
||||
utils.assert_event('user.service.sso.authorization',
|
||||
session=app.session,
|
||||
user=simple_user, service=oidc_client,
|
||||
scopes=['email', 'openid', 'profile'])
|
||||
utils.assert_event('user.service.sso', session=app.session,
|
||||
user=simple_user, service=oidc_client,
|
||||
how='password-on-https')
|
||||
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
|
||||
assert OIDCCode.objects.count() == 1
|
||||
code = OIDCCode.objects.get()
|
||||
|
|
|
@ -21,11 +21,25 @@ from django.contrib.auth import get_user_model
|
|||
|
||||
from authentic2 import models
|
||||
|
||||
from .utils import login, check_log
|
||||
from .utils import login, check_log, assert_event
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
def test_success(db, app, simple_user):
|
||||
login(app, simple_user)
|
||||
assert_event('user.login', user=simple_user, session=app.session, how='password-on-https')
|
||||
session = app.session
|
||||
app.get('/logout/').form.submit()
|
||||
assert_event('user.logout', user=simple_user, session=session)
|
||||
|
||||
|
||||
def test_failure(db, app, simple_user):
|
||||
login(app, simple_user, password='wrong', fail=True)
|
||||
assert_event('user.login.failure', username=simple_user.username)
|
||||
|
||||
|
||||
def test_login_inactive_user(db, app):
|
||||
User = get_user_model()
|
||||
user1 = User.objects.create(username='john.doe')
|
||||
user1.set_password('john.doe')
|
||||
user1.save()
|
||||
|
@ -39,7 +53,7 @@ def test_login_inactive_user(db, app):
|
|||
assert '_auth_user_id' not in app.session
|
||||
user1.is_active = False
|
||||
user1.save()
|
||||
login(app, user1)
|
||||
login(app, user2)
|
||||
assert int(app.session['_auth_user_id']) == user2.id
|
||||
app.get('/logout/').form.submit()
|
||||
assert '_auth_user_id' not in app.session
|
||||
|
|
|
@ -23,14 +23,16 @@ from . import utils
|
|||
def test_send_password_reset_email(app, simple_user, mailoutbox):
|
||||
from authentic2.utils import send_password_reset_mail
|
||||
assert len(mailoutbox) == 0
|
||||
send_password_reset_mail(
|
||||
simple_user,
|
||||
legacy_subject_templates=['registration/password_reset_subject.txt'],
|
||||
legacy_body_templates=['registration/password_reset_email.html'],
|
||||
context={
|
||||
'base_url': 'http://testserver',
|
||||
})
|
||||
with utils.run_on_commit_hooks():
|
||||
send_password_reset_mail(
|
||||
simple_user,
|
||||
legacy_subject_templates=['registration/password_reset_subject.txt'],
|
||||
legacy_body_templates=['registration/password_reset_email.html'],
|
||||
context={
|
||||
'base_url': 'http://testserver',
|
||||
})
|
||||
assert len(mailoutbox) == 1
|
||||
utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
|
||||
url = utils.get_link_from_mail(mailoutbox[0])
|
||||
relative_url = url.split('testserver')[1]
|
||||
resp = app.get(relative_url, status=200)
|
||||
|
@ -38,6 +40,7 @@ def test_send_password_reset_email(app, simple_user, mailoutbox):
|
|||
resp.form.set('new_password2', '1234==aA')
|
||||
resp = resp.form.submit().follow()
|
||||
assert str(app.session['_auth_user_id']) == str(simple_user.pk)
|
||||
utils.assert_event('user.password.reset', user=simple_user, session=app.session)
|
||||
|
||||
|
||||
def test_view(app, simple_user, mailoutbox, settings):
|
||||
|
@ -47,6 +50,7 @@ def test_view(app, simple_user, mailoutbox, settings):
|
|||
assert len(mailoutbox) == 0
|
||||
settings.DEFAULT_FROM_EMAIL = 'show only addr <noreply@example.net>'
|
||||
resp = resp.form.submit()
|
||||
utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email)
|
||||
assert resp['Location'].endswith('/instructions/')
|
||||
resp = resp.follow()
|
||||
assert simple_user.email in resp.text
|
||||
|
|
|
@ -25,13 +25,13 @@ from django.utils.six.moves.urllib.parse import urlparse
|
|||
from authentic2 import utils, models
|
||||
from authentic2.validators import EmailValidator
|
||||
|
||||
from .utils import get_link_from_mail
|
||||
from .utils import get_link_from_mail, assert_event
|
||||
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
def test_registration(app, db, settings, mailoutbox, external_redirect):
|
||||
def test_registration_success(app, db, settings, mailoutbox, external_redirect):
|
||||
next_url, good_next_url = external_redirect
|
||||
|
||||
settings.LANGUAGE_CODE = 'en-us'
|
||||
|
@ -45,6 +45,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
|
|||
response.form.set('email', 'testbot@entrouvert.com')
|
||||
response = response.form.submit()
|
||||
|
||||
assert_event('user.registration.request', email='testbot@entrouvert.com')
|
||||
assert urlparse(response['Location']).path == reverse('registration_complete')
|
||||
if not good_next_url:
|
||||
assert not urlparse(response['Location']).query
|
||||
|
@ -87,6 +88,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect):
|
|||
assert 'was successful' in mailoutbox[1].body
|
||||
|
||||
new_user = User.objects.get()
|
||||
assert_event('user.registration', user=new_user, how='email')
|
||||
assert new_user.email == 'testbot@entrouvert.com'
|
||||
assert new_user.username is None
|
||||
assert new_user.check_password('T0==toto')
|
||||
|
|
|
@ -23,6 +23,7 @@ from django.utils.functional import lazy
|
|||
|
||||
from django_rbac.utils import get_ou_model
|
||||
|
||||
from authentic2.journal import Journal
|
||||
from authentic2.utils import (good_next_url, same_origin, select_next_url,
|
||||
user_can_change_password, login,
|
||||
get_authentication_events, authenticate,
|
||||
|
@ -91,6 +92,7 @@ def test_get_authentication_events_hows(rf, simple_user):
|
|||
middleware = AuthenticationMiddleware()
|
||||
middleware.process_request(request)
|
||||
MessageMiddleware().process_request(request)
|
||||
request.journal = Journal(request=request)
|
||||
assert 'password' not in [ev['how'] for ev in get_authentication_events(request)]
|
||||
login(request, user, 'password')
|
||||
assert 'password' in [ev['how'] for ev in get_authentication_events(request)]
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
# authentic2
|
||||
|
||||
import datetime
|
||||
from .utils import login, logout, get_link_from_mail
|
||||
from .utils import login, logout, get_link_from_mail, assert_event
|
||||
import pytest
|
||||
|
||||
from django.urls import reverse
|
||||
|
@ -38,13 +38,19 @@ def test_password_change(app, simple_user):
|
|||
simple_user.set_password('hop')
|
||||
simple_user.save()
|
||||
resp = login(app, simple_user, password='hop', path=reverse('password_change'))
|
||||
old_session_key = app.session.session_key
|
||||
|
||||
resp.form['old_password'] = 'hop'
|
||||
resp.form['new_password1'] = 'hopAbcde1'
|
||||
resp.form['new_password2'] = 'hopAbcde1'
|
||||
resp = resp.form.submit()
|
||||
|
||||
new_session_key = app.session.session_key
|
||||
|
||||
assert old_session_key != new_session_key, 'session\'s key has not been cycled'
|
||||
|
||||
assert resp.location == '/accounts/password/change/done/'
|
||||
assert_event('user.password.change', user=simple_user, session=app.session)
|
||||
|
||||
|
||||
def test_account_delete(app, simple_user, mailoutbox):
|
||||
|
@ -84,8 +90,8 @@ def test_account_delete_when_logged_out(app, simple_user, mailoutbox):
|
|||
link = get_link_from_mail(mailoutbox[0])
|
||||
logout(app)
|
||||
page = app.get(link)
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % \
|
||||
escape(simple_user.get_full_name()) in page.text
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
|
||||
simple_user.get_full_name()) in page.text
|
||||
response = page.form.submit(name='delete').follow().follow()
|
||||
assert not User.objects.get(pk=simple_user.pk).is_active
|
||||
assert len(mailoutbox) == 2
|
||||
|
@ -105,8 +111,8 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
|
|||
logout(app)
|
||||
login(app, user_ou1, path=reverse('account_management'))
|
||||
page = app.get(link)
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % \
|
||||
escape(simple_user.get_full_name()) in page.text
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
|
||||
simple_user.get_full_name()) in page.text
|
||||
response = page.form.submit(name='delete').follow()
|
||||
assert not User.objects.get(pk=simple_user.pk).is_active
|
||||
assert User.objects.get(pk=user_ou1.pk).is_active
|
||||
|
@ -117,7 +123,12 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
|
|||
|
||||
|
||||
def test_account_delete_fake_token(app, simple_user, mailoutbox):
|
||||
response = app.get(reverse('validate_deletion', kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'})).follow().follow()
|
||||
response = (
|
||||
app.get(reverse('validate_deletion',
|
||||
kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'}))
|
||||
.follow()
|
||||
.follow()
|
||||
)
|
||||
assert "The account deletion request is invalid, try again" in response.text
|
||||
|
||||
|
||||
|
@ -182,6 +193,8 @@ def test_views_email_ratelimit(app, db, simple_user, settings, mailoutbox, freez
|
|||
response = response.form.submit()
|
||||
assert len(mailoutbox) == 3
|
||||
assert 'try again later' in response.text
|
||||
if view_name == 'password_reset':
|
||||
assert_event('user.password.reset.failure', email=simple_user.email)
|
||||
|
||||
# reach ip limit
|
||||
for i in range(7):
|
||||
|
|
|
@ -30,10 +30,11 @@ from django.shortcuts import resolve_url
|
|||
from django.utils import six
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from authentic2 import utils
|
||||
from authentic2 import utils, models
|
||||
from authentic2.apps.journal.models import Event
|
||||
|
||||
|
||||
def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None):
|
||||
def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None, fail=False):
|
||||
if path:
|
||||
args = args or []
|
||||
kwargs = kwargs or {}
|
||||
|
@ -43,17 +44,24 @@ def login(app, user, path=None, password=None, remember_me=None, args=None, kwar
|
|||
login_page = app.get(reverse('auth_login'))
|
||||
assert login_page.request.path == reverse('auth_login')
|
||||
form = login_page.form
|
||||
form.set('username', user.username if hasattr(user, 'username') else user)
|
||||
username = user.username if hasattr(user, 'username') else user
|
||||
form.set('username', username)
|
||||
# password is supposed to be the same as username
|
||||
form.set('password', password or user.username)
|
||||
form.set('password', password or (user.clear_password if hasattr(user, 'clear_password') else username))
|
||||
if remember_me is not None:
|
||||
form.set('remember_me', bool(remember_me))
|
||||
response = form.submit(name='login-password-submit').follow()
|
||||
if path:
|
||||
assert response.request.path == path
|
||||
response = form.submit(name='login-password-submit')
|
||||
if fail:
|
||||
assert response.status_code == 200
|
||||
assert '_auth_user_id' not in app.session
|
||||
else:
|
||||
assert response.request.path == reverse('auth_homepage')
|
||||
assert '_auth_user_id' in app.session
|
||||
response = response.follow()
|
||||
if path:
|
||||
assert response.request.path == path
|
||||
else:
|
||||
assert response.request.path == reverse('auth_homepage')
|
||||
assert '_auth_user_id' in app.session
|
||||
assert not hasattr(user, 'id') or (app.session['_auth_user_id'] == str(user.id))
|
||||
return response
|
||||
|
||||
|
||||
|
@ -242,4 +250,30 @@ def call_command(*args, **kwargs):
|
|||
def text_content(node):
|
||||
'''Extract text content from node and all its children. Equivalent to
|
||||
xmlNodeGetContent from libxml.'''
|
||||
return u''.join(node.itertext())
|
||||
return u''.join(node.itertext()) if node is not None else ''
|
||||
|
||||
|
||||
def assert_event(event_type_name, user=None, session=None, service=None, **data):
|
||||
qs = Event.objects.filter(type__name=event_type_name)
|
||||
if user:
|
||||
qs = qs.filter(user=user)
|
||||
else:
|
||||
qs = qs.filter(user__isnull=True)
|
||||
if session:
|
||||
qs = qs.filter(session=session.session_key)
|
||||
else:
|
||||
qs = qs.filter(session__isnull=True)
|
||||
if service:
|
||||
qs = qs.which_references(service)
|
||||
else:
|
||||
qs = qs.exclude(qs._which_references_query(models.Service))
|
||||
|
||||
assert qs.count() == 1
|
||||
|
||||
if data:
|
||||
event = qs.get()
|
||||
assert event.data, 'no event.data, should be %s' % data
|
||||
for key, value in data.items():
|
||||
assert event.data.get(key) == value, (
|
||||
'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value)
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue