ldap: add method to get ppolicy operational attributes (#51239)

Fixes: #51239

License: MIT
This commit is contained in:
Loïc Dachary 2021-02-12 19:02:26 +01:00 committed by Benjamin Dauvergne
parent 99f55b78b4
commit 1f6378256e
2 changed files with 111 additions and 12 deletions

View File

@ -299,21 +299,25 @@ def map_text(d):
raise NotImplementedError
def password_policy_control_messages(ctrl):
def password_policy_control_messages(ctrl, attributes):
messages = []
if ctrl.error:
error = ppolicy.PasswordPolicyError.namedValues[ctrl.error]
error2message = {
'passwordExpired': _('The password expired'),
'accountLocked': _('The account is locked.'),
'passwordExpired': _('The password expired after {pwdmaxage}').format(**attributes),
'accountLocked': _(
'The account is locked since {pwdaccountlockedtime[0]} after {pwdmaxfailure} failures.'
).format(**attributes),
'changeAfterReset': _('The password was reset and must be changed.'),
'passwordModNotAllowed': _('It is not possible to modify the password.'),
'mustSupplyOldPassword': _('The old password must be supplied.'),
'insufficientPasswordQuality': _('The password does not meet the quality requirements.'),
'passwordTooShort': _('The password is too short.'),
'passwordTooYoung': _('It is too soon to change the password.'),
'passwordInHistory': _('This password was recently used and cannot be used again.'),
'passwordTooShort': _('The password is too short {pwdminlength}.').format(**attributes),
'passwordTooYoung': _('It is too soon to change the password {pwdminage}.').format(**attributes),
'passwordInHistory': _(
'This password is among the last {pwdhistory} password that were used and cannot be used again.'
).format(**attributes),
}
messages.append(error2message.get(error, _('Unexpected error {error}').format(error=error)))
return messages
@ -621,6 +625,7 @@ class LDAPBackend(object):
'user_attributes': [],
# https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html#ldap-controls
'use_controls': True,
'ppolicy_dn': '',
}
_REQUIRED = ('url', 'basedn')
_TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive')
@ -659,11 +664,12 @@ class LDAPBackend(object):
log.debug('got config %r', blocks)
return blocks
@staticmethod
def process_controls(request, authz_id, ctrls):
@classmethod
def process_controls(cls, request, block, conn, authz_id, ctrls):
attributes = cls.get_ppolicy_attributes(block, conn, authz_id)
for c in ctrls:
if c.controlType == ppolicy.PasswordPolicyControl.controlType:
message = ' '.join(password_policy_control_messages(c))
message = ' '.join(password_policy_control_messages(c, attributes))
if request is not None:
messages.add_message(request, messages.WARNING, message)
if c.graceAuthNsRemaining or c.timeBeforeExpiration:
@ -791,7 +797,7 @@ class LDAPBackend(object):
else:
serverctrls = []
results = conn.simple_bind_s(authz_id, password, serverctrls=serverctrls)
self.process_controls(request, authz_id, results[3])
self.process_controls(request, block, conn, authz_id, results[3])
user_login_success(authz_id)
if not block['connect_with_user_credentials']:
try:
@ -803,7 +809,7 @@ class LDAPBackend(object):
except ldap.INVALID_CREDENTIALS as e:
if block.get('use_controls') and len(e.args) > 0 and 'ctrls' in e.args[0]:
self.process_controls(
request, authz_id, DecodeControlTuples(e.args[0]['ctrls'])
request, block, conn, authz_id, DecodeControlTuples(e.args[0]['ctrls'])
)
attributes = self.get_ldap_attributes(block, conn, authz_id)
user = self.lookup_existing_user(authz_id, block, attributes)
@ -1167,6 +1173,52 @@ class LDAPBackend(object):
attributes.add(at_mapping[key])
return list(set(attribute.lower() for attribute in attributes))
@classmethod
def get_ppolicy_attributes(cls, block, conn, dn):
def get_attributes(dn, attributes):
try:
results = conn.search_s(dn, ldap.SCOPE_BASE, u'(objectclass=*)', attributes)
except ldap.LDAPError as e:
log.error('unable to retrieve attributes of dn %r: %r', dn, e)
return {}
results = cls.normalize_ldap_results(results)
attributes_results.update(results[0][1])
return attributes_results
user_attributes = [
'pwdaccountlockedtime',
'pwdchangedtime',
'pwdfailuretime',
'pwdgraceusetime',
'pwdhistory',
'pwdreset',
]
ppolicy_attributes = [
'pwdminage',
'pwdmaxage',
'pwdinhistory',
'pwdcheckquality',
'pwdminlength',
'pwdexpirewarning',
'pwdgraceauthnlimit',
'pwdlockout',
'pwdlockoutduration',
'pwdmaxfailure',
'pwdmaxrecordedfailure',
'pwdfailurecountinterval',
'pwdmustchange',
'pwdallowuserchange',
'pwdsafemodify',
]
attributes_results = {k: [] for k in user_attributes + ppolicy_attributes}
attributes_results.update(get_attributes(dn, user_attributes))
ppolicy_dn = block.get('ppolicy_dn')
if ppolicy_dn:
attributes_results.update(**get_attributes(ppolicy_dn, ppolicy_attributes))
return attributes_results
@classmethod
def get_ldap_attributes(cls, block, conn, dn):
"""Retrieve some attributes from LDAP, add mandatory values then apply

View File

@ -1166,7 +1166,7 @@ def test_authenticate_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, caplog)
ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
assert 'account is locked' not in caplog.text
assert authenticate(username=USERNAME, password='incorrect') is None
assert 'account is locked' in caplog.text
assert 'account is locked since 20' in caplog.text
def test_do_not_use_controls(slapd_ppolicy, settings, db, caplog):
@ -1190,6 +1190,53 @@ def test_do_not_use_controls(slapd_ppolicy, settings, db, caplog):
assert 'account is locked' not in caplog.text
def test_get_ppolicy_attributes(slapd_ppolicy, settings, db):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd_ppolicy.ldap_url],
'basedn': u'o=ôrga',
'ppolicy_dn': u'cn=default,ou=ppolicies,o=ôrga',
'use_tls': False,
}]
pwdMaxAge = 1
slapd_ppolicy.add_ldif('''
dn: cn=default,ou=ppolicies,o=ôrga
cn: default
objectclass: top
objectclass: device
objectclass: pwdPolicy
objectclass: pwdPolicyChecker
pwdAttribute: userPassword
pwdMinAge: 0
pwdMaxAge: {pwdMaxAge}
pwdInHistory: 1
pwdCheckQuality: 0
pwdMinLength: 0
pwdExpireWarning: 0
pwdGraceAuthnLimit: 0
pwdLockout: TRUE
pwdLockoutDuration: 0
pwdMaxFailure: 0
pwdMaxRecordedFailure: 0
pwdFailureCountInterval: 0
pwdMustChange: FALSE
pwdAllowUserChange: TRUE
pwdSafeModify: FALSE
'''.format(pwdMaxAge=pwdMaxAge))
user = authenticate(username=USERNAME, password=UPASS)
assert user.check_password(UPASS)
password = u'ogutOmyetew4'
user.set_password(password)
time.sleep(pwdMaxAge * 3)
conn = ldap_backend.LDAPBackend.get_connection(settings.LDAP_AUTH_SETTINGS[0])
attributes = ldap_backend.LDAPBackend.get_ppolicy_attributes(settings.LDAP_AUTH_SETTINGS[0], conn, DN)
assert 'pwdchangedtime' in attributes
assert attributes['pwdmaxage'] == [str(pwdMaxAge)]
def test_authenticate_ppolicy_pwdGraceAuthnLimit(slapd_ppolicy, settings, db, caplog):
settings.LDAP_AUTH_SETTINGS = [
{