passerelle/tests/test_mdel.py

758 lines
33 KiB
Python

# Passerelle - uniform access to data and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a.deepcopy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import copy
import json
import os
import pathlib
import shutil
import zipfile
from xml.etree import ElementTree as etree
import pytest
from lxml import etree as letree
import tests.utils
from passerelle.apps.mdel.mdel import AttachedFile, Description, Message, get_resource_base_dir
from passerelle.apps.mdel.models import MDEL, Demand
from passerelle.apps.mdel.utils import parse_date
from passerelle.utils import SFTP
AEC_XSD = 'ActeEtatCivil-V1.A.xsd'
ILE_XSD = 'ILE_v1.1.xsd'
def get_mdel_base_dir():
return os.path.join(os.path.dirname(__file__), 'data', 'mdel')
def get_file_from_test_base_dir(filename):
path = os.path.join(get_mdel_base_dir(), filename)
with open(path, 'rb') as fd:
return fd.read()
def validate_schema(doc, xsd):
xsd_path = os.path.join(get_mdel_base_dir(), xsd)
xsd = letree.parse(xsd_path).getroot()
schema = letree.XMLSchema(xsd)
parser = letree.XMLParser(schema=schema)
letree.parse(doc, parser=parser) # Will raise an exception if schema not respected
def check_zip_file(zipdir, expected_files):
# check files order in zip
with zipfile.ZipFile(zipdir, 'r') as zipres:
files = [f.filename for f in zipres.infolist()]
assert sorted(files[: len(expected_files)]) == sorted(expected_files)
@pytest.fixture
def setup(db):
return tests.utils.setup_access_rights(MDEL.objects.create(slug='test'))
@pytest.fixture(
params=[
json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json')),
json.loads(get_file_from_test_base_dir('formdata_aec_mariage.json')),
json.loads(get_file_from_test_base_dir('formdata_aec_deces.json')),
],
ids=['naissance', 'mariage', 'deces'],
)
def aec_payload(request):
return request.param
ILE_PAYLOAD = json.loads(get_file_from_test_base_dir('formdata.json'))
def test_message():
ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'}
xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'
message = Message('ILE-LA', '77', '94600')
xml = message.xml
assert xml.get('xmlns') == xmlns
assert len(list(xml)) == 2
routing = xml.find('mdel:Header', ns).find('mdel:Routing', ns)
assert routing.find('mdel:MessageId', ns).text == '77'
assert routing.find('mdel:FlowType', ns).text == 'ILE-LA'
aller = xml.find('mdel:Body', ns).find('mdel:Content', ns).find('mdel:Aller', ns)
assert len(list(aller)) == 3
assert (
aller.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml'
)
assert aller.find('Teledemarche/IdentifiantPlateforme').text == '1'
def test_description():
attached_files = [
AttachedFile('JI', 'passport', base64.b64encode(b'this is passport')),
AttachedFile('JD', 'energy_bill', base64.b64encode(b'this is edf_mai_2016')),
]
description = Description('ILE-LA', '77', '94600', attached_files=attached_files)
xml = description.xml
assert len(list(xml)) == 5
assert len(xml.findall('PieceJointe')) == 2
assert xml.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml'
assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1'
def test_invalid_demand_type(app, setup):
ILE_PAYLOAD_INVALID = copy.deepcopy(ILE_PAYLOAD)
ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever'
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200)
assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']"
def test_invalid_demand_no_form_number(app, setup):
ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ILE_PAYLOAD)
ILE_PAYLOAD_INVALID_NO.pop('display_id')
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID_NO, status=200)
assert resp.json['err_desc'] == 'display_id is required'
def test_create_rco_demand_type(app, setup):
RCO_PAYLOAD = copy.deepcopy(ILE_PAYLOAD)
RCO_PAYLOAD['extra']['demand_type'] = 'rco-la'
resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200)
assert resp.json['err_desc'] == 'RCO-LA processing not implemented'
def test_create_aec_demand_type(app, setup, aec_payload):
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
if aec_payload['display_id'] == '15-4':
assert resp.json['data']['demand_id'] == '15-4-AEC-LA'
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0')
doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
validate_schema(doc, AEC_XSD)
check_zip_file(
basedir + '.zip',
['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
)
root = etree.parse(doc).getroot()
assert root.tag == 'EnveloppeMetierType'
# Generic tags
assert root.find('DemandeActe/TypeActe/Code').text == 'NAISSANCE'
assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE'
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'CertificatDeNationaliteFrancaise'
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
assert root.find('DemandeActe/DateActe').text == '1958-05-19'
assert root.find('DemandeActe/NombreExemplaires').text == '2'
# Requester
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Fils'
assert root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays').text == 'Suisse'
assert (
root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse').text
== '3ème, Bâtiment B, 37 rue de Paris, 3800 Bern'
)
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal')
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville')
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie')
# Concerned
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MADAME'
assert root.find('DemandeActe/Titulaire/Nationalite').text == 'FRA'
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1958-05-19'
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA'
assert root.find('DemandeActe/Titulaire/DepartementDeNaissance').text == '54'
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever'
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Kim'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz'
elif aec_payload['display_id'] == '16-1':
assert resp.json['data']['demand_id'] == '16-1-AEC-LA'
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-EtatCivil-0')
doc = os.path.join(basedir, '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
validate_schema(doc, AEC_XSD)
check_zip_file(
basedir + '.zip',
['message.xml', '16-1-EtatCivil-ent-0.xml', '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
)
validate_schema(doc, AEC_XSD)
root = etree.parse(doc).getroot()
assert root.tag == 'EnveloppeMetierType'
# Generic tags
assert root.find('DemandeActe/TypeActe/Code').text == 'MARIAGE'
assert root.find('DemandeActe/NatureActe/Code').text == 'EXTRAIT-AVEC-FILIATION'
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre'
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
assert root.find('DemandeActe/DateActe').text == '2008-08-18'
assert root.find('DemandeActe/NombreExemplaires').text == '3'
# Requester
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre'
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Libelle').text == 'Sa soeur'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == '22 rue Danton'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '94270'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Kremlin Bicetre'
# Concerned
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR'
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever'
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'PRK'
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19'
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Josh'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz'
# Concerned2
assert root.find('DemandeActe/Titulaire2/Civilite').text == 'MADAME'
assert root.find('DemandeActe/Titulaire2/NomNaissance').text == 'Kokey'
assert root.find('DemandeActe/Titulaire2/PaysDeNaissance').text == 'SEN'
assert root.find('DemandeActe/Titulaire2/DateDeNaissance').text == '1980-03-12'
assert root.find('DemandeActe/Titulaire2/Prenoms').text == 'Sarah'
assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Nom').text == 'De'
assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Prenoms').text == 'Coudy'
assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Nom').text == 'Kokey'
assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal'
else:
assert resp.json['data']['demand_id'] == '17-1-AEC-LA'
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-EtatCivil-0')
doc = os.path.join(basedir, '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
validate_schema(doc, AEC_XSD)
check_zip_file(
basedir + '.zip',
['message.xml', '17-1-EtatCivil-ent-0.xml', '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
)
validate_schema(doc, AEC_XSD)
root = etree.parse(doc).getroot()
assert root.tag == 'EnveloppeMetierType'
# Generic tags
assert root.find('DemandeActe/TypeActe/Code').text == 'DECES'
assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE'
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre'
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
assert root.find('DemandeActe/DateActe').text == '2014-04-26'
assert root.find('DemandeActe/NombreExemplaires').text == '4'
# Requester
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '54000'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Nancy'
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == "37 Rue de l'Aigle Blanc"
assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays')
assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse')
# Concerned
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR'
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Yamamoto'
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA'
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19'
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Yosuke'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Ino'
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Haruka'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Yamamoto'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Ryu'
def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload):
AEC_PAYLOAD = dict(aec_payload)
display_id = AEC_PAYLOAD['display_id']
AEC_PAYLOAD['fields']['logitud_commentaire_usager'] = 'gentle user comment'
app.post_json('/mdel/test/create', params=aec_payload, status=200)
# checking that attached files are referenced in -ent-.xml file
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id)
desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id)
root = etree.parse(desc).getroot()
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
assert root.find('ns2:LocalAccess/ns2:CommentaireUsager', namespaces=ns).text == 'gentle user comment'
def test_create_aec_demand_with_output_sftp(app, setup, aec_payload, sftpserver, caplog):
setup.outcoming_sftp = SFTP(
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
)
setup.save()
app.post_json('/mdel/test/create', params=aec_payload, status=200)
caplog.clear()
demand = setup.demand_set.filter(sent=False).get()
assert not demand.sent
content_object = {'output': {}}
with sftpserver.serve_content(content_object):
setup.hourly()
assert not setup.demand_set.filter(sent=False).exists()
# check zip file was transfered
assert set(content_object['output']) == {demand.filename}
with open(demand.filepath, 'rb') as fd:
assert content_object['output'][demand.filename] == fd.read()
assert caplog.messages[-1:] == [f'Demand {demand.name} (pk {demand.num}) sent.']
def test_create_aec_demand_with_input_sftp(app, setup, aec_payload, sftpserver, caplog):
setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver))
setup.save()
with sftpserver.serve_content({'input': {'whatever': 'content'}}):
setup.hourly()
assert (pathlib.Path(setup.output_dir) / 'whatever').read_text() == 'content'
assert caplog.messages[-1:] == ['Retrieved new responses: whatever']
def test_create_aec_demand_type_without_date_acte(app, setup):
payload = json.loads(get_file_from_test_base_dir('formdata_aec_deces.json'))
payload['fields'].pop('date_acte')
resp = app.post_json('/mdel/test/create', params=payload, status=200)
assert resp.json['err_desc'] == '<date_acte> is required'
def test_create_aec_demand_type_with_files(app, setup, aec_payload):
AEC_PAYLOAD = dict(aec_payload)
display_id = AEC_PAYLOAD['display_id']
AEC_PAYLOAD['fields'].update(
{
'justificatif_identite_cni': {
'content': 'data',
'content_type': 'application/pdf',
'filename': 'cni.pdf',
},
'justificatif_domicile': {
'content': 'data',
'content_type': 'application/pdf',
'filename': 'quittance.pdf',
},
}
)
resp = app.post_json('/mdel/test/create', params=AEC_PAYLOAD, status=200)
assert resp.json['data']['demand_id'] == '%s-AEC-LA' % display_id
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id)
doc = os.path.join(basedir, '%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id)
validate_schema(doc, AEC_XSD)
expected_files = [
'%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id,
'message.xml',
'%s-EtatCivil-ent-0.xml' % display_id,
'cni.pdf',
'quittance.pdf',
]
for fname in os.listdir(basedir):
assert fname in expected_files
# checking that attached files are referenced in -ent-.xml file
desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id)
root = etree.parse(desc).getroot()
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns
attached_files = root.findall('ns2:PieceJointe', namespaces=ns)
assert len(attached_files) == 2
for afile in attached_files:
if afile.find('ns2:Intitule', namespaces=ns).text == 'cni.pdf':
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
assert afile.find('ns2:Fichier', namespaces=ns).text == 'cni.pdf'
else:
assert afile.find('ns2:Intitule', namespaces=ns).text == 'quittance.pdf'
assert afile.find('ns2:Code', namespaces=ns).text == 'JD'
assert afile.find('ns2:Fichier', namespaces=ns).text == 'quittance.pdf'
check_zip_file(
basedir + '.zip',
[
'message.xml',
'%s-EtatCivil-ent-0.xml' % display_id,
'%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id,
'quittance.pdf',
'cni.pdf',
],
)
def test_create_ile_demand_type(app, setup):
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200)
assert resp.json['data']['demand_id'] == '1-14-ILE-LA'
base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0')
doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml')
validate_schema(doc, ILE_XSD)
root = etree.parse(doc).getroot()
assert root.tag == 'AvisDInscription'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/Localite').text == 'Nancy'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/CodePostal').text == '54000'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/TypeVoie').text == 'RUE'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/NomVoie').text == 'RUE DU CHEVAL BLANC'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/NumeroVoie').text == '37'
assert root.find('Inscription/Electeur/AdresseDeLElecteur/Complement').text == 'Apt 4112, Batiment B'
assert root.find('Inscription/Electeur/Nationalite').text == 'FRA'
assert root.find('Inscription/Electeur/Noms/NomFamille').text == 'whatever'
assert root.find('Inscription/Electeur/Prenoms/Prenom').text == 'chelsea'
assert root.find('Inscription/Electeur/MethodeDeContact/URI').text == 'chelsea@whatever.com'
assert root.find('Inscription/Electeur/MethodeDeContact/CanalCode').text == 'EMAIL'
assert root.find('Inscription/Electeur/Sexe').text == 'F'
assert root.find('Inscription/Electeur/LieuDeNaissance/Pays').text == 'CAN'
assert root.find('Inscription/Electeur/LieuDeNaissance/Localite').text == 'Vancouver'
assert root.find('Inscription/Electeur/LieuDeNaissance/CodePostal').text == 'V56 B68'
assert root.find('Inscription/Electeur/DateDeNaissance').text == '2014-06-11'
assert root.find('Inscription/TypeDeListe').text == 'cm'
assert root.find('TypeDInscription').text == 'vol'
assert root.find('SituationElectoraleAnterieure/SituationDeLElecteur').text == 'cci'
assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Pays').text == 'BEL'
assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Localite').text == 'Bruxelles'
assert (
root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text
== 'Whatever'
)
# checking that attached files are referenced in -ent-.xml file
desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml')
root = etree.parse(desc).getroot()
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns
assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14'
assert root.find('ns2:Routage/ns2:Donnee/ns2:Id', namespaces=ns).text == 'CodeINSEE'
assert root.find('ns2:Routage/ns2:Donnee/ns2:Valeur', namespaces=ns).text == '54395'
assert root.find('ns2:Document/ns2:Code', namespaces=ns).text == '1-14-ILE-LA'
assert root.find('ns2:Document/ns2:Nom', namespaces=ns).text == '1-14-ILE-LA'
assert (
root.find('ns2:Document/ns2:FichierFormulaire/ns2:FichierDonnees', namespaces=ns).text
== '1-14-ILE-LA-doc-.xml'
)
attached_files = root.findall('ns2:PieceJointe', namespaces=ns)
assert len(attached_files) == 3
for afile in attached_files:
if afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_recto.pdf':
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_recto.pdf'
elif afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_verso.pdf':
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_verso.pdf'
else:
assert afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_edf.pdf'
assert afile.find('ns2:Code', namespaces=ns).text == 'JD'
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_edf.pdf'
expected_files = [
'message.xml',
'1-14-ILE-LA-doc-.xml',
'1-14-ILE-LA-ent-.xml',
'mdel_passeport_recto.pdf',
'mdel_passeport_verso.pdf',
'mdel_edf.pdf',
]
for fname in os.listdir(base_doc):
assert fname in expected_files
# Without anterieur_situation_raw
payload = copy.deepcopy(ILE_PAYLOAD)
payload['fields'].pop('anterieur_situation_raw')
resp = app.post_json('/mdel/test/create', params=payload, status=200)
assert resp.json['err_desc'] == 'anterieur_situation_raw is required'
def test_create_ile_demand_type_invalid_document_proof(app, setup):
# test with missing key
payload = json.loads(get_file_from_test_base_dir('formdata.json'))
payload['fields'].pop('justificatif_domicile_hebergeur')
resp = app.post_json('/mdel/test/create', params=payload, status=200)
assert resp.json['err_desc'] == 'justificatif_domicile and all its attributes are required'
# test with invalid content
payload = json.loads(get_file_from_test_base_dir('formdata.json'))
payload['fields']['justificatif_identite'] = None
payload['fields']['justificatif_identite_verso'] = None
resp = app.post_json('/mdel/test/create', params=payload, status=200)
assert resp.json['err_desc'] == 'justificatif_identite and all its attributes are required'
def test_get_status(app, setup):
shutil.copytree(
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
)
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200)
demand_id = resp.json['data']['demand_id']
assert demand_id == '1-14-ILE-LA'
resp = app.get('/mdel/test/status', params={'demand_id': demand_id})
data = resp.json['data']
assert data['closed'] is True
assert data['status'] == 'accepted'
assert data['comment'] == 'Dossier traité.'
assert Demand.objects.get(demand_id='1-14-ILE-LA').status == 'accepted'
Demand.objects.create(resource=setup, num='97-5', flow_type='AEC-LA', demand_id='97-5-AEC-LA')
resp = app.get('/mdel/test/status', params={'demand_id': '97-5-AEC-LA'}, status=200)
data = resp.json['data']
assert data['closed'] is False
assert data['status'] == 'imported'
assert data['comment'] == 'Le dossier a été reçu et sera traité prochainement.'
assert Demand.objects.get(demand_id='97-5-AEC-LA').status == 'imported'
Demand.objects.create(resource=setup, num='102-2', flow_type='AEC-LA', demand_id='102-2-AEC-LA')
resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200)
data = resp.json['data']
assert data['closed'] is True
assert data['status'] == 'accepted'
assert data['comment'] == 'Dossier accepté'
assert Demand.objects.get(demand_id='102-2-AEC-LA').status == 'accepted'
# bad zipfile
filepath = os.path.join(get_resource_base_dir(), 'test', 'outputs', '102-2-aec-la--4.zip')
with open(filepath, 'w') as f:
f.write(' ')
resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200)
assert resp.json['err_desc'] == 'zipfile error'
def test_get_status_unknown_demand(app, setup):
resp = app.get('/mdel/test/status', params={'demand_id': '1-14-ILE-LA'})
assert resp.json['err_desc'] == 'demand 1-14-ILE-LA does not exist'
def test_get_status_no_response(app, setup):
shutil.copytree(
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
)
Demand.objects.create(resource=setup, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA')
resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200)
data = resp.json['data']
assert data['closed'] is False
assert data['status'] is None
assert data['comment'] == ''
assert Demand.objects.get(demand_id='1-15-ILE-LA').status is None
def test_get_not_closed_status(app, setup):
shutil.copytree(
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
)
Demand.objects.create(resource=setup, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA')
resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200)
data = resp.json['data']
assert data['closed'] is False
assert data['status'] == 'in progress'
assert data['comment'] == 'Dossier en cours de traitement.'
assert Demand.objects.get(demand_id='15-9-AEC-LA').status == 'in progress'
def test_data_source_applicants(app, setup):
resp = app.get('/mdel/test/applicants', status=200)
data = resp.json['data']
assert len(data) == 9
# test without PersonneConcernee and Representant
params = {'without': 'PersonneConcernee,Representant'}
resp = app.get('/mdel/test/applicants', params=params, status=200)
data = resp.json['data']
assert len(data) == 7
def test_data_source_certificates(app, setup):
resp = app.get('/mdel/test/certificates', status=200)
data = resp.json['data']
for datum in data:
if datum['id'] == 'NAISSANCE':
assert datum['text'] == 'Acte de naissance'
elif datum['id'] == 'MARIAGE':
assert datum['text'] == 'Acte de mariage'
else:
assert datum['text'] == 'Acte de décès'
def test_data_source_certificate_types(app, setup):
resp = app.get('/mdel/test/certificate-types', status=200)
data = resp.json['data']
assert len(data) == 4
for datum in data:
if datum['id'] == 'COPIE-INTEGRALE':
assert datum['text'] == 'Copie intégrale'
elif datum['id'] == 'EXTRAIT-AVEC-FILIATION':
assert datum['text'] == 'Extrait avec filiation'
elif datum['id'] == 'EXTRAIT-SANS-FILIATION':
assert datum['text'] == 'Extrait sans filiation'
else:
assert datum['text'] == 'Extrait plurilingue'
# test without COPIE-INTEGRALE and EXTRAIT-AVEC-FILIATION
resp = app.get(
'/mdel/test/certificate-types',
params={'without': 'COPIE-INTEGRALE,EXTRAIT-AVEC-FILIATION'},
status=200,
)
data = resp.json['data']
assert len(data) == 2
assert 'EXTRAIT-SANS-FILIATION' in (data[0]['id'], data[1]['id'])
assert 'EXTRAIT-PLURILINGUE' in (data[0]['id'], data[1]['id'])
def test_date_parsing():
from passerelle.utils.jsonresponse import APIError
with pytest.raises(APIError) as error:
parse_date('2018-02-29')
assert 'day is out of range for month' in str(error)
with pytest.raises(APIError) as error:
parse_date('28-02-2018')
for text in ('date', '28-02-2018', 'not iso-formated'):
assert text in str(error)
def test_aec_filenames_and_routing(app, setup):
aec_payload = json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json'))
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
assert resp.json['data']['demand_id'] == '15-4-AEC-LA'
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0')
assert os.path.exists(basedir)
check_zip_file(
basedir + '.zip',
['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
)
doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
assert os.path.exists(doc)
validate_schema(doc, AEC_XSD)
ent = os.path.join(basedir, '15-4-EtatCivil-ent-0.xml')
assert os.path.exists(ent)
root = letree.parse(ent).getroot()
num_dem = root[0]
assert letree.QName(num_dem).localname == 'NumeroDemarche'
assert num_dem.text == 'EtatCivil'
ns = {'ns': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
assert root.find('ns:Document/ns:Code', namespaces=ns).text == 'ActeEtatCivil-XML'
assert root.find('ns:Document/ns:Nom', namespaces=ns).text == 'ActeEtatCivil-XML'
assert (
root.find('ns:Document/ns:FichierFormulaire/ns:FichierDonnees', namespaces=ns).text
== '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'
)
message = os.path.join(basedir, 'message.xml')
assert os.path.exists(message)
root = letree.parse(message).getroot()
ns = {
'ns1': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier',
'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1',
}
assert root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:NumeroDemarche', namespaces=ns).text == 'EtatCivil'
assert (
root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Code', namespaces=ns).text
== 'ActeEtatCivil-XML'
)
assert (
root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Nom', namespaces=ns).text
== 'ActeEtatCivil-XML'
)
assert (
root.find(
'ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:FichierFormulaire/ns1:FichierDonnees',
namespaces=ns,
).text
== '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'
)
def resource_logs(caplog):
return [record.msg for record in caplog.records if record.name == 'passerelle.resource.mdel.test']
def test_create_aec_demand_with_output_sftp_error(app, setup, aec_payload, sftpserver, caplog):
caplog.set_level('WARNING')
setup.outcoming_sftp = SFTP(
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
)
setup.save()
app.post_json('/mdel/test/create', params=aec_payload, status=200)
setup.hourly()
demand = setup.demand_set.get()
assert not demand.sent
assert resource_logs(caplog) == ['Demand %s (pk %s) could not be sent: %s']
def test_create_aec_demand_with_input_sftp_error(app, setup, aec_payload, sftpserver, caplog):
caplog.set_level('WARNING')
setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver))
setup.save()
app.post_json('/mdel/test/create', params=aec_payload, status=200)
setup.hourly()
assert resource_logs(caplog) == ['Could not retrieve all responses: %s']