1754 lines
54 KiB
Python
1754 lines
54 KiB
Python
# -*- coding: utf-8 -*-
|
|
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.parse
|
|
|
|
import ldap
|
|
import mock
|
|
import pytest
|
|
from django.contrib.auth import get_user_model
|
|
from django.core import mail, management
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.utils import timezone
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from ldap.dn import escape_dn_chars
|
|
from ldaptools.slapd import Slapd, has_slapd
|
|
|
|
from authentic2 import crypto, models
|
|
from authentic2.a2_rbac.models import Role
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
from authentic2.backends import ldap_backend
|
|
from authentic2.models import Service
|
|
from authentic2.utils import authenticate, switch_user
|
|
from django_rbac.utils import get_ou_model
|
|
|
|
from . import utils
|
|
|
|
User = get_user_model()
|
|
|
|
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
|
|
|
|
USERNAME = 'etienne.michu'
|
|
UID = 'etienne.michu'
|
|
CN = 'Étienne Michu'
|
|
DN = 'cn=%s,o=ôrga' % escape_dn_chars(CN)
|
|
PASS = 'passé'
|
|
UPASS = 'passé'
|
|
EMAIL = 'etienne.michu@example.net'
|
|
CARLICENSE = '123445ABC'
|
|
|
|
EO_O = "EO"
|
|
EO_STREET = "169 rue du Chateau"
|
|
EO_POSTALCODE = "75014"
|
|
EO_CITY = "PARIS"
|
|
|
|
EE_O = "EE"
|
|
EE_STREET = "44 rue de l'Ouest"
|
|
EE_POSTALCODE = "75014"
|
|
EE_CITY = "PARIS"
|
|
|
|
base_dir = os.path.dirname(__file__)
|
|
key_file = os.path.join(base_dir, 'key.pem')
|
|
cert_file = os.path.join(base_dir, 'cert.pem')
|
|
|
|
|
|
@pytest.fixture
|
|
def slapd():
|
|
with create_slapd() as s:
|
|
yield s
|
|
|
|
|
|
@pytest.fixture
|
|
def slapd_ppolicy():
|
|
with create_slapd() as slapd:
|
|
conn = slapd.get_connection_admin()
|
|
assert conn.protocol_version == ldap.VERSION3
|
|
conn.modify_s('cn=module{0},cn=config', [(ldap.MOD_ADD, 'olcModuleLoad', [force_bytes('ppolicy')])])
|
|
with open('/etc/ldap/schema/ppolicy.ldif') as fd:
|
|
slapd.add_ldif(fd.read())
|
|
slapd.add_ldif(
|
|
'''
|
|
dn: olcOverlay={0}ppolicy,olcDatabase={2}mdb,cn=config
|
|
objectclass: olcOverlayConfig
|
|
objectclass: olcPPolicyConfig
|
|
olcoverlay: {0}ppolicy
|
|
olcppolicydefault: cn=default,ou=ppolicies,o=ôrga
|
|
olcppolicyforwardupdates: FALSE
|
|
olcppolicyhashcleartext: TRUE
|
|
olcppolicyuselockout: TRUE
|
|
'''
|
|
)
|
|
|
|
slapd.add_ldif(
|
|
'''
|
|
dn: ou=ppolicies,o=ôrga
|
|
objectclass: organizationalUnit
|
|
ou: ppolicies
|
|
'''
|
|
)
|
|
yield slapd
|
|
|
|
|
|
@pytest.fixture
|
|
def tls_slapd():
|
|
tcp_port = utils.find_free_tcp_port()
|
|
with Slapd(ldap_url='ldap://localhost.entrouvert.org:%s' % tcp_port, tls=(key_file, cert_file)) as s:
|
|
yield create_slapd(s)
|
|
|
|
|
|
def create_slapd(slapd=None):
|
|
slapd = slapd or Slapd()
|
|
slapd.add_db('o=ôrga')
|
|
slapd.add_ldif(
|
|
'''dn: o=ôrga
|
|
objectClass: organization
|
|
o: ôrga
|
|
|
|
dn: {dn}
|
|
objectClass: inetOrgPerson
|
|
userPassword: {password}
|
|
uid: {uid}
|
|
cn: Étienne Michu
|
|
sn: Michu
|
|
gn: Étienne
|
|
l: Paris
|
|
mail: etienne.michu@example.net
|
|
jpegPhoto:: ACOE
|
|
carLicense: {cl}
|
|
o: EO
|
|
o: EE
|
|
# memberOf is not defined on OpenLDAP so we use street for storing DN like
|
|
# memberOf values
|
|
strEET: cn=group2,o=ôrga
|
|
|
|
dn: cn=GRoup1,o=ôrga
|
|
objectClass: groupOfNames
|
|
cn: GrOuP1
|
|
member: {dn}
|
|
|
|
dn: o={eo_o},o=ôrga
|
|
objectClass: organization
|
|
o: {eo_o}
|
|
postalAddress: {eo_street}
|
|
postalCode: {eo_postalcode}
|
|
l: {eo_city}
|
|
|
|
dn: o={ee_o},o=ôrga
|
|
objectClass: organization
|
|
o: {ee_o}
|
|
postalAddress: {ee_street}
|
|
postalCode: {ee_postalcode}
|
|
l: {ee_city}
|
|
|
|
'''.format(
|
|
dn=DN,
|
|
uid=UID,
|
|
password=PASS,
|
|
cl=CARLICENSE,
|
|
eo_o=EO_O,
|
|
eo_street=EO_STREET,
|
|
eo_postalcode=EO_POSTALCODE,
|
|
eo_city=EO_CITY,
|
|
ee_o=EE_O,
|
|
ee_street=EE_STREET,
|
|
ee_postalcode=EE_POSTALCODE,
|
|
ee_city=EE_CITY,
|
|
)
|
|
)
|
|
for i in range(5):
|
|
slapd.add_ldif(
|
|
'''dn: uid=mïchu{i},o=ôrga
|
|
objectClass: inetOrgPerson
|
|
userPassword: {password}
|
|
uid: mïchu{i}
|
|
cn: Étienne Michu
|
|
sn: Michu
|
|
gn: Étienne
|
|
l: locality{i}
|
|
mail: etienne.michu@example.net
|
|
|
|
'''.format(
|
|
i=i, password=PASS
|
|
)
|
|
)
|
|
group_ldif = '''dn: cn=group2,o=ôrga
|
|
gidNumber: 10
|
|
objectClass: posixGroup
|
|
memberUid: {uid}
|
|
'''.format(
|
|
uid=UID
|
|
)
|
|
group_ldif += '\n\n'
|
|
slapd.add_ldif(group_ldif)
|
|
return slapd
|
|
|
|
|
|
def test_connection(slapd):
|
|
conn = slapd.get_connection()
|
|
conn.simple_bind_s(DN, PASS)
|
|
|
|
|
|
def test_simple(slapd, settings, client, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'attributes': ['jpegPhoto'],
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') in result.content
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == '%s@ldap' % USERNAME
|
|
assert user.first_name == 'Étienne'
|
|
assert user.last_name == 'Michu'
|
|
assert user.is_active is True
|
|
assert user.is_superuser is False
|
|
assert user.is_staff is False
|
|
assert user.groups.count() == 0
|
|
assert user.ou == get_default_ou()
|
|
assert not user.check_password(PASS)
|
|
assert 'password' not in client.session['ldap-data']
|
|
|
|
|
|
def test_deactivate_orphaned_users(slapd, settings, client, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
|
|
# create users as a side effect
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
block = settings.LDAP_AUTH_SETTINGS[0]
|
|
assert (
|
|
ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 0
|
|
)
|
|
|
|
conn = slapd.get_connection_admin()
|
|
conn.delete_s(DN)
|
|
|
|
ldap_backend.LDAPBackend.deactivate_orphaned_users()
|
|
|
|
assert (
|
|
ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 1
|
|
)
|
|
|
|
# rename source realm
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{'url': [slapd.ldap_url], 'basedn': 'o=ôrga', 'use_tls': False, 'realm': 'test'}
|
|
]
|
|
|
|
ldap_backend.LDAPBackend.deactivate_orphaned_users()
|
|
assert (
|
|
ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 6
|
|
)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_simple_with_binddn(slapd, settings, client):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(DN),
|
|
'bindpw': PASS,
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') in result.content
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == '%s@ldap' % USERNAME
|
|
assert user.first_name == 'Étienne'
|
|
assert user.last_name == 'Michu'
|
|
assert user.is_active is True
|
|
assert user.is_superuser is False
|
|
assert user.is_staff is False
|
|
assert user.groups.count() == 0
|
|
assert user.ou == get_default_ou()
|
|
assert not user.check_password(PASS)
|
|
assert 'password' not in client.session['ldap-data']
|
|
|
|
|
|
def test_double_login(slapd, simple_user, settings, app, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'is_superuser': True,
|
|
'is_staff': True,
|
|
}
|
|
]
|
|
utils.login(app, simple_user, path='/admin/')
|
|
utils.login(app, UID, password=PASS, path='/admin/')
|
|
|
|
|
|
def test_login_failure(slapd, simple_user, settings, app, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'is_superuser': True,
|
|
'is_staff': True,
|
|
}
|
|
]
|
|
# create ldap user
|
|
utils.login(app, UID, password=PASS, path='/admin/')
|
|
utils.logout(app)
|
|
user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
|
|
|
|
utils.login(app, simple_user, password='wrong', fail=True)
|
|
utils.assert_event('user.login.failure', user=simple_user, username=simple_user.username)
|
|
|
|
utils.login(app, UID, password='wrong', fail=True)
|
|
utils.assert_event('user.login.failure', user=user, username=UID)
|
|
|
|
|
|
def test_keep_password_in_session(slapd, settings, client, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'keep_password_in_session': True,
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') in result.content
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == '%s@ldap' % USERNAME
|
|
assert user.first_name == 'Étienne'
|
|
assert user.last_name == 'Michu'
|
|
assert user.ou == get_default_ou()
|
|
assert not user.check_password(PASS)
|
|
assert client.session['ldap-data']['password']
|
|
assert DN.lower() in result.context['request'].user.ldap_data['password']
|
|
assert crypto.aes_base64_decrypt(
|
|
settings.SECRET_KEY, force_bytes(result.context['request'].user.ldap_data['password'][DN.lower()])
|
|
) == force_bytes(PASS)
|
|
|
|
|
|
def test_keep_password_true_or_false(slapd, settings, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'keep_password': True,
|
|
}
|
|
]
|
|
user = authenticate(username=USERNAME, password=PASS)
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.check_password(PASS)
|
|
|
|
settings.LDAP_AUTH_SETTINGS[0]['keep_password'] = False
|
|
user = ldap_backend.LDAPBackend().authenticate(username=USERNAME, password=PASS)
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert not user.check_password(PASS)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_custom_ou(slapd, settings, client):
|
|
OU = get_ou_model()
|
|
ou = OU.objects.create(name='test', slug='test')
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'ou_slug': 'test',
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') in result.content
|
|
assert User.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == '%s@ldap' % USERNAME
|
|
assert user.first_name == 'Étienne'
|
|
assert user.last_name == 'Michu'
|
|
assert user.ou == ou
|
|
assert not user.check_password(PASS)
|
|
|
|
|
|
def test_wrong_ou(slapd, settings, client, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'ou_slug': 'test',
|
|
}
|
|
]
|
|
with pytest.raises(ImproperlyConfigured):
|
|
client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
|
|
|
|
def test_dn_formatter():
|
|
from authentic2.ldap_utils import DnFormatter, FilterFormatter
|
|
|
|
formatter = FilterFormatter()
|
|
|
|
assert formatter.format('uid={uid}', uid='john doe') == 'uid=john doe'
|
|
assert formatter.format('uid={uid}', uid='(#$!"?éé') == 'uid=\\28#$!"?éé'
|
|
assert formatter.format('uid={uid}', uid=['(#$!"?éé']) == 'uid=\\28#$!"?éé'
|
|
assert formatter.format('uid={uid}', uid=('(#$!"?éé',)) == 'uid=\\28#$!"?éé'
|
|
|
|
formatter = DnFormatter()
|
|
|
|
assert formatter.format('uid={uid}', uid='john doé!#$"\'-_') == 'uid=john doé!#$\\"\'-_'
|
|
assert formatter.format('uid={uid}', uid=['john doé!#$"\'-_']) == 'uid=john doé!#$\\"\'-_'
|
|
|
|
|
|
def test_group_mapping(slapd, settings, client, db):
|
|
from django.contrib.auth.models import Group
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group1,o=ôrga', ['Group1']],
|
|
],
|
|
}
|
|
]
|
|
assert Group.objects.filter(name='Group1').count() == 0
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert Group.objects.filter(name='Group1').count() == 1
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert response.context['user'].groups.count() == 1
|
|
|
|
|
|
def test_posix_group_mapping(slapd, settings, client, db):
|
|
from django.contrib.auth.models import Group
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
}
|
|
]
|
|
assert Group.objects.filter(name='Group2').count() == 0
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert Group.objects.filter(name='Group2').count() == 1
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert response.context['user'].groups.count() == 1
|
|
|
|
|
|
def test_group_to_role_mapping(slapd, settings, client, db):
|
|
Role.objects.get_or_create(name='Role1')
|
|
Role.objects.get_or_create(name='Role2')
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
# memberOf is not defined on OpenLDAP so we use street for storing DN like
|
|
# memberOf values
|
|
'member_of_attribute': 'STReet',
|
|
'group_to_role_mapping': [
|
|
['cn=GrouP1,o=ôrga', ['Role1']],
|
|
['cn=GrouP2,o=ôrga', ['Role2']],
|
|
],
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert set(response.context['user'].roles.values_list('name', flat=True)) == set(['Role1', 'Role2'])
|
|
|
|
|
|
def test_posix_group_to_role_mapping(slapd, settings, client, db):
|
|
Role.objects.get_or_create(name='Role2')
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'group_to_role_mapping': [
|
|
['cn=group2,o=ôrga', ['Role2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert response.context['user'].roles.count() == 1
|
|
|
|
|
|
def test_group_to_role_mapping_modify_disabled(slapd, settings, db, app, admin, client):
|
|
role = Role.objects.create(name='Role3')
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'group_to_role_mapping': [
|
|
['cn=group1,o=ôrga', ['Role3']],
|
|
],
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
user = response.context['user']
|
|
assert user.roles.count() == 1
|
|
|
|
utils.login(app, admin, '/manage/')
|
|
|
|
response = app.get('/manage/roles/')
|
|
q = response.pyquery.remove_namespaces()
|
|
assert q('table tbody td.name').text() == 'Role3 (LDAP)'
|
|
|
|
response = app.get('/manage/users/%s/roles/?search-ou=%s' % (user.pk, user.ou.pk))
|
|
q = response.pyquery.remove_namespaces()
|
|
assert q('table tbody td.name').text() == 'Role3 (LDAP)'
|
|
assert q('table tbody td.member input').attr('disabled')
|
|
|
|
response = app.get('/manage/users/%s/roles/?search-ou=all' % user.pk)
|
|
q = response.pyquery.remove_namespaces()
|
|
assert q('table tbody td.name').text() == 'Role3 (LDAP)'
|
|
assert q('table tbody td.member input').attr('disabled')
|
|
|
|
response = app.get('/manage/roles/%s/' % (role.pk))
|
|
assert 'synchronised from LDAP' in response.text
|
|
q = response.pyquery.remove_namespaces()
|
|
assert not q('form.manager-m2m-add-form')
|
|
assert q('div.role-inheritance .role-add.disabled')
|
|
assert not q('table tbody td a.icon-remove-sign js-remove-object')
|
|
|
|
|
|
def test_group_su(slapd, settings, client, db):
|
|
from django.contrib.auth.models import Group
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'groupsu': [u'cn=group1,o=ôrga'],
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert Group.objects.count() == 0
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert response.context['user'].is_superuser
|
|
assert not response.context['user'].is_staff
|
|
|
|
|
|
def test_group_staff(slapd, settings, client, db):
|
|
from django.contrib.auth.models import Group
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'groupstaff': [u'cn=group1,o=ôrga'],
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': 'etienne.michu', 'password': PASS}, follow=True
|
|
)
|
|
assert Group.objects.count() == 0
|
|
assert response.context['user'].username == '%s@ldap' % USERNAME
|
|
assert response.context['user'].is_staff
|
|
assert not response.context['user'].is_superuser
|
|
|
|
|
|
def test_get_users(slapd, settings, db, monkeypatch, caplog):
|
|
from types import MethodType
|
|
|
|
import django.db.models.base
|
|
from django.contrib.auth.models import Group
|
|
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'group_to_role_mapping': [
|
|
['cn=unknown,o=dn', ['Role2']],
|
|
],
|
|
}
|
|
]
|
|
save = mock.Mock(wraps=ldap_backend.LDAPUser.save)
|
|
bulk_create = mock.Mock(wraps=django.db.models.query.QuerySet.bulk_create)
|
|
|
|
monkeypatch.setattr(ldap_backend.LDAPUser, 'save', lambda *args, **kwargs: save(*args, **kwargs))
|
|
monkeypatch.setattr(
|
|
django.db.models.query.QuerySet, 'bulk_create', lambda *args, **kwargs: bulk_create(*args, **kwargs)
|
|
)
|
|
|
|
assert Group.objects.count() == 0
|
|
# Provision all users and their groups
|
|
assert User.objects.count() == 0
|
|
users = list(ldap_backend.LDAPBackend.get_users())
|
|
assert len(users) == 6
|
|
assert User.objects.count() == 6
|
|
assert bulk_create.call_count == 1
|
|
assert save.call_count == 18
|
|
assert Group.objects.count() == 1
|
|
assert Group.objects.get().user_set.count() == 1
|
|
|
|
# Check that if nothing changed no save() is made
|
|
save.reset_mock()
|
|
bulk_create.reset_mock()
|
|
with utils.check_log(caplog, 'ldap: unknown group "cn=unknown,o=dn" mapped to a role'):
|
|
users = list(ldap_backend.LDAPBackend.get_users())
|
|
assert save.call_count == 0
|
|
assert bulk_create.call_count == 0
|
|
|
|
# Check that if we delete 1 user, only this user is created
|
|
save.reset_mock()
|
|
bulk_create.reset_mock()
|
|
User.objects.filter(username='etienne.michu@ldap').delete()
|
|
assert User.objects.count() == 5
|
|
users = list(ldap_backend.LDAPBackend.get_users())
|
|
assert len(users) == 6
|
|
assert User.objects.count() == 6
|
|
assert save.call_count == 3
|
|
assert bulk_create.call_count == 1
|
|
|
|
# uppercase user uid in the directory and check that no new user is created
|
|
conn = slapd.get_connection_admin()
|
|
ldif = [(ldap.MOD_REPLACE, 'uid', force_bytes(UID.upper()))]
|
|
conn.modify_s(DN, ldif)
|
|
save.reset_mock()
|
|
bulk_create.reset_mock()
|
|
users = list(ldap_backend.LDAPBackend.get_users())
|
|
assert len(users) == 6
|
|
assert User.objects.count() == 6
|
|
assert save.call_count == 0
|
|
assert bulk_create.call_count == 0
|
|
|
|
# create user with the same username, but case-different
|
|
save.reset_mock()
|
|
bulk_create.reset_mock()
|
|
u = ldap_backend.LDAPUser.objects.create(username=UID.capitalize())
|
|
ldap_backend.UserExternalId.objects.create(external_id=UID.capitalize(), source='ldap', user=u)
|
|
# set user login time as if he logged in
|
|
user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
|
|
user.last_login = timezone.now()
|
|
user.save()
|
|
assert ldap_backend.LDAPUser.objects.count() == 7
|
|
users = list(ldap_backend.LDAPBackend.get_users())
|
|
assert len(users) == 6
|
|
assert ldap_backend.LDAPUser.objects.filter(username='%s' % UID.capitalize()).count() == 0
|
|
|
|
|
|
def test_set_mandatory_roles(slapd, settings, db):
|
|
from authentic2.a2_rbac.models import Role
|
|
|
|
Role.objects.get_or_create(name='tech')
|
|
Role.objects.get_or_create(name='admin')
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech', 'admin'],
|
|
}
|
|
]
|
|
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
assert User.objects.first().roles.count() == 2
|
|
|
|
|
|
def test_nocreate_mandatory_roles(slapd, settings, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech', 'admin'],
|
|
}
|
|
]
|
|
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
assert User.objects.first().roles.count() == 0
|
|
|
|
|
|
def test_from_slug_set_mandatory_roles(slapd, settings, db):
|
|
from authentic2.a2_rbac.models import Role
|
|
|
|
Role.objects.get_or_create(name='Tech', slug='tech')
|
|
Role.objects.get_or_create(name='Admin', slug='admin')
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech', 'admin'],
|
|
}
|
|
]
|
|
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
assert User.objects.first().roles.count() == 2
|
|
|
|
|
|
def test_multiple_slug_set_mandatory_roles(slapd, settings, db):
|
|
from authentic2.a2_rbac.models import Role
|
|
|
|
service1 = Service.objects.create(name='s1', slug='s1')
|
|
service2 = Service.objects.create(name='s2', slug='s2')
|
|
Role.objects.create(name='foo', slug='tech', service=service1)
|
|
Role.objects.create(name='bar', slug='tech', service=service2)
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech'],
|
|
}
|
|
]
|
|
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
assert User.objects.first().roles.count() == 0
|
|
|
|
|
|
def test_multiple_name_set_mandatory_roles(slapd, settings, db):
|
|
from authentic2.a2_rbac.models import Role
|
|
|
|
OU = get_ou_model()
|
|
ou1 = OU.objects.create(name='test1', slug='test1')
|
|
ou2 = OU.objects.create(name='test2', slug='test2')
|
|
Role.objects.create(name='tech', slug='foo', ou=ou1)
|
|
Role.objects.create(name='tech', slug='bar', ou=ou2)
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
[u'cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech'],
|
|
}
|
|
]
|
|
|
|
list(ldap_backend.LDAPBackend.get_users())
|
|
assert User.objects.first().roles.count() == 0
|
|
|
|
|
|
@pytest.fixture
|
|
def slapd_strict_acl(slapd):
|
|
# forbid modifications by user themselves
|
|
conn = slapd.get_connection_external()
|
|
result = conn.search_s('cn=config', ldap.SCOPE_SUBTREE, 'olcSuffix=o=ôrga')
|
|
dn = result[0][0]
|
|
conn.modify_s(
|
|
dn,
|
|
[(ldap.MOD_REPLACE, 'olcAccess', [force_bytes('{0}to * by dn.subtree="o=ôrga" none by * manage')])],
|
|
)
|
|
return slapd
|
|
|
|
|
|
def test_no_connect_with_user_credentials(slapd_strict_acl, db, settings, app):
|
|
slapd = slapd_strict_acl
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'create_group': True,
|
|
'group_mapping': [
|
|
['cn=group2,o=ôrga', ['Group2']],
|
|
],
|
|
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
|
'set_mandatory_roles': ['tech', 'admin'],
|
|
}
|
|
]
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response = response.form.submit('login-password-submit')
|
|
assert response.status_code == 200
|
|
assert force_bytes('Étienne Michu') not in response.body
|
|
|
|
settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response = response.form.submit('login-password-submit').follow()
|
|
assert force_bytes('Étienne Michu') in response.body
|
|
|
|
|
|
def reset_password_ldap_user(settings, app):
|
|
assert User.objects.count() == 0
|
|
# first login
|
|
response = app.get('/login/')
|
|
response.form['username'] = USERNAME
|
|
response.form['password'] = PASS
|
|
response = response.form.submit('login-password-submit').follow()
|
|
assert User.objects.count() == 1
|
|
assert 'Étienne Michu' in str(response)
|
|
user = User.objects.get()
|
|
assert user.email == EMAIL
|
|
# logout
|
|
response = response.click('Logout').maybe_follow()
|
|
response = response.click('Reset it!')
|
|
response.form['email'] = EMAIL
|
|
assert len(mail.outbox) == 0
|
|
response = response.form.submit().maybe_follow()
|
|
assert len(mail.outbox) == 1
|
|
reset_email_url = utils.get_link_from_mail(mail.outbox[0])
|
|
response = app.get(reset_email_url, status=302)
|
|
response = response.maybe_follow()
|
|
assert 'login-password-submit' in response.text
|
|
settings.LDAP_AUTH_SETTINGS[0]['can_reset_password'] = True
|
|
response = app.get(reset_email_url, status=200)
|
|
new_password = 'Aa1xxxxx'
|
|
response.form['new_password1'] = new_password
|
|
response.form['new_password2'] = new_password
|
|
response = response.form.submit(status=302).maybe_follow()
|
|
assert app.session['_auth_user_backend'] == 'authentic2.backends.ldap_backend.LDAPBackendPasswordLost'
|
|
template_user = response.context['user']
|
|
assert 'carlicense' in template_user.get_attributes(object(), {})
|
|
# logout
|
|
response = response.click('Logout').maybe_follow()
|
|
return new_password
|
|
|
|
|
|
def test_reset_password_ldap_user(slapd, settings, app, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(slapd.root_bind_dn),
|
|
'bindpw': force_text(slapd.root_bind_password),
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'attributes': ['uid', 'carLicense'],
|
|
}
|
|
]
|
|
new_password = reset_password_ldap_user(settings, app)
|
|
# verify password has changed
|
|
slapd.get_connection().bind_s(DN, new_password)
|
|
with pytest.raises(ldap.INVALID_CREDENTIALS):
|
|
slapd.get_connection().bind_s(DN, PASS)
|
|
assert not User.objects.get().has_usable_password()
|
|
|
|
|
|
def test_user_cannot_change_password(slapd, settings, app, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(slapd.root_bind_dn),
|
|
'bindpw': force_text(slapd.root_bind_password),
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'user_can_change_password': False,
|
|
}
|
|
]
|
|
assert User.objects.count() == 0
|
|
# first login
|
|
response = app.get('/login/')
|
|
response.form['username'] = USERNAME
|
|
response.form['password'] = PASS
|
|
response = response.form.submit('login-password-submit').follow()
|
|
response = response.click('Your account')
|
|
assert 'Password' not in response
|
|
response = app.get('/accounts/password/change/')
|
|
assert response['Location'].endswith('/accounts/')
|
|
|
|
|
|
def test_tls(db, tls_slapd, settings, client):
|
|
conn = tls_slapd.get_connection_admin()
|
|
conn.modify_s(
|
|
'cn=config',
|
|
[
|
|
(ldap.MOD_ADD, 'olcTLSCACertificateFile', force_bytes(cert_file)),
|
|
(ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
|
|
],
|
|
)
|
|
|
|
# without TLS it does not work
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [tls_slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') not in result.content
|
|
assert force_bytes('name="username"') in result.content
|
|
|
|
# without TLS client authentication it does not work
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [tls_slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': True,
|
|
'cacertfile': cert_file,
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') not in result.content
|
|
assert force_bytes('name="username"') in result.content
|
|
|
|
# now it works !
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [tls_slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': True,
|
|
'cacertfile': cert_file,
|
|
'certfile': cert_file,
|
|
'keyfile': key_file,
|
|
}
|
|
]
|
|
result = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
assert result.status_code == 200
|
|
assert force_bytes('Étienne Michu') in result.content
|
|
assert force_bytes('name="username"') not in result.content
|
|
|
|
|
|
def test_user_attributes(slapd, settings, client, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'user_attributes': [
|
|
{
|
|
'from_ldap': 'l',
|
|
'to_user': 'locality',
|
|
},
|
|
],
|
|
}
|
|
]
|
|
|
|
# create a locality attribute
|
|
models.Attribute.objects.create(
|
|
label='locality',
|
|
name='locality',
|
|
kind='string',
|
|
required=False,
|
|
user_visible=True,
|
|
user_editable=False,
|
|
asked_on_registration=False,
|
|
multiple=False,
|
|
)
|
|
|
|
client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
username = '%s@ldap' % USERNAME
|
|
user = User.objects.get(username=username)
|
|
assert user.attributes.locality == 'Paris'
|
|
client.session.flush()
|
|
for i in range(5):
|
|
client.post(
|
|
'/login/',
|
|
{
|
|
'login-password-submit': '1',
|
|
'username': 'mïchu%s' % i,
|
|
'password': PASS,
|
|
},
|
|
follow=True,
|
|
)
|
|
username = 'mïchu%s@ldap' % i
|
|
user = User.objects.get(username=username)
|
|
assert user.attributes.locality == 'locality%s' % i
|
|
client.session.flush()
|
|
|
|
|
|
def test_set_password(slapd, settings, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
user = authenticate(username=u'etienne.michu', password=u'passé')
|
|
assert user
|
|
assert user.check_password(u'passé')
|
|
user.set_password(u'àbon')
|
|
assert user.check_password(u'àbon')
|
|
user2 = authenticate(username=u'etienne.michu', password=u'àbon')
|
|
assert user.pk == user2.pk
|
|
|
|
|
|
def test_login_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, app):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
pwdMaxFailure = 2
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: 0
|
|
pwdInHistory: 0
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: 0
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: {pwdMaxFailure}
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: FALSE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxFailure=pwdMaxFailure
|
|
)
|
|
)
|
|
|
|
for _ in range(pwdMaxFailure):
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', 'invalid')
|
|
response = response.form.submit(name='login-password-submit')
|
|
assert 'Incorrect Username or password' in str(response.pyquery('.errornotice'))
|
|
assert 'account is locked' not in str(response.pyquery('.messages'))
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', 'invalid')
|
|
response = response.form.submit(name='login-password-submit')
|
|
assert 'account is locked' in str(response.pyquery('.messages'))
|
|
|
|
|
|
def ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog):
|
|
pwdMaxFailure = 2
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: 0
|
|
pwdInHistory: 0
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: 0
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: {pwdMaxFailure}
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: FALSE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxFailure=pwdMaxFailure
|
|
)
|
|
)
|
|
|
|
for _ in range(pwdMaxFailure):
|
|
assert authenticate(username=USERNAME, password='incorrect') is None
|
|
assert "failed to login" in caplog.text
|
|
|
|
|
|
def test_authenticate_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
|
|
assert 'account is locked' not in caplog.text
|
|
assert authenticate(username=USERNAME, password='incorrect') is None
|
|
assert 'account is locked since 20' in caplog.text
|
|
|
|
|
|
def test_do_not_use_controls(slapd_ppolicy, settings, db, caplog):
|
|
"""
|
|
Same as test_authenticate_ppolicy_pwdMaxFailure but with use_controls
|
|
deactivated and therefore not logging when an account is locked.
|
|
"""
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': False,
|
|
}
|
|
]
|
|
|
|
ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
|
|
assert 'account is locked' not in caplog.text
|
|
assert authenticate(username=USERNAME, password='incorrect') is None
|
|
# this following line is the difference with test_authenticate_ppolicy_pwdMaxFailure
|
|
assert 'account is locked' not in caplog.text
|
|
|
|
|
|
def test_get_ppolicy_attributes(slapd_ppolicy, settings, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': u'o=ôrga',
|
|
'ppolicy_dn': u'cn=default,ou=ppolicies,o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
|
|
pwdMaxAge = 1
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: {pwdMaxAge}
|
|
pwdInHistory: 1
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: 0
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: 0
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: TRUE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxAge=pwdMaxAge
|
|
)
|
|
)
|
|
|
|
user = authenticate(username=USERNAME, password=UPASS)
|
|
assert user.check_password(UPASS)
|
|
password = u'ogutOmyetew4'
|
|
user.set_password(password)
|
|
|
|
time.sleep(pwdMaxAge * 3)
|
|
|
|
conn = ldap_backend.LDAPBackend.get_connection(settings.LDAP_AUTH_SETTINGS[0])
|
|
attributes = ldap_backend.LDAPBackend.get_ppolicy_attributes(settings.LDAP_AUTH_SETTINGS[0], conn, DN)
|
|
assert 'pwdchangedtime' in attributes
|
|
assert attributes['pwdmaxage'] == [str(pwdMaxAge)]
|
|
|
|
|
|
def test_authenticate_ppolicy_pwdGraceAuthnLimit(slapd_ppolicy, settings, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
pwdMaxAge = 1
|
|
pwdGraceAuthnLimit = 2
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: {pwdMaxAge}
|
|
pwdInHistory: 1
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: 0
|
|
pwdGraceAuthnLimit: {pwdGraceAuthnLimit}
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: 0
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: TRUE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxAge=pwdMaxAge, pwdGraceAuthnLimit=pwdGraceAuthnLimit
|
|
)
|
|
)
|
|
|
|
user = authenticate(username=USERNAME, password=UPASS)
|
|
assert user.check_password(UPASS)
|
|
password = 'ogutOmyetew4'
|
|
user.set_password(password)
|
|
|
|
time.sleep(pwdMaxAge * 3)
|
|
|
|
assert 'used 2 time' not in caplog.text
|
|
assert authenticate(username=USERNAME, password=password) is not None
|
|
assert 'used 2 times' in caplog.text
|
|
|
|
assert 'last time' not in caplog.text
|
|
assert authenticate(username=USERNAME, password=password) is not None
|
|
assert 'last time' in caplog.text
|
|
|
|
|
|
def test_authenticate_ppolicy_pwdExpireWarning(slapd_ppolicy, settings, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
pwdMaxAge = 3600
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: {pwdMaxAge}
|
|
pwdInHistory: 1
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: {pwdMaxAge}
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: 0
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: TRUE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxAge=pwdMaxAge
|
|
)
|
|
)
|
|
|
|
user = authenticate(username=USERNAME, password=UPASS)
|
|
assert user.check_password(UPASS)
|
|
password = 'ogutOmyetew4'
|
|
user.set_password(password)
|
|
|
|
time.sleep(2)
|
|
|
|
assert 'password will expire' not in caplog.text
|
|
assert authenticate(username=USERNAME, password=password) is not None
|
|
assert 'password will expire' in caplog.text
|
|
|
|
|
|
def test_login_ppolicy_pwdExpireWarning(slapd_ppolicy, settings, app, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'binddn': force_text(slapd_ppolicy.root_bind_dn),
|
|
'bindpw': force_text(slapd_ppolicy.root_bind_password),
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'attributes': ['carLicense'],
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
pwdMaxAge = 3600
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
objectclass: pwdPolicyChecker
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: {pwdMaxAge}
|
|
pwdInHistory: 1
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: {pwdMaxAge}
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: 0
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: TRUE
|
|
pwdSafeModify: FALSE
|
|
'''.format(
|
|
pwdMaxAge=pwdMaxAge
|
|
)
|
|
)
|
|
|
|
password = reset_password_ldap_user(settings, app)
|
|
|
|
time.sleep(2)
|
|
|
|
response = app.get('/login/')
|
|
response.form['username'] = USERNAME
|
|
response.form['password'] = password
|
|
response = response.form.submit('login-password-submit')
|
|
assert '/password/change/' in response['Location']
|
|
|
|
|
|
def test_authenticate_ppolicy_pwdAllowUserChange(slapd_ppolicy, settings, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd_ppolicy.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'use_controls': True,
|
|
}
|
|
]
|
|
|
|
slapd_ppolicy.add_ldif(
|
|
'''
|
|
dn: cn=default,ou=ppolicies,o=ôrga
|
|
cn: default
|
|
objectclass: top
|
|
objectclass: device
|
|
objectclass: pwdPolicy
|
|
pwdAttribute: userPassword
|
|
pwdMinAge: 0
|
|
pwdMaxAge: 0
|
|
pwdInHistory: 0
|
|
pwdCheckQuality: 0
|
|
pwdMinLength: 0
|
|
pwdExpireWarning: 0
|
|
pwdGraceAuthnLimit: 0
|
|
pwdLockout: TRUE
|
|
pwdLockoutDuration: 0
|
|
pwdMaxFailure: 0
|
|
pwdMaxRecordedFailure: 0
|
|
pwdFailureCountInterval: 0
|
|
pwdMustChange: FALSE
|
|
pwdAllowUserChange: FALSE
|
|
pwdSafeModify: FALSE
|
|
'''
|
|
)
|
|
|
|
user = authenticate(username=USERNAME, password=UPASS)
|
|
assert user.set_password(u'ogutOmyetew4') is None
|
|
assert 'STRONG_AUTH_REQUIRED' in caplog.text
|
|
|
|
|
|
def test_ou_selector(slapd, settings, app, ou1):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(DN),
|
|
'bindpw': PASS,
|
|
'basedn': 'o=ôrga',
|
|
'ou_slug': ou1.slug,
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
settings.A2_LOGIN_FORM_OU_SELECTOR = True
|
|
|
|
# Check login to the wrong ou does not work
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response.form.set('ou', str(get_default_ou().pk))
|
|
response = response.form.submit(name='login-password-submit')
|
|
assert response.pyquery('.errornotice')
|
|
assert '_auth_user_id' not in app.session
|
|
|
|
# Check login to the proper ou works
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response.form.set('ou', str(ou1.pk))
|
|
response = response.form.submit(name='login-password-submit').follow()
|
|
assert '_auth_user_id' in app.session
|
|
|
|
|
|
def test_ou_selector_default_ou(slapd, settings, app, ou1):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(DN),
|
|
'bindpw': PASS,
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
}
|
|
]
|
|
settings.A2_LOGIN_FORM_OU_SELECTOR = True
|
|
|
|
# Check login to the wrong ou does not work
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response.form.set('ou', str(ou1.pk))
|
|
response = response.form.submit(name='login-password-submit')
|
|
assert response.pyquery('.errornotice')
|
|
assert '_auth_user_id' not in app.session
|
|
|
|
# Check login to the proper ou works
|
|
response = app.get('/login/')
|
|
response.form.set('username', USERNAME)
|
|
response.form.set('password', PASS)
|
|
response.form.set('ou', str(get_default_ou().pk))
|
|
response = response.form.submit(name='login-password-submit').follow()
|
|
assert '_auth_user_id' in app.session
|
|
|
|
|
|
def test_sync_ldap_users(slapd, settings, app, db, capsys):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'user_attributes': [
|
|
{
|
|
'from_ldap': 'l',
|
|
'to_user': 'locality',
|
|
},
|
|
],
|
|
}
|
|
]
|
|
|
|
# create a locality attribute
|
|
models.Attribute.objects.create(
|
|
label='locality',
|
|
name='locality',
|
|
kind='string',
|
|
required=False,
|
|
user_visible=True,
|
|
user_editable=False,
|
|
asked_on_registration=False,
|
|
multiple=False,
|
|
)
|
|
|
|
assert User.objects.count() == 0
|
|
capsys.readouterr()
|
|
management.call_command('sync-ldap-users', verbosity=2)
|
|
assert len(capsys.readouterr().out.splitlines()) == 7
|
|
assert User.objects.count() == 6
|
|
assert all(user.first_name == 'Étienne' for user in User.objects.all())
|
|
assert all(user.attributes.first_name == 'Étienne' for user in User.objects.all())
|
|
assert all(user.last_name == 'Michu' for user in User.objects.all())
|
|
assert all(user.attributes.last_name == 'Michu' for user in User.objects.all())
|
|
assert all(
|
|
user.attributes.locality == 'Paris' or user.attributes.locality.startswith('locality')
|
|
for user in User.objects.all()
|
|
)
|
|
assert all(
|
|
[
|
|
user.userexternalid_set.first().external_id
|
|
== urllib.parse.quote(user.username.split('@')[0].encode('utf-8'))
|
|
for user in User.objects.all()
|
|
]
|
|
)
|
|
capsys.readouterr()
|
|
management.call_command('sync-ldap-users', verbosity=2)
|
|
assert len(capsys.readouterr().out.splitlines()) == 1
|
|
|
|
|
|
def test_alert_on_wrong_user_filter(slapd, settings, client, db, caplog):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'user_filter': '(&(objectClass=user)(sAMAccountName=*)', # wrong
|
|
}
|
|
]
|
|
with utils.check_log(caplog, "account name authentication filter doesn't contain '%s'"):
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
|
|
)
|
|
|
|
|
|
def test_get_attributes(slapd, settings, db, rf):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'attributes': ['uid', 'carLicense'],
|
|
}
|
|
]
|
|
user = authenticate(username=USERNAME, password=UPASS)
|
|
assert user
|
|
assert user.get_attributes(object(), {}) == {
|
|
'dn': 'cn=étienne michu,o=\xf4rga',
|
|
'givenname': [u'Étienne'],
|
|
'mail': [u'etienne.michu@example.net'],
|
|
'sn': [u'Michu'],
|
|
'uid': [u'etienne.michu'],
|
|
'carlicense': ['123445ABC'],
|
|
}
|
|
# simulate LDAP down
|
|
slapd.stop()
|
|
assert user.get_attributes(object(), {}) == {
|
|
'dn': 'cn=étienne michu,o=\xf4rga',
|
|
'givenname': [u'\xc9tienne'],
|
|
'mail': [u'etienne.michu@example.net'],
|
|
'sn': [u'Michu'],
|
|
'uid': [u'etienne.michu'],
|
|
'carlicense': ['123445ABC'],
|
|
}
|
|
assert not user.check_password(UPASS)
|
|
# simulate LDAP come back up
|
|
slapd.start()
|
|
assert user.check_password(UPASS)
|
|
# modify LDAP record and check attributes are updated
|
|
conn = slapd.get_connection_admin()
|
|
ldif = [(ldap.MOD_REPLACE, 'sn', [b'Micho'])]
|
|
conn.modify_s(DN, ldif)
|
|
assert user.get_attributes(object(), {}) == {
|
|
'dn': 'cn=étienne michu,o=\xf4rga',
|
|
'givenname': [u'\xc9tienne'],
|
|
'mail': [u'etienne.michu@example.net'],
|
|
'sn': [u'Micho'],
|
|
'uid': [u'etienne.michu'],
|
|
'carlicense': ['123445ABC'],
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_get_extra_attributes(slapd, settings, client):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'groupstaff': ['cn=group1,o=ôrga'],
|
|
'attributes': ['uid'],
|
|
'extra_attributes': {
|
|
'orga': {
|
|
'loop_over_attribute': 'o',
|
|
'filter': '(&(objectclass=organization)(o={item}))',
|
|
'basedn': 'o=ôrga',
|
|
'scope': 'sub',
|
|
'mapping': {
|
|
'id': 'o',
|
|
'street': 'postalAddress',
|
|
'city': 'l',
|
|
'postal_code': 'postalCode',
|
|
},
|
|
'serialization': 'json',
|
|
}
|
|
},
|
|
}
|
|
]
|
|
response = client.post(
|
|
'/login/', {'login-password-submit': '1', 'username': 'etienne.michu', 'password': PASS}, follow=True
|
|
)
|
|
user = response.context['user']
|
|
fetched_attrs = user.get_attributes(object(), {})
|
|
assert UID in fetched_attrs.get('uid')
|
|
assert 'orga' in fetched_attrs
|
|
orgas = json.loads(fetched_attrs.get('orga'))
|
|
assert isinstance(orgas, list)
|
|
assert len(orgas) == 2
|
|
assert {'id': EO_O, 'street': EO_STREET, 'city': EO_CITY, 'postal_code': EO_POSTALCODE} in orgas
|
|
assert {'id': EE_O, 'street': EE_STREET, 'city': EE_CITY, 'postal_code': EE_POSTALCODE} in orgas
|
|
|
|
|
|
def test_config_to_lowercase():
|
|
config = {
|
|
'fname_field': 'givenName',
|
|
'lname_field': 'surName',
|
|
'email_field': 'EMAIL',
|
|
'attributes': ['ZoB', 'CoiN'],
|
|
'mandatory_attributes_values': {
|
|
'XXX': ['A'],
|
|
},
|
|
'member_of_attribute': 'memberOf',
|
|
'group_mapping': [
|
|
['CN=coin,OU=Groups,DC=coin,DC=Fr', ['Group 1']],
|
|
],
|
|
'group_to_role_mapping': [
|
|
['CN=coin,OU=Groups,DC=coin,DC=Fr', ['Group 1']],
|
|
],
|
|
'attribute_mappings': [
|
|
['XXX', 'YYY'],
|
|
],
|
|
'external_id_tuples': [['A', 'B', 'C']],
|
|
'user_attributes': [
|
|
{
|
|
'from_ldap': 'ABC',
|
|
'to_user': 'Phone',
|
|
}
|
|
],
|
|
}
|
|
|
|
config_normalized = dict(config, url='ldap://example.net', basedn='dc=coin,dc=fr')
|
|
ldap_backend.LDAPBackend.update_default(config_normalized)
|
|
|
|
# only keep keys we are interested in
|
|
for key in list(config_normalized):
|
|
if key not in config:
|
|
del config_normalized[key]
|
|
|
|
assert config_normalized == {
|
|
"fname_field": "givenname",
|
|
"lname_field": "surname",
|
|
"email_field": "email",
|
|
"attributes": ["zob", "coin"],
|
|
"mandatory_attributes_values": {"xxx": ["A"]},
|
|
"member_of_attribute": "memberof",
|
|
"group_mapping": [["cn=coin,ou=groups,dc=coin,dc=fr", ["Group 1"]]],
|
|
"group_to_role_mapping": [["cn=coin,ou=groups,dc=coin,dc=fr", ["Group 1"]]],
|
|
"attribute_mappings": [
|
|
["xxx", "yyy"],
|
|
],
|
|
"external_id_tuples": [
|
|
["a", "b", "c"],
|
|
],
|
|
'user_attributes': [
|
|
{
|
|
'from_ldap': 'abc',
|
|
'to_user': 'Phone',
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def test_switch_user_ldap_user(slapd, settings, app, db):
|
|
settings.LDAP_AUTH_SETTINGS = [
|
|
{
|
|
'url': [slapd.ldap_url],
|
|
'binddn': force_text(slapd.root_bind_dn),
|
|
'bindpw': force_text(slapd.root_bind_password),
|
|
'basedn': 'o=ôrga',
|
|
'use_tls': False,
|
|
'attributes': ['carLicense'],
|
|
}
|
|
]
|
|
# get all users
|
|
management.call_command('sync-ldap-users', verbosity=2)
|
|
|
|
user = User.objects.get(username=USERNAME + '@ldap')
|
|
url = switch_user.build_url(user)
|
|
response = app.get(url).follow()
|
|
assert app.session['_auth_user_backend'] == 'authentic2.backends.ldap_backend.LDAPBackendPasswordLost'
|
|
template_user = response.context['user']
|
|
assert 'carlicense' in template_user.get_attributes(object(), {})
|
|
|
|
|
|
def test_build_external_id(slapd, settings, client, db):
|
|
backend = ldap_backend.LDAPBackend()
|
|
|
|
assert backend.build_external_id(['uid'], {'uid': 'john.doe'}) == 'john.doe'
|
|
assert backend.build_external_id(['uid'], {}) is None
|