passerelle/tests/test_cmis.py

667 lines
24 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import os
import re
from unittest import mock
from unittest.mock import MagicMock, Mock, PropertyMock, call
import py
import pytest
import responses
from cmislib import CmisClient
from cmislib.exceptions import (
CmisException,
InvalidArgumentException,
ObjectNotFoundException,
PermissionDeniedException,
UpdateConflictException,
)
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str
from passerelle.apps.cmis import models
from passerelle.apps.cmis.models import CmisConnector
from passerelle.base.models import AccessRight, ApiUser
from passerelle.utils.jsonresponse import APIError
from tests.test_manager import login
def b64encode(content):
return force_str(base64.b64encode(force_bytes(content)))
@pytest.fixture()
def setup(db):
api = ApiUser.objects.create(username='all', keytype='', key='')
conn = CmisConnector.objects.create(
cmis_endpoint='http://example.com/cmisatom', username='admin', password='admin', slug='slug-cmis'
)
obj_type = ContentType.objects.get_for_model(conn)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=conn.pk
)
return conn
def test_uploadfile(app, setup, tmpdir, monkeypatch):
class FakeCMISGateway:
def __init__(self, *args, **kwargs):
pass
def create_doc(
self,
file_name,
file_path,
file_byte_content,
content_type=None,
object_type=None,
properties=None,
):
assert content_type == 'image/jpeg'
with open(file_name, 'wb') as f:
f.write(file_byte_content)
return Mock(properties={'toto': 'tata'})
file_name = 'testfile.whatever'
file_content = 'aaaa'
monkeypatch.chdir(tmpdir)
monkeypatch.setattr(models, 'CMISGateway', FakeCMISGateway)
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
},
)
result_file = py.path.local(file_name)
assert result_file.exists()
with result_file.open('rb'):
assert result_file.read() == file_content
json_result = response.json
assert json_result['err'] == 0
assert json_result['data']['properties'] == {'toto': 'tata'}
file_name_overwrite = 'testfile.whatever.overwrite'
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
'filename': file_name_overwrite,
},
)
result_file = py.path.local(file_name_overwrite)
assert result_file.exists()
with result_file.open('rb'):
assert result_file.read() == file_content
json_result = response.json
assert json_result['err'] == 0
assert json_result['data']['properties'] == {'toto': 'tata'}
def test_upload_file_metadata(app, setup, monkeypatch):
class FakeFolder:
def createDocument(self, filename, contentFile, properties, contentType=None):
return Mock(properties=properties)
monkeypatch.setattr(models.CMISGateway, '_get_or_create_folder', lambda x, y: FakeFolder())
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 'bla', 'content': b64encode('bla')},
'object_type': 'D:dui:type',
'properties': {
'cmis:description': 'Coucou',
'dui:tnumDossier': '42',
},
'properties/dui:ttypeStructure': 'Accueil de loisirs',
},
)
assert response.json['data']['properties'] == {
'cmis:objectTypeId': 'D:dui:type',
'cmis:description': 'Coucou',
'dui:tnumDossier': '42',
'dui:ttypeStructure': 'Accueil de loisirs',
}
def test_uploadfile_error_if_no_file_name(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'].startswith('"filename" or "file[\'filename\']" is required')
def test_uploadfile_error_if_non_string_file_name(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 1, 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "file/filename: 1 is not of type 'string'"
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
'filename': 1,
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "filename: 1 is not of type 'string'"
def test_uploadfile_error_if_non_valid_file_name(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': ',.,', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert "',.,' does not match " in response.json['err_desc']
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
'filename': ',.,',
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert "',.,' does not match " in response.json['err_desc']
def test_uploadfile_error_if_no_path(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "'path' is a required property"
def test_uploadfile_error_if_non_string_path(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': 1,
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "path: 1 is not of type 'string'"
def test_uploadfile_error_if_no_regular_path(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': 'no/leading/slash',
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert "'no/leading/slash' does not match " in response.json['err_desc']
def test_uploadfile_error_if_no_file_content(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 'somefile.txt', 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "file: 'content' is a required property"
def test_uploadfile_error_if_non_string_file_content(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 'somefile.txt', 'content': 1, 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'] == "file/content: 1 is not of type 'string'"
def test_uploadfile_error_if_no_proper_base64_encoding(app, setup):
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 'somefile.txt', 'content': '1', 'content_type': 'image/jpeg'},
},
expect_errors=True,
)
assert response.status_code == 400
assert response.json['err'] == 1
assert response.json['err_desc'].startswith('"file[\'content\']" must be a valid base64 string')
def test_uploadfile_cmis_gateway_error(app, setup, monkeypatch):
cmis_gateway = Mock()
cmis_gateway.create_doc.side_effect = APIError('some error')
cmis_gateway_cls = Mock(return_value=cmis_gateway)
monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls)
response = app.post_json(
'/cmis/slug-cmis/uploadfile',
params={
'path': '/some/folder/structure',
'file': {'filename': 'file_name', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
},
)
assert response.json['err'] == 1
assert response.json['err_desc'].startswith('some error')
class TestGetOrCreateFolder:
@pytest.fixture
def default_repository(self, monkeypatch):
default_repository = MagicMock()
cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository))
monkeypatch.setattr(models, 'CmisClient', cmis_client_cls)
return default_repository
@pytest.fixture
def gateway(self, default_repository):
return models.CMISGateway('cmis_endpoint', 'user', 'pass', Mock())
def test_get_or_create_folder_already_existing(self, gateway, default_repository):
default_repository.getObjectByPath.return_value = 'folder'
assert gateway._get_or_create_folder('/whatever') == 'folder'
default_repository.getObjectByPath.assert_has_calls([call('/whatever')])
def test_get_or_create_folder_one_level_creation(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = ObjectNotFoundException()
default_repository.root_folder = Mock(createFolder=Mock(return_value='folder'))
assert gateway._get_or_create_folder('/whatever') == 'folder'
default_repository.getObjectByPath.assert_has_calls([call('/whatever')])
default_repository.root_folder.createFolder.assert_called_once_with('whatever')
def test_get_or_create_folder_two_level_creation(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = [
ObjectNotFoundException(),
ObjectNotFoundException(),
]
default_repository.root_folder.createFolder.return_value.createFolder.return_value = 'folder'
assert gateway._get_or_create_folder('/whatever/man') == 'folder'
assert default_repository.mock_calls == [
call.getObjectByPath('/whatever/man'),
call.getObjectByPath('/whatever'),
call.root_folder.createFolder('whatever'),
call.root_folder.createFolder().createFolder('man'),
]
def test_get_or_create_folder_with_some_existing_and_some_not(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = [
ObjectNotFoundException(),
mock.DEFAULT,
Exception('Boom!'),
]
default_repository.getObjectByPath.return_value.createFolder.return_value = 'folder'
assert gateway._get_or_create_folder('/whatever/man') == 'folder'
assert default_repository.mock_calls == [
call.getObjectByPath('/whatever/man'),
call.getObjectByPath('/whatever'),
call.getObjectByPath().createFolder('man'),
]
def test_get_or_create_folder_permission_denied_on_get_object_by_path(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = [
ObjectNotFoundException(),
PermissionDeniedException(),
]
with pytest.raises(APIError, match=r'CMIS server denied reading folder /whatever'):
gateway._get_or_create_folder('/whatever/man')
def test_get_or_create_folder_permission_denied_on_create(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = [
ObjectNotFoundException(),
mock.DEFAULT,
Exception('Boom!'),
]
default_repository.getObjectByPath.return_value.createFolder.side_effect = PermissionDeniedException()
with pytest.raises(APIError, match=r'CMIS server denied creating folder /whatever/man'):
gateway._get_or_create_folder('/whatever/man')
def test_get_or_create_folder_permission_denied_on_root_folder(self, gateway, default_repository):
default_repository.getObjectByPath.side_effect = ObjectNotFoundException()
type(default_repository).root_folder = PropertyMock(side_effect=PermissionDeniedException())
with pytest.raises(APIError, match=r'CMIS server denied reading folder /'):
gateway._get_or_create_folder('/whatever')
def test_create_doc():
gateway = models.CMISGateway('cmis_url', 'user', 'password', Mock())
folder = Mock()
folder.createDocument.return_value = 'doc'
gateway._get_or_create_folder = Mock(return_value=folder)
assert gateway.create_doc('filename', '/some/path', b'file_content') == 'doc'
gateway._get_or_create_folder.assert_called_once_with('/some/path')
args, kwargs = folder.createDocument.call_args
assert args[0] == 'filename'
content_file = kwargs['contentFile']
assert content_file.read() == b'file_content'
@pytest.mark.parametrize(
'cmis_exc,err_msg',
[
(PermissionDeniedException, 'permission denied'),
(UpdateConflictException, 'update conflict'),
(InvalidArgumentException, 'invalid property'),
(CmisException, 'cmis binding error'),
],
)
def test_wrap_cmis_error(app, setup, monkeypatch, cmis_exc, err_msg):
@models.wrap_cmis_error
def dummy_func():
raise cmis_exc('some error')
with pytest.raises(APIError) as excinfo:
dummy_func()
assert str(excinfo.value).startswith(err_msg)
def test_re_file_path():
from passerelle.apps.cmis.models import FILE_PATH_PATTERN
RE_FILE_PATH = re.compile(FILE_PATH_PATTERN)
assert RE_FILE_PATH.match('/')
assert RE_FILE_PATH.match('/some')
assert RE_FILE_PATH.match('/some/path')
assert RE_FILE_PATH.match('/SOME/PATH')
assert RE_FILE_PATH.match('/some/long/path')
assert RE_FILE_PATH.match('/some/digits/12/and/CAPITALS')
assert RE_FILE_PATH.match('/some/!#$%&+-^_`~;[]{}+=~')
assert not RE_FILE_PATH.match('/trailing/slash/')
assert not RE_FILE_PATH.match('no/leading/slash')
assert not RE_FILE_PATH.match('/multiple//slash')
assert not RE_FILE_PATH.match('')
def test_re_file_name():
from passerelle.apps.cmis.models import FILE_NAME_PATTERN
RE_FILE_NAME = re.compile(FILE_NAME_PATTERN)
assert RE_FILE_NAME.match('toto.tata')
assert RE_FILE_NAME.match('TOTO.TATA')
def test_cmis_types_view(setup, app, admin_user, monkeypatch):
class FakeCmisType:
class FakeCmisProperty:
def __init__(self, id):
self.id = id
self.description = 'cmis:prop'
self.propertyType = 'string'
self.required = True
def __init__(self, id, children=None):
self.id = id
self.description = 'hop'
prop = self.FakeCmisProperty('cmis:prop')
prop2 = self.FakeCmisProperty('cmis:prop2')
self.properties = {prop.id: prop, prop2.id: prop2}
self.children = children or []
class FakeCmisRepo:
def __init__(self, root_types):
self.root_types = root_types
def getTypeDefinition(self, id):
if id in self.root_types:
return self.root_types[id]
for type in self.root_types.values():
for child in type.children:
if child.id == id:
return child
raise ObjectNotFoundException
def getTypeDefinitions(self):
return self.root_types.values()
def getTypeChildren(self, id):
return self.getTypeDefinition(id).children
children = [FakeCmisType('cmis:child1'), FakeCmisType('cmis:child2')]
root_type1 = FakeCmisType('cmis:root1', children=children)
root_type2 = FakeCmisType('cmis:root2')
root_types = {root_type1.id: root_type1, root_type2.id: root_type2}
repo = FakeCmisRepo(root_types)
cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=repo))
monkeypatch.setattr(models, 'CmisClient', cmis_client_cls)
app = login(app)
resp = app.get('/cmis/slug-cmis/')
resp = resp.click('Explore available object types')
assert all(id in resp.text for id in root_types)
assert 'Back to' not in resp.text
assert 'Children' not in resp.text
assert 'Properties' not in resp.text
resp = resp.click(root_type1.id)
assert all(id in resp.text for id in root_type1.properties)
assert all(child.id in resp.text for child in root_type1.children)
resp = resp.click(children[0].id)
assert 'No more children.' in resp.text
resp = resp.click('Back to base types list')
resp = resp.click(root_type2.id)
assert 'No more children.' in resp.text
resp = app.get('/manage/cmis/slug-cmis/type?id=wrong', status=404)
@pytest.mark.parametrize('debug', (False, True))
@responses.activate
def test_raw_uploadfile(app, setup, debug, caplog):
""" Simulate the bellow bash query :
$ http https://passerelle.dev.publik.love/cmis/ged/uploadfile \
file:='{"filename": "test2", "content": "c2FsdXQK"}' path=/test-eo
"""
caplog.set_level('DEBUG')
file_name = 'test2'
file_content = 'salut\n'
path = '/test-eo'
url = reverse(
'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'uploadfile', 'slug': setup.slug}
)
if debug:
setup.set_log_level('DEBUG')
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
cmis1_body = fd.read()
with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd:
cmis2_body = fd.read()
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
cmis3_body = fd.read()
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis2_body, status=200)
responses.add(responses.POST, 'http://example.com/cmisatom/test/children', body=cmis3_body, status=200)
params = {
'path': path,
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
}
response = app.post_json(url, params=params)
json_result = response.json
assert json_result['err'] == 0
assert json_result['data']['properties']['cmis:objectTypeId'] == 'cmis:document'
assert json_result['data']['properties']['cmis:name'] == file_name
assert not any('cmislib' in record.name for record in caplog.records)
def test_cmis_check_status(app, setup, monkeypatch):
cmis_gateway = Mock()
type(cmis_gateway).repo = mock.PropertyMock(side_effect=CmisException)
cmis_gateway_cls = Mock(return_value=cmis_gateway)
monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls)
with pytest.raises(CmisException):
setup.check_status()
@responses.activate
def test_get_file(app, setup):
url = (
reverse('generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getfile', 'slug': setup.slug})
+ '?raise=1'
)
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
cmis1_body = fd.read()
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
cmis3_body = fd.read()
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200)
responses.add(
responses.GET,
'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D',
body=b'hello world',
status=200,
)
response = app.get(url, params={'object_id': '/test/file'})
assert response.content_type == 'application/octet-stream'
assert response.content == b'hello world'
response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'})
assert response.content_type == 'application/octet-stream'
assert response.content == b'hello world'
@responses.activate
def test_get_metadata(app, setup):
url = reverse(
'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getmetadata', 'slug': setup.slug}
)
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
cmis1_body = fd.read()
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
cmis3_body = fd.read()
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200)
responses.add(
responses.GET,
'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D',
body=b'hello world',
status=200,
)
response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'})
assert response.json['data']['cmis']['contentStreamFileName'] == 'test2'
assert response.json['data']['rsj']['idInsertis'] == '21N284563'
response = app.get(url, params={'object_id': '/test/file'})
assert response.json['data']['cmis']['contentStreamFileName'] == 'test2'
assert response.json['data']['rsj']['idInsertis'] == '21N284563'
@responses.activate
def test_get_file_404_error(app, setup, caplog):
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
cmis1_body = fd.read()
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', status=404)
response = app.get('/cmis/slug-cmis/getmetadata', params={'object_id': '/test/file'})
assert 'ERROR' not in caplog.text
assert response.json == {
'err': 1,
'err_class': 'passerelle.utils.jsonresponse.APIError',
'err_desc': 'CMIS server did not found path /test/file',
'data': None,
}