388 lines
13 KiB
Python
388 lines
13 KiB
Python
import base64
|
|
import cgi # noqa pylint: disable=deprecated-module
|
|
import io
|
|
import json
|
|
|
|
import httmock
|
|
import pytest
|
|
import responses
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from passerelle.apps.astre_rest.models import AstreREST
|
|
from passerelle.base.models import AccessRight, ApiUser
|
|
|
|
|
|
@pytest.fixture()
|
|
def connector(db):
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
connector = AstreREST.objects.create(
|
|
base_url='http://example.astre.net/',
|
|
username='admin',
|
|
password='admin',
|
|
slug='slug-astre-rest',
|
|
database='db',
|
|
organism='somebigcity',
|
|
budget='2022',
|
|
exercice='01',
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(connector)
|
|
AccessRight.objects.create(
|
|
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=connector.pk
|
|
)
|
|
return connector
|
|
|
|
|
|
@responses.activate
|
|
def test_authentication(app, connector):
|
|
assert not connector.auth
|
|
responses.add(
|
|
responses.GET,
|
|
'http://example.astre.net/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
|
|
body='$55101$',
|
|
status=200,
|
|
)
|
|
|
|
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
|
|
assert len(responses.calls) == 1
|
|
assert responses.calls[0].request.headers['login'] == 'admin'
|
|
assert responses.calls[0].request.headers['database'] == 'db'
|
|
assert 'auth' not in responses.calls[0].request.headers
|
|
|
|
connector.auth = 'AD'
|
|
connector.save()
|
|
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
|
|
assert len(responses.calls) == 2
|
|
assert responses.calls[1].request.headers['login'] == 'admin'
|
|
assert responses.calls[1].request.headers['database'] == 'db'
|
|
assert responses.calls[1].request.headers['auth'] == 'AD'
|
|
|
|
|
|
def test_gf_documents_entites_getref(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(200, content='$55101$')
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get(
|
|
'/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what'
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == '$55101$'
|
|
|
|
|
|
def test_gf_documents_entites_list(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/entites/somebigcity-2022-01/who-what/list/foo/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200,
|
|
content=[
|
|
{
|
|
'codeEntite': 'RMA01',
|
|
'libelleEntite': 'Jon Doe',
|
|
'typeEntite': 'Tiers',
|
|
'refEntite': '$1234$',
|
|
'codeOrganisme': 'Ville',
|
|
}
|
|
],
|
|
)
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get(
|
|
'/astre-rest/slug-astre-rest/gf-documents-entites-list?entity_type=who&entity_ref=what&code_list=foo'
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [
|
|
{
|
|
'id': 'RMA01',
|
|
'text': 'Jon Doe',
|
|
'codeEntite': 'RMA01',
|
|
'libelleEntite': 'Jon Doe',
|
|
'typeEntite': 'Tiers',
|
|
'refEntite': '$1234$',
|
|
'codeOrganisme': 'Ville',
|
|
}
|
|
]
|
|
|
|
|
|
def test_gf_documents_entites_read(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/entites/read/somebigcity-2022-01/who-what/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(200, content=[{'foo': 'bar'}])
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get(
|
|
'/astre-rest/slug-astre-rest/gf-documents-entites-read?entity_type=who&entity_ref=what'
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [{'foo': 'bar'}]
|
|
|
|
|
|
def test_gf_documents_entites_search(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/entites/search/somebigcity-2022-01/who/json',
|
|
query='s=what',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200,
|
|
content=[
|
|
{
|
|
'codeEntite': 'RMA01',
|
|
'libEntite': 'Jon Doe',
|
|
'typeEntite': 'Tiers',
|
|
'refEntite': '$1234$',
|
|
'codeOrganisme': 'Ville',
|
|
}
|
|
],
|
|
)
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-search?entity_type=who&s=what')
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [
|
|
{
|
|
'id': 'RMA01',
|
|
'text': 'Jon Doe',
|
|
'codeEntite': 'RMA01',
|
|
'libEntite': 'Jon Doe',
|
|
'typeEntite': 'Tiers',
|
|
'refEntite': '$1234$',
|
|
'codeOrganisme': 'Ville',
|
|
}
|
|
]
|
|
|
|
|
|
@responses.activate
|
|
def test_gf_documents_gedmanager_document_create(app, connector):
|
|
responses.add(
|
|
responses.POST,
|
|
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/create/json',
|
|
json=[{'foo': 'bar'}],
|
|
status=200,
|
|
)
|
|
|
|
params = {
|
|
'file': {
|
|
'filename': 'foo.txt',
|
|
'content': base64.b64encode(b'aaaa').decode('utf-8'),
|
|
'content_type': 'text/plain',
|
|
},
|
|
'entity_type': 'entitytype',
|
|
'entity_ref': 'entityref',
|
|
'astre_ref': 'astreref',
|
|
'ged_ref': 'gedref',
|
|
'doc_type': 'doctype',
|
|
'description': 'description',
|
|
'filename': 'somefile.ext',
|
|
'indpj': 'indpj',
|
|
'domain_code': 'domaincode',
|
|
}
|
|
|
|
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-create', params=params)
|
|
assert len(responses.calls) == 1
|
|
made_request = responses.calls[0].request
|
|
assert made_request.headers['content-type'].startswith('multipart/form-data')
|
|
_, pdict = cgi.parse_header(made_request.headers['content-type'])
|
|
pdict['boundary'] = bytes(pdict['boundary'], 'utf-8')
|
|
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
|
|
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
|
|
assert postvars['nomFichier'] == ['somefile.ext']
|
|
assert 'fichier' in postvars
|
|
file_data = postvars['fichier'][0]
|
|
assert file_data == b'aaaa'
|
|
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [{'foo': 'bar'}]
|
|
|
|
|
|
def test_gf_documents_gedmanager_document_delete(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(200, content=[{'foo': 'bar'}])
|
|
|
|
params = {
|
|
'entity_type': 'entitytype',
|
|
'entity_ref': 'entityref',
|
|
'document_ref': 'documentref',
|
|
}
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.post_json(
|
|
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [{'foo': 'bar'}]
|
|
|
|
|
|
def test_gf_documents_gedmanager_document_delete_error(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200, content={'code': 'DataError', 'message': 'La référence du document est obligatoire.'}
|
|
)
|
|
|
|
params = {
|
|
'entity_type': 'entitytype',
|
|
'entity_ref': 'entityref',
|
|
'document_ref': 'documentref',
|
|
}
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.post_json(
|
|
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 1
|
|
assert json_resp['err_desc'] == 'DataError: La référence du document est obligatoire.'
|
|
|
|
|
|
def test_gf_documents_gedmanager_document_read(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/gedmanager/document/read/somebigcity-2022-01/who-what/foo/json',
|
|
query='metadonnees=all',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(200, content=[{'foo': 'bar'}])
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get(
|
|
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-read?entity_type=who&entity_ref=what&document_ref=foo'
|
|
)
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [{'foo': 'bar'}]
|
|
|
|
|
|
@responses.activate
|
|
def test_gf_documents_gedmanager_document_update(app, connector):
|
|
responses.add(
|
|
responses.POST,
|
|
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/update/json',
|
|
json=[{'foo': 'bar'}],
|
|
status=200,
|
|
)
|
|
|
|
params = {
|
|
'file': {
|
|
'filename': 'foo.txt',
|
|
'content': base64.b64encode(b'aaaa').decode('utf-8'),
|
|
'content_type': 'text/plain',
|
|
},
|
|
'entity_type': 'entitytype',
|
|
'entity_ref': 'entityref',
|
|
'astre_ref': 'astreref',
|
|
'ged_ref': 'gedref',
|
|
'doc_type': 'doctype',
|
|
'description': 'description',
|
|
'filename': 'somefile.ext',
|
|
'indpj': 'indpj',
|
|
'domain_code': 'domaincode',
|
|
}
|
|
|
|
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-update', params=params)
|
|
assert len(responses.calls) == 1
|
|
made_request = responses.calls[0].request
|
|
assert made_request.headers['content-type'].startswith('multipart/form-data')
|
|
_, pdict = cgi.parse_header(made_request.headers['content-type'])
|
|
pdict['boundary'] = bytes(pdict['boundary'], 'utf-8')
|
|
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
|
|
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
|
|
assert postvars['nomFichier'] == ['somefile.ext']
|
|
assert 'fichier' in postvars
|
|
file_data = postvars['fichier'][0]
|
|
assert file_data == b'aaaa'
|
|
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [{'foo': 'bar'}]
|
|
|
|
|
|
def test_gf_documents_referentiel_domainepj(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/referentiel/domainepj/list/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
|
|
)
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-domainepj')
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [
|
|
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
|
|
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
|
|
]
|
|
|
|
|
|
def test_gf_documents_referentiel_typedocument(app, connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
|
|
)
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-typedocument')
|
|
json_resp = resp.json
|
|
assert json_resp['err'] == 0
|
|
assert json_resp['data'] == [
|
|
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
|
|
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
|
|
]
|
|
|
|
|
|
def test_check_status_ok(connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(
|
|
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
|
|
)
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
assert connector.check_status() is None
|
|
|
|
|
|
def test_check_status_notok(connector):
|
|
@httmock.urlmatch(
|
|
netloc='example.astre.net',
|
|
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
|
|
)
|
|
def astre_mock(url, request):
|
|
return httmock.response(200, content='<html></html>')
|
|
|
|
with httmock.HTTMock(astre_mock):
|
|
with pytest.raises(json.JSONDecodeError):
|
|
connector.check_status()
|