passerelle/tests/test_astre_rest.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

388 lines
13 KiB
Python
Raw Normal View History

import base64
import cgi
import io
import json
import httmock
import pytest
import responses
from django.contrib.contenttypes.models import ContentType
from passerelle.apps.astre_rest.models import AstreREST
from passerelle.base.models import AccessRight, ApiUser
@pytest.fixture()
def connector(db):
api = ApiUser.objects.create(username='all', keytype='', key='')
connector = AstreREST.objects.create(
base_url='http://example.astre.net/',
username='admin',
password='admin',
slug='slug-astre-rest',
database='db',
organism='somebigcity',
budget='2022',
exercice='01',
)
obj_type = ContentType.objects.get_for_model(connector)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=connector.pk
)
return connector
@responses.activate
def test_authentication(app, connector):
assert not connector.auth
responses.add(
responses.GET,
'http://example.astre.net/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
body='$55101$',
status=200,
)
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 1
assert responses.calls[0].request.headers['login'] == 'admin'
assert responses.calls[0].request.headers['database'] == 'db'
assert 'auth' not in responses.calls[0].request.headers
connector.auth = 'AD'
connector.save()
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 2
assert responses.calls[1].request.headers['login'] == 'admin'
assert responses.calls[1].request.headers['database'] == 'db'
assert responses.calls[1].request.headers['auth'] == 'AD'
def test_gf_documents_entites_getref(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what',
)
def astre_mock(url, request):
return httmock.response(200, content='$55101$')
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == '$55101$'
def test_gf_documents_entites_list(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/somebigcity-2022-01/who-what/list/foo/json',
)
def astre_mock(url, request):
return httmock.response(
200,
content=[
{
'codeEntite': 'RMA01',
'libelleEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
],
)
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-list?entity_type=who&entity_ref=what&code_list=foo'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{
'id': 'RMA01',
'text': 'Jon Doe',
'codeEntite': 'RMA01',
'libelleEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
]
def test_gf_documents_entites_read(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/read/somebigcity-2022-01/who-what/json',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-entites-read?entity_type=who&entity_ref=what'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_entites_search(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/entites/search/somebigcity-2022-01/who/json',
query='s=what',
)
def astre_mock(url, request):
return httmock.response(
200,
content=[
{
'codeEntite': 'RMA01',
'libEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
],
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-search?entity_type=who&s=what')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{
'id': 'RMA01',
'text': 'Jon Doe',
'codeEntite': 'RMA01',
'libEntite': 'Jon Doe',
'typeEntite': 'Tiers',
'refEntite': '$1234$',
'codeOrganisme': 'Ville',
}
]
@responses.activate
def test_gf_documents_gedmanager_document_create(app, connector):
responses.add(
responses.POST,
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/create/json',
json=[{'foo': 'bar'}],
status=200,
)
params = {
'file': {
'filename': 'foo.txt',
'content': base64.b64encode(b'aaaa').decode('utf-8'),
'content_type': 'text/plain',
},
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'astre_ref': 'astreref',
'ged_ref': 'gedref',
'doc_type': 'doctype',
'description': 'description',
'filename': 'somefile.ext',
'indpj': 'indpj',
'domain_code': 'domaincode',
}
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-create', params=params)
assert len(responses.calls) == 1
made_request = responses.calls[0].request
assert made_request.headers['content-type'].startswith('multipart/form-data')
_, pdict = cgi.parse_header(made_request.headers["content-type"])
pdict["boundary"] = bytes(pdict["boundary"], "utf-8")
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
assert postvars['nomFichier'] == ['somefile.ext']
assert 'fichier' in postvars
file_data = postvars['fichier'][0]
assert file_data == b'aaaa'
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_gedmanager_document_delete(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
params = {
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'document_ref': 'documentref',
}
with httmock.HTTMock(astre_mock):
resp = app.post_json(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_gedmanager_document_delete_error(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/delete/json',
)
def astre_mock(url, request):
return httmock.response(
200, content={'code': 'DataError', 'message': 'La référence du document est obligatoire.'}
)
params = {
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'document_ref': 'documentref',
}
with httmock.HTTMock(astre_mock):
resp = app.post_json(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params
)
json_resp = resp.json
assert json_resp['err'] == 1
assert json_resp['err_desc'] == 'DataError: La référence du document est obligatoire.'
def test_gf_documents_gedmanager_document_read(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/gedmanager/document/read/somebigcity-2022-01/who-what/foo/json',
query='metadonnees=all',
)
def astre_mock(url, request):
return httmock.response(200, content=[{'foo': 'bar'}])
with httmock.HTTMock(astre_mock):
resp = app.get(
'/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-read?entity_type=who&entity_ref=what&document_ref=foo'
)
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
@responses.activate
def test_gf_documents_gedmanager_document_update(app, connector):
responses.add(
responses.POST,
'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/update/json',
json=[{'foo': 'bar'}],
status=200,
)
params = {
'file': {
'filename': 'foo.txt',
'content': base64.b64encode(b'aaaa').decode('utf-8'),
'content_type': 'text/plain',
},
'entity_type': 'entitytype',
'entity_ref': 'entityref',
'astre_ref': 'astreref',
'ged_ref': 'gedref',
'doc_type': 'doctype',
'description': 'description',
'filename': 'somefile.ext',
'indpj': 'indpj',
'domain_code': 'domaincode',
}
resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-update', params=params)
assert len(responses.calls) == 1
made_request = responses.calls[0].request
assert made_request.headers['content-type'].startswith('multipart/form-data')
_, pdict = cgi.parse_header(made_request.headers["content-type"])
pdict["boundary"] = bytes(pdict["boundary"], "utf-8")
pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length']
postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict)
assert postvars['nomFichier'] == ['somefile.ext']
assert 'fichier' in postvars
file_data = postvars['fichier'][0]
assert file_data == b'aaaa'
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [{'foo': 'bar'}]
def test_gf_documents_referentiel_domainepj(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/domainepj/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-domainepj')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
]
def test_gf_documents_referentiel_typedocument(app, connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-typedocument')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == [
{'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'},
{'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'},
]
def test_check_status_ok(connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(
200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}]
)
with httmock.HTTMock(astre_mock):
assert connector.check_status() is None
def test_check_status_notok(connector):
@httmock.urlmatch(
netloc='example.astre.net',
path='/astre/webservices/gf/documents/referentiel/typedocument/list/json',
)
def astre_mock(url, request):
return httmock.response(200, content='<html></html>')
with httmock.HTTMock(astre_mock):
with pytest.raises(json.JSONDecodeError):
connector.check_status()