add ldap connector (#66533)

This commit is contained in:
Benjamin Dauvergne 2022-08-01 19:15:30 +02:00
parent 959ce6c197
commit e905cdb516
21 changed files with 1249 additions and 12 deletions

View File

View File

@ -0,0 +1,33 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from OpenSSL import crypto
def validate_certificate(value):
try:
crypto.load_certificate(crypto.FILETYPE_PEM, value.open().read())
except Exception:
raise ValidationError(_('Invalid certificate.'))
def validate_private_key(value):
try:
crypto.load_privatekey(crypto.FILETYPE_PEM, value.open().read())
except Exception:
raise ValidationError(_('Invalid private key.'))

View File

@ -0,0 +1,97 @@
# Generated by Django 3.2.14 on 2022-08-02 14:37
from django.db import migrations, models
import passerelle.apps.ldap.forms
import passerelle.utils.models
class Migration(migrations.Migration):
initial = True
dependencies = [
('base', '0029_auto_20210202_1627'),
]
operations = [
migrations.CreateModel(
name='Resource',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('title', models.CharField(max_length=50, verbose_name='Title')),
('slug', models.SlugField(unique=True, verbose_name='Identifier')),
('description', models.TextField(verbose_name='Description')),
('ldap_url', passerelle.utils.models.LDAPURLField(max_length=512, verbose_name='Server URL')),
(
'ldap_bind_dn',
models.CharField(blank=True, max_length=256, null=True, verbose_name='Bind DN'),
),
(
'ldap_bind_password',
models.CharField(blank=True, max_length=128, null=True, verbose_name='Bind password'),
),
(
'ldap_tls_cert',
models.FileField(
verbose_name='TLS client certificate',
null=True,
blank=True,
upload_to=passerelle.utils.models.resource_file_upload_to,
validators=[passerelle.apps.ldap.forms.validate_certificate],
),
),
(
'ldap_tls_key',
models.FileField(
blank=True,
null=True,
upload_to=passerelle.utils.models.resource_file_upload_to,
validators=[passerelle.apps.ldap.forms.validate_private_key],
verbose_name='TLS client key',
),
),
(
'ldap_tls_cacert',
models.FileField(
blank=True,
null=True,
upload_to=passerelle.utils.models.resource_file_upload_to,
validators=[passerelle.apps.ldap.forms.validate_certificate],
verbose_name='TLS trusted certificate',
),
),
(
'users',
models.ManyToManyField(
blank=True,
related_name='_ldap_resource_users_+',
related_query_name='+',
to='base.ApiUser',
),
),
(
'ldap_tls_check_hostname',
models.BooleanField(
verbose_name='TLS check hostname',
default=True,
blank=True,
),
),
(
'ldap_tls_check_cert',
models.BooleanField(
verbose_name='TLS check certificate',
default=True,
blank=True,
),
),
],
options={
'verbose_name': 'LDAP',
},
),
]

View File

