misc: validate and use a real identifier for services (#45672)

This commit is contained in:
Benjamin Dauvergne 2020-08-13 16:48:24 +02:00
parent ebbd51f306
commit 41b97f0bb2
8 changed files with 129 additions and 46 deletions

View File

@ -22,8 +22,9 @@ from django.utils.translation import ugettext as _, ugettext_lazy
from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role
from authentic2.custom_user.models import User
from . import views, app_settings, utils, constants
from . import views, app_settings, utils
from .utils.views import csrf_token_check
from .utils.service import get_service_from_request
from .forms import authentication as authentication_forms
from .utils.evaluate import evaluate_condition
@ -65,11 +66,8 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
def name(self):
return ugettext_lazy('Password')
def get_service_ous(self, request):
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
if not service_slug:
return []
roles = Role.objects.filter(allowed_services__slug=service_slug).children()
def get_service_ous(self, service):
roles = Role.objects.filter(allowed_services=service).children()
if not roles:
return []
service_ou_ids = []
@ -82,20 +80,21 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
return []
return OU.objects.filter(pk__in=service_ou_ids)
def get_preferred_ous(self, request):
def get_preferred_ous(self, request, service):
preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous')
preferred_ous = []
if preferred_ous_cookie:
preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie))
# for the special case of services open to only one OU, pre-select it
for ou in self.get_service_ous(request):
if ou in preferred_ous:
continue
preferred_ous.append(ou)
if service:
for ou in self.get_service_ous(service):
if ou in preferred_ous:
continue
preferred_ous.append(ou)
return preferred_ous
def login(self, request, *args, **kwargs):
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
service = get_service_from_request(request)
context = kwargs.get('context', {})
is_post = request.method == 'POST' and self.submit_name in request.POST
data = request.POST if is_post else None
@ -104,7 +103,7 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
# Special handling when the form contains an OU selector
if app_settings.A2_LOGIN_FORM_OU_SELECTOR:
preferred_ous = self.get_preferred_ous(request)
preferred_ous = self.get_preferred_ous(request, service)
if preferred_ous:
initial['ou'] = preferred_ous[0]
@ -129,8 +128,7 @@ class LoginPasswordAuthenticator(BaseAuthenticator):
if form.cleaned_data.get('remember_me'):
request.session['remember_me'] = True
request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME)
response = utils.login(request, form.get_user(), how,
service_slug=service_slug)
response = utils.login(request, form.get_user(), how, service=service)
if 'ou' in form.fields:
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk)

View File

@ -63,6 +63,7 @@ from authentic2.saml.saml2utils import filter_attribute_private_key, \
filter_element_private_key
from .. import plugins, app_settings, constants, crypto
from .service import set_service_ref
class CleanLogMessage(logging.Filter):
@ -431,12 +432,15 @@ def last_authentication_event(request=None, session=None):
return None
def login(request, user, how, service_slug=None, nonce=None, **kwargs):
def login(request, user, how, service=None, service_slug=None, nonce=None, **kwargs):
'''Login a user model, record the authentication event and redirect to next
URL or settings.LOGIN_REDIRECT_URL.'''
from .. import hooks
from .views import check_cookie_works
if service:
assert service_slug is None
service_slug = service.slug
check_cookie_works(request)
last_login = user.last_login
auth_login(request, user)
@ -455,11 +459,12 @@ def login(request, user, how, service_slug=None, nonce=None, **kwargs):
def login_require(request, next_url=None, login_url='auth_login', service=None, login_hint=(), **kwargs):
'''Require a login and come back to current URL'''
next_url = next_url or request.get_full_path()
params = kwargs.setdefault('params', {})
params[REDIRECT_FIELD_NAME] = next_url
if service:
params['service'] = service.slug
set_service_ref(params, service)
if login_hint:
request.session['login-hint'] = list(login_hint)
elif 'login-hint' in request.session:
@ -679,13 +684,13 @@ def get_fk_model(model, fieldname):
return field.related_model
def get_registration_url(request, service_slug=None):
def get_registration_url(request, service=None):
next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL)
next_url = make_url(next_url, request=request, keep_params=True,
include=(constants.NONCE_FIELD_NAME,), resolve=False)
params = {REDIRECT_FIELD_NAME: next_url}
if service_slug:
params[constants.SERVICE_FIELD_NAME] = service_slug
if service:
set_service_ref(params, service)
return make_url('registration_register', params=params)
@ -1049,12 +1054,12 @@ def same_origin(url1, url2):
def simulate_authentication(request, user, method,
backend='authentic2.backends.models_backend.ModelBackend',
service_slug=None, **kwargs):
service=None, **kwargs):
'''Simulate a normal login by forcing a backend attribute on the user instance'''
# do not modify the passed user
user = copy.deepcopy(user)
user.backend = backend
return login(request, user, method, service_slug=service_slug, **kwargs)
return login(request, user, method, service=service, **kwargs)
def get_manager_login_url():

