import base64 import cgi import io import json import httmock import pytest import responses from django.contrib.contenttypes.models import ContentType from passerelle.apps.astre_rest.models import AstreREST from passerelle.base.models import AccessRight, ApiUser @pytest.fixture() def connector(db): api = ApiUser.objects.create(username='all', keytype='', key='') connector = AstreREST.objects.create( base_url='http://example.astre.net/', username='admin', password='admin', slug='slug-astre-rest', database='db', organism='somebigcity', budget='2022', exercice='01', ) obj_type = ContentType.objects.get_for_model(connector) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=connector.pk ) return connector @responses.activate def test_authentication(app, connector): assert not connector.auth responses.add( responses.GET, 'http://example.astre.net/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what', body='$55101$', status=200, ) app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what') assert len(responses.calls) == 1 assert responses.calls[0].request.headers['login'] == 'admin' assert responses.calls[0].request.headers['database'] == 'db' assert 'auth' not in responses.calls[0].request.headers connector.auth = 'AD' connector.save() app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what') assert len(responses.calls) == 2 assert responses.calls[1].request.headers['login'] == 'admin' assert responses.calls[1].request.headers['database'] == 'db' assert responses.calls[1].request.headers['auth'] == 'AD' def test_gf_documents_entites_getref(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/entites/getRefEntite/somebigcity-2022-01/who-what', ) def astre_mock(url, request): return httmock.response(200, content='$55101$') with httmock.HTTMock(astre_mock): resp = app.get( '/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what' ) json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == '$55101$' def test_gf_documents_entites_list(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/entites/somebigcity-2022-01/who-what/list/foo/json', ) def astre_mock(url, request): return httmock.response( 200, content=[ { 'codeEntite': 'RMA01', 'libelleEntite': 'Jon Doe', 'typeEntite': 'Tiers', 'refEntite': '$1234$', 'codeOrganisme': 'Ville', } ], ) with httmock.HTTMock(astre_mock): resp = app.get( '/astre-rest/slug-astre-rest/gf-documents-entites-list?entity_type=who&entity_ref=what&code_list=foo' ) json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [ { 'id': 'RMA01', 'text': 'Jon Doe', 'codeEntite': 'RMA01', 'libelleEntite': 'Jon Doe', 'typeEntite': 'Tiers', 'refEntite': '$1234$', 'codeOrganisme': 'Ville', } ] def test_gf_documents_entites_read(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/entites/read/somebigcity-2022-01/who-what/json', ) def astre_mock(url, request): return httmock.response(200, content=[{'foo': 'bar'}]) with httmock.HTTMock(astre_mock): resp = app.get( '/astre-rest/slug-astre-rest/gf-documents-entites-read?entity_type=who&entity_ref=what' ) json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [{'foo': 'bar'}] def test_gf_documents_entites_search(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/entites/search/somebigcity-2022-01/who/json', query='s=what', ) def astre_mock(url, request): return httmock.response( 200, content=[ { 'codeEntite': 'RMA01', 'libEntite': 'Jon Doe', 'typeEntite': 'Tiers', 'refEntite': '$1234$', 'codeOrganisme': 'Ville', } ], ) with httmock.HTTMock(astre_mock): resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-search?entity_type=who&s=what') json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [ { 'id': 'RMA01', 'text': 'Jon Doe', 'codeEntite': 'RMA01', 'libEntite': 'Jon Doe', 'typeEntite': 'Tiers', 'refEntite': '$1234$', 'codeOrganisme': 'Ville', } ] @responses.activate def test_gf_documents_gedmanager_document_create(app, connector): responses.add( responses.POST, 'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/create/json', json=[{'foo': 'bar'}], status=200, ) params = { 'file': { 'filename': 'foo.txt', 'content': base64.b64encode(b'aaaa').decode('utf-8'), 'content_type': 'text/plain', }, 'entity_type': 'entitytype', 'entity_ref': 'entityref', 'astre_ref': 'astreref', 'ged_ref': 'gedref', 'doc_type': 'doctype', 'description': 'description', 'filename': 'somefile.ext', 'indpj': 'indpj', 'domain_code': 'domaincode', } resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-create', params=params) assert len(responses.calls) == 1 made_request = responses.calls[0].request assert made_request.headers['content-type'].startswith('multipart/form-data') _, pdict = cgi.parse_header(made_request.headers["content-type"]) pdict["boundary"] = bytes(pdict["boundary"], "utf-8") pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length'] postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict) assert postvars['nomFichier'] == ['somefile.ext'] assert 'fichier' in postvars file_data = postvars['fichier'][0] assert file_data == b'aaaa' json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [{'foo': 'bar'}] def test_gf_documents_gedmanager_document_delete(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/gedmanager/document/delete/json', ) def astre_mock(url, request): return httmock.response(200, content=[{'foo': 'bar'}]) params = { 'entity_type': 'entitytype', 'entity_ref': 'entityref', 'document_ref': 'documentref', } with httmock.HTTMock(astre_mock): resp = app.post_json( '/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params ) json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [{'foo': 'bar'}] def test_gf_documents_gedmanager_document_delete_error(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/gedmanager/document/delete/json', ) def astre_mock(url, request): return httmock.response( 200, content={'code': 'DataError', 'message': 'La référence du document est obligatoire.'} ) params = { 'entity_type': 'entitytype', 'entity_ref': 'entityref', 'document_ref': 'documentref', } with httmock.HTTMock(astre_mock): resp = app.post_json( '/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-delete', params=params ) json_resp = resp.json assert json_resp['err'] == 1 assert json_resp['err_desc'] == 'DataError: La référence du document est obligatoire.' def test_gf_documents_gedmanager_document_read(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/gedmanager/document/read/somebigcity-2022-01/who-what/foo/json', query='metadonnees=all', ) def astre_mock(url, request): return httmock.response(200, content=[{'foo': 'bar'}]) with httmock.HTTMock(astre_mock): resp = app.get( '/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-read?entity_type=who&entity_ref=what&document_ref=foo' ) json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [{'foo': 'bar'}] @responses.activate def test_gf_documents_gedmanager_document_update(app, connector): responses.add( responses.POST, 'http://example.astre.net/astre/webservices/gf/documents/gedmanager/document/update/json', json=[{'foo': 'bar'}], status=200, ) params = { 'file': { 'filename': 'foo.txt', 'content': base64.b64encode(b'aaaa').decode('utf-8'), 'content_type': 'text/plain', }, 'entity_type': 'entitytype', 'entity_ref': 'entityref', 'astre_ref': 'astreref', 'ged_ref': 'gedref', 'doc_type': 'doctype', 'description': 'description', 'filename': 'somefile.ext', 'indpj': 'indpj', 'domain_code': 'domaincode', } resp = app.post_json('/astre-rest/slug-astre-rest/gf-documents-gedmanager-document-update', params=params) assert len(responses.calls) == 1 made_request = responses.calls[0].request assert made_request.headers['content-type'].startswith('multipart/form-data') _, pdict = cgi.parse_header(made_request.headers["content-type"]) pdict["boundary"] = bytes(pdict["boundary"], "utf-8") pdict['CONTENT-LENGTH'] = made_request.headers['Content-Length'] postvars = cgi.parse_multipart(io.BytesIO(made_request.body), pdict) assert postvars['nomFichier'] == ['somefile.ext'] assert 'fichier' in postvars file_data = postvars['fichier'][0] assert file_data == b'aaaa' json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [{'foo': 'bar'}] def test_gf_documents_referentiel_domainepj(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/referentiel/domainepj/list/json', ) def astre_mock(url, request): return httmock.response( 200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}] ) with httmock.HTTMock(astre_mock): resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-domainepj') json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [ {'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'}, {'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'}, ] def test_gf_documents_referentiel_typedocument(app, connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/referentiel/typedocument/list/json', ) def astre_mock(url, request): return httmock.response( 200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}] ) with httmock.HTTMock(astre_mock): resp = app.get('/astre-rest/slug-astre-rest/gf-documents-referentiel-typedocument') json_resp = resp.json assert json_resp['err'] == 0 assert json_resp['data'] == [ {'id': '1', 'text': 'foo', 'code': '1', 'libelle': 'foo'}, {'id': '2', 'text': 'bar', 'code': '2', 'libelle': 'bar'}, ] def test_check_status_ok(connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/referentiel/typedocument/list/json', ) def astre_mock(url, request): return httmock.response( 200, content=[{'code': '1', 'libelle': 'foo'}, {'code': '2', 'libelle': 'bar'}] ) with httmock.HTTMock(astre_mock): assert connector.check_status() is None def test_check_status_notok(connector): @httmock.urlmatch( netloc='example.astre.net', path='/astre/webservices/gf/documents/referentiel/typedocument/list/json', ) def astre_mock(url, request): return httmock.response(200, content='') with httmock.HTTMock(astre_mock): with pytest.raises(json.JSONDecodeError): connector.check_status()