@ -0,0 +1,378 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import contextlib
import ldap
import ldap.filter
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
from OpenSSL import crypto
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.models import LDAPURLField, resource_file_upload_to
from passerelle.utils.templates import render_to_string
from . import forms
SEARCH_OP_SUBSTRING = 'substring'
SEARCH_OP_PREFIX = 'prefix'
SEARCH_OP_APPROX = 'approx'
class Resource(BaseResource):
ldap_url = LDAPURLField(verbose_name=_('Server URL'), max_length=512)
ldap_bind_dn = models.CharField(verbose_name=_('Bind DN'), max_length=256, null=True, blank=True)
ldap_bind_password = models.CharField(
verbose_name=_('Bind password'), max_length=128, null=True, blank=True
)
ldap_tls_cert = models.FileField(
verbose_name=_('TLS client certificate'),
upload_to=resource_file_upload_to,
null=True,
blank=True,
validators=[forms.validate_certificate],
)
ldap_tls_key = models.FileField(
verbose_name=_('TLS client key'),
upload_to=resource_file_upload_to,
null=True,
blank=True,
validators=[forms.validate_private_key],
)
ldap_tls_cacert = models.FileField(
verbose_name=_('TLS trusted certificate'),
upload_to=resource_file_upload_to,
null=True,
blank=True,
validators=[forms.validate_certificate],
)
ldap_tls_check_hostname = models.BooleanField(
verbose_name=_('TLS check hostname'),
default=True,
blank=True,
)
ldap_tls_check_cert = models.BooleanField(
verbose_name=_('TLS check certificate'),
default=True,
blank=True,
)
category = _('Misc')
class Meta:
verbose_name = _('LDAP')
def tls_cert(self, value):
if not value.name:
return None
with value as fd:
content = fd.read()
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, content)
name = ','.join(
'%s=%s' % (a.decode(), b.decode()) for a, b in cert.get_subject().get_components()
)
except Exception:
name = ('%s bytes') % len(content)
return format_html(
'<a href="data:application/octet-string;base64,{}" target="_blank" download="tls.crt">{}<a/>',
base64.b64encode(content).decode(),
name,
)
def clean(self):
if bool(self.ldap_bind_dn) != bool(self.ldap_bind_password):
raise ValidationError('Bind DN and password must be set together.')
if bool(self.ldap_tls_cert.name) != bool(self.ldap_tls_key.name):
raise ValidationError('Client certificate and key must be set together.')
def get_description_fields(self):
fields = super().get_description_fields()
fields = [
(field, self.tls_cert(value) if field.name == 'ldap_tls_cert' else value)
for field, value in fields
]
return fields
def check_status(self):
with self.get_connection() as conn:
conn.whoami_s()
@contextlib.contextmanager
def get_connection(self):
conn = ldap.initialize(self.ldap_url)
conn.set_option(ldap.OPT_TIMEOUT, 5)
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, 5)
if self.ldap_tls_check_hostname:
conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_DEMAND)
else:
conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_NEVER)
if self.ldap_tls_check_cert:
conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
else:
conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
if self.ldap_tls_cert.name and self.ldap_tls_key.name:
conn.set_option(ldap.OPT_X_TLS_CERTFILE, self.ldap_tls_cert.path)
conn.set_option(ldap.OPT_X_TLS_KEYFILE, self.ldap_tls_key.path)
if self.ldap_tls_cacert.name:
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ldap_tls_cacert.path)
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
if self.ldap_bind_dn:
conn.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_password or '')
else:
conn.simple_bind_s()
yield conn
conn.unbind()
def ldap_search(self, base_dn, scope, ldap_filter, ldap_attributes, sizelimit=-1, timeout=5):
with self.get_connection() as conn:
message_id = conn.search_ext(
base_dn, scope, ldap_filter, ldap_attributes, timeout=timeout, sizelimit=sizelimit
)
while True:
try:
dummy, entries = conn.result(message_id, all=0)
except ldap.SIZELIMIT_EXCEEDED:
break
if not entries:
break
for dn, attributes in entries:
if dn:
decoded_attributes = cidict()
# decode values to unicode, if possible, and keep only the first value
for k, values in attributes.items():
decoded_values = []
for value in values:
try:
decoded_values.append(value.decode())
except UnicodeDecodeError:
pass
if decoded_values:
if len(decoded_values) == 1:
decoded_attributes[k] = decoded_values[0]
else:
decoded_attributes[k] = decoded_values
yield dn, decoded_attributes
def search(
self,
ldap_base_dn,
scope,
ldap_filter,
ldap_attributes,
sizelimit,
id_attribute,
search_attribute,
text_template,
):
ldap_attributes = tuple(sorted(ldap_attributes))
cache_fingerprint = str(
[
ldap_base_dn,
scope,
ldap_filter,
ldap_attributes,
sizelimit,
id_attribute,
search_attribute,
text_template,
]
)
cache_key = f'ldap-{self.id}-{hash(cache_fingerprint)}'
cache_value = cache.get(cache_key)
if cache_value and cache_value[0] == cache_fingerprint:
return {'data': cache_value[1]}
try:
entries = list(
self.ldap_search(ldap_base_dn, scope, ldap_filter, ldap_attributes, sizelimit=sizelimit)
)
except ldap.LDAPError as e:
# add a disabled entry to show something on search errors, with display_disabled_items on w.c.s.
return {
'err': 1,
'data': [
{
'id': '',
'text': _('Directory server is unavailable'),
'disabled': True,
}
],
'err_class': 'directory-server-unavailable',
'err_desc': str(e),
}
data = []
for dn, attributes in entries:
entry_id = attributes.get(id_attribute)
if not entry_id:
continue
if text_template:
entry_text = render_to_string(text_template, attributes)
else:
entry_text = attributes.get(search_attribute)
data.append(
{
'id': entry_id,
'text': entry_text,
'dn': dn,
'attributes': attributes,
}
)
data.sort(key=lambda x: (x['text'], x['id']))
cache.set(cache_key, (cache_fingerprint, data))
return {'data': data}
@endpoint(
description=_('Search'),
name='search',
perm='can_access',
parameters={
'ldap_base_dn': {
'description': _('Base DN for the LDAP search'),
'example_value': 'dc=company,dc=com',
},
'search_attribute': {
'description': _('Attribute to search for the substring search'),
'example_value': 'cn',
},
'id_attribute': {
'description': _('Attribute used as a unique identifier'),
'example_value': 'uid',
},
'text_template': {
'description': _(
'Optional template string based on LDAP attributes '
'to create a text value, if none given the search_attribute is used'
),
'example_value': '{{ givenName }} {{ surname }}',
},
'ldap_attributes': {
'description': _('Space separated list of LDAP attributes to retrieve'),
'example_value': 'l sn givenName locality',
},
'id': {
'description': _('Identifier for exact retrieval, using the id_attribute'),
'example_value': 'johndoe',
},
'q': {
'description': _('Substring to search in the search_attribute'),
'example_value': 'John Doe',
},
'sizelimit': {
'description': _('Maximum number of entries to retrieve, between 1 and 200, default is 30.')
},
'scope': {
'description': _('Scope of the LDAP search, subtree or onelevel, default is subtree.'),
},
'filter': {
'description': _('Extra LDAP filter.'),
'example_value': 'objectClass=*',
},
'search_op': {
'description': _(
'Search operator, can be "substring" (the default value), "prefix" or "approx"'
),
'example_value': SEARCH_OP_SUBSTRING,
},
},
)
def search_endpoint(
self,
request,
ldap_base_dn,
search_attribute,
id_attribute,
text_template=None,
ldap_attributes=None,
id=None,
q=None,
sizelimit=None,
scope=None,
filter=None,
search_op=SEARCH_OP_SUBSTRING,
):
search_attribute = search_attribute.lower()
id_attribute = id_attribute.lower()
if not search_attribute.isascii():
raise APIError('search_attribute contains non ASCII characters')
if not id_attribute.isascii():
raise APIError('id_attribute contains non ASCII characters')
ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set()
ldap_attributes.update([search_attribute, id_attribute])
if not all(attribute.isascii() for attribute in ldap_attributes):
raise APIError('ldap_attributes contains non ASCII characters')
try:
sizelimit = int(sizelimit)
except (ValueError, TypeError):
pass
sizelimit = max(1, min(sizelimit or 30, 200))
if not q and not id:
raise APIError('q or id are mandatory parameters', http_status=400)
if id:
ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id))
elif q:
if search_op == SEARCH_OP_SUBSTRING:
ldap_filter = '(%s=*%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
elif search_op == SEARCH_OP_PREFIX:
ldap_filter = '(%s=%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
elif search_op == SEARCH_OP_APPROX:
ldap_filter = '(%s~=%s)' % (search_attribute, ldap.filter.escape_filter_chars(q))
else:
raise APIError('unknown search_op %r' % search_op)
if filter:
if not filter.startswith('('):
filter = '(%s)' % filter
ldap_filter = '(&%s%s)' % (ldap_filter, filter)
scopes = {
'subtree': ldap.SCOPE_SUBTREE,
'onelevel': ldap.SCOPE_ONELEVEL,
}
scope = scopes.get(scope, ldap.SCOPE_SUBTREE)
return self.search(
ldap_base_dn=ldap_base_dn,
scope=scope,
ldap_filter=ldap_filter,
ldap_attributes=ldap_attributes,
sizelimit=sizelimit,
id_attribute=id_attribute,
search_attribute=search_attribute,
text_template=text_template,
)
# use a case-insensitive dictionnary to handle map of attribute to values.
class cidict(dict):
'''Case insensitive dictionnary'''
def __setitem__(self, key, value):
super().__setitem__(key.lower(), value)
def __getitem__(self, key):
return super().__getitem__(key.lower())
def __contains__(self, key):
return super().__contains__(key.lower())
def get(self, key, default=None, /):
return super().get(key.lower(), default)

