add settings to filter user authorized to authenticate (fixes #19597)
Added: - A2_USER_FILTER and A2_USER_EXCLUDE settings, contains kwargs for User.filter() and User.exclude(), - two helper methods for backends: authentic2.backend.get_user_queryset() and authentic2.backend.is_user_authenticable(), - all backends modified to use those.
This commit is contained in:
parent
1c68e6730b
commit
4aec4f62cb
|
@ -173,6 +173,14 @@ default_settings = dict(
|
|||
A2_API_USERS_REQUIRED_FIELDS=Setting(
|
||||
default=(),
|
||||
definition='List of fields to require on user\'s API, override other settings'),
|
||||
A2_USER_FILTER=Setting(
|
||||
default={},
|
||||
definition='Filters (as in QuerySet.filter() to apply to User queryset before '
|
||||
'authentication'),
|
||||
A2_USER_EXCLUDE=Setting(
|
||||
default={},
|
||||
definition='Exclusion filter (as in QuerySet.exclude() to apply to User queryset before '
|
||||
'authentication'),
|
||||
)
|
||||
|
||||
app_settings = AppSettings(default_settings)
|
||||
|
|
|
@ -3,9 +3,13 @@ from django.db import transaction
|
|||
import logging
|
||||
|
||||
from authentic2.compat import get_user_model
|
||||
from authentic2.backends import is_user_authenticable
|
||||
|
||||
from . import models, app_settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AuthenticationError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -25,6 +29,9 @@ class SSLBackend:
|
|||
if cert is None:
|
||||
return None
|
||||
else:
|
||||
if not is_user_authenticable(cert.user):
|
||||
logger.info('SSLAuth: authentication refused by user filters')
|
||||
return None
|
||||
return cert.user
|
||||
|
||||
def get_user(self, user_id):
|
||||
|
@ -116,7 +123,6 @@ settings')
|
|||
|
||||
return user
|
||||
|
||||
|
||||
def build_user(self, username, ssl_info):
|
||||
"""
|
||||
create a valid (and stored) django user to be associated with the
|
||||
|
|
|
@ -1,2 +1,29 @@
|
|||
from django.contrib.auth import get_user_model
|
||||
from authentic2 import app_settings
|
||||
|
||||
|
||||
def get_user_queryset():
|
||||
User = get_user_model()
|
||||
|
||||
qs = User.objects.all()
|
||||
|
||||
if app_settings.A2_USER_FILTER:
|
||||
qs = qs.filter(**app_settings.A2_USER_FILTER)
|
||||
|
||||
if app_settings.A2_USER_EXCLUDE:
|
||||
qs = qs.exclude(**app_settings.A2_USER_EXCLUDE)
|
||||
|
||||
return qs
|
||||
|
||||
|
||||
def is_user_authenticable(user):
|
||||
# if user is None, don't care about the authenticable status
|
||||
if user is None:
|
||||
return True
|
||||
if not app_settings.A2_USER_FILTER and not app_settings.A2_USER_EXCLUDE:
|
||||
return True
|
||||
return get_user_queryset().filter(pk=user.pk).exists()
|
||||
|
||||
|
||||
from .ldap_backend import LDAPBackend
|
||||
from .models_backend import ModelBackend
|
||||
|
|
|
@ -39,6 +39,9 @@ from authentic2.a2_rbac.utils import get_default_ou
|
|||
from authentic2.ldap_utils import FilterFormatter
|
||||
from authentic2.utils import utf8_encode
|
||||
|
||||
from authentic2.backends import is_user_authenticable
|
||||
|
||||
|
||||
DEFAULT_CA_BUNDLE = ''
|
||||
|
||||
CA_BUNDLE_PATHS = [
|
||||
|
@ -837,6 +840,10 @@ class LDAPBackend(object):
|
|||
self.populate_user(user, dn, username, conn, block, attributes)
|
||||
if not user.pk or getattr(user, '_changed', False):
|
||||
user.save()
|
||||
|
||||
if not is_user_authenticable(user):
|
||||
return None
|
||||
|
||||
user_login_success(user.get_username())
|
||||
return user
|
||||
|
||||
|
|
|
@ -7,12 +7,16 @@ from django.contrib.auth.backends import ModelBackend
|
|||
from .. import app_settings
|
||||
from authentic2.user_login_failure import user_login_success, user_login_failure
|
||||
|
||||
from authentic2.backends import get_user_queryset
|
||||
|
||||
|
||||
def upn(username, realm):
|
||||
'''Build an UPN from a username and a realm'''
|
||||
return u'{0}@{1}'.format(username, realm)
|
||||
|
||||
PROXY_USER_MODEL = None
|
||||
|
||||
|
||||
class ModelBackend(ModelBackend):
|
||||
"""
|
||||
Authenticates against settings.AUTH_USER_MODEL.
|
||||
|
@ -51,7 +55,7 @@ class ModelBackend(ModelBackend):
|
|||
if not username:
|
||||
return
|
||||
query = self.get_query(username, realm)
|
||||
users = UserModel.objects.filter(query)
|
||||
users = get_user_queryset().filter(query)
|
||||
# order by username to make username without realm come before usernames with realms
|
||||
# i.e. "toto" should come before "toto@example.com"
|
||||
users = users.order_by('-is_active', UserModel.USERNAME_FIELD)
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
import pam
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
from authentic2.backends import is_user_authenticable
|
||||
from authentic2.compat import get_user_model
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PAMBackend:
|
||||
def authenticate(self, username=None, password=None):
|
||||
|
@ -17,12 +20,16 @@ class PAMBackend:
|
|||
user = User(username=username, password='not stored here')
|
||||
|
||||
if getattr(settings, 'PAM_IS_SUPERUSER', False):
|
||||
user.is_superuser = True
|
||||
user.is_superuser = True
|
||||
|
||||
if getattr(settings, 'PAM_IS_STAFF', user.is_superuser):
|
||||
user.is_staff = True
|
||||
user.is_staff = True
|
||||
|
||||
user.save()
|
||||
if not is_user_authenticable(user):
|
||||
logger.info(u'auth_pam: authentication refused by user filters')
|
||||
return None
|
||||
|
||||
return user
|
||||
return None
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ from django.contrib.auth.backends import ModelBackend
|
|||
|
||||
from django_rbac.utils import get_ou_model
|
||||
|
||||
from authentic2.backends import is_user_authenticable
|
||||
from authentic2.crypto import base64url_encode
|
||||
from authentic2 import app_settings
|
||||
|
||||
|
@ -238,9 +239,14 @@ class OIDCBackend(ModelBackend):
|
|||
if created:
|
||||
logger.info(u'auth_oidc: created user %s for sub %s and issuer %s',
|
||||
user, id_token.sub, id_token.iss)
|
||||
|
||||
if linked:
|
||||
logger.info(u'auth_oidc: linked user %s to sub %s and issuer %s',
|
||||
user, id_token.sub, id_token.iss)
|
||||
|
||||
if not is_user_authenticable(user):
|
||||
logger.info(u'auth_oidc: authentication refused by user filters')
|
||||
return None
|
||||
return user
|
||||
|
||||
def get_saml2_authn_context(self):
|
||||
|
|
|
@ -1,15 +1,25 @@
|
|||
import logging
|
||||
|
||||
from mellon.backends import SAMLBackend
|
||||
|
||||
from authentic2.middleware import StoreRequestMiddleware
|
||||
from authentic2.backends import is_user_authenticable
|
||||
|
||||
from . import app_settings
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SAMLBackend(SAMLBackend):
|
||||
def authenticate(self, saml_attributes, **credentials):
|
||||
if not app_settings.enable:
|
||||
return None
|
||||
return super(SAMLBackend, self).authenticate(saml_attributes, **credentials)
|
||||
user = super(SAMLBackend, self).authenticate(saml_attributes, **credentials)
|
||||
if not is_user_authenticable(user):
|
||||
logger.error(u'auth_saml: authentication refused by user filters')
|
||||
return None
|
||||
return user
|
||||
|
||||
def get_saml2_authn_context(self):
|
||||
# Pass AuthnContextClassRef from the previous IdP
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
from django.contrib.auth import authenticate
|
||||
from authentic2.backends import is_user_authenticable
|
||||
|
||||
|
||||
def test_user_filters(settings, db, simple_user, user_ou1, ou1):
|
||||
assert authenticate(username=simple_user.username, password=simple_user.username)
|
||||
assert is_user_authenticable(simple_user)
|
||||
assert is_user_authenticable(user_ou1)
|
||||
assert authenticate(username=user_ou1.username, password=user_ou1.username)
|
||||
settings.A2_USER_FILTER = {'ou__slug': 'ou1'}
|
||||
assert not authenticate(username=simple_user.username, password=simple_user.username)
|
||||
assert authenticate(username=user_ou1.username, password=user_ou1.username)
|
||||
assert not is_user_authenticable(simple_user)
|
||||
assert is_user_authenticable(user_ou1)
|
||||
settings.A2_USER_EXCLUDE = {'ou__slug': 'ou1'}
|
||||
assert not authenticate(username=simple_user.username, password=simple_user.username)
|
||||
assert not authenticate(username=user_ou1.username, password=user_ou1.username)
|
||||
assert not is_user_authenticable(simple_user)
|
||||
assert not is_user_authenticable(user_ou1)
|
||||
settings.A2_USER_FILTER = {}
|
||||
assert authenticate(username=simple_user.username, password=simple_user.username)
|
||||
assert not authenticate(username=user_ou1.username, password=user_ou1.username)
|
||||
assert is_user_authenticable(simple_user)
|
||||
assert not is_user_authenticable(user_ou1)
|
Loading…
Reference in New Issue