667 lines
24 KiB
Python
667 lines
24 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2023 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import os
|
|
import re
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock, Mock, PropertyMock, call
|
|
|
|
import py
|
|
import pytest
|
|
import responses
|
|
from cmislib import CmisClient
|
|
from cmislib.exceptions import (
|
|
CmisException,
|
|
InvalidArgumentException,
|
|
ObjectNotFoundException,
|
|
PermissionDeniedException,
|
|
UpdateConflictException,
|
|
)
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.urls import reverse
|
|
from django.utils.encoding import force_bytes, force_str
|
|
|
|
from passerelle.apps.cmis import models
|
|
from passerelle.apps.cmis.models import CmisConnector
|
|
from passerelle.base.models import AccessRight, ApiUser
|
|
from passerelle.utils.jsonresponse import APIError
|
|
from tests.test_manager import login
|
|
|
|
|
|
def b64encode(content):
|
|
return force_str(base64.b64encode(force_bytes(content)))
|
|
|
|
|
|
@pytest.fixture()
|
|
def setup(db):
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
conn = CmisConnector.objects.create(
|
|
cmis_endpoint='http://example.com/cmisatom', username='admin', password='admin', slug='slug-cmis'
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(conn)
|
|
AccessRight.objects.create(
|
|
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=conn.pk
|
|
)
|
|
return conn
|
|
|
|
|
|
def test_uploadfile(app, setup, tmpdir, monkeypatch):
|
|
class FakeCMISGateway:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def create_doc(
|
|
self,
|
|
file_name,
|
|
file_path,
|
|
file_byte_content,
|
|
content_type=None,
|
|
object_type=None,
|
|
properties=None,
|
|
):
|
|
assert content_type == 'image/jpeg'
|
|
with open(file_name, 'wb') as f:
|
|
f.write(file_byte_content)
|
|
return Mock(properties={'toto': 'tata'})
|
|
|
|
file_name = 'testfile.whatever'
|
|
file_content = 'aaaa'
|
|
monkeypatch.chdir(tmpdir)
|
|
|
|
monkeypatch.setattr(models, 'CMISGateway', FakeCMISGateway)
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
|
|
},
|
|
)
|
|
result_file = py.path.local(file_name)
|
|
assert result_file.exists()
|
|
with result_file.open('rb'):
|
|
assert result_file.read() == file_content
|
|
json_result = response.json
|
|
assert json_result['err'] == 0
|
|
assert json_result['data']['properties'] == {'toto': 'tata'}
|
|
|
|
file_name_overwrite = 'testfile.whatever.overwrite'
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
|
|
'filename': file_name_overwrite,
|
|
},
|
|
)
|
|
result_file = py.path.local(file_name_overwrite)
|
|
assert result_file.exists()
|
|
with result_file.open('rb'):
|
|
assert result_file.read() == file_content
|
|
json_result = response.json
|
|
assert json_result['err'] == 0
|
|
assert json_result['data']['properties'] == {'toto': 'tata'}
|
|
|
|
|
|
def test_upload_file_metadata(app, setup, monkeypatch):
|
|
class FakeFolder:
|
|
def createDocument(self, filename, contentFile, properties, contentType=None):
|
|
return Mock(properties=properties)
|
|
|
|
monkeypatch.setattr(models.CMISGateway, '_get_or_create_folder', lambda x, y: FakeFolder())
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 'bla', 'content': b64encode('bla')},
|
|
'object_type': 'D:dui:type',
|
|
'properties': {
|
|
'cmis:description': 'Coucou',
|
|
'dui:tnumDossier': '42',
|
|
},
|
|
'properties/dui:ttypeStructure': 'Accueil de loisirs',
|
|
},
|
|
)
|
|
assert response.json['data']['properties'] == {
|
|
'cmis:objectTypeId': 'D:dui:type',
|
|
'cmis:description': 'Coucou',
|
|
'dui:tnumDossier': '42',
|
|
'dui:ttypeStructure': 'Accueil de loisirs',
|
|
}
|
|
|
|
|
|
def test_uploadfile_error_if_no_file_name(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'].startswith('"filename" or "file[\'filename\']" is required')
|
|
|
|
|
|
def test_uploadfile_error_if_non_string_file_name(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 1, 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "file/filename: 1 is not of type 'string'"
|
|
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
'filename': 1,
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "filename: 1 is not of type 'string'"
|
|
|
|
|
|
def test_uploadfile_error_if_non_valid_file_name(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': ',.,', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert "',.,' does not match " in response.json['err_desc']
|
|
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
'filename': ',.,',
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert "',.,' does not match " in response.json['err_desc']
|
|
|
|
|
|
def test_uploadfile_error_if_no_path(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'}
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "'path' is a required property"
|
|
|
|
|
|
def test_uploadfile_error_if_non_string_path(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': 1,
|
|
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "path: 1 is not of type 'string'"
|
|
|
|
|
|
def test_uploadfile_error_if_no_regular_path(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': 'no/leading/slash',
|
|
'file': {'filename': 'somefile.txt', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert "'no/leading/slash' does not match " in response.json['err_desc']
|
|
|
|
|
|
def test_uploadfile_error_if_no_file_content(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 'somefile.txt', 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "file: 'content' is a required property"
|
|
|
|
|
|
def test_uploadfile_error_if_non_string_file_content(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 'somefile.txt', 'content': 1, 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == "file/content: 1 is not of type 'string'"
|
|
|
|
|
|
def test_uploadfile_error_if_no_proper_base64_encoding(app, setup):
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 'somefile.txt', 'content': '1', 'content_type': 'image/jpeg'},
|
|
},
|
|
expect_errors=True,
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'].startswith('"file[\'content\']" must be a valid base64 string')
|
|
|
|
|
|
def test_uploadfile_cmis_gateway_error(app, setup, monkeypatch):
|
|
cmis_gateway = Mock()
|
|
cmis_gateway.create_doc.side_effect = APIError('some error')
|
|
cmis_gateway_cls = Mock(return_value=cmis_gateway)
|
|
|
|
monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls)
|
|
response = app.post_json(
|
|
'/cmis/slug-cmis/uploadfile',
|
|
params={
|
|
'path': '/some/folder/structure',
|
|
'file': {'filename': 'file_name', 'content': b64encode('aaaa'), 'content_type': 'image/jpeg'},
|
|
},
|
|
)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'].startswith('some error')
|
|
|
|
|
|
class TestGetOrCreateFolder:
|
|
@pytest.fixture
|
|
def default_repository(self, monkeypatch):
|
|
default_repository = MagicMock()
|
|
cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=default_repository))
|
|
monkeypatch.setattr(models, 'CmisClient', cmis_client_cls)
|
|
return default_repository
|
|
|
|
@pytest.fixture
|
|
def gateway(self, default_repository):
|
|
return models.CMISGateway('cmis_endpoint', 'user', 'pass', Mock())
|
|
|
|
def test_get_or_create_folder_already_existing(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.return_value = 'folder'
|
|
|
|
assert gateway._get_or_create_folder('/whatever') == 'folder'
|
|
default_repository.getObjectByPath.assert_has_calls([call('/whatever')])
|
|
|
|
def test_get_or_create_folder_one_level_creation(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = ObjectNotFoundException()
|
|
default_repository.root_folder = Mock(createFolder=Mock(return_value='folder'))
|
|
|
|
assert gateway._get_or_create_folder('/whatever') == 'folder'
|
|
|
|
default_repository.getObjectByPath.assert_has_calls([call('/whatever')])
|
|
default_repository.root_folder.createFolder.assert_called_once_with('whatever')
|
|
|
|
def test_get_or_create_folder_two_level_creation(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = [
|
|
ObjectNotFoundException(),
|
|
ObjectNotFoundException(),
|
|
]
|
|
default_repository.root_folder.createFolder.return_value.createFolder.return_value = 'folder'
|
|
|
|
assert gateway._get_or_create_folder('/whatever/man') == 'folder'
|
|
|
|
assert default_repository.mock_calls == [
|
|
call.getObjectByPath('/whatever/man'),
|
|
call.getObjectByPath('/whatever'),
|
|
call.root_folder.createFolder('whatever'),
|
|
call.root_folder.createFolder().createFolder('man'),
|
|
]
|
|
|
|
def test_get_or_create_folder_with_some_existing_and_some_not(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = [
|
|
ObjectNotFoundException(),
|
|
mock.DEFAULT,
|
|
Exception('Boom!'),
|
|
]
|
|
default_repository.getObjectByPath.return_value.createFolder.return_value = 'folder'
|
|
|
|
assert gateway._get_or_create_folder('/whatever/man') == 'folder'
|
|
|
|
assert default_repository.mock_calls == [
|
|
call.getObjectByPath('/whatever/man'),
|
|
call.getObjectByPath('/whatever'),
|
|
call.getObjectByPath().createFolder('man'),
|
|
]
|
|
|
|
def test_get_or_create_folder_permission_denied_on_get_object_by_path(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = [
|
|
ObjectNotFoundException(),
|
|
PermissionDeniedException(),
|
|
]
|
|
|
|
with pytest.raises(APIError, match=r'CMIS server denied reading folder /whatever'):
|
|
gateway._get_or_create_folder('/whatever/man')
|
|
|
|
def test_get_or_create_folder_permission_denied_on_create(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = [
|
|
ObjectNotFoundException(),
|
|
mock.DEFAULT,
|
|
Exception('Boom!'),
|
|
]
|
|
default_repository.getObjectByPath.return_value.createFolder.side_effect = PermissionDeniedException()
|
|
|
|
with pytest.raises(APIError, match=r'CMIS server denied creating folder /whatever/man'):
|
|
gateway._get_or_create_folder('/whatever/man')
|
|
|
|
def test_get_or_create_folder_permission_denied_on_root_folder(self, gateway, default_repository):
|
|
default_repository.getObjectByPath.side_effect = ObjectNotFoundException()
|
|
type(default_repository).root_folder = PropertyMock(side_effect=PermissionDeniedException())
|
|
|
|
with pytest.raises(APIError, match=r'CMIS server denied reading folder /'):
|
|
gateway._get_or_create_folder('/whatever')
|
|
|
|
|
|
def test_create_doc():
|
|
gateway = models.CMISGateway('cmis_url', 'user', 'password', Mock())
|
|
folder = Mock()
|
|
folder.createDocument.return_value = 'doc'
|
|
gateway._get_or_create_folder = Mock(return_value=folder)
|
|
assert gateway.create_doc('filename', '/some/path', b'file_content') == 'doc'
|
|
gateway._get_or_create_folder.assert_called_once_with('/some/path')
|
|
args, kwargs = folder.createDocument.call_args
|
|
assert args[0] == 'filename'
|
|
content_file = kwargs['contentFile']
|
|
assert content_file.read() == b'file_content'
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'cmis_exc,err_msg',
|
|
[
|
|
(PermissionDeniedException, 'permission denied'),
|
|
(UpdateConflictException, 'update conflict'),
|
|
(InvalidArgumentException, 'invalid property'),
|
|
(CmisException, 'cmis binding error'),
|
|
],
|
|
)
|
|
def test_wrap_cmis_error(app, setup, monkeypatch, cmis_exc, err_msg):
|
|
@models.wrap_cmis_error
|
|
def dummy_func():
|
|
raise cmis_exc('some error')
|
|
|
|
with pytest.raises(APIError) as excinfo:
|
|
dummy_func()
|
|
assert str(excinfo.value).startswith(err_msg)
|
|
|
|
|
|
def test_re_file_path():
|
|
from passerelle.apps.cmis.models import FILE_PATH_PATTERN
|
|
|
|
RE_FILE_PATH = re.compile(FILE_PATH_PATTERN)
|
|
assert RE_FILE_PATH.match('/')
|
|
assert RE_FILE_PATH.match('/some')
|
|
assert RE_FILE_PATH.match('/some/path')
|
|
assert RE_FILE_PATH.match('/SOME/PATH')
|
|
assert RE_FILE_PATH.match('/some/long/path')
|
|
assert RE_FILE_PATH.match('/some/digits/12/and/CAPITALS')
|
|
assert RE_FILE_PATH.match('/some/!#$%&+-^_`~;[]{}+=~')
|
|
assert not RE_FILE_PATH.match('/trailing/slash/')
|
|
assert not RE_FILE_PATH.match('no/leading/slash')
|
|
assert not RE_FILE_PATH.match('/multiple//slash')
|
|
assert not RE_FILE_PATH.match('')
|
|
|
|
|
|
def test_re_file_name():
|
|
from passerelle.apps.cmis.models import FILE_NAME_PATTERN
|
|
|
|
RE_FILE_NAME = re.compile(FILE_NAME_PATTERN)
|
|
assert RE_FILE_NAME.match('toto.tata')
|
|
assert RE_FILE_NAME.match('TOTO.TATA')
|
|
|
|
|
|
def test_cmis_types_view(setup, app, admin_user, monkeypatch):
|
|
class FakeCmisType:
|
|
class FakeCmisProperty:
|
|
def __init__(self, id):
|
|
self.id = id
|
|
self.description = 'cmis:prop'
|
|
self.propertyType = 'string'
|
|
self.required = True
|
|
|
|
def __init__(self, id, children=None):
|
|
self.id = id
|
|
self.description = 'hop'
|
|
prop = self.FakeCmisProperty('cmis:prop')
|
|
prop2 = self.FakeCmisProperty('cmis:prop2')
|
|
self.properties = {prop.id: prop, prop2.id: prop2}
|
|
self.children = children or []
|
|
|
|
class FakeCmisRepo:
|
|
def __init__(self, root_types):
|
|
self.root_types = root_types
|
|
|
|
def getTypeDefinition(self, id):
|
|
if id in self.root_types:
|
|
return self.root_types[id]
|
|
for type in self.root_types.values():
|
|
for child in type.children:
|
|
if child.id == id:
|
|
return child
|
|
raise ObjectNotFoundException
|
|
|
|
def getTypeDefinitions(self):
|
|
return self.root_types.values()
|
|
|
|
def getTypeChildren(self, id):
|
|
return self.getTypeDefinition(id).children
|
|
|
|
children = [FakeCmisType('cmis:child1'), FakeCmisType('cmis:child2')]
|
|
root_type1 = FakeCmisType('cmis:root1', children=children)
|
|
root_type2 = FakeCmisType('cmis:root2')
|
|
root_types = {root_type1.id: root_type1, root_type2.id: root_type2}
|
|
repo = FakeCmisRepo(root_types)
|
|
|
|
cmis_client_cls = Mock(return_value=Mock(spec=CmisClient, defaultRepository=repo))
|
|
|
|
monkeypatch.setattr(models, 'CmisClient', cmis_client_cls)
|
|
app = login(app)
|
|
resp = app.get('/cmis/slug-cmis/')
|
|
|
|
resp = resp.click('Explore available object types')
|
|
assert all(id in resp.text for id in root_types)
|
|
assert 'Back to' not in resp.text
|
|
assert 'Children' not in resp.text
|
|
assert 'Properties' not in resp.text
|
|
|
|
resp = resp.click(root_type1.id)
|
|
assert all(id in resp.text for id in root_type1.properties)
|
|
assert all(child.id in resp.text for child in root_type1.children)
|
|
|
|
resp = resp.click(children[0].id)
|
|
assert 'No more children.' in resp.text
|
|
|
|
resp = resp.click('Back to base types list')
|
|
resp = resp.click(root_type2.id)
|
|
assert 'No more children.' in resp.text
|
|
|
|
resp = app.get('/manage/cmis/slug-cmis/type?id=wrong', status=404)
|
|
|
|
|
|
@pytest.mark.parametrize('debug', (False, True))
|
|
@responses.activate
|
|
def test_raw_uploadfile(app, setup, debug, caplog):
|
|
""" Simulate the bellow bash query :
|
|
$ http https://passerelle.dev.publik.love/cmis/ged/uploadfile \
|
|
file:='{"filename": "test2", "content": "c2FsdXQK"}' path=/test-eo
|
|
"""
|
|
caplog.set_level('DEBUG')
|
|
file_name = 'test2'
|
|
file_content = 'salut\n'
|
|
path = '/test-eo'
|
|
url = reverse(
|
|
'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'uploadfile', 'slug': setup.slug}
|
|
)
|
|
if debug:
|
|
setup.set_log_level('DEBUG')
|
|
|
|
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis1_body = fd.read()
|
|
|
|
with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis2_body = fd.read()
|
|
|
|
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis3_body = fd.read()
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis2_body, status=200)
|
|
|
|
responses.add(responses.POST, 'http://example.com/cmisatom/test/children', body=cmis3_body, status=200)
|
|
|
|
params = {
|
|
'path': path,
|
|
'file': {'filename': file_name, 'content': b64encode(file_content), 'content_type': 'image/jpeg'},
|
|
}
|
|
response = app.post_json(url, params=params)
|
|
json_result = response.json
|
|
assert json_result['err'] == 0
|
|
assert json_result['data']['properties']['cmis:objectTypeId'] == 'cmis:document'
|
|
assert json_result['data']['properties']['cmis:name'] == file_name
|
|
|
|
assert not any('cmislib' in record.name for record in caplog.records)
|
|
|
|
|
|
def test_cmis_check_status(app, setup, monkeypatch):
|
|
cmis_gateway = Mock()
|
|
type(cmis_gateway).repo = mock.PropertyMock(side_effect=CmisException)
|
|
cmis_gateway_cls = Mock(return_value=cmis_gateway)
|
|
|
|
monkeypatch.setattr(models, 'CMISGateway', cmis_gateway_cls)
|
|
|
|
with pytest.raises(CmisException):
|
|
setup.check_status()
|
|
|
|
|
|
@responses.activate
|
|
def test_get_file(app, setup):
|
|
url = (
|
|
reverse('generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getfile', 'slug': setup.slug})
|
|
+ '?raise=1'
|
|
)
|
|
|
|
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis1_body = fd.read()
|
|
|
|
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis3_body = fd.read()
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200)
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200)
|
|
|
|
responses.add(
|
|
responses.GET,
|
|
'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D',
|
|
body=b'hello world',
|
|
status=200,
|
|
)
|
|
|
|
response = app.get(url, params={'object_id': '/test/file'})
|
|
assert response.content_type == 'application/octet-stream'
|
|
assert response.content == b'hello world'
|
|
|
|
response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'})
|
|
assert response.content_type == 'application/octet-stream'
|
|
assert response.content == b'hello world'
|
|
|
|
|
|
@responses.activate
|
|
def test_get_metadata(app, setup):
|
|
url = reverse(
|
|
'generic-endpoint', kwargs={'connector': 'cmis', 'endpoint': 'getmetadata', 'slug': setup.slug}
|
|
)
|
|
|
|
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis1_body = fd.read()
|
|
|
|
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis3_body = fd.read()
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', body=cmis3_body, status=200)
|
|
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/id', body=cmis3_body, status=200)
|
|
|
|
responses.add(
|
|
responses.GET,
|
|
'http://example.com/cmisatom/test/content/test2?id=L3Rlc3QtZW8vdGVzdDI%3D',
|
|
body=b'hello world',
|
|
status=200,
|
|
)
|
|
|
|
response = app.get(url, params={'object_id': 'c4bc9d00-5bf0-404d-8f0a-a6260f6d21ae;1.0'})
|
|
assert response.json['data']['cmis']['contentStreamFileName'] == 'test2'
|
|
assert response.json['data']['rsj']['idInsertis'] == '21N284563'
|
|
|
|
response = app.get(url, params={'object_id': '/test/file'})
|
|
assert response.json['data']['cmis']['contentStreamFileName'] == 'test2'
|
|
assert response.json['data']['rsj']['idInsertis'] == '21N284563'
|
|
|
|
|
|
@responses.activate
|
|
def test_get_file_404_error(app, setup, caplog):
|
|
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
|
|
cmis1_body = fd.read()
|
|
responses.add(responses.GET, 'http://example.com/cmisatom', body=cmis1_body, status=200)
|
|
responses.add(responses.GET, 'http://example.com/cmisatom/test/path', status=404)
|
|
|
|
response = app.get('/cmis/slug-cmis/getmetadata', params={'object_id': '/test/file'})
|
|
|
|
assert 'ERROR' not in caplog.text
|
|
assert response.json == {
|
|
'err': 1,
|
|
'err_class': 'passerelle.utils.jsonresponse.APIError',
|
|
'err_desc': 'CMIS server did not found path /test/file',
|
|
'data': None,
|
|
}
|