View File

@ -4,6 +4,7 @@ import django.db.models.deletion
from django.db import migrations, models
import passerelle.apps.sector.models
import passerelle.utils.models
class Migration(migrations.Migration):
@ -43,7 +44,7 @@ class Migration(migrations.Migration):
'csv_file',
models.FileField(
help_text='CSV file',
upload_to=passerelle.apps.sector.models.upload_to,
upload_to=passerelle.utils.models.resource_file_upload_to,
verbose_name='Sectorization file',
),
),

View File

@ -32,6 +32,7 @@ from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.models import resource_file_upload_to
PARITY_ALL = 0
PARITY_ODD = 1
@ -56,14 +57,10 @@ CSV_TITLES = [
MAX_HOUSENUMBER = 999_999
def upload_to(instance, filename):
return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
class SectorResource(BaseResource):
csv_file = models.FileField(
_('Sectorization file'),
upload_to=upload_to,
upload_to=resource_file_upload_to,
help_text=_('CSV file'),
)
titles_in_first_line = models.BooleanField(

View File

@ -4,6 +4,7 @@ from __future__ import unicode_literals
from django.db import migrations, models
import passerelle.contrib.nancypoll.models
import passerelle.utils.models
class Migration(migrations.Migration):
@ -17,7 +18,7 @@ class Migration(migrations.Migration):
model_name='nancypoll',
name='csv_file',
field=models.FileField(
upload_to=passerelle.contrib.nancypoll.models.upload_to, verbose_name='CSV File'
upload_to=passerelle.utils.models.resource_file_upload_to, verbose_name='CSV File'
),
),
]

View File

