passerelle/tests/ldap/test_search_endpoint.py

460 lines
13 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ldap
import pytest
@pytest.fixture
def ldap_configure(ldap_object):
# configure approximative indexes
conn = ldap_object.get_connection_admin()
ldif = [
(
ldap.MOD_ADD,
'olcDbIndex',
b'cn,sn,mail pres,eq,approx',
)
]
conn.modify_s('olcDatabase={%s}mdb,cn=config' % (ldap_object.db_index - 1), ldif)
# add some entries
ldap_object.add_ldif(
'''
dn: uid=johndoe,o=orga
objectClass: inetOrgPerson
uid: johndoe
cn: John Doe
sn: Doe
gn: John
dn: uid=janedoe,o=orga
objectClass: inetOrgPerson
uid: janedoe
cn: Jane Doe
sn: Doe
gn: Jane
dn: uid=janefoo,uid=janedoe,o=orga
objectClass: inetOrgPerson
uid: janefoo
cn: Jane Foo
sn: Foo
gn: Jane
'''
)
def test_server_unavailaible(app, resource):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json['err'] == 1
assert response.json['data'] == [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}]
assert response.json['err_class'] == 'directory-server-unavailable'
assert "'info': 'Transport endpoint is not connected'" in response.json['err_desc']
assert "'errno': 107" in response.json['err_desc']
assert "'desc': \"Can't contact LDAP server\"" in response.json['err_desc']
def test_q(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
'dn': 'uid=johndoe,o=orga',
'id': 'johndoe',
'text': 'John Doe',
},
],
}
def test_q_prefix(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'prefix',
},
)
assert response.json == {
'err': 0,
'data': [],
}
response = app.get(
'/ldap/resource/search',
params={
'q': 'jane',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'prefix',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'Jane Foo', 'uid': 'janefoo'},
'dn': 'uid=janefoo,uid=janedoe,o=orga',
'id': 'janefoo',
'text': 'Jane Foo',
},
],
}
def test_q_approx(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'jne do',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'approx',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
'dn': 'uid=johndoe,o=orga',
'id': 'johndoe',
'text': 'John Doe',
},
],
}
def test_q_exact(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Jane Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'exact',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
],
}
response = app.get(
'/ldap/resource/search',
params={
'q': 'Foo Bar',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'exact',
},
)
assert response.json == {
'err': 0,
'data': [],
}
def test_bad_search_op(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Jane Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'search_op': 'bad search op',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == "unknown search_op 'bad search op'"
assert response.json['data'] is None
def test_id(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'id': 'janedoe',
'ldap_base_dn': 'o=orga',
'ldap_attributes': 'cn',
'text_template': '{{ cn }}',
'id_attribute': 'uid',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {
'cn': 'Jane Doe',
'uid': 'janedoe',
},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
}
],
}
def test_filter(app, resource, ldap_server):
for filter in ('(cn~=Jane Doe)', 'cn~=Jane Doe'):
response = app.get(
'/ldap/resource/search',
params={
'filter': filter,
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {
'err': 0,
'data': [
{
'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
'dn': 'uid=janedoe,o=orga',
'id': 'janedoe',
'text': 'Jane Doe',
},
{
'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
'dn': 'uid=johndoe,o=orga',
'id': 'johndoe',
'text': 'John Doe',
},
],
}
response = app.get(
'/ldap/resource/search',
params={
'filter': 'cn~=Jane Doe',
'id': 'nobody',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {'err': 0, 'data': []}
response = app.get(
'/ldap/resource/search',
params={
'filter': 'cn~=Jane Doe',
'q': 'nobody',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
)
assert response.json == {'err': 0, 'data': []}
def test_sizelimit(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Doe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'sizelimit': '1',
},
)
assert len(response.json['data']) == 1
def test_text_template(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'id': 'janedoe',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert response.json['data'][0]['text'] == 'Doe Jane (janedoe)'
def test_scope(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Foo',
'scope': 'onelevel',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert len(response.json['data']) == 0
response = app.get(
'/ldap/resource/search',
params={
'q': 'Foo',
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
'ldap_attributes': 'sn givenname',
'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
},
)
assert len(response.json['data']) == 1
def test_missing_q_id_filter(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'ldap_base_dn': 'o=orga',
'search_attribute': 'cn',
'id_attribute': 'uid',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'filter or q or id are mandatory parameters'
assert response.json['data'] is None
def test_bad_requests(app, resource, ldap_server):
response = app.get(
'/ldap/resource/search',
params={
'q': 'Jane Doe',
'ldap_base_dn': 'o=orga',
'id_attribute': 'uid',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'search_attribute is mandatory with q parameter'
assert response.json['data'] is None
response = app.get(
'/ldap/resource/search',
params={
'filter': "(cn~=Jane Doe)",
'ldap_base_dn': 'o=orga',
'id_attribute': 'uid',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'search_attribute or text_template are mandatory parameters'
assert response.json['data'] is None
response = app.get(
'/ldap/resource/search',
params={
'filter': "(cn~=Jane Doe)",
'ldap_base_dn': 'o=orga',
'id_attribute': 'uid',
'search_attribute': 'bloqué',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'search_attribute contains non ASCII characters'
assert response.json['data'] is None
response = app.get(
'/ldap/resource/search',
params={
'filter': "(cn~=Jane Doe)",
'ldap_base_dn': 'o=orga',
'id_attribute': 'bloqué',
'search_attribute': 'cn',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'id_attribute contains non ASCII characters'
assert response.json['data'] is None
response = app.get(
'/ldap/resource/search',
params={
'filter': "(cn~=Jane Doe)",
'ldap_base_dn': 'o=orga',
'id_attribute': 'uid',
'search_attribute': 'cn',
'ldap_attributes': 'bloqué',
},
status=400,
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'ldap_attributes contains non ASCII characters'
assert response.json['data'] is None