diff --git a/passerelle/apps/ldap/models.py b/passerelle/apps/ldap/models.py index 28fef1d7..9389de28 100644 --- a/passerelle/apps/ldap/models.py +++ b/passerelle/apps/ldap/models.py @@ -39,6 +39,7 @@ LDAP_HAS_OPT_X_TLS_REQUIRE_SAN = hasattr(ldap, 'OPT_X_TLS_REQUIRE_SAN') # only SEARCH_OP_SUBSTRING = 'substring' SEARCH_OP_PREFIX = 'prefix' SEARCH_OP_APPROX = 'approx' +SEARCH_OP_EXACT = 'exact' class Resource(BaseResource): @@ -295,7 +296,7 @@ class Resource(BaseResource): }, 'search_op': { 'description': _( - 'Search operator, can be "substring" (the default value), "prefix" or "approx"' + 'Search operator, can be "substring" (the default value), "prefix", "approx" or "exact"' ), 'example_value': SEARCH_OP_SUBSTRING, }, @@ -305,8 +306,8 @@ class Resource(BaseResource): self, request, ldap_base_dn, - search_attribute, id_attribute, + search_attribute=None, text_template=None, ldap_attributes=None, id=None, @@ -316,23 +317,31 @@ class Resource(BaseResource): filter=None, search_op=SEARCH_OP_SUBSTRING, ): - search_attribute = search_attribute.lower() + if not q and not id and not filter: + raise APIError('filter or q or id are mandatory parameters', http_status=400) + if q and not search_attribute: + raise APIError('search_attribute is mandatory with q parameter', http_status=400) + if not search_attribute and not text_template: + raise APIError('search_attribute or text_template are mandatory parameters', http_status=400) + if search_attribute: + search_attribute = search_attribute.lower() + if not search_attribute.isascii(): + raise APIError('search_attribute contains non ASCII characters', http_status=400) id_attribute = id_attribute.lower() - if not search_attribute.isascii(): - raise APIError('search_attribute contains non ASCII characters') if not id_attribute.isascii(): - raise APIError('id_attribute contains non ASCII characters') + raise APIError('id_attribute contains non ASCII characters', http_status=400) ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set() - ldap_attributes.update([search_attribute, id_attribute]) + ldap_attributes.add(id_attribute) + if search_attribute: + ldap_attributes.add(search_attribute) if not all(attribute.isascii() for attribute in ldap_attributes): - raise APIError('ldap_attributes contains non ASCII characters') + raise APIError('ldap_attributes contains non ASCII characters', http_status=400) try: sizelimit = int(sizelimit) except (ValueError, TypeError): pass sizelimit = max(1, min(sizelimit or 30, 200)) - if not q and not id: - raise APIError('q or id are mandatory parameters', http_status=400) + ldap_filter = None if id: ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id)) elif q: @@ -342,12 +351,17 @@ class Resource(BaseResource): ldap_filter = '(%s=%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q)) elif search_op == SEARCH_OP_APPROX: ldap_filter = '(%s~=%s)' % (search_attribute, ldap.filter.escape_filter_chars(q)) + elif search_op == SEARCH_OP_EXACT: + ldap_filter = '(%s=%s)' % (search_attribute, ldap.filter.escape_filter_chars(q)) else: - raise APIError('unknown search_op %r' % search_op) + raise APIError('unknown search_op %r' % search_op, http_status=400) if filter: if not filter.startswith('('): filter = '(%s)' % filter - ldap_filter = '(&%s%s)' % (ldap_filter, filter) + if ldap_filter: + ldap_filter = '(&%s%s)' % (ldap_filter, filter) + else: + ldap_filter = filter scopes = { 'subtree': ldap.SCOPE_SUBTREE, 'onelevel': ldap.SCOPE_ONELEVEL, diff --git a/tests/ldap/test_search_endpoint.py b/tests/ldap/test_search_endpoint.py index 36f22f31..f3f3f1da 100644 --- a/tests/ldap/test_search_endpoint.py +++ b/tests/ldap/test_search_endpoint.py @@ -179,13 +179,69 @@ def test_q_approx(app, resource, ldap_server): } +def test_q_exact(app, resource, ldap_server): + response = app.get( + '/ldap/resource/search', + params={ + 'q': 'Jane Doe', + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + 'search_op': 'exact', + }, + ) + assert response.json == { + 'err': 0, + 'data': [ + { + 'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'}, + 'dn': 'uid=janedoe,o=orga', + 'id': 'janedoe', + 'text': 'Jane Doe', + }, + ], + } + response = app.get( + '/ldap/resource/search', + params={ + 'q': 'Foo Bar', + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + 'search_op': 'exact', + }, + ) + assert response.json == { + 'err': 0, + 'data': [], + } + + +def test_bad_search_op(app, resource, ldap_server): + response = app.get( + '/ldap/resource/search', + params={ + 'q': 'Jane Doe', + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + 'search_op': 'bad search op', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == "unknown search_op 'bad search op'" + assert response.json['data'] is None + + def test_id(app, resource, ldap_server): response = app.get( '/ldap/resource/search', params={ 'id': 'janedoe', 'ldap_base_dn': 'o=orga', - 'search_attribute': 'cn', + 'ldap_attributes': 'cn', + 'text_template': '{{ cn }}', 'id_attribute': 'uid', }, ) @@ -205,6 +261,60 @@ def test_id(app, resource, ldap_server): } +def test_filter(app, resource, ldap_server): + for filter in ('(cn~=Jane Doe)', 'cn~=Jane Doe'): + response = app.get( + '/ldap/resource/search', + params={ + 'filter': filter, + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + }, + ) + assert response.json == { + 'err': 0, + 'data': [ + { + 'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'}, + 'dn': 'uid=janedoe,o=orga', + 'id': 'janedoe', + 'text': 'Jane Doe', + }, + { + 'attributes': {'cn': 'John Doe', 'uid': 'johndoe'}, + 'dn': 'uid=johndoe,o=orga', + 'id': 'johndoe', + 'text': 'John Doe', + }, + ], + } + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': 'cn~=Jane Doe', + 'id': 'nobody', + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + }, + ) + assert response.json == {'err': 0, 'data': []} + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': 'cn~=Jane Doe', + 'q': 'nobody', + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + }, + ) + assert response.json == {'err': 0, 'data': []} + + def test_sizelimit(app, resource, ldap_server): response = app.get( '/ldap/resource/search', @@ -261,3 +371,89 @@ def test_scope(app, resource, ldap_server): }, ) assert len(response.json['data']) == 1 + + +def test_missing_q_id_filter(app, resource, ldap_server): + response = app.get( + '/ldap/resource/search', + params={ + 'ldap_base_dn': 'o=orga', + 'search_attribute': 'cn', + 'id_attribute': 'uid', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'filter or q or id are mandatory parameters' + assert response.json['data'] is None + + +def test_bad_requests(app, resource, ldap_server): + response = app.get( + '/ldap/resource/search', + params={ + 'q': 'Jane Doe', + 'ldap_base_dn': 'o=orga', + 'id_attribute': 'uid', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'search_attribute is mandatory with q parameter' + assert response.json['data'] is None + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': "(cn~=Jane Doe)", + 'ldap_base_dn': 'o=orga', + 'id_attribute': 'uid', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'search_attribute or text_template are mandatory parameters' + assert response.json['data'] is None + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': "(cn~=Jane Doe)", + 'ldap_base_dn': 'o=orga', + 'id_attribute': 'uid', + 'search_attribute': 'bloqué', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'search_attribute contains non ASCII characters' + assert response.json['data'] is None + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': "(cn~=Jane Doe)", + 'ldap_base_dn': 'o=orga', + 'id_attribute': 'bloqué', + 'search_attribute': 'cn', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'id_attribute contains non ASCII characters' + assert response.json['data'] is None + + response = app.get( + '/ldap/resource/search', + params={ + 'filter': "(cn~=Jane Doe)", + 'ldap_base_dn': 'o=orga', + 'id_attribute': 'uid', + 'search_attribute': 'cn', + 'ldap_attributes': 'bloqué', + }, + status=400, + ) + assert response.json['err'] == 1 + assert response.json['err_desc'] == 'ldap_attributes contains non ASCII characters' + assert response.json['data'] is None