@ -7,6 +7,7 @@ from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.models import resource_file_upload_to
COLUMN_NAMES = (
'street_start_number, street_end_number,,,street_side,,,,code,id,text,address,,,street_name,,canton,,,'
@ -17,12 +18,8 @@ def to_unicode(value):
return force_str(value, 'utf-8')
def upload_to(instance, filename):
return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
class NancyPoll(BaseResource):
csv_file = models.FileField(_('CSV File'), upload_to=upload_to)
csv_file = models.FileField(_('CSV File'), upload_to=resource_file_upload_to)
category = _('Data Sources')
class Meta:

View File

@ -150,6 +150,7 @@ INSTALLED_APPS = (
'passerelle.apps.gesbac',
'passerelle.apps.holidays',
'passerelle.apps.jsondatastore',
'passerelle.apps.ldap',
'passerelle.apps.maelis',
'passerelle.apps.mdel',
'passerelle.apps.mdel_ddpacs',

22
passerelle/utils/forms.py Normal file
View File

@ -0,0 +1,22 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.core import validators
class LDAPURLField(forms.URLField):
default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]

View File

@ -0,0 +1,37 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core import validators
from django.db import models
from .forms import LDAPURLField as LDAPURLFormField
def resource_file_upload_to(instance, filename):
return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
class LDAPURLField(models.URLField):
default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
def formfield(self, **kwargs):
return super().formfield(
**{
'form_class': LDAPURLFormField,
**kwargs,
}
)

View File

@ -165,6 +165,8 @@ setup(
'pytz',
'vobject',
'Levenshtein',
'python-ldap',
'pyOpenSSL',
],
cmdclass={
'build': build,

0
tests/ldap/__init__.py Normal file
View File

19
tests/ldap/cert.pem Normal file
View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDBjCCAe6gAwIBAgIUTKopT76CFlsVcI7FAilaYLILz0owDQYJKoZIhvcNAQEL
BQAwIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVudHJvdXZlcnQub3JnMB4XDTE4MTIw
NTE2NTkyNFoXDTI4MTIwMjE2NTkyNFowIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVu
dHJvdXZlcnQub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4xsL
n25yittbjk5bcKvY2I8zPivL6YWn2MJimaQQSNzCw/8POmVPLmMIb3lcZjydFRad
+RTxZfnuvCCJrnGrG7hOsJNenTLLU0ugN/yQ1869cM07a9tjSzL7NCz9H1NIK1+Q
cBsTExc77dOWpwWI9TjqYYRL+zex3ml8cdqcQ7BQUQxAvA4UU63DM2G+5O3dE7l8
uvyBUU3kW/shHyhfweWNXO8IXXIjvDfPYkOsjc6en2kFMr+sENSUKgfDKjz/Uzqy
S7LBb4tkJALZM8QP56VeQAG1JZF2J2/y1RqBfIGRIEkYoaHcj6UATZa1xcZjMubL
z3otRNYcRXKJMYWGbQIDAQABozIwMDAJBgNVHRMEAjAAMCMGA1UdEQQcMBqCGGxv
Y2FsaG9zdC5lbnRyb3V2ZXJ0Lm9yZzANBgkqhkiG9w0BAQsFAAOCAQEAFVPavBah
mIjgnTjq6ZbFxXTNJW0TrqN8olbKJ6SfwWVk0I8px7POekFaXd+egsFJlWYyH9q4
HkKotddRYYrWoXcPiodNfUa+bRnh2WYl2rEGMW5dbBf/MYCDts68c3SoA7JIYJ8w
0QZGAkijKNtVML0/FrLuJWbfFBAWH8JB46BcAg/8flbMHAULzV3F1g/v0A3FG3Y/
9fVr+lN5qs+NB9NXIMdf5wXrmJQYRjotyOjUO6yTFqDFvqE7DEpKQD5hnvqJoXCz
zYQS1DjH1qSRc5vC8I7YlJowCfnI9MsEICSrsk75DhT091aJC2XX93o4zhfNxmO5
Kj28hP87GHgNIg==
-----END CERTIFICATE-----

115
tests/ldap/conftest.py Normal file
View File

@ -0,0 +1,115 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import os.path
import pathlib
import socket
import pytest
from ldaptools.slapd import Slapd, has_slapd
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
base_dir = os.path.dirname(__file__)
cert_file = os.path.join(base_dir, 'cert.pem')
key_file = os.path.join(base_dir, 'key.pem')
@pytest.fixture
def cert():
return pathlib.Path(cert_file)
@pytest.fixture
def key():
return pathlib.Path(key_file)
@pytest.fixture
def cert_content(cert):
with cert.open(mode='rb') as fd:
return fd.read()
@pytest.fixture
def key_content(key):
with key.open(mode='rb') as fd:
return fd.read()
def find_free_tcp_port():
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
@pytest.fixture
def ldap_params():
return {
'ldap_url': 'ldap://localhost.entrouvert.org:%s' % find_free_tcp_port(),
}
@pytest.fixture
def ldap_object(ldap_params):
with Slapd(**ldap_params) as slapd:
yield slapd
@pytest.fixture
def ldap_configure():
pass
@pytest.fixture
def ldap_server(ldap_object, ldap_configure):
return ldap_object
@pytest.fixture
def resource_class(db):
from passerelle.apps.ldap.models import Resource
return Resource
@pytest.fixture
def resource_params(ldap_params):
return {
'title': 'resource',
'slug': 'resource',
'description': 'resource',
'ldap_url': ldap_params['ldap_url'],
}
@pytest.fixture
def resource_access_rights(resource_object):
from tests.utils import setup_access_rights
setup_access_rights(resource_object)
@pytest.fixture
def resource_object(resource_class, resource_params):
return resource_class.objects.create(**resource_params)
@pytest.fixture
def resource(resource_object, resource_access_rights):
return resource_object

28
tests/ldap/key.pem Normal file
View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDjGwufbnKK21uO
Tltwq9jYjzM+K8vphafYwmKZpBBI3MLD/w86ZU8uYwhveVxmPJ0VFp35FPFl+e68
IImucasbuE6wk16dMstTS6A3/JDXzr1wzTtr22NLMvs0LP0fU0grX5BwGxMTFzvt
05anBYj1OOphhEv7N7HeaXxx2pxDsFBRDEC8DhRTrcMzYb7k7d0TuXy6/IFRTeRb
+yEfKF/B5Y1c7whdciO8N89iQ6yNzp6faQUyv6wQ1JQqB8MqPP9TOrJLssFvi2Qk
AtkzxA/npV5AAbUlkXYnb/LVGoF8gZEgSRihodyPpQBNlrXFxmMy5svPei1E1hxF
cokxhYZtAgMBAAECggEAOUZI2BxyprJLlMgOJ4wvU+5JbhR9iJc8jV34n+bQdI+4
TtW0cXW7UmeHaRWiR+Zhd0AM9xRhDObLXoaWMnhYPtVsgvunkN2OiaM49OWtYb+x
5xDbO4hIsl5ZG/98lrnaKZYgRyWM2fOyGXiTNewfbji8Y3uJ7gFNylmwGMaZQjhr
YNaqNEV7Vs2n7oERxqzKG9947oBAx2hpmoaW6eMyXcWl2ov7iHpJSKUBKho+5PWc
J731no0OsGuS+3jHa/0nZXrT8nKmemyDMdSfWmtTv659L/guFInZpHPfFVLf56vc
J6zb/IzEJV+Nh7CfBsMbHlTYBeUFlRWsy9t70+OZwQKBgQD3J4aVN4vJhXX1zDgL
dVAczwLGKXY38BoBjOeRtVhXHs5p/eNIqeZ2YbYwBBy3nL414Un7gqb9fDtg0i3n
5mQIOWhvpIYUxtwIPgYwzumxxp/n7XdU4BPDbxejZxkuC7AR5bB34pwJAJvWRGEf
0X1TxJlqULhiZ6g18O3S0oiJtwKBgQDrO9ROaj6kkxjYHmljBZIXXhdKDcn0AqPi
w20Aaafx0oxNQAoq8Gtu22Z1QHwRdBeUJwqCbmHVCCwbMf/568zFAANuT9bKMe6X
J0p0nTDiyn8w9MfduFuG4cUMn4oK6dIuYlscguoPQCvQdciwG+djqwTrHib5TEbm
jeKEkY2A+wKBgQDvXt+wy2BeqBzMF6M8Lb2OeUwVgniVyrxVPhPVgk5x6ks+SoAD
k1G62/3o2UK67lsmsfDGYA69uMGFj2qYjAHcGUW1wyF9I/BdJz01rmCWJmoe5VXK
5U8e3AyH3MV9XCKF4vCb2+UFrwo/ZnCusWVxaRqw5kb+P6ihvZuIsRE+VwKBgQC7
2duBg3bjFlUQwbiHSzuPTaRrjvdn1XPq8wVo/vcPNoS0bB+yiqxAqxT3LbfmeD8c
INFTt7KI3S3byeIRQy0TZR9YSInOjnFqZAYheiY/9lX8Un4Jod/1pvYlToJ+lJs0
T3dTHXitFSHoJydM+/ucrEYRPNMC4tb75vKty06lYQKBgQCx49+g5kQaaRZ+4psw
+eolMpAwKDkpK5gYen6OsrT8m4hpxTmtiteMsH5Avb/fxqoJWLOjhN4EnEZTMJzr
LyGoKsTv7rhZwhRznE15rOzxmldWrcCkl7DGuM2GcKgguhCYF7U7KA+vUCeqCE0H
LA2grkY+TxFpg1pwYdF1hekmTw==
-----END PRIVATE KEY-----

170
tests/ldap/test_manager.py Normal file
View File

@ -0,0 +1,170 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import pytest
from django.core.files.base import ContentFile
from webtest import Upload
pytestmark = pytest.mark.django_db
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms[0]
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
@pytest.fixture
def app(app, admin_user):
login(app)
return app
def test_add(app, db, cert_content, key_content, resource_class):
response = app.get('/manage/ldap/add')
response.form.set('slug', 'resource')
response.form.set('title', 'resource')
response.form.set('description', 'resource')
response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
response.form.set('ldap_bind_dn', 'uid=user,o=orga')
response.form.set('ldap_bind_password', 'password')
response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
response = response.form.submit(status=302)
assert resource_class.objects.count() == 1
resource = resource_class.objects.get()
assert resource.ldap_url == 'ldap://localhost.entrouvert.org'
assert resource.ldap_bind_dn == 'uid=user,o=orga'
assert resource.ldap_bind_password == 'password'
with resource.ldap_tls_cert as fd:
assert fd.read() == cert_content
with resource.ldap_tls_key as fd:
assert fd.read() == key_content
def test_missing_bind_password(app, db, cert_content, key_content, resource_class):
response = app.get('/manage/ldap/add')
response.form.set('slug', 'resource')
response.form.set('title', 'resource')
response.form.set('description', 'resource')
response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
response.form.set('ldap_bind_dn', 'uid=user,o=orga')
response = response.form.submit(status=200)
def test_missing_bind_dn(app, db, cert_content, key_content, resource_class):
response = app.get('/manage/ldap/add')
response.form.set('slug', 'resource')
response.form.set('title', 'resource')
response.form.set('description', 'resource')
response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
response.form.set('ldap_bind_password', 'password')
response = response.form.submit(status=200)
def test_missing_tls_key(app, db, cert_content, key_content, resource_class):
response = app.get('/manage/ldap/add')
response.form.set('slug', 'resource')
response.form.set('title', 'resource')
response.form.set('description', 'resource')
response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
response = response.form.submit(status=200)
def test_missing_tls_cert(app, db, cert_content, key_content, resource_class):
response = app.get('/manage/ldap/add')
response.form.set('slug', 'resource')
response.form.set('title', 'resource')
response.form.set('description', 'resource')
response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
response = response.form.submit(status=200)
EXPORT_JSON = {
'resources': [
{
'@type': 'passerelle-resource',
'access_rights': [{'apiuser': 'all', 'codename': 'can_access'}],
'description': 'resource',
'ldap_bind_dn': None,
'ldap_bind_password': None,
'ldap_tls_cert': {
'name': 'cert.pem',
'content': 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lVVEtvcFQ3NkNGbHNWY0k3RkFpbGFZTElMejBvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1ZEhKdmRYWmxjblF1YjNKbk1CNFhEVEU0TVRJdwpOVEUyTlRreU5Gb1hEVEk0TVRJd01qRTJOVGt5TkZvd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1CmRISnZkWFpsY25RdWIzSm5NSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTR4c0wKbjI1eWl0dGJqazViY0t2WTJJOHpQaXZMNllXbjJNSmltYVFRU056Q3cvOFBPbVZQTG1NSWIzbGNaanlkRlJhZAorUlR4WmZudXZDQ0pybkdyRzdoT3NKTmVuVExMVTB1Z04veVExODY5Y00wN2E5dGpTekw3TkN6OUgxTklLMStRCmNCc1RFeGM3N2RPV3B3V0k5VGpxWVlSTCt6ZXgzbWw4Y2RxY1E3QlFVUXhBdkE0VVU2M0RNMkcrNU8zZEU3bDgKdXZ5QlVVM2tXL3NoSHloZndlV05YTzhJWFhJanZEZlBZa09zamM2ZW4ya0ZNcitzRU5TVUtnZkRLanovVXpxeQpTN0xCYjR0a0pBTFpNOFFQNTZWZVFBRzFKWkYySjIveTFScUJmSUdSSUVrWW9hSGNqNlVBVFphMXhjWmpNdWJMCnozb3RSTlljUlhLSk1ZV0diUUlEQVFBQm96SXdNREFKQmdOVkhSTUVBakFBTUNNR0ExVWRFUVFjTUJxQ0dHeHYKWTJGc2FHOXpkQzVsYm5SeWIzVjJaWEowTG05eVp6QU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFGVlBhdkJhaAptSWpnblRqcTZaYkZ4WFROSlcwVHJxTjhvbGJLSjZTZndXVmswSThweDdQT2VrRmFYZCtlZ3NGSmxXWXlIOXE0CkhrS290ZGRSWVlyV29YY1Bpb2ROZlVhK2JSbmgyV1lsMnJFR01XNWRiQmYvTVlDRHRzNjhjM1NvQTdKSVlKOHcKMFFaR0FraWpLTnRWTUwwL0ZyTHVKV2JmRkJBV0g4SkI0NkJjQWcvOGZsYk1IQVVMelYzRjFnL3YwQTNGRzNZLwo5ZlZyK2xONXFzK05COU5YSU1kZjV3WHJtSlFZUmpvdHlPalVPNnlURnFERnZxRTdERXBLUUQ1aG52cUpvWEN6CnpZUVMxRGpIMXFTUmM1dkM4STdZbEpvd0Nmbkk5TXNFSUNTcnNrNzVEaFQwOTFhSkMyWFg5M280emhmTnhtTzUKS2oyOGhQODdHSGdOSWc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==', # pylint: disable=line-too-long
},
'ldap_tls_key': {
'name': 'key.pem',
'content': 'LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRRGpHd3VmYm5LSzIxdU8KVGx0d3E5allqek0rSzh2cGhhZll3bUtacEJCSTNNTEQvdzg2WlU4dVl3aHZlVnhtUEowVkZwMzVGUEZsK2U2OApJSW11Y2FzYnVFNndrMTZkTXN0VFM2QTMvSkRYenIxd3pUdHIyMk5MTXZzMExQMGZVMGdyWDVCd0d4TVRGenZ0CjA1YW5CWWoxT09waGhFdjdON0hlYVh4eDJweERzRkJSREVDOERoUlRyY016WWI3azdkMFR1WHk2L0lGUlRlUmIKK3lFZktGL0I1WTFjN3doZGNpTzhOODlpUTZ5TnpwNmZhUVV5djZ3UTFKUXFCOE1xUFA5VE9ySkxzc0Z2aTJRawpBdGt6eEEvbnBWNUFBYlVsa1hZbmIvTFZHb0Y4Z1pFZ1NSaWhvZHlQcFFCTmxyWEZ4bU15NXN2UGVpMUUxaHhGCmNva3hoWVp0QWdNQkFBRUNnZ0VBT1VaSTJCeHlwckpMbE1nT0o0d3ZVKzVKYmhSOWlKYzhqVjM0bitiUWRJKzQKVHRXMGNYVzdVbWVIYVJXaVIrWmhkMEFNOXhSaERPYkxYb2FXTW5oWVB0VnNndnVua04yT2lhTTQ5T1d0WWIreAo1eERiTzRoSXNsNVpHLzk4bHJuYUtaWWdSeVdNMmZPeUdYaVROZXdmYmppOFkzdUo3Z0ZOeWxtd0dNYVpRamhyCllOYXFORVY3VnMybjdvRVJ4cXpLRzk5NDdvQkF4MmhwbW9hVzZlTXlYY1dsMm92N2lIcEpTS1VCS2hvKzVQV2MKSjczMW5vME9zR3VTKzNqSGEvMG5aWHJUOG5LbWVteURNZFNmV210VHY2NTlML2d1RkluWnBIUGZGVkxmNTZ2YwpKNnpiL0l6RUpWK05oN0NmQnNNYkhsVFlCZVVGbFJXc3k5dDcwK09ad1FLQmdRRDNKNGFWTjR2SmhYWDF6RGdMCmRWQWN6d0xHS1hZMzhCb0JqT2VSdFZoWEhzNXAvZU5JcWVaMlliWXdCQnkzbkw0MTRVbjdncWI5ZkR0ZzBpM24KNW1RSU9XaHZwSVlVeHR3SVBnWXd6dW14eHAvbjdYZFU0QlBEYnhlalp4a3VDN0FSNWJCMzRwd0pBSnZXUkdFZgowWDFUeEpscVVMaGlaNmcxOE8zUzBvaUp0d0tCZ1FEck85Uk9hajZra3hqWUhtbGpCWklYWGhkS0RjbjBBcVBpCncyMEFhYWZ4MG94TlFBb3E4R3R1MjJaMVFId1JkQmVVSndxQ2JtSFZDQ3diTWYvNTY4ekZBQU51VDliS01lNlgKSjBwMG5URGl5bjh3OU1mZHVGdUc0Y1VNbjRvSzZkSXVZbHNjZ3VvUFFDdlFkY2l3RytkanF3VHJIaWI1VEVibQpqZUtFa1kyQSt3S0JnUUR2WHQrd3kyQmVxQnpNRjZNOExiMk9lVXdWZ25pVnlyeFZQaFBWZ2s1eDZrcytTb0FECmsxRzYyLzNvMlVLNjdsc21zZkRHWUE2OXVNR0ZqMnFZakFIY0dVVzF3eUY5SS9CZEp6MDFybUNXSm1vZTVWWEsKNVU4ZTNBeUgzTVY5WENLRjR2Q2IyK1VGcndvL1puQ3VzV1Z4YVJxdzVrYitQNmlodlp1SXNSRStWd0tCZ1FDNwoyZHVCZzNiakZsVVF3YmlIU3p1UFRhUnJqdmRuMVhQcTh3Vm8vdmNQTm9TMGJCK3lpcXhBcXhUM0xiZm1lRDhjCklORlR0N0tJM1MzYnllSVJReTBUWlI5WVNJbk9qbkZxWkFZaGVpWS85bFg4VW40Sm9kLzFwdllsVG9KK2xKczAKVDNkVEhYaXRGU0hvSnlkTSsvdWNyRVlSUE5NQzR0Yjc1dkt0eTA2bFlRS0JnUUN4NDkrZzVrUWFhUlorNHBzdworZW9sTXBBd0tEa3BLNWdZZW42T3NyVDhtNGhweFRtdGl0ZU1zSDVBdmIvZnhxb0pXTE9qaE40RW5FWlRNSnpyCkx5R29Lc1R2N3JoWndoUnpuRTE1ck96eG1sZFdyY0NrbDdER3VNMkdjS2dndWhDWUY3VTdLQSt2VUNlcUNFMEgKTEEyZ3JrWStUeEZwZzFwd1lkRjFoZWttVHc9PQotLS0tLUVORCBQUklWQVRFIEtFWS0tLS0tCg==', # pylint: disable=line-too-long
},
'ldap_tls_cacert': None,
'ldap_tls_check_hostname': True,
'ldap_tls_check_cert': True,
'ldap_url': 'ldap://localhost.entrouvert.org:52271',
'log_level': 'INFO',
'resource_type': 'ldap.resource',
'slug': 'resource',
'title': 'resource',
}
],
}
class TestImportExport:
@pytest.fixture
def resource_params(self, resource_params, cert_content, key_content):
return {
**resource_params,
'ldap_url': 'ldap://localhost.entrouvert.org:52271',
'ldap_tls_cert': ContentFile(cert_content, name='cert.pem'),
'ldap_tls_key': ContentFile(key_content, name='key.pem'),
}
def test_import(self, app, resource_class, resource_params):
assert not resource_class.objects.count()
response = app.get('/manage/')
response = response.click('Import')
response.form.set('site_json', Upload('ldap.json', json.dumps(EXPORT_JSON).encode()))
response.form.set('import_users', False)
response = response.form.submit(status=302)
instance = resource_class.objects.get()
for key, value in resource_params.items():
instance_value = getattr(instance, key)
if isinstance(value, ContentFile):
with instance_value as fd1:
with value as fd2:
assert fd1.read() == fd2.read()
else:
assert instance_value == value
def test_export(self, app, resource, cert_content, key_content):
response = app.get('/ldap/resource/')
response = response.click('Export')
content = response.json
for r in content['resources']:
if r.get('ldap_tls_cert'):
r['ldap_tls_cert']['name'] = 'cert.pem'
if r.get('ldap_tls_key'):
r['ldap_tls_key']['name'] = 'key.pem'
assert content == EXPORT_JSON

72
tests/ldap/test_model.py Normal file
View File

@ -0,0 +1,72 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ldap
import pytest
from django.core.files.base import ContentFile
def test_get_connection(resource):
resource.get_connection()
class TestCheckStatus:
def test_nok(self, resource):
with pytest.raises(ldap.LDAPError):
resource.check_status()
def test_ok(self, resource, ldap_server):
resource.check_status()
class TestTLSAuthentication:
@pytest.fixture
def ldap_params(self, ldap_params, key, cert):
ldap_params['ldap_url'] = ldap_params['ldap_url'].replace('ldap:', 'ldaps:')
return {**ldap_params, 'tls': (str(key), str(cert))}
@pytest.fixture
def ldap_configure(self, ldap_object, cert):
conn = ldap_object.get_connection_admin()
conn.modify_s(
'cn=config',
[
(ldap.MOD_ADD, 'olcTLSCACertificateFile', str(cert).encode()),
(ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
],
)
@pytest.fixture
def resource_params(self, resource_params, cert_content, key_content):
return {
**resource_params,
'ldap_tls_cert': ContentFile(cert_content, name='cert.pem'),
'ldap_tls_key': ContentFile(key_content, name='key.pem'),
'ldap_tls_cacert': ContentFile(cert_content, name='cert.pem'),
}
def test_ok(self, resource, ldap_server):
resource.check_status()
class TestLdapSearch:
def test_nok(self, resource):
with pytest.raises(ldap.LDAPError):
list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
def test_ok(self, resource, ldap_server):
entries = list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
assert entries == [('o=orga', {'o': 'orga', 'objectclass': 'organization'})]

View File

@ -0,0 +1,265 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ldap
import pytest
@pytest.fixture
def ldap_configure(ldap_object):
# configure approximative indexes
conn = ldap_object.get_connection_admin()
ldif = [
(
ldap.MOD_ADD,
'olcDbIndex',
b'cn,sn,mail pres,eq,approx',
)
]
conn.modify_s('olcDatabase={%s}mdb,cn=config' % (ldap_object.db_index - 1), ldif)
# add some entries
ldap_object.add_ldif(
'''
dn: uid=johndoe,o=orga
objectClass: inetOrgPerson
uid: johndoe
cn: John Doe
sn: Doe
gn: John
dn: uid=janedoe,o=orga
objectClass: inetOrgPerson
uid: janedoe
cn: Jane Doe
sn: Doe
gn: Jane
dn: uid=janefoo,uid=janedoe,o=orga
objectClass: inetOrgPerson
uid: janefoo
cn: Jane Foo
sn: Foo
gn: Jane
'''
)
def test_server_unavailaible(app, resource):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {
'data': [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}],
'err': 1,
'err_class': 'directory-server-unavailable',
'err_desc': '{\'result\': -1, \'desc\': "Can\'t contact LDAP server", '
"'errno': 107, 'ctrls': [], 'info': 'Transport endpoint is not "
"connected'}",
}
def test_q(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
'dn': 'uid=johndoe,o=orga',
'id': 'johndoe',
'text': 'John Doe',
},
],
}
def test_q_prefix(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'prefix',
},
)
assert response.json == {
'err': 0,
'data': [],
}
response = app.get(
'/ldap/resource/search',
params={
'q': 'jane',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'prefix',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'Jane Foo', 'uid': 'janefoo'},
'dn': 'uid=janefoo,uid=janedoe,o=orga',
'id': 'janefoo',
'text': 'Jane Foo',
},
],
}
def test_q_approx(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'jne do',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'approx',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
'dn': 'uid=johndoe,o=orga',
'id': 'johndoe',
'text': 'John Doe',
},
],
}
def test_id(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'id': 'janedoe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {
'cn': 'Jane Doe',
'uid': 'janedoe',
},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
}
],
}
def test_sizelimit(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'sizelimit': '1',
},
)
assert len(response.json['data']) == 1
def test_text_template(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'id': 'janedoe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert response.json['data'][0]['text'] == 'Doe Jane (janedoe)'
def test_scope(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Foo',
'scope': 'onelevel',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert len(response.json['data']) == 0
response = app.get(
'/ldap/resource/search',
params={
'q': 'Foo',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert len(response.json['data']) == 1

View File

@ -46,6 +46,7 @@ deps =
responses
zeep<3.3
codestyle: pre-commit
ldaptools
commands =
./get_wcs.sh
py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
@ -80,5 +81,6 @@ deps =
pytest-freezegun
responses
mohawk
ldaptools
commands =
./pylint.sh passerelle/ tests/