View File

@ -0,0 +1,68 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from authentic2.constants import SERVICE_FIELD_NAME
def service_ref(service):
if service.ou:
return '%s %s' % (service.ou.slug, service.slug)
else:
return service.slug
def get_service_from_ref(ref):
from authentic2.models import Service
splitted = ref.split(' ')
try:
ou_slug, service_slug = splitted
except ValueError:
pass
else:
return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first()
try:
service_slug, = splitted
except ValueError:
return None
service = Service.objects.filter(ou__isnull=True, slug=service_slug).first()
if service:
return service
try:
return Service.objects.get(slug=service_slug)
except (Service.DoesNotExist, Service.MultipleObjectsReturned):
return None
def get_service_from_request(request):
service_ref = request.GET.get(SERVICE_FIELD_NAME)
if not service_ref or '\x00' in service_ref:
return None
return get_service_from_ref(service_ref)
def get_service_from_token(params):
ref = params.get(SERVICE_FIELD_NAME)
if not ref:
return None
return get_service_from_ref(ref)
def set_service_ref(params, service):
params[SERVICE_FIELD_NAME] = service_ref(service)

View File

@ -54,6 +54,7 @@ from django.template import loader
from authentic2.custom_user.models import iter_attributes
from . import (utils, app_settings, decorators, constants,
models, cbv, hooks, validators, attribute_kinds)
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref
from .utils import switch_user
from .a2_rbac.utils import get_default_ou
from .a2_rbac.models import OrganizationalUnit as OU
@ -258,6 +259,8 @@ def login(request, template_name='authentic2/login.html',
redirect_to = request.GET.get(redirect_field_name)
service = get_service_from_request(request)
if not redirect_to or ' ' in redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL
# Heavier security check -- redirects to http://example.com should
@ -272,8 +275,7 @@ def login(request, template_name='authentic2/login.html',
blocks = []
registration_url = utils.get_registration_url(
request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME))
registration_url = utils.get_registration_url(request, service=service)
context = {
'cancel': nonce is not None,
@ -313,11 +315,12 @@ def login(request, template_name='authentic2/login.html',
parameters = {'request': request,
'context': context}
remote_addr = request.META.get('REMOTE_ADDR')
service = request.GET.get('service')
login_hint = set(request.session.get('login-hint', []))
show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint)
if service and models.Service.objects.filter(slug=service).exists():
show_ctx['service_slug'] = service
if service:
show_ctx['service_ou_slug'] = service.ou and service.ou.slug
show_ctx['service_slug'] = service.slug
show_ctx['service'] = service
# check if the authenticator has multiple instances
if hasattr(authenticator, 'instances'):
for instance_id, instance in authenticator.instances(**parameters):
@ -846,9 +849,9 @@ class BaseRegistrationView(FormView):
self.token[field] = form.cleaned_data[field]
# propagate service to the registration completion view
if constants.SERVICE_FIELD_NAME in self.request.GET:
self.token[constants.SERVICE_FIELD_NAME] = \
self.request.GET[constants.SERVICE_FIELD_NAME]
service = get_service_from_request(self.request)
if service:
set_service_ref(self.token, service)
self.token.pop(REDIRECT_FIELD_NAME, None)
self.token.pop('email', None)
@ -933,7 +936,7 @@ class RegistrationCompletionView(CreateView):
self.email_is_unique |= self.ou.email_is_unique
self.init_fields_labels_and_help_texts()
# if registration is done during an SSO add the service to the registration event
self.service = self.token.get(constants.SERVICE_FIELD_NAME)
self.service = get_service_from_token(self.token)
return super(RegistrationCompletionView, self) \
.dispatch(request, *args, **kwargs)
@ -1051,7 +1054,7 @@ class RegistrationCompletionView(CreateView):
utils.simulate_authentication(
request, self.users[0],
method=self.authentication_method,
service_slug=self.service)
service=self.service)
return utils.redirect(request, self.get_success_url())
confirm_data = self.token.get('confirm_data', False)
@ -1090,7 +1093,7 @@ class RegistrationCompletionView(CreateView):
utils.simulate_authentication(
request, user,
method=self.authentication_method,
service_slug=self.service)
service=self.service)
return utils.redirect(request, self.get_success_url())
return super(RegistrationCompletionView, self).post(request, *args, **kwargs)
@ -1130,12 +1133,12 @@ class RegistrationCompletionView(CreateView):
def registration_success(self, request, user, form):
hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
authentication_method=self.authentication_method,
token=self.token, service=self.service)
token=self.token, service=self.service and self.service.slug)
self.token_obj.delete()
utils.simulate_authentication(
request, user,
method=self.authentication_method,
service_slug=self.service)
service=self.service)
message_template = loader.get_template('authentic2/registration_success_message.html')
messages.info(self.request, message_template.render(request=request))
self.send_registration_success_email(user)

