# passerelle - uniform access to multiple data sources and services # Copyright (C) 2023 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import os import re from unittest import mock from unittest.mock import MagicMock, Mock, PropertyMock, call import py import pytest import responses from cmislib import CmisClient from cmislib.exceptions import ( CmisException, InvalidArgumentException, ObjectNotFoundException, PermissionDeniedException, UpdateConflictException, ) from django.contrib.contenttypes.models import ContentType from django.urls import reverse from django.utils.encoding import force_bytes, force_str from passerelle.apps.cmis import models from passerelle.apps.cmis.models import CmisConnector from passerelle.base.models import AccessRight, ApiUser from passerelle.utils.jsonresponse import APIError from tests.test_manager import login def b64encode(content): return force_str(base64.b64encode(force_bytes(content))) @pytest.fixture() def setup(db): api = ApiUser.objects.create(username='all', keytype='', key='') conn = CmisConnector.objects.create( cmis_endpoint='http://example.com/cmisatom', username='admin', password='admin', slug='slug-cmis' ) obj_type = ContentType.objects.get_for_model(conn) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=conn.pk ) return conn def test_uploadfile(app, setup, tmpdir, monkeypatch): class FakeCMISGateway: def __init__(self, *args, **kwargs): pass def create_doc( self, file_name, file_path, file_byte_content, content_type=None, object_type=None, properties=None, ): assert content_type == 'image/jpeg' with open(file_name, 'wb') as f: f.write(file_byte_content) return Mock(properties={'toto': 'tata'}) file_name = 'testfile.whatever' file_content = 'aaaa' monkeypatch.chdir(tmpdir) monkeypatch.setattr(models, 'CMISGateway', FakeCMISGateway) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'}, }, ) result_file = py.path.local(file_name) assert result_file.exists() with result_file.open('rb'): assert result_file.read() == file_content json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties'] == {'toto': 'tata'} file_name_overwrite = 'testfile.whatever.overwrite' response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'}, 'filename': file_name_overwrite, }, ) result_file = py.path.local(file_name_overwrite) assert result_file.exists() with result_file.open('rb'): assert result_file.read() == file_content json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties'] == {'toto': 'tata'} def test_upload_file_metadata(app, setup, monkeypatch): class FakeFolder: def createDocument(self, filename, contentFile, properties, contentType=None): return Mock(properties=properties) monkeypatch.setattr(models.CMISGateway, '_get_or_create_folder', lambda x, y: FakeFolder()) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 'bla', 'content': b64encode('bla')}, 'object_type': 'D:dui:type', 'properties': { 'cmis:description': 'Coucou', 'dui:tnumDossier': '42', }, 'properties/dui:ttypeStructure': 'Accueil de loisirs', }, ) assert response.json['data']['properties'] == { 'cmis:objectTypeId': 'D:dui:type', 'cmis:description': 'Coucou', 'dui:tnumDossier': '42', 'dui:ttypeStructure': 'Accueil de loisirs', } def test_uploadfile_error_if_no_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'].startswith('"filename" or "file[\'filename\']" is required') def test_uploadfile_error_if_non_string_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 1, 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file/filename: 1 is not of type 'string'" response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, 'filename': 1, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "filename: 1 is not of type 'string'" def test_uploadfile_error_if_non_valid_file_name(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': ',.,', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "',.,' does not match " in response.json['err_desc'] response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, 'filename': ',.,', }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "',.,' does not match " in response.json['err_desc'] def test_uploadfile_error_if_no_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'} }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "'path' is a required property" def test_uploadfile_error_if_non_string_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': 1, 'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "path: 1 is not of type 'string'" def test_uploadfile_error_if_no_regular_path(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': 'no/leading/slash', 'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert "'no/leading/slash' does not match " in response.json['err_desc'] def test_uploadfile_error_if_no_file_content(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 'somefile.txt', 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file: 'content' is a required property" def test_uploadfile_error_if_non_string_file_content(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 'somefile.txt', 'content': 1, 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'] == "file/content: 1 is not of type 'string'" def test_uploadfile_error_if_no_proper_base64_encoding(app, setup): response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 'somefile.txt', 'content': '1', 'content_type': 'image/jpeg'}, }, expect_errors=True, ) assert response.status_code == 400 assert response.json['err'] == 1 assert response.json['err_desc'].startswith('"file[\'content\']" must be a valid base64 string') def test_uploadfile_cmis_gateway_error(app, setup, monkeypatch): cmis_gateway = Mock() cmis_gateway.create_doc.side_effect = APIError('some error') cmis_gateway_cls = Mock(return_value=cmis_gateway) monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls) response = app.post_json( '/cmis/slug-cmis/uploadfile', params={ 'path': '/some/folder/structure', 'file': {'filename': 'file_name', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}, }, ) assert response.json['err'] == 1 assert response.json['err_desc'].startswith('some error') class TestGetOrCreateFolder: @pytest.fixture def default_repository(self, monkeypatch): default_repository = MagicMock() cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository)) monkeypatch.setattr(models, 'CmisClient', cmis_client_cls) return default_repository @pytest.fixture def gateway(self, default_repository): return models.CMISGateway('cmis_endpoint', 'user', 'pass', Mock()) def test_get_or_create_folder_already_existing(self, gateway, default_repository): default_repository.getObjectByPath.return_value = 'folder' assert gateway._get_or_create_folder('/whatever') == 'folder' default_repository.getObjectByPath.assert_has_calls([call('/whatever')]) def test_get_or_create_folder_one_level_creation(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = ObjectNotFoundException() default_repository.root_folder = Mock(createFolder=Mock(return_value='folder')) assert gateway._get_or_create_folder('/whatever') == 'folder' default_repository.getObjectByPath.assert_has_calls([call('/whatever')]) default_repository.root_folder.createFolder.assert_called_once_with('whatever') def test_get_or_create_folder_two_level_creation(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = [ ObjectNotFoundException(), ObjectNotFoundException(), ] default_repository.root_folder.createFolder.return_value.createFolder.return_value = 'folder' assert gateway._get_or_create_folder('/whatever/man') == 'folder' assert default_repository.mock_calls == [ call.getObjectByPath('/whatever/man'), call.getObjectByPath('/whatever'), call.root_folder.createFolder('whatever'), call.root_folder.createFolder().createFolder('man'), ] def test_get_or_create_folder_with_some_existing_and_some_not(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = [ ObjectNotFoundException(), mock.DEFAULT, Exception('Boom!'), ] default_repository.getObjectByPath.return_value.createFolder.return_value = 'folder' assert gateway._get_or_create_folder('/whatever/man') == 'folder' assert default_repository.mock_calls == [ call.getObjectByPath('/whatever/man'), call.getObjectByPath('/whatever'), call.getObjectByPath().createFolder('man'), ] def test_get_or_create_folder_permission_denied_on_get_object_by_path(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = [ ObjectNotFoundException(), PermissionDeniedException(), ] with pytest.raises(APIError, match=r'CMIS server denied reading folder /whatever'): gateway._get_or_create_folder('/whatever/man') def test_get_or_create_folder_permission_denied_on_create(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = [ ObjectNotFoundException(), mock.DEFAULT, Exception('Boom!'), ] default_repository.getObjectByPath.return_value.createFolder.side_effect = PermissionDeniedException() with pytest.raises(APIError, match=r'CMIS server denied creating folder /whatever/man'): gateway._get_or_create_folder('/whatever/man') def test_get_or_create_folder_permission_denied_on_root_folder(self, gateway, default_repository): default_repository.getObjectByPath.side_effect = ObjectNotFoundException() type(default_repository).root_folder = PropertyMock(side_effect=PermissionDeniedException()) with pytest.raises(APIError, match=r'CMIS server denied reading folder /'): gateway._get_or_create_folder('/whatever') def test_create_doc(): gateway = models.CMISGateway('cmis_url', 'user', 'password', Mock()) folder = Mock() folder.createDocument.return_value = 'doc' gateway._get_or_create_folder = Mock(return_value=folder) assert gateway.create_doc('filename', '/some/path', b'file_content') == 'doc' gateway._get_or_create_folder.assert_called_once_with('/some/path') args, kwargs = folder.createDocument.call_args assert args[0] == 'filename' content_file = kwargs['contentFile'] assert content_file.read() == b'file_content' @pytest.mark.parametrize( 'cmis_exc,err_msg', [ (PermissionDeniedException, 'permission denied'), (UpdateConflictException, 'update conflict'), (InvalidArgumentException, 'invalid property'), (CmisException, 'cmis binding error'), ], ) def test_wrap_cmis_error(app, setup, monkeypatch, cmis_exc, err_msg): @models.wrap_cmis_error def dummy_func(): raise cmis_exc('some error') with pytest.raises(APIError) as excinfo: dummy_func() assert str(excinfo.value).startswith(err_msg) def test_re_file_path(): from passerelle.apps.cmis.models import FILE_PATH_PATTERN RE_FILE_PATH = re.compile(FILE_PATH_PATTERN) assert RE_FILE_PATH.match('/') assert RE_FILE_PATH.match('/some') assert RE_FILE_PATH.match('/some/path') assert RE_FILE_PATH.match('/SOME/PATH') assert RE_FILE_PATH.match('/some/long/path') assert RE_FILE_PATH.match('/some/digits/12/and/CAPITALS') assert RE_FILE_PATH.match('/some/!#$%&+-^_`~;[]{}+=~') assert not RE_FILE_PATH.match('/trailing/slash/') assert not RE_FILE_PATH.match('no/leading/slash') assert not RE_FILE_PATH.match('/multiple//slash') assert not RE_FILE_PATH.match('') def test_re_file_name(): from passerelle.apps.cmis.models import FILE_NAME_PATTERN RE_FILE_NAME = re.compile(FILE_NAME_PATTERN) assert RE_FILE_NAME.match('toto.tata') assert RE_FILE_NAME.match('TOTO.TATA') def test_cmis_types_view(setup, app, admin_user, monkeypatch): class FakeCmisType: class FakeCmisProperty: def __init__(self, id): self.id = id self.description = 'cmis:prop' self.propertyType = 'string' self.required = True def __init__(self, id, children=None): self.id = id self.description = 'hop' prop = self.FakeCmisProperty('cmis:prop') prop2 = self.FakeCmisProperty('cmis:prop2') self.properties = {prop.id: prop, prop2.id: prop2} self.children = children or [] class FakeCmisRepo: def __init__(self, root_types): self.root_types = root_types def getTypeDefinition(self, id): if id in self.root_types: return self.root_types[id] for type in self.root_types.values(): for child in type.children: if child.id == id: return child raise ObjectNotFoundException def getTypeDefinitions(self): return self.root_types.values() def getTypeChildren(self, id): return self.getTypeDefinition(id).children children = [FakeCmisType('cmis:child1'), FakeCmisType('cmis:child2')] root_type1 = FakeCmisType('cmis:root1', children=children) root_type2 = FakeCmisType('cmis:root2') root_types = {root_type1.id: root_type1, root_type2.id: root_type2} repo = FakeCmisRepo(root_types) cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=repo)) monkeypatch.setattr(models, 'CmisClient', cmis_client_cls) app = login(app) resp = app.get('/cmis/slug-cmis/') resp = resp.click('Explore available object types') assert all(id in resp.text for id in root_types) assert 'Back to' not in resp.text assert 'Children' not in resp.text assert 'Properties' not in resp.text resp = resp.click(root_type1.id) assert all(id in resp.text for id in root_type1.properties) assert all(child.id in resp.text for child in root_type1.children) resp = resp.click(children[0].id) assert 'No more children.' in resp.text resp = resp.click('Back to base types list') resp = resp.click(root_type2.id) assert 'No more children.' in resp.text resp = app.get('/manage/cmis/slug-cmis/type?id=wrong', status=404) @pytest.mark.parametrize('debug', (False, True)) @responses.activate def test_raw_uploadfile(app, setup, debug, caplog): """ Simulate the bellow bash query : $ http https://passerelle.dev.publik.love/cmis/ged/uploadfile \ file:='{"filename": "test2", "content": "c2FsdXQK"}' path=/test-eo """ caplog.set_level('DEBUG') file_name = 'test2' file_content = 'salut\n' path = '/test-eo' url = reverse( 'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'uploadfile', 'slug': setup.slug} ) if debug: setup.set_log_level('DEBUG') with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd: cmis2_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis2_body, status=200) responses.add(responses.POST, 'http://example.com/cmisatom/test/children', body=cmis3_body, status=200) params = { 'path': path, 'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'}, } response = app.post_json(url, params=params) json_result = response.json assert json_result['err'] == 0 assert json_result['data']['properties']['cmis:objectTypeId'] == 'cmis:document' assert json_result['data']['properties']['cmis:name'] == file_name assert not any('cmislib' in record.name for record in caplog.records) def test_cmis_check_status(app, setup, monkeypatch): cmis_gateway = Mock() type(cmis_gateway).repo = mock.PropertyMock(side_effect=CmisException) cmis_gateway_cls = Mock(return_value=cmis_gateway) monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls) with pytest.raises(CmisException): setup.check_status() @responses.activate def test_get_file(app, setup): url = ( reverse('generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getfile', 'slug': setup.slug}) + '?raise=1' ) with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200) responses.add( responses.GET, 'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D', body=b'hello world', status=200, ) response = app.get(url, params={'object_id': '/test/file'}) assert response.content_type == 'application/octet-stream' assert response.content == b'hello world' response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'}) assert response.content_type == 'application/octet-stream' assert response.content == b'hello world' @responses.activate def test_get_metadata(app, setup): url = reverse( 'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getmetadata', 'slug': setup.slug} ) with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: cmis3_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200) responses.add( responses.GET, 'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D', body=b'hello world', status=200, ) response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'}) assert response.json['data']['cmis']['contentStreamFileName'] == 'test2' assert response.json['data']['rsj']['idInsertis'] == '21N284563' response = app.get(url, params={'object_id': '/test/file'}) assert response.json['data']['cmis']['contentStreamFileName'] == 'test2' assert response.json['data']['rsj']['idInsertis'] == '21N284563' @responses.activate def test_get_file_404_error(app, setup, caplog): with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: cmis1_body = fd.read() responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200) responses.add(responses.GET, 'http://example.com/cmisatom/test/path', status=404) response = app.get('/cmis/slug-cmis/getmetadata', params={'object_id': '/test/file'}) assert 'ERROR' not in caplog.text assert response.json == { 'err': 1, 'err_class': 'passerelle.utils.jsonresponse.APIError', 'err_desc': 'CMIS server did not found path /test/file', 'data': None, }