ldap: optionally collects messages from ppolicy

Enable PasswordPolicyControl[0] in authenticate() and log the
information it returns, on success or error. In the context of a
request, this information is also set as a message[1] to be displayed
to the user.

[0] https://github.com/python-ldap/python-ldap/blob/python-ldap-3.3.1/Lib/ldap/controls/ppolicy.py
[1] https://docs.djangoproject.com/en/3.1/ref/contrib/messages/

Fixes: #50959

License: MIT
This commit is contained in:
Loïc Dachary 2021-02-09 08:46:22 +01:00 committed by Benjamin Dauvergne
parent 8efb3ee192
commit 814e0192f3
2 changed files with 340 additions and 6 deletions

View File

@ -23,7 +23,9 @@ try:
from ldap.filter import filter_format
from ldap.dn import escape_dn_chars
from ldap.ldapobject import ReconnectLDAPObject as NativeLDAPObject
from ldap.controls import SimplePagedResultsControl
from ldap.controls import SimplePagedResultsControl, DecodeControlTuples
from ldap.controls import ppolicy
from pyasn1.codec.der import decoder
PYTHON_LDAP3 = [int(x) for x in ldap.__version__.split('.')] >= [3]
LDAPObject = NativeLDAPObject
except ImportError:
@ -34,6 +36,7 @@ import random
import base64
import os
import json
import time
# code originaly copied from by now merely inspired by
# http://www.amherst.k12.oh.us/django-ldap.html
@ -41,11 +44,13 @@ import json
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.utils.encoding import force_bytes, force_text
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _, ngettext
from authentic2.a2_rbac.models import Role
@ -232,6 +237,36 @@ def map_text(d):
raise NotImplementedError
def password_policy_control_messages(ctrl):
messages = []
if ctrl.error:
error = ppolicy.PasswordPolicyError.namedValues[ctrl.error]
error2message = {
'passwordExpired': _('The password expired'),
'accountLocked': _('The account is locked.'),
'changeAfterReset': _('The password was reset and must be changed.'),
'passwordModNotAllowed': _('It is not possible to modify the password.'),
'mustSupplyOldPassword': _('The old password must be supplied.'),
'insufficientPasswordQuality': _('The password does not meet the quality requirements.'),
'passwordTooShort': _('The password is too short.'),
'passwordTooYoung': _('It is too soon to change the password.'),
'passwordInHistory': _('This password was recently used and cannot be used again.'),
}
messages.append(error2message.get(error, _('Unexpected error {error}').format(error=error)))
return messages
if ctrl.timeBeforeExpiration:
timeBeforeExpiration = time.asctime(time.localtime(time.time() + ctrl.timeBeforeExpiration))
messages.append(_('The password will expire at {timeBeforeExpiration}.').format(
timeBeforeExpiration=timeBeforeExpiration))
if ctrl.graceAuthNsRemaining:
messages.append(ngettext(
'This password expired: this is the last time it can be used.',
'This password expired and can only be used {graceAuthNsRemaining} times, including this one.',
ctrl.graceAuthNsRemaining).format(graceAuthNsRemaining=ctrl.graceAuthNsRemaining))
return messages
class LDAPUser(User):
SESSION_LDAP_DATA_KEY = 'ldap-data'
_changed = False
@ -358,7 +393,11 @@ class LDAPUser(User):
if not conn:
log.warning('ldap: set_password failed, could not get a connection')
return
self.ldap_backend.modify_password(conn, self.block, self.dn, _current_password, new_password)
try:
self.ldap_backend.modify_password(conn, self.block, self.dn, _current_password, new_password)
except ldap.STRONG_AUTH_REQUIRED:
log.warning('ldap: set_password failed, STRONG_AUTH_REQUIRED')
return
self._current_password = new_password
self.keep_password_in_session(new_password)
if self.block['keep_password']:
@ -507,6 +546,8 @@ class LDAPBackend(object):
'can_reset_password': False,
# mapping from LDAP attributes to User attributes
'user_attributes': [],
# https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html#ldap-controls
'use_controls': True,
}
_REQUIRED = ('url', 'basedn')
_TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive')
@ -537,6 +578,17 @@ class LDAPBackend(object):
log.debug('got config %r', blocks)
return blocks
@staticmethod
def process_controls(request, authz_id, ctrls):
for c in ctrls:
if c.controlType == ppolicy.PasswordPolicyControl.controlType:
message = ' '.join(password_policy_control_messages(c))
if request is not None:
messages.add_message(request, messages.WARNING, message)
else:
message = str(vars(c))
log.info('ldap: bind error with authz_id "%s" -> "%s"', authz_id, message)
def authenticate(self, request=None, username=None, password=None, realm=None, ou=None):
if username is None or password is None:
return None
@ -566,11 +618,11 @@ class LDAPBackend(object):
log.error(
"account name authentication filter doesn't contain '%s'")
continue
user = self.authenticate_block(block, uid, password)
user = self.authenticate_block(request, block, uid, password)
if user is not None:
return user
def authenticate_block(self, block, username, password):
def authenticate_block(self, request, block, username, password):
for conn in self.get_connections(block):
ldap_uri = conn.get_option(ldap.OPT_URI)
authz_ids = []
@ -628,7 +680,12 @@ class LDAPBackend(object):
if failed:
continue
try:
conn.simple_bind_s(authz_id, password)
if block.get('use_controls'):
serverctrls = [ppolicy.PasswordPolicyControl()]
else:
serverctrls = []
results = conn.simple_bind_s(authz_id, password, serverctrls=serverctrls)
self.process_controls(request, authz_id, results[3])
user_login_success(authz_id)
if not block['connect_with_user_credentials']:
try:
@ -637,7 +694,9 @@ class LDAPBackend(object):
log.exception(u'rebind failure after login bind')
raise ldap.SERVER_DOWN
break
except ldap.INVALID_CREDENTIALS:
except ldap.INVALID_CREDENTIALS as e:
if block.get('use_controls') and len(e.args) > 0 and 'ctrls' in e.args[0]:
self.process_controls(request, authz_id, DecodeControlTuples(e.args[0]['ctrls']))
user_login_failure(authz_id)
pass
else:

