# passerelle - uniform access to multiple data sources and services # Copyright (C) 2023 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import os import re from unittest import mock from unittest.mock import Mock, call import py import pytest import responses from cmislib import CmisClient from cmislib.exceptions import ( CmisException, InvalidArgumentException, ObjectNotFoundException, PermissionDeniedException, UpdateConflictException, ) from django.contrib.contenttypes.models import ContentType from django.urls import reverse from django.utils.encoding import force_bytes, force_str from passerelle.apps.cmis.models import CmisConnector from passerelle.base.models import AccessRight, ApiUser from tests.test_manager import login def b64encode(content): return force_str(base64.b64encode(force_bytes(content))) @pytest.fixture() def setup(db): api = ApiUser.objects.create(username='all', keytype='', key='') conn = CmisConnector.objects.create( cmis_endpoint='http://example.com/cmisatom', username='admin', password='admin', slug='slug-cmis' ) obj_type = ContentType.objects.get_for_model(conn) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=conn.pk ) return conn def test_uploadfile(app, setup, tmpdir, monkeypatch): class FakeCMISGateway: def __init__(self, *args, **kwargs): pass def create_doc( self, file_name, file_path, file_byte_content, content_type=None, object_type=None, properties=None, ): assert content_type == "image/jpeg" with open(file_name, 'wb') as f: f.write(file_byte_content) return Mock(properties={"toto": "tata"}) file_name = "testfile.whatever" file_content = 'aaaa' monkeypatch.chdir(tmpdir) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CMISGateway', FakeCMISGateway) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": file_name, "content": b64encode(file_content), "content_type": "image/jpeg"}, }, ) result_file = py.path.local(file_name) assert result_file.exists() with result_file.open('rb'): assert result_file.read() == file_content json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties'] == {"toto": "tata"} file_name_overwrite = "testfile.whatever.overwrite" response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": file_name, "content": b64encode(file_content), "content_type": "image/jpeg"}, "filename": file_name_overwrite, }, ) result_file = py.path.local(file_name_overwrite) assert result_file.exists() with result_file.open('rb'): assert result_file.read() == file_content json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties'] == {"toto": "tata"} def test_upload_file_metadata(app, setup, monkeypatch): class FakeFolder: def createDocument(self, filename, contentFile, properties, contentType=None): return Mock(properties=properties) from passerelle.apps.cmis.models import CMISGateway monkeypatch.setattr(CMISGateway, '_get_or_create_folder', lambda x, y: FakeFolder()) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": "bla", "content": b64encode('bla')}, "object_type": "D:dui:type", "properties": { "cmis:description": "Coucou", "dui:tnumDossier": "42", }, "properties/dui:ttypeStructure": "Accueil de loisirs", }, ) assert response.json['data']['properties'] == { "cmis:objectTypeId": "D:dui:type", "cmis:description": "Coucou", "dui:tnumDossier": "42", "dui:ttypeStructure": "Accueil de loisirs", } def test_uploadfile_error_if_no_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'].startswith('"filename" or "file[\'filename\']" is required') def test_uploadfile_error_if_non_string_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": 1, "content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file/filename: 1 is not of type 'string'" response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"content": b64encode('aaaa'), "content_type": "image/jpeg"}, "filename": 1, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "filename: 1 is not of type 'string'" def test_uploadfile_error_if_non_valid_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": ",.,", "content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "',.,' does not match " in response.json['err_desc'] response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"content": b64encode('aaaa'), "content_type": "image/jpeg"}, "filename": ",.,", }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "',.,' does not match " in response.json['err_desc'] def test_uploadfile_error_if_no_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "file": {"filename": 'somefile.txt', "content": b64encode('aaaa'), "content_type": "image/jpeg"} }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "'path' is a required property" def test_uploadfile_error_if_non_string_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": 1, "file": {"filename": 'somefile.txt', "content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "path: 1 is not of type 'string'" def test_uploadfile_error_if_no_regular_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "no/leading/slash", "file": {"filename": 'somefile.txt', "content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "'no/leading/slash' does not match " in response.json['err_desc'] def test_uploadfile_error_if_no_file_content(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": 'somefile.txt', "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file: 'content' is a required property" def test_uploadfile_error_if_non_string_file_content(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": 'somefile.txt', "content": 1, "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file/content: 1 is not of type 'string'" def test_uploadfile_error_if_no_proper_base64_encoding(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": 'somefile.txt', "content": "1", "content_type": "image/jpeg"}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'].startswith('"file[\'content\']" must be a valid base64 string') def test_uploadfile_cmis_gateway_error(app, setup, monkeypatch): from passerelle.utils.jsonresponse import APIError cmis_gateway = Mock() cmis_gateway.create_doc.side_effect = APIError("some error") cmis_gateway_cls = Mock(return_value=cmis_gateway) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CMISGateway', cmis_gateway_cls) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ "path": "/some/folder/structure", "file": {"filename": "file_name", "content": b64encode('aaaa'), "content_type": "image/jpeg"}, }, ) assert response.json['err'] == 1 assert response.json['err_desc'].startswith("some error") def test_get_or_create_folder_already_existing(monkeypatch): default_repository = Mock() default_repository.getObjectByPath.return_value = 'folder' cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository)) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CmisClient', cmis_client_cls) gateway = passerelle.apps.cmis.models.CMISGateway('cmis_endpoint', 'user', 'pass', Mock()) assert gateway._get_or_create_folder('/whatever') == 'folder' default_repository.getObjectByPath.assert_has_calls([call('/whatever')]) def test_get_or_create_folder_one_level_creation(monkeypatch): root_folder = Mock() root_folder.createFolder.return_value = 'folder' default_repository = Mock( rootFolder=root_folder, **{'getObjectByPath.side_effect': ObjectNotFoundException()} ) cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository)) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CmisClient', cmis_client_cls) gateway = passerelle.apps.cmis.models.CMISGateway('cmis-url', 'user', 'password', Mock()) assert gateway._get_or_create_folder('/whatever') == 'folder' default_repository.getObjectByPath.assert_has_calls([call('/whatever'), call('/whatever')]) root_folder.createFolder.assert_called_once_with('whatever') def test_get_or_create_folder_two_level_creation(monkeypatch): whatever_folder = Mock() whatever_folder.createFolder.return_value = 'folder' root_folder = Mock() root_folder.createFolder.return_value = whatever_folder default_repository = Mock(rootFolder=root_folder) default_repository.getObjectByPath.side_effect = ObjectNotFoundException() cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository)) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CmisClient', cmis_client_cls) gateway = passerelle.apps.cmis.models.CMISGateway('cmis_url', 'user', 'password', Mock()) assert gateway._get_or_create_folder('/whatever/man') == 'folder' default_repository.getObjectByPath.assert_has_calls( [call('/whatever/man'), call('/whatever'), call('/whatever/man')] ) root_folder.createFolder.assert_called_once_with('whatever') whatever_folder.createFolder.assert_called_once_with('man') def test_get_or_create_folder_with_some_existing_and_some_not(monkeypatch): whatever_folder = Mock() whatever_folder.createFolder.return_value = 'folder' def getObjectByPath(path): if path == '/whatever': return whatever_folder elif path == '/whatever/man': raise ObjectNotFoundException() else: raise Exception("I should not be called with: %s" % path) root_folder = Mock() default_repository = Mock(rootFolder=root_folder) default_repository.getObjectByPath.side_effect = getObjectByPath cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository)) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CmisClient', cmis_client_cls) gateway = passerelle.apps.cmis.models.CMISGateway('cmis_url', 'user', 'password', Mock()) assert gateway._get_or_create_folder('/whatever/man') == 'folder' root_folder.createFolder.assert_not_called() whatever_folder.createFolder.assert_called_once_with('man') def test_create_doc(): from passerelle.apps.cmis.models import CMISGateway gateway = CMISGateway('cmis_url', 'user', 'password', Mock()) folder = Mock() folder.createDocument.return_value = 'doc' gateway._get_or_create_folder = Mock(return_value=folder) assert gateway.create_doc('filename', '/some/path', b'file_content') == 'doc' gateway._get_or_create_folder.assert_called_once_with('/some/path') args, kwargs = folder.createDocument.call_args assert args[0] == 'filename' content_file = kwargs['contentFile'] assert content_file.read() == b'file_content' @pytest.mark.parametrize( "cmis_exc,err_msg", [ (PermissionDeniedException, "permission denied"), (UpdateConflictException, "update conflict"), (InvalidArgumentException, "invalid property"), (CmisException, "cmis binding error"), ], ) def test_wrap_cmis_error(app, setup, monkeypatch, cmis_exc, err_msg): from passerelle.apps.cmis.models import wrap_cmis_error from passerelle.utils.jsonresponse import APIError @wrap_cmis_error def dummy_func(): raise cmis_exc("some error") with pytest.raises(APIError) as excinfo: dummy_func() assert str(excinfo.value).startswith(err_msg) def test_re_file_path(): from passerelle.apps.cmis.models import FILE_PATH_PATTERN RE_FILE_PATH = re.compile(FILE_PATH_PATTERN) assert RE_FILE_PATH.match('/') assert RE_FILE_PATH.match('/some') assert RE_FILE_PATH.match('/some/path') assert RE_FILE_PATH.match('/SOME/PATH') assert RE_FILE_PATH.match('/some/long/path') assert RE_FILE_PATH.match('/some/digits/12/and/CAPITALS') assert RE_FILE_PATH.match('/some/!#$%&+-^_`~;[]{}+=~') assert not RE_FILE_PATH.match('/trailing/slash/') assert not RE_FILE_PATH.match('no/leading/slash') assert not RE_FILE_PATH.match('/multiple//slash') assert not RE_FILE_PATH.match('') def test_re_file_name(): from passerelle.apps.cmis.models import FILE_NAME_PATTERN RE_FILE_NAME = re.compile(FILE_NAME_PATTERN) assert RE_FILE_NAME.match('toto.tata') assert RE_FILE_NAME.match('TOTO.TATA') def test_cmis_types_view(setup, app, admin_user, monkeypatch): class FakeCmisType: class FakeCmisProperty: def __init__(self, id): self.id = id self.description = 'cmis:prop' self.propertyType = 'string' self.required = True def __init__(self, id, children=None): self.id = id self.description = 'hop' prop = self.FakeCmisProperty('cmis:prop') prop2 = self.FakeCmisProperty('cmis:prop2') self.properties = {prop.id: prop, prop2.id: prop2} self.children = children or [] class FakeCmisRepo: def __init__(self, root_types): self.root_types = root_types def getTypeDefinition(self, id): if id in self.root_types: return self.root_types[id] for type in self.root_types.values(): for child in type.children: if child.id == id: return child raise ObjectNotFoundException def getTypeDefinitions(self): return self.root_types.values() def getTypeChildren(self, id): return self.getTypeDefinition(id).children children = [FakeCmisType('cmis:child1'), FakeCmisType('cmis:child2')] root_type1 = FakeCmisType('cmis:root1', children=children) root_type2 = FakeCmisType('cmis:root2') root_types = {root_type1.id: root_type1, root_type2.id: root_type2} repo = FakeCmisRepo(root_types) cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=repo)) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CmisClient', cmis_client_cls) app = login(app) resp = app.get('/cmis/slug-cmis/') resp = resp.click('Explore available object types') assert all(id in resp.text for id in root_types) assert 'Back to' not in resp.text assert 'Children' not in resp.text assert 'Properties' not in resp.text resp = resp.click(root_type1.id) assert all(id in resp.text for id in root_type1.properties) assert all(child.id in resp.text for child in root_type1.children) resp = resp.click(children[0].id) assert "No more children." in resp.text resp = resp.click("Back to base types list") resp = resp.click(root_type2.id) assert "No more children." in resp.text resp = app.get('/manage/cmis/slug-cmis/type?id=wrong', status=404) @pytest.mark.parametrize('debug', (False, True)) @responses.activate def test_raw_uploadfile(app, setup, debug, caplog): """ Simulate the bellow bash query : $ http https://passerelle.dev.publik.love/cmis/ged/uploadfile \ file:='{"filename": "test2", "content": "c2FsdXQK"}' path=/test-eo """ caplog.set_level('DEBUG') file_name = "test2" file_content = 'salut\n' path = "/test-eo" url = reverse( 'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'uploadfile', 'slug': setup.slug} ) if debug: setup.set_log_level('DEBUG') with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd: cmis2_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis2_body, status=200) responses.add(responses.POST, 'http://example.com/cmisatom/test/children', body=cmis3_body, status=200) params = { "path": path, "file": {"filename": file_name, "content": b64encode(file_content), "content_type": "image/jpeg"}, } response = app.post_json(url, params=params) json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties']['cmis:objectTypeId'] == "cmis:document" assert json_result['data']['properties']['cmis:name'] == file_name assert not any('cmislib' in record.name for record in caplog.records) def test_cmis_check_status(app, setup, monkeypatch): cmis_gateway = Mock() type(cmis_gateway).repo = mock.PropertyMock(side_effect=CmisException) cmis_gateway_cls = Mock(return_value=cmis_gateway) import passerelle.apps.cmis.models monkeypatch.setattr(passerelle.apps.cmis.models, 'CMISGateway', cmis_gateway_cls) with pytest.raises(CmisException): setup.check_status() @responses.activate def test_get_file(app, setup): url = ( reverse('generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getfile', 'slug': setup.slug}) + '?raise=1' ) with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200) responses.add( responses.GET, 'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D', body=b'hello world', status=200, ) response = app.get(url, params={'object_id': '/test/file'}) assert response.content_type == 'application/octet-stream' assert response.content == b'hello world' response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'}) assert response.content_type == 'application/octet-stream' assert response.content == b'hello world' @responses.activate def test_get_metadata(app, setup): url = reverse( 'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getmetadata', 'slug': setup.slug} ) with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200) responses.add( responses.GET, 'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D', body=b'hello world', status=200, ) response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'}) assert response.json['data']['cmis']['contentStreamFileName'] == 'test2' assert response.json['data']['rsj']['idInsertis'] == '21N284563' response = app.get(url, params={'object_id': '/test/file'}) assert response.json['data']['cmis']['contentStreamFileName'] == 'test2' assert response.json['data']['rsj']['idInsertis'] == '21N284563'