authentic/src/authentic2/authenticators.py

166 lines
6.4 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from django.db.models import Count
from django.shortcuts import render
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Role
from authentic2.custom_user.models import User
from . import app_settings, utils, views
from .forms import authentication as authentication_forms
from .utils.evaluate import evaluate_condition
from .utils.service import get_service_from_request
from .utils.views import csrf_token_check
logger = logging.getLogger(__name__)
class BaseAuthenticator:
def __init__(self, show_condition=None, **kwargs):
self.show_condition = show_condition
def get_show_condition(self, instance_id=None):
if isinstance(self.show_condition, dict):
if instance_id and instance_id in self.show_condition:
return self.show_condition[instance_id]
else:
return self.show_condition
def shown(self, instance_id=None, ctx=()):
show_condition = self.get_show_condition(instance_id)
if not show_condition:
return True
ctx = dict(ctx, id=instance_id)
try:
return evaluate_condition(show_condition, ctx, on_raise=True)
except Exception as e:
logger.error(e)
return False
class LoginPasswordAuthenticator(BaseAuthenticator):
id = 'password'
submit_name = 'login-password-submit'
priority = 0
def enabled(self):
return app_settings.A2_AUTH_PASSWORD_ENABLE
def name(self):
return ugettext_lazy('Password')
def get_service_ous(self, service):
roles = Role.objects.filter(allowed_services=service).children()
if not roles:
return []
service_ou_ids = []
qs = (
User.objects.filter(roles__in=roles)
.values_list('ou')
.annotate(count=Count('ou'))
.order_by('-count')
)
for ou_id, count in qs:
if not ou_id:
continue
service_ou_ids.append(ou_id)
if not service_ou_ids:
return []
return OU.objects.filter(pk__in=service_ou_ids)
def get_preferred_ous(self, request, service):
preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous')
preferred_ous = []
if preferred_ous_cookie:
preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie))
# for the special case of services open to only one OU, pre-select it
if service:
for ou in self.get_service_ous(service):
if ou in preferred_ous:
continue
preferred_ous.append(ou)
return preferred_ous
def login(self, request, *args, **kwargs):
service = get_service_from_request(request)
context = kwargs.get('context', {})
is_post = request.method == 'POST' and self.submit_name in request.POST
data = request.POST if is_post else None
initial = {}
preferred_ous = []
request.failed_logins = set()
# Special handling when the form contains an OU selector
if app_settings.A2_LOGIN_FORM_OU_SELECTOR:
preferred_ous = self.get_preferred_ous(request, service)
if preferred_ous:
initial['ou'] = preferred_ous[0]
form = authentication_forms.AuthenticationForm(
request=request, data=data, initial=initial, preferred_ous=preferred_ous
)
if app_settings.A2_ACCEPT_EMAIL_AUTHENTICATION:
form.fields['username'].label = _('Username or email')
if app_settings.A2_USERNAME_LABEL:
form.fields['username'].label = app_settings.A2_USERNAME_LABEL
is_secure = request.is_secure
context['submit_name'] = self.submit_name
if is_post:
csrf_token_check(request, form)
if form.is_valid():
if is_secure:
how = 'password-on-https'
else:
how = 'password'
if form.cleaned_data.get('remember_me'):
request.session['remember_me'] = True
request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME)
response = utils.login(request, form.get_user(), how, service=service)
if 'ou' in form.fields:
utils.prepend_remember_cookie(
request, response, 'preferred-ous', form.cleaned_data['ou'].pk
)
if hasattr(request, 'needs_password_change'):
del request.needs_password_change
return utils.redirect(
request, 'password_change', params={'next': response.url}, resolve=True
)
return response
else:
username = form.cleaned_data.get('username', '').strip()
if request.failed_logins:
for user in request.failed_logins:
request.journal.record('user.login.failure', user=user, username=username)
elif username:
request.journal.record('user.login.failure', username=username)
context['form'] = form
return render(request, 'authentic2/login_password_form.html', context)
def profile(self, request, *args, **kwargs):
return views.login_password_profile(request, *args, **kwargs)
def registration(self, request, *args, **kwargs):
context = kwargs.get('context', {})
return render(request, 'authentic2/login_password_registration_form.html', context)