View File

@ -20,6 +20,7 @@ import os
import pytest
import mock
import time
import ldap
from ldap.dn import escape_dn_chars
@ -77,6 +78,38 @@ def slapd():
with create_slapd() as s:
yield s
@pytest.fixture
def slapd_ppolicy():
with create_slapd() as slapd:
conn = slapd.get_connection_admin()
assert conn.protocol_version == ldap.VERSION3
conn.modify_s(
'cn=module{0},cn=config',
[
(ldap.MOD_ADD, 'olcModuleLoad', [
force_bytes('ppolicy')
])
])
with open('/etc/ldap/schema/ppolicy.ldif') as fd:
slapd.add_ldif(fd.read())
slapd.add_ldif('''
dn: olcOverlay={0}ppolicy,olcDatabase={2}mdb,cn=config
objectclass: olcOverlayConfig
objectclass: olcPPolicyConfig
olcoverlay: {0}ppolicy
olcppolicydefault: cn=default,ou=ppolicies,o=ôrga
olcppolicyforwardupdates: FALSE
olcppolicyhashcleartext: TRUE
olcppolicyuselockout: TRUE
''')
slapd.add_ldif('''
dn: ou=ppolicies,o=ôrga
objectclass: organizationalUnit
ou: ppolicies
''')
yield slapd
@pytest.fixture
def tls_slapd():
@ -891,6 +924,248 @@ def test_set_password(slapd, settings, db):
assert user.pk == user2.pk
def test_login_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, app):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
pwdMaxFailure = 2
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
objectclass: pwdPolicyChecker
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: 0
pwdInHistory: 0
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: 0
pwdGraceAuthnLimit: 0
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: {pwdMaxFailure}
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: FALSE
pwdSafeModify: FALSE
'''.format(pwdMaxFailure=pwdMaxFailure))
for _ in range(pwdMaxFailure):
response = app.get('/login/')
response.form.set('username', USERNAME)
response.form.set('password', 'invalid')
response = response.form.submit(name='login-password-submit')
assert 'Incorrect Username or password' in str(response.pyquery('.errornotice'))
assert 'account is locked' not in str(response.pyquery('.messages'))
response = app.get('/login/')
response.form.set('username', USERNAME)
response.form.set('password', 'invalid')
response = response.form.submit(name='login-password-submit')
assert 'account is locked' in str(response.pyquery('.messages'))
def ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog):
pwdMaxFailure = 2
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
objectclass: pwdPolicyChecker
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: 0
pwdInHistory: 0
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: 0
pwdGraceAuthnLimit: 0
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: {pwdMaxFailure}
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: FALSE
pwdSafeModify: FALSE
'''.format(pwdMaxFailure=pwdMaxFailure))
for _ in range(pwdMaxFailure):
assert authenticate(username=USERNAME, password='incorrect') is None
assert "failed to login" in caplog.text
def test_authenticate_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, caplog):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
assert 'account is locked' not in caplog.text
assert authenticate(username=USERNAME, password='incorrect') is None
assert 'account is locked' in caplog.text
def test_do_not_use_controls(slapd_ppolicy, settings, db, caplog):
"""
Same as test_authenticate_ppolicy_pwdMaxFailure but with use_controls
deactivated and therefore not logging when an account is locked.
"""
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
'use_controls': False,
}]
ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
assert 'account is locked' not in caplog.text
assert authenticate(username=USERNAME, password='incorrect') is None
# this following line is the difference with test_authenticate_ppolicy_pwdMaxFailure
assert 'account is locked' not in caplog.text
def test_authenticate_ppolicy_pwdGraceAuthnLimit(slapd_ppolicy, settings, db, caplog):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
pwdMaxAge = 1
pwdGraceAuthnLimit = 2
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
objectclass: pwdPolicyChecker
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: {pwdMaxAge}
pwdInHistory: 1
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: 0
pwdGraceAuthnLimit: {pwdGraceAuthnLimit}
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: 0
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: TRUE
pwdSafeModify: FALSE
'''.format(pwdMaxAge=pwdMaxAge, pwdGraceAuthnLimit=pwdGraceAuthnLimit))
user = authenticate(username=USERNAME, password=UPASS)
assert user.check_password(UPASS)
password = u'ogutOmyetew4'
user.set_password(password)
time.sleep(pwdMaxAge * 3)
assert 'used 2 time' not in caplog.text
assert authenticate(username=USERNAME, password=password) is not None
assert 'used 2 times' in caplog.text
assert 'last time' not in caplog.text
assert authenticate(username=USERNAME, password=password) is not None
assert 'last time' in caplog.text
def test_authenticate_ppolicy_pwdExpireWarning(slapd_ppolicy, settings, db, caplog):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
pwdMaxAge = 3600
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
objectclass: pwdPolicyChecker
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: {pwdMaxAge}
pwdInHistory: 1
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: {pwdMaxAge}
pwdGraceAuthnLimit: 0
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: 0
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: TRUE
pwdSafeModify: FALSE
'''.format(pwdMaxAge=pwdMaxAge))
user = authenticate(username=USERNAME, password=UPASS)
assert user.check_password(UPASS)
password = u'ogutOmyetew4'
user.set_password(password)
time.sleep(2)
assert 'password will expire' not in caplog.text
assert authenticate(username=USERNAME, password=password) is not None
assert 'password will expire' in caplog.text
def test_authenticate_ppolicy_pwdAllowUserChange(slapd_ppolicy, settings, db, caplog):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: 0
pwdInHistory: 0
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: 0
pwdGraceAuthnLimit: 0
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: 0
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: FALSE
pwdSafeModify: FALSE
''')
user = authenticate(username=USERNAME, password=UPASS)
assert user.set_password(u'ogutOmyetew4') is None
assert 'STRONG_AUTH_REQUIRED' in caplog.text
def test_ou_selector(slapd, settings, app, ou1):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd.ldap_url],