# Passerelle - uniform access to data and services # Copyright (C) 2015 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a.deepcopy of the GNU Affero General Public License # along with this program. If not, see . import base64 import copy import json import os import pathlib import shutil import zipfile from xml.etree import ElementTree as etree import pytest from lxml import etree as letree import tests.utils from passerelle.apps.mdel.mdel import AttachedFile, Description, Message, get_resource_base_dir from passerelle.apps.mdel.models import MDEL, Demand from passerelle.apps.mdel.utils import parse_date from passerelle.utils import SFTP AEC_XSD = 'ActeEtatCivil-V1.A.xsd' ILE_XSD = 'ILE_v1.1.xsd' def get_mdel_base_dir(): return os.path.join(os.path.dirname(__file__), 'data', 'mdel') def get_file_from_test_base_dir(filename): path = os.path.join(get_mdel_base_dir(), filename) with open(path, 'rb') as fd: return fd.read() def validate_schema(doc, xsd): xsd_path = os.path.join(get_mdel_base_dir(), xsd) xsd = letree.parse(xsd_path).getroot() schema = letree.XMLSchema(xsd) parser = letree.XMLParser(schema=schema) letree.parse(doc, parser=parser) # Will raise an exception if schema not respected def check_zip_file(zipdir, expected_files): # check files order in zip with zipfile.ZipFile(zipdir, 'r') as zipres: files = [f.filename for f in zipres.infolist()] assert sorted(files[: len(expected_files)]) == sorted(expected_files) @pytest.fixture def setup(db): return tests.utils.setup_access_rights(MDEL.objects.create(slug='test')) @pytest.fixture( params=[ json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json')), json.loads(get_file_from_test_base_dir('formdata_aec_mariage.json')), json.loads(get_file_from_test_base_dir('formdata_aec_deces.json')), ], ids=['naissance', 'mariage', 'deces'], ) def aec_payload(request): return request.param ILE_PAYLOAD = json.loads(get_file_from_test_base_dir('formdata.json')) def test_message(): ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'} xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier' message = Message('ILE-LA', '77', '94600') xml = message.xml assert xml.get('xmlns') == xmlns assert len(list(xml)) == 2 routing = xml.find('mdel:Header', ns).find('mdel:Routing', ns) assert routing.find('mdel:MessageId', ns).text == '77' assert routing.find('mdel:FlowType', ns).text == 'ILE-LA' aller = xml.find('mdel:Body', ns).find('mdel:Content', ns).find('mdel:Aller', ns) assert len(list(aller)) == 3 assert ( aller.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml' ) assert aller.find('Teledemarche/IdentifiantPlateforme').text == '1' def test_description(): attached_files = [ AttachedFile('JI', 'passport', base64.b64encode(b'this is passport')), AttachedFile('JD', 'energy_bill', base64.b64encode(b'this is edf_mai_2016')), ] description = Description('ILE-LA', '77', '94600', attached_files=attached_files) xml = description.xml assert len(list(xml)) == 5 assert len(xml.findall('PieceJointe')) == 2 assert xml.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml' assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1' def test_invalid_demand_type(app, setup): ILE_PAYLOAD_INVALID = copy.deepcopy(ILE_PAYLOAD) ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever' resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200) assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']" def test_invalid_demand_no_form_number(app, setup): ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ILE_PAYLOAD) ILE_PAYLOAD_INVALID_NO.pop('display_id') resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID_NO, status=200) assert resp.json['err_desc'] == 'display_id is required' def test_create_rco_demand_type(app, setup): RCO_PAYLOAD = copy.deepcopy(ILE_PAYLOAD) RCO_PAYLOAD['extra']['demand_type'] = 'rco-la' resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200) assert resp.json['err_desc'] == 'RCO-LA processing not implemented' def test_create_aec_demand_type(app, setup, aec_payload): resp = app.post_json('/mdel/test/create', params=aec_payload, status=200) if aec_payload['display_id'] == '15-4': assert resp.json['data']['demand_id'] == '15-4-AEC-LA' basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0') doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml') validate_schema(doc, AEC_XSD) check_zip_file( basedir + '.zip', ['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'], ) root = etree.parse(doc).getroot() assert root.tag == 'EnveloppeMetierType' # Generic tags assert root.find('DemandeActe/TypeActe/Code').text == 'NAISSANCE' assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE' assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'CertificatDeNationaliteFrancaise' assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000' assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395' assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy' assert root.find('DemandeActe/DateActe').text == '1958-05-19' assert root.find('DemandeActe/NombreExemplaires').text == '2' # Requester assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com' assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME' assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever' assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea' assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455' assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Fils' assert root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays').text == 'Suisse' assert ( root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse').text == '3ème, Bâtiment B, 37 rue de Paris, 3800 Bern' ) assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal') assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville') assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie') # Concerned assert root.find('DemandeActe/Titulaire/Civilite').text == 'MADAME' assert root.find('DemandeActe/Titulaire/Nationalite').text == 'FRA' assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1958-05-19' assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA' assert root.find('DemandeActe/Titulaire/DepartementDeNaissance').text == '54' assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever' assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Kim' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz' elif aec_payload['display_id'] == '16-1': assert resp.json['data']['demand_id'] == '16-1-AEC-LA' basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-EtatCivil-0') doc = os.path.join(basedir, '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml') validate_schema(doc, AEC_XSD) check_zip_file( basedir + '.zip', ['message.xml', '16-1-EtatCivil-ent-0.xml', '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'], ) validate_schema(doc, AEC_XSD) root = etree.parse(doc).getroot() assert root.tag == 'EnveloppeMetierType' # Generic tags assert root.find('DemandeActe/TypeActe/Code').text == 'MARIAGE' assert root.find('DemandeActe/NatureActe/Code').text == 'EXTRAIT-AVEC-FILIATION' assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre' assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000' assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395' assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy' assert root.find('DemandeActe/DateActe').text == '2008-08-18' assert root.find('DemandeActe/NombreExemplaires').text == '3' # Requester assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com' assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME' assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever' assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea' assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455' assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre' assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Libelle').text == 'Sa soeur' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == '22 rue Danton' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '94270' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Kremlin Bicetre' # Concerned assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR' assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever' assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'PRK' assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19' assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Josh' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz' # Concerned2 assert root.find('DemandeActe/Titulaire2/Civilite').text == 'MADAME' assert root.find('DemandeActe/Titulaire2/NomNaissance').text == 'Kokey' assert root.find('DemandeActe/Titulaire2/PaysDeNaissance').text == 'SEN' assert root.find('DemandeActe/Titulaire2/DateDeNaissance').text == '1980-03-12' assert root.find('DemandeActe/Titulaire2/Prenoms').text == 'Sarah' assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Nom').text == 'De' assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Prenoms').text == 'Coudy' assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Nom').text == 'Kokey' assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal' else: assert resp.json['data']['demand_id'] == '17-1-AEC-LA' basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-EtatCivil-0') doc = os.path.join(basedir, '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml') validate_schema(doc, AEC_XSD) check_zip_file( basedir + '.zip', ['message.xml', '17-1-EtatCivil-ent-0.xml', '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'], ) validate_schema(doc, AEC_XSD) root = etree.parse(doc).getroot() assert root.tag == 'EnveloppeMetierType' # Generic tags assert root.find('DemandeActe/TypeActe/Code').text == 'DECES' assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE' assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre' assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000' assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395' assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy' assert root.find('DemandeActe/DateActe').text == '2014-04-26' assert root.find('DemandeActe/NombreExemplaires').text == '4' # Requester assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com' assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME' assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever' assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea' assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455' assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '54000' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Nancy' assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == "37 Rue de l'Aigle Blanc" assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays') assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse') # Concerned assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR' assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Yamamoto' assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA' assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19' assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Yosuke' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Ino' assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Haruka' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Yamamoto' assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Ryu' def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload): AEC_PAYLOAD = dict(aec_payload) display_id = AEC_PAYLOAD['display_id'] AEC_PAYLOAD['fields']['logitud_commentaire_usager'] = 'gentle user comment' app.post_json('/mdel/test/create', params=aec_payload, status=200) # checking that attached files are referenced in -ent-.xml file basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id) desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id) root = etree.parse(desc).getroot() ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} assert root.find('ns2:LocalAccess/ns2:CommentaireUsager', namespaces=ns).text == 'gentle user comment' def test_create_aec_demand_with_output_sftp(app, setup, aec_payload, sftpserver, caplog): setup.outcoming_sftp = SFTP( 'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver) ) setup.save() app.post_json('/mdel/test/create', params=aec_payload, status=200) caplog.clear() demand = setup.demand_set.filter(sent=False).get() assert not demand.sent content_object = {'output': {}} with sftpserver.serve_content(content_object): setup.hourly() assert not setup.demand_set.filter(sent=False).exists() # check zip file was transfered assert set(content_object['output']) == {demand.filename} with open(demand.filepath, 'rb') as fd: assert content_object['output'][demand.filename] == fd.read() assert caplog.messages[-1:] == [f'Demand {demand.name} (pk {demand.num}) sent.'] def test_create_aec_demand_with_input_sftp(app, setup, aec_payload, sftpserver, caplog): setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver)) setup.save() with sftpserver.serve_content({'input': {'whatever': 'content'}}): setup.hourly() assert (pathlib.Path(setup.output_dir) / 'whatever').read_text() == 'content' assert caplog.messages[-1:] == ['Retrieved new responses: whatever'] def test_create_aec_demand_type_without_date_acte(app, setup): payload = json.loads(get_file_from_test_base_dir('formdata_aec_deces.json')) payload['fields'].pop('date_acte') resp = app.post_json('/mdel/test/create', params=payload, status=200) assert resp.json['err_desc'] == ' is required' def test_create_aec_demand_type_with_files(app, setup, aec_payload): AEC_PAYLOAD = dict(aec_payload) display_id = AEC_PAYLOAD['display_id'] AEC_PAYLOAD['fields'].update( { 'justificatif_identite_cni': { 'content': 'data', 'content_type': 'application/pdf', 'filename': 'cni.pdf', }, 'justificatif_domicile': { 'content': 'data', 'content_type': 'application/pdf', 'filename': 'quittance.pdf', }, } ) resp = app.post_json('/mdel/test/create', params=AEC_PAYLOAD, status=200) assert resp.json['data']['demand_id'] == '%s-AEC-LA' % display_id basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id) doc = os.path.join(basedir, '%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id) validate_schema(doc, AEC_XSD) expected_files = [ '%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id, 'message.xml', '%s-EtatCivil-ent-0.xml' % display_id, 'cni.pdf', 'quittance.pdf', ] for fname in os.listdir(basedir): assert fname in expected_files # checking that attached files are referenced in -ent-.xml file desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id) root = etree.parse(desc).getroot() ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns attached_files = root.findall('ns2:PieceJointe', namespaces=ns) assert len(attached_files) == 2 for afile in attached_files: if afile.find('ns2:Intitule', namespaces=ns).text == 'cni.pdf': assert afile.find('ns2:Code', namespaces=ns).text == 'JI' assert afile.find('ns2:Fichier', namespaces=ns).text == 'cni.pdf' else: assert afile.find('ns2:Intitule', namespaces=ns).text == 'quittance.pdf' assert afile.find('ns2:Code', namespaces=ns).text == 'JD' assert afile.find('ns2:Fichier', namespaces=ns).text == 'quittance.pdf' check_zip_file( basedir + '.zip', [ 'message.xml', '%s-EtatCivil-ent-0.xml' % display_id, '%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id, 'quittance.pdf', 'cni.pdf', ], ) def test_create_ile_demand_type(app, setup): resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200) assert resp.json['data']['demand_id'] == '1-14-ILE-LA' base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0') doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml') validate_schema(doc, ILE_XSD) root = etree.parse(doc).getroot() assert root.tag == 'AvisDInscription' assert root.find('Inscription/Electeur/AdresseDeLElecteur/Localite').text == 'Nancy' assert root.find('Inscription/Electeur/AdresseDeLElecteur/CodePostal').text == '54000' assert root.find('Inscription/Electeur/AdresseDeLElecteur/TypeVoie').text == 'RUE' assert root.find('Inscription/Electeur/AdresseDeLElecteur/NomVoie').text == 'RUE DU CHEVAL BLANC' assert root.find('Inscription/Electeur/AdresseDeLElecteur/NumeroVoie').text == '37' assert root.find('Inscription/Electeur/AdresseDeLElecteur/Complement').text == 'Apt 4112, Batiment B' assert root.find('Inscription/Electeur/Nationalite').text == 'FRA' assert root.find('Inscription/Electeur/Noms/NomFamille').text == 'whatever' assert root.find('Inscription/Electeur/Prenoms/Prenom').text == 'chelsea' assert root.find('Inscription/Electeur/MethodeDeContact/URI').text == 'chelsea@whatever.com' assert root.find('Inscription/Electeur/MethodeDeContact/CanalCode').text == 'EMAIL' assert root.find('Inscription/Electeur/Sexe').text == 'F' assert root.find('Inscription/Electeur/LieuDeNaissance/Pays').text == 'CAN' assert root.find('Inscription/Electeur/LieuDeNaissance/Localite').text == 'Vancouver' assert root.find('Inscription/Electeur/LieuDeNaissance/CodePostal').text == 'V56 B68' assert root.find('Inscription/Electeur/DateDeNaissance').text == '2014-06-11' assert root.find('Inscription/TypeDeListe').text == 'cm' assert root.find('TypeDInscription').text == 'vol' assert root.find('SituationElectoraleAnterieure/SituationDeLElecteur').text == 'cci' assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Pays').text == 'BEL' assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Localite').text == 'Bruxelles' assert ( root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text == 'Whatever' ) # checking that attached files are referenced in -ent-.xml file desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml') root = etree.parse(desc).getroot() ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14' assert root.find('ns2:Routage/ns2:Donnee/ns2:Id', namespaces=ns).text == 'CodeINSEE' assert root.find('ns2:Routage/ns2:Donnee/ns2:Valeur', namespaces=ns).text == '54395' assert root.find('ns2:Document/ns2:Code', namespaces=ns).text == '1-14-ILE-LA' assert root.find('ns2:Document/ns2:Nom', namespaces=ns).text == '1-14-ILE-LA' assert ( root.find('ns2:Document/ns2:FichierFormulaire/ns2:FichierDonnees', namespaces=ns).text == '1-14-ILE-LA-doc-.xml' ) attached_files = root.findall('ns2:PieceJointe', namespaces=ns) assert len(attached_files) == 3 for afile in attached_files: if afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_recto.pdf': assert afile.find('ns2:Code', namespaces=ns).text == 'JI' assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_recto.pdf' elif afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_verso.pdf': assert afile.find('ns2:Code', namespaces=ns).text == 'JI' assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_verso.pdf' else: assert afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_edf.pdf' assert afile.find('ns2:Code', namespaces=ns).text == 'JD' assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_edf.pdf' expected_files = [ 'message.xml', '1-14-ILE-LA-doc-.xml', '1-14-ILE-LA-ent-.xml', 'mdel_passeport_recto.pdf', 'mdel_passeport_verso.pdf', 'mdel_edf.pdf', ] for fname in os.listdir(base_doc): assert fname in expected_files # Without anterieur_situation_raw payload = copy.deepcopy(ILE_PAYLOAD) payload['fields'].pop('anterieur_situation_raw') resp = app.post_json('/mdel/test/create', params=payload, status=200) assert resp.json['err_desc'] == 'anterieur_situation_raw is required' def test_create_ile_demand_type_invalid_document_proof(app, setup): # test with missing key payload = json.loads(get_file_from_test_base_dir('formdata.json')) payload['fields'].pop('justificatif_domicile_hebergeur') resp = app.post_json('/mdel/test/create', params=payload, status=200) assert resp.json['err_desc'] == 'justificatif_domicile and all its attributes are required' # test with invalid content payload = json.loads(get_file_from_test_base_dir('formdata.json')) payload['fields']['justificatif_identite'] = None payload['fields']['justificatif_identite_verso'] = None resp = app.post_json('/mdel/test/create', params=payload, status=200) assert resp.json['err_desc'] == 'justificatif_identite and all its attributes are required' def test_get_status(app, setup): shutil.copytree( os.path.join(get_mdel_base_dir(), 'test', 'outputs'), os.path.join(get_resource_base_dir(), 'test', 'outputs'), ) resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200) demand_id = resp.json['data']['demand_id'] assert demand_id == '1-14-ILE-LA' resp = app.get('/mdel/test/status', params={'demand_id': demand_id}) data = resp.json['data'] assert data['closed'] is True assert data['status'] == 'accepted' assert data['comment'] == 'Dossier traité.' assert Demand.objects.get(demand_id='1-14-ILE-LA').status == 'accepted' Demand.objects.create(resource=setup, num='97-5', flow_type='AEC-LA', demand_id='97-5-AEC-LA') resp = app.get('/mdel/test/status', params={'demand_id': '97-5-AEC-LA'}, status=200) data = resp.json['data'] assert data['closed'] is False assert data['status'] == 'imported' assert data['comment'] == 'Le dossier a été reçu et sera traité prochainement.' assert Demand.objects.get(demand_id='97-5-AEC-LA').status == 'imported' Demand.objects.create(resource=setup, num='102-2', flow_type='AEC-LA', demand_id='102-2-AEC-LA') resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200) data = resp.json['data'] assert data['closed'] is True assert data['status'] == 'accepted' assert data['comment'] == 'Dossier accepté' assert Demand.objects.get(demand_id='102-2-AEC-LA').status == 'accepted' # bad zipfile filepath = os.path.join(get_resource_base_dir(), 'test', 'outputs', '102-2-aec-la--4.zip') with open(filepath, 'w') as f: f.write(' ') resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200) assert resp.json['err_desc'] == 'zipfile error' def test_get_status_unknown_demand(app, setup): resp = app.get('/mdel/test/status', params={'demand_id': '1-14-ILE-LA'}) assert resp.json['err_desc'] == 'demand 1-14-ILE-LA does not exist' def test_get_status_no_response(app, setup): shutil.copytree( os.path.join(get_mdel_base_dir(), 'test', 'outputs'), os.path.join(get_resource_base_dir(), 'test', 'outputs'), ) Demand.objects.create(resource=setup, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA') resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200) data = resp.json['data'] assert data['closed'] is False assert data['status'] is None assert data['comment'] == '' assert Demand.objects.get(demand_id='1-15-ILE-LA').status is None def test_get_not_closed_status(app, setup): shutil.copytree( os.path.join(get_mdel_base_dir(), 'test', 'outputs'), os.path.join(get_resource_base_dir(), 'test', 'outputs'), ) Demand.objects.create(resource=setup, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA') resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200) data = resp.json['data'] assert data['closed'] is False assert data['status'] == 'in progress' assert data['comment'] == 'Dossier en cours de traitement.' assert Demand.objects.get(demand_id='15-9-AEC-LA').status == 'in progress' def test_data_source_applicants(app, setup): resp = app.get('/mdel/test/applicants', status=200) data = resp.json['data'] assert len(data) == 9 # test without PersonneConcernee and Representant params = {'without': 'PersonneConcernee,Representant'} resp = app.get('/mdel/test/applicants', params=params, status=200) data = resp.json['data'] assert len(data) == 7 def test_data_source_certificates(app, setup): resp = app.get('/mdel/test/certificates', status=200) data = resp.json['data'] for datum in data: if datum['id'] == 'NAISSANCE': assert datum['text'] == 'Acte de naissance' elif datum['id'] == 'MARIAGE': assert datum['text'] == 'Acte de mariage' else: assert datum['text'] == 'Acte de décès' def test_data_source_certificate_types(app, setup): resp = app.get('/mdel/test/certificate-types', status=200) data = resp.json['data'] assert len(data) == 4 for datum in data: if datum['id'] == 'COPIE-INTEGRALE': assert datum['text'] == 'Copie intégrale' elif datum['id'] == 'EXTRAIT-AVEC-FILIATION': assert datum['text'] == 'Extrait avec filiation' elif datum['id'] == 'EXTRAIT-SANS-FILIATION': assert datum['text'] == 'Extrait sans filiation' else: assert datum['text'] == 'Extrait plurilingue' # test without COPIE-INTEGRALE and EXTRAIT-AVEC-FILIATION resp = app.get( '/mdel/test/certificate-types', params={'without': 'COPIE-INTEGRALE,EXTRAIT-AVEC-FILIATION'}, status=200, ) data = resp.json['data'] assert len(data) == 2 assert 'EXTRAIT-SANS-FILIATION' in (data[0]['id'], data[1]['id']) assert 'EXTRAIT-PLURILINGUE' in (data[0]['id'], data[1]['id']) def test_date_parsing(): from passerelle.utils.jsonresponse import APIError with pytest.raises(APIError) as error: parse_date('2018-02-29') assert 'day is out of range for month' in str(error) with pytest.raises(APIError) as error: parse_date('28-02-2018') for text in ('date', '28-02-2018', 'not iso-formated'): assert text in str(error) def test_aec_filenames_and_routing(app, setup): aec_payload = json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json')) resp = app.post_json('/mdel/test/create', params=aec_payload, status=200) assert resp.json['data']['demand_id'] == '15-4-AEC-LA' basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0') assert os.path.exists(basedir) check_zip_file( basedir + '.zip', ['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'], ) doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml') assert os.path.exists(doc) validate_schema(doc, AEC_XSD) ent = os.path.join(basedir, '15-4-EtatCivil-ent-0.xml') assert os.path.exists(ent) root = letree.parse(ent).getroot() num_dem = root[0] assert letree.QName(num_dem).localname == 'NumeroDemarche' assert num_dem.text == 'EtatCivil' ns = {'ns': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} assert root.find('ns:Document/ns:Code', namespaces=ns).text == 'ActeEtatCivil-XML' assert root.find('ns:Document/ns:Nom', namespaces=ns).text == 'ActeEtatCivil-XML' assert ( root.find('ns:Document/ns:FichierFormulaire/ns:FichierDonnees', namespaces=ns).text == '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' ) message = os.path.join(basedir, 'message.xml') assert os.path.exists(message) root = letree.parse(message).getroot() ns = { 'ns1': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier', 'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1', } assert root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:NumeroDemarche', namespaces=ns).text == 'EtatCivil' assert ( root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Code', namespaces=ns).text == 'ActeEtatCivil-XML' ) assert ( root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Nom', namespaces=ns).text == 'ActeEtatCivil-XML' ) assert ( root.find( 'ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:FichierFormulaire/ns1:FichierDonnees', namespaces=ns, ).text == '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' ) def resource_logs(caplog): return [record.msg for record in caplog.records if record.name == 'passerelle.resource.mdel.test'] def test_create_aec_demand_with_output_sftp_error(app, setup, aec_payload, sftpserver, caplog): caplog.set_level('WARNING') setup.outcoming_sftp = SFTP( 'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver) ) setup.save() app.post_json('/mdel/test/create', params=aec_payload, status=200) setup.hourly() demand = setup.demand_set.get() assert not demand.sent assert resource_logs(caplog) == ['Demand %s (pk %s) could not be sent: %s'] def test_create_aec_demand_with_input_sftp_error(app, setup, aec_payload, sftpserver, caplog): caplog.set_level('WARNING') setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver)) setup.save() app.post_json('/mdel/test/create', params=aec_payload, status=200) setup.hourly() assert resource_logs(caplog) == ['Could not retrieve all responses: %s']