View File

@ -47,6 +47,7 @@ from authentic2 import utils as a2_utils, hooks, constants
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.forms.passwords import SetPasswordForm
from authentic2.utils import views as views_utils
from authentic2.utils.service import get_service_from_request, set_service_ref
from . import app_settings, models, utils
@ -372,7 +373,7 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
def get(self, request, *args, **kwargs):
registration = True if 'registration' in request.GET else False
'''Request an access grant code and associate it to the current user'''
self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
self.service = get_service_from_request(request)
if request.user.is_authenticated:
# Prevent to add a link with an FC account already linked with another user.
try:
@ -446,7 +447,7 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
return self.redirect(request)
if user:
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
a2_utils.login(request, user, 'france-connect', service=self.service)
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
@ -457,8 +458,8 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
return self.redirect(request)
else:
params = {}
if self.service_slug:
params[constants.SERVICE_FIELD_NAME] = self.service_slug
if self.service:
set_service_ref(params, self.service)
if registration:
return self.redirect_and_come_back(request,
a2_utils.make_url('fc-registration',
@ -496,8 +497,9 @@ class RegistrationView(PopupViewMixin, LoggerMixin, View):
params = {
REDIRECT_FIELD_NAME: redirect_to,
}
if constants.SERVICE_FIELD_NAME in request.GET:
params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
service = get_service_from_request(request)
if service:
set_service_ref(params, service)
if self.get_in_popup():
params['popup'] = ''
redirect_to = a2_utils.make_url('fc-login-or-link', params=params)
@ -510,8 +512,8 @@ class RegistrationView(PopupViewMixin, LoggerMixin, View):
data['valid_email'] = False
data['franceconnect'] = True
data['authentication_method'] = 'france-connect'
if constants.SERVICE_FIELD_NAME in request.GET:
data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
if service:
set_service_ref(data, service)
activation_url = a2_utils.build_activation_url(request,
next_url=redirect_to,
**data)

View File

@ -32,6 +32,8 @@ from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from authentic2.models import Service
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
@ -41,6 +43,11 @@ from ..utils import login
User = get_user_model()
@pytest.fixture(autouse=True)
def service(db):
return Service.objects.create(name='portail', slug='portail')
def path(url):
return urlparse.urlparse(url).path

View File

@ -960,11 +960,11 @@ def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simpl
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
assert query['service'] == ['client']
assert query['service'] == ['default client']
response = response.follow().click('Register')
location = urlparse.urlparse(response.request.url)
query = urlparse.parse_qs(location.query)
assert query['service'] == ['client']
assert query['service'] == ['default client']
response.form.set('email', 'john.doe@example.com')
response = response.form.submit()

View File

@ -346,7 +346,7 @@ class Scenario(object):
reverse('auth_login'),
**{
'nonce': '*',
SERVICE_FIELD_NAME: self.sp.slug,
SERVICE_FIELD_NAME: 'default ' + self.sp.slug,
REDIRECT_FIELD_NAME: make_url(
'a2-idp-saml-continue',
params={NONCE_FIELD_NAME: request_id}),