passerelle/tests/test_astre_rest.py

388 lines
13 KiB
Python

import base64
import cgi
import io
import json
import httmock
import pytest
import responses
from django.contrib.contenttypes.models import ContentType
from passerelle.apps.astre_rest.models import AstreREST
from passerelle.base.models import AccessRight, ApiUser
@pytest.fixture()
def connector(db):
api = ApiUser.objects.create(username='all', keytype='', key='')
connector = AstreREST.objects.create(
base_url='http://example.astre.net/',
username='admin',
password='admin',
slug='slug-astre-rest',
database='db',
organism='somebigcity',
budget='2022',
exercice='01',
)
obj_type = ContentType.objects.get_for_model(connector)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=connector.pk
)
return connector
@responses.activate
def test_authentication(app, connector):
assert not connector.auth
responses.add(
responses.GET,
'http://example.astre.net/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
body='$55101$',
status=200,
)
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 1
assert responses.calls[0].request.headers['login'] == 'admin'
assert responses.calls[0].request.headers['database'] == 'db'
assert 'auth' not in responses.calls[0].request.headers
connector.auth = 'AD'
connector.save()
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 2
assert responses.calls[1].request.headers['login'] == 'admin'
assert responses.calls[1].request.headers['database'] == 'db'
assert responses.calls[1].request.headers['auth'] == 'AD'
def test_gf_documents_entites_getref(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
)
def astre_mock(url, request):
return httmock.response(200, content='$55101$')
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == '$55101$'
def test_gf_documents_entites_list(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/somebigcity-2022-01/who-what/list/foo/json',
)
def astre_mock(url, request):
return httmock.response(
200,
content=[
{
'codeEntite': 'RMA01',
'libelleEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
],
)
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-list?entity_type=who&entity_ref=what&code_list=foo'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{
'id': 'RMA01',
'text': 'Jon Doe',
'codeEntite': 'RMA01',
'libelleEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
]
def test_gf_documents_entites_read(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/read/somebigcity-2022-01/who-what/json',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-read?entity_type=who&entity_ref=what'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_entites_search(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/search/somebigcity-2022-01/who/json',
query='s=what',
)
def astre_mock(url, request):
return httmock.response(
200,
content=[
{
'codeEntite': 'RMA01',
'libEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
],
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-search?entity_type=who&s=what')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{
'id': 'RMA01',
'text': 'Jon Doe',
'codeEntite': 'RMA01',
'libEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
]
@responses.activate
def test_gf_documents_gedmanager_document_create(app, connector):
responses.add(
responses.POST,
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/create/json',
json=[{'foo': 'bar'}],
status=200,
)
params = {
'file': {
'filename': 'foo.txt',
'content': base64.b64encode(b'aaaa').decode('utf-8'),
'content_type': 'text/plain',
},
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'astre_ref': 'astreref',
'ged_ref': 'gedref',
'doc_type': 'doctype',
'description': 'description',
'filename': 'somefile.ext',
'indpj': 'indpj',
'domain_code': 'domaincode',
}
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-create', params=params)
assert len(responses.calls) == 1
made_request = responses.calls[0].request
assert made_request.headers['content-type'].startswith('multipart/form-data')
_, pdict = cgi.parse_header(made_request.headers["content-type"])
pdict["boundary"] = bytes(pdict["boundary"], "utf-8")
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
assert postvars['nomFichier'] == ['somefile.ext']
assert 'fichier' in postvars
file_data = postvars['fichier'][0]
assert file_data == b'aaaa'
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_gedmanager_document_delete(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
params = {
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'document_ref': 'documentref',
}
with httmock.HTTMock(astre_mock):
resp = app.post_json(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_gedmanager_document_delete_error(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
)
def astre_mock(url, request):
return httmock.response(
200, content={'code': 'DataError', 'message': 'La référence du document est obligatoire.'}
)
params = {
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'document_ref': 'documentref',
}
with httmock.HTTMock(astre_mock):
resp = app.post_json(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
)
json_resp = resp.json
assert json_resp['err'] == 1
assert json_resp['err_desc'] == 'DataError: La référence du document est obligatoire.'
def test_gf_documents_gedmanager_document_read(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/read/somebigcity-2022-01/who-what/foo/json',
query='metadonnees=all',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-read?entity_type=who&entity_ref=what&document_ref=foo'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
@responses.activate
def test_gf_documents_gedmanager_document_update(app, connector):
responses.add(
responses.POST,
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/update/json',
json=[{'foo': 'bar'}],
status=200,
)
params = {
'file': {
'filename': 'foo.txt',
'content': base64.b64encode(b'aaaa').decode('utf-8'),
'content_type': 'text/plain',
},
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'astre_ref': 'astreref',
'ged_ref': 'gedref',
'doc_type': 'doctype',
'description': 'description',
'filename': 'somefile.ext',
'indpj': 'indpj',
'domain_code': 'domaincode',
}
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-update', params=params)
assert len(responses.calls) == 1
made_request = responses.calls[0].request
assert made_request.headers['content-type'].startswith('multipart/form-data')
_, pdict = cgi.parse_header(made_request.headers["content-type"])
pdict["boundary"] = bytes(pdict["boundary"], "utf-8")
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
assert postvars['nomFichier'] == ['somefile.ext']
assert 'fichier' in postvars
file_data = postvars['fichier'][0]
assert file_data == b'aaaa'
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_referentiel_domainepj(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/domainepj/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-domainepj')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
]
def test_gf_documents_referentiel_typedocument(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-typedocument')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
]
def test_check_status_ok(connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
assert connector.check_status() is None
def test_check_status_notok(connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(200, content='<html></html>')
with httmock.HTTMock(astre_mock):
with pytest.raises(json.JSONDecodeError):
connector.check_status()