whitelist send_registration_email_next_url using HMAC signature (#34115)
This commit is contained in:
parent
7e3faddbb1
commit
26be52b49f
|
@ -21,3 +21,4 @@ AUTHENTICATION_EVENTS_SESSION_KEY = 'authentication-events'
|
|||
SWITCH_USER_SESSION_KEY = '_switch_user'
|
||||
LAST_LOGIN_SESSION_KEY = '_last_login'
|
||||
SERVICE_FIELD_NAME = 'service'
|
||||
NEXT_URL_SIGNATURE = 'next-signature'
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import struct
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
|
@ -24,6 +25,8 @@ from Crypto.Hash import SHA256
|
|||
from Crypto.Hash import HMAC
|
||||
from Crypto import Random
|
||||
|
||||
from django.utils.crypto import constant_time_compare
|
||||
|
||||
|
||||
class DecryptionError(Exception):
|
||||
pass
|
||||
|
@ -177,3 +180,17 @@ def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=Tr
|
|||
if not raise_on_error:
|
||||
return None
|
||||
raise
|
||||
|
||||
|
||||
def hmac_url(key, url):
|
||||
if hasattr(key, 'isnumeric'):
|
||||
key = key.encode('utf-8')
|
||||
if hasattr(url, 'isnumeric'):
|
||||
url = url.encode('utf-8', 'replace')
|
||||
return base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest()).decode('ascii').strip('=')
|
||||
|
||||
|
||||
def check_hmac_url(key, url, signature):
|
||||
if hasattr(signature, 'isnumeric'):
|
||||
signature = signature.encode('utf-8', 'replace')
|
||||
return constant_time_compare(signature, hmac_url(key, url).encode('ascii'))
|
||||
|
|
|
@ -63,7 +63,7 @@ except ImportError:
|
|||
from authentic2.saml.saml2utils import filter_attribute_private_key, \
|
||||
filter_element_private_key
|
||||
|
||||
from . import plugins, app_settings, constants
|
||||
from . import plugins, app_settings, constants, crypto
|
||||
|
||||
|
||||
class CleanLogMessage(logging.Filter):
|
||||
|
@ -732,7 +732,7 @@ def send_account_deletion_mail(request, user):
|
|||
logger.info(u'account deletion mail sent to %s', user.email)
|
||||
|
||||
|
||||
def build_reset_password_url(user, request=None, next_url=None, set_random_password=True):
|
||||
def build_reset_password_url(user, request=None, next_url=None, set_random_password=True, sign_next_url=True):
|
||||
'''Build a reset password URL'''
|
||||
from .compat import default_token_generator
|
||||
|
||||
|
@ -745,7 +745,10 @@ def build_reset_password_url(user, request=None, next_url=None, set_random_passw
|
|||
if request:
|
||||
reset_url = request.build_absolute_uri(reset_url)
|
||||
if next_url:
|
||||
reset_url += '?' + urlparse.urlencode({'next': next_url})
|
||||
params = {'next': next_url}
|
||||
if sign_next_url:
|
||||
params[constants.NEXT_URL_SIGNATURE] = crypto.hmac_url(settings.SECRET_KEY, next_url)
|
||||
reset_url += '?' + urlparse.urlencode(params)
|
||||
return reset_url, token
|
||||
|
||||
|
||||
|
@ -755,6 +758,7 @@ def send_password_reset_mail(user, template_names=None, request=None,
|
|||
legacy_subject_templates=['registration/password_reset_subject.txt'],
|
||||
legacy_body_templates=['registration/password_reset_email.html'],
|
||||
set_random_password=True,
|
||||
sign_next_url=True,
|
||||
**kwargs):
|
||||
from . import middleware
|
||||
|
||||
|
@ -776,7 +780,8 @@ def send_password_reset_mail(user, template_names=None, request=None,
|
|||
|
||||
# Build reset URL
|
||||
ctx['reset_url'], token = build_reset_password_url(user, request=request, next_url=next_url,
|
||||
set_random_password=set_random_password)
|
||||
set_random_password=set_random_password,
|
||||
sign_next_url=sign_next_url)
|
||||
|
||||
send_templated_mail(user.email, template_names, ctx, request=request,
|
||||
legacy_subject_templates=legacy_subject_templates,
|
||||
|
@ -895,6 +900,11 @@ def good_next_url(request, next_url):
|
|||
return True
|
||||
if same_origin(request.build_absolute_uri(), next_url):
|
||||
return True
|
||||
signature = request.POST.get(constants.NEXT_URL_SIGNATURE) or request.GET.get(constants.NEXT_URL_SIGNATURE)
|
||||
|
||||
if signature:
|
||||
return crypto.check_hmac_url(settings.SECRET_KEY, next_url, signature)
|
||||
|
||||
for origin in app_settings.A2_REDIRECT_WHITELIST:
|
||||
if same_origin(next_url, origin):
|
||||
return True
|
||||
|
|
|
@ -644,7 +644,7 @@ class PasswordResetView(cbv.NextURLViewMixin, FormView):
|
|||
def get_form_kwargs(self, **kwargs):
|
||||
kwargs = super(PasswordResetView, self).get_form_kwargs(**kwargs)
|
||||
initial = kwargs.setdefault('initial', {})
|
||||
initial['next_url'] = self.request.GET.get(REDIRECT_FIELD_NAME, '')
|
||||
initial['next_url'] = utils.select_next_url(self.request, '')
|
||||
return kwargs
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
|
|
|
@ -34,6 +34,7 @@ from django_rbac.utils import get_role_model, get_ou_model
|
|||
from authentic2.a2_rbac.models import Role
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.models import Service
|
||||
from authentic2.utils import good_next_url
|
||||
|
||||
from utils import login, basic_authorization_header, get_link_from_mail
|
||||
|
||||
|
@ -560,7 +561,7 @@ def test_api_users_create_email_is_unique(settings, app, superuser):
|
|||
assert resp.json['errors']['email']
|
||||
|
||||
|
||||
def test_api_users_create_send_mail(app, settings, superuser):
|
||||
def test_api_users_create_send_mail(app, settings, superuser, rf):
|
||||
from authentic2.models import Attribute
|
||||
|
||||
# Use case is often that Email is the main identifier
|
||||
|
@ -575,6 +576,7 @@ def test_api_users_create_send_mail(app, settings, superuser):
|
|||
'email': 'john.doe@example.net',
|
||||
'title': 'Mr',
|
||||
'send_registration_email': True,
|
||||
'send_registration_email_next_url': 'http://example.com/',
|
||||
}
|
||||
assert len(mail.outbox) == 0
|
||||
resp = app.post_json('/api/users/', params=payload, status=201)
|
||||
|
@ -586,9 +588,11 @@ def test_api_users_create_send_mail(app, settings, superuser):
|
|||
resp = app.get(relative_url, status=200)
|
||||
resp.form.set('new_password1', '1234==aA')
|
||||
resp.form.set('new_password2', '1234==aA')
|
||||
resp = resp.form.submit().follow()
|
||||
resp = resp.form.submit()
|
||||
# Check user was properly logged in
|
||||
assert str(app.session['_auth_user_id']) == str(user_id)
|
||||
assert not good_next_url(rf.get('/'), 'http://example.com')
|
||||
assert resp.location == 'http://example.com/'
|
||||
|
||||
|
||||
def test_api_users_create_force_password_reset(app, client, settings, superuser):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2019 Entr'ouvert
|
||||
#
|
||||
|
@ -70,3 +71,12 @@ def test_deterministic_encryption():
|
|||
for i in range(1000):
|
||||
assert crypto.aes_base64url_deterministic_decrypt(key, crypted1, salt,
|
||||
max_count=count) == raw
|
||||
|
||||
|
||||
def test_hmac_url():
|
||||
key = u'é'
|
||||
url = 'https://example.invalid/'
|
||||
assert crypto.check_hmac_url(key, url, crypto.hmac_url(key, url))
|
||||
key = u'é'
|
||||
url = u'https://example.invalid/\u0000'
|
||||
assert crypto.check_hmac_url(key, url, crypto.hmac_url(key, url))
|
||||
|
|
Loading…
Reference in New Issue