758 lines
33 KiB
Python
758 lines
33 KiB
Python
# Passerelle - uniform access to data and services
|
|
# Copyright (C) 2015 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a.deepcopy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import copy
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import zipfile
|
|
from xml.etree import ElementTree as etree
|
|
|
|
import pytest
|
|
from lxml import etree as letree
|
|
|
|
import tests.utils
|
|
from passerelle.apps.mdel.mdel import AttachedFile, Description, Message, get_resource_base_dir
|
|
from passerelle.apps.mdel.models import MDEL, Demand
|
|
from passerelle.apps.mdel.utils import parse_date
|
|
from passerelle.utils import SFTP
|
|
|
|
AEC_XSD = 'ActeEtatCivil-V1.A.xsd'
|
|
ILE_XSD = 'ILE_v1.1.xsd'
|
|
|
|
|
|
def get_mdel_base_dir():
|
|
return os.path.join(os.path.dirname(__file__), 'data', 'mdel')
|
|
|
|
|
|
def get_file_from_test_base_dir(filename):
|
|
path = os.path.join(get_mdel_base_dir(), filename)
|
|
with open(path, 'rb') as fd:
|
|
return fd.read()
|
|
|
|
|
|
def validate_schema(doc, xsd):
|
|
xsd_path = os.path.join(get_mdel_base_dir(), xsd)
|
|
xsd = letree.parse(xsd_path).getroot()
|
|
schema = letree.XMLSchema(xsd)
|
|
parser = letree.XMLParser(schema=schema)
|
|
letree.parse(doc, parser=parser) # Will raise an exception if schema not respected
|
|
|
|
|
|
def check_zip_file(zipdir, expected_files):
|
|
# check files order in zip
|
|
with zipfile.ZipFile(zipdir, 'r') as zipres:
|
|
files = [f.filename for f in zipres.infolist()]
|
|
assert sorted(files[: len(expected_files)]) == sorted(expected_files)
|
|
|
|
|
|
@pytest.fixture
|
|
def setup(db):
|
|
return tests.utils.setup_access_rights(MDEL.objects.create(slug='test'))
|
|
|
|
|
|
@pytest.fixture(
|
|
params=[
|
|
json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json')),
|
|
json.loads(get_file_from_test_base_dir('formdata_aec_mariage.json')),
|
|
json.loads(get_file_from_test_base_dir('formdata_aec_deces.json')),
|
|
],
|
|
ids=['naissance', 'mariage', 'deces'],
|
|
)
|
|
def aec_payload(request):
|
|
return request.param
|
|
|
|
|
|
ILE_PAYLOAD = json.loads(get_file_from_test_base_dir('formdata.json'))
|
|
|
|
|
|
def test_message():
|
|
ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'}
|
|
xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'
|
|
message = Message('ILE-LA', '77', '94600')
|
|
xml = message.xml
|
|
assert xml.get('xmlns') == xmlns
|
|
assert len(list(xml)) == 2
|
|
routing = xml.find('mdel:Header', ns).find('mdel:Routing', ns)
|
|
assert routing.find('mdel:MessageId', ns).text == '77'
|
|
assert routing.find('mdel:FlowType', ns).text == 'ILE-LA'
|
|
aller = xml.find('mdel:Body', ns).find('mdel:Content', ns).find('mdel:Aller', ns)
|
|
assert len(list(aller)) == 3
|
|
assert (
|
|
aller.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml'
|
|
)
|
|
assert aller.find('Teledemarche/IdentifiantPlateforme').text == '1'
|
|
|
|
|
|
def test_description():
|
|
attached_files = [
|
|
AttachedFile('JI', 'passport', base64.b64encode(b'this is passport')),
|
|
AttachedFile('JD', 'energy_bill', base64.b64encode(b'this is edf_mai_2016')),
|
|
]
|
|
|
|
description = Description('ILE-LA', '77', '94600', attached_files=attached_files)
|
|
xml = description.xml
|
|
assert len(list(xml)) == 5
|
|
assert len(xml.findall('PieceJointe')) == 2
|
|
assert xml.find('Document').find('FichierFormulaire').find('FichierDonnees').text == '77-ILE-LA-doc-.xml'
|
|
assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1'
|
|
|
|
|
|
def test_invalid_demand_type(app, setup):
|
|
ILE_PAYLOAD_INVALID = copy.deepcopy(ILE_PAYLOAD)
|
|
ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever'
|
|
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200)
|
|
assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']"
|
|
|
|
|
|
def test_invalid_demand_no_form_number(app, setup):
|
|
ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ILE_PAYLOAD)
|
|
ILE_PAYLOAD_INVALID_NO.pop('display_id')
|
|
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID_NO, status=200)
|
|
assert resp.json['err_desc'] == 'display_id is required'
|
|
|
|
|
|
def test_create_rco_demand_type(app, setup):
|
|
RCO_PAYLOAD = copy.deepcopy(ILE_PAYLOAD)
|
|
RCO_PAYLOAD['extra']['demand_type'] = 'rco-la'
|
|
resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200)
|
|
assert resp.json['err_desc'] == 'RCO-LA processing not implemented'
|
|
|
|
|
|
def test_create_aec_demand_type(app, setup, aec_payload):
|
|
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
|
|
if aec_payload['display_id'] == '15-4':
|
|
assert resp.json['data']['demand_id'] == '15-4-AEC-LA'
|
|
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0')
|
|
doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
|
|
validate_schema(doc, AEC_XSD)
|
|
check_zip_file(
|
|
basedir + '.zip',
|
|
['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
|
|
)
|
|
|
|
root = etree.parse(doc).getroot()
|
|
|
|
assert root.tag == 'EnveloppeMetierType'
|
|
|
|
# Generic tags
|
|
assert root.find('DemandeActe/TypeActe/Code').text == 'NAISSANCE'
|
|
assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE'
|
|
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'CertificatDeNationaliteFrancaise'
|
|
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
|
|
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
|
|
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
|
|
assert root.find('DemandeActe/DateActe').text == '1958-05-19'
|
|
assert root.find('DemandeActe/NombreExemplaires').text == '2'
|
|
|
|
# Requester
|
|
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
|
|
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
|
|
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
|
|
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
|
|
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
|
|
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Fils'
|
|
assert root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays').text == 'Suisse'
|
|
assert (
|
|
root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse').text
|
|
== '3ème, Bâtiment B, 37 rue de Paris, 3800 Bern'
|
|
)
|
|
|
|
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal')
|
|
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville')
|
|
assert not root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie')
|
|
|
|
# Concerned
|
|
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MADAME'
|
|
assert root.find('DemandeActe/Titulaire/Nationalite').text == 'FRA'
|
|
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1958-05-19'
|
|
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA'
|
|
assert root.find('DemandeActe/Titulaire/DepartementDeNaissance').text == '54'
|
|
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever'
|
|
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Kim'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz'
|
|
|
|
elif aec_payload['display_id'] == '16-1':
|
|
assert resp.json['data']['demand_id'] == '16-1-AEC-LA'
|
|
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-EtatCivil-0')
|
|
doc = os.path.join(basedir, '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
|
|
validate_schema(doc, AEC_XSD)
|
|
check_zip_file(
|
|
basedir + '.zip',
|
|
['message.xml', '16-1-EtatCivil-ent-0.xml', '16-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
|
|
)
|
|
validate_schema(doc, AEC_XSD)
|
|
|
|
root = etree.parse(doc).getroot()
|
|
|
|
assert root.tag == 'EnveloppeMetierType'
|
|
|
|
# Generic tags
|
|
assert root.find('DemandeActe/TypeActe/Code').text == 'MARIAGE'
|
|
assert root.find('DemandeActe/NatureActe/Code').text == 'EXTRAIT-AVEC-FILIATION'
|
|
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre'
|
|
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
|
|
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
|
|
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
|
|
assert root.find('DemandeActe/DateActe').text == '2008-08-18'
|
|
assert root.find('DemandeActe/NombreExemplaires').text == '3'
|
|
|
|
# Requester
|
|
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
|
|
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
|
|
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
|
|
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
|
|
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
|
|
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre'
|
|
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Libelle').text == 'Sa soeur'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == '22 rue Danton'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '94270'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Kremlin Bicetre'
|
|
|
|
# Concerned
|
|
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR'
|
|
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Whatever'
|
|
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'PRK'
|
|
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19'
|
|
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Josh'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Song'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Eloise'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Whatever'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz'
|
|
|
|
# Concerned2
|
|
assert root.find('DemandeActe/Titulaire2/Civilite').text == 'MADAME'
|
|
assert root.find('DemandeActe/Titulaire2/NomNaissance').text == 'Kokey'
|
|
assert root.find('DemandeActe/Titulaire2/PaysDeNaissance').text == 'SEN'
|
|
assert root.find('DemandeActe/Titulaire2/DateDeNaissance').text == '1980-03-12'
|
|
assert root.find('DemandeActe/Titulaire2/Prenoms').text == 'Sarah'
|
|
assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Nom').text == 'De'
|
|
assert root.find('DemandeActe/Titulaire2/Filiation/Mere/Prenoms').text == 'Coudy'
|
|
assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Nom').text == 'Kokey'
|
|
assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal'
|
|
|
|
else:
|
|
assert resp.json['data']['demand_id'] == '17-1-AEC-LA'
|
|
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-EtatCivil-0')
|
|
doc = os.path.join(basedir, '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
|
|
validate_schema(doc, AEC_XSD)
|
|
check_zip_file(
|
|
basedir + '.zip',
|
|
['message.xml', '17-1-EtatCivil-ent-0.xml', '17-1-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
|
|
)
|
|
validate_schema(doc, AEC_XSD)
|
|
|
|
root = etree.parse(doc).getroot()
|
|
|
|
assert root.tag == 'EnveloppeMetierType'
|
|
|
|
# Generic tags
|
|
assert root.find('DemandeActe/TypeActe/Code').text == 'DECES'
|
|
assert root.find('DemandeActe/NatureActe/Code').text == 'COPIE-INTEGRALE'
|
|
assert root.find('DemandeActe/MotifDemande/Commentaire').text == 'Autre'
|
|
assert root.find('DemandeActe/LieuActe/CodePostal').text == '54000'
|
|
assert root.find('DemandeActe/LieuActe/CodeINSEE').text == '54395'
|
|
assert root.find('DemandeActe/LieuActe/Ville').text == 'Nancy'
|
|
assert root.find('DemandeActe/DateActe').text == '2014-04-26'
|
|
assert root.find('DemandeActe/NombreExemplaires').text == '4'
|
|
|
|
# Requester
|
|
assert root.find('DemandeActe/Demandeur/Courriel').text == 'chelsea@whatever.com'
|
|
assert root.find('DemandeActe/Demandeur/Civilite').text == 'MADAME'
|
|
assert root.find('DemandeActe/Demandeur/Nom').text == 'Whatever'
|
|
assert root.find('DemandeActe/Demandeur/Prenom').text == 'Chelsea'
|
|
assert root.find('DemandeActe/Demandeur/Telephone').text == '0122334455'
|
|
assert root.find('DemandeActe/Demandeur/QualiteDemandeur/Code').text == 'Autre'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/CodePostal').text == '54000'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Ville').text == 'Nancy'
|
|
assert root.find('DemandeActe/Demandeur/AdresseFrancaise/Voie').text == "37 Rue de l'Aigle Blanc"
|
|
|
|
assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Pays')
|
|
assert not root.find('DemandeActe/Demandeur/AdresseEtrangere/Adresse')
|
|
|
|
# Concerned
|
|
assert root.find('DemandeActe/Titulaire/Civilite').text == 'MONSIEUR'
|
|
assert root.find('DemandeActe/Titulaire/NomNaissance').text == 'Yamamoto'
|
|
assert root.find('DemandeActe/Titulaire/PaysDeNaissance').text == 'FRA'
|
|
assert root.find('DemandeActe/Titulaire/DateDeNaissance').text == '1978-05-19'
|
|
assert root.find('DemandeActe/Titulaire/Prenoms').text == 'Yosuke'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Nom').text == 'Ino'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Mere/Prenoms').text == 'Haruka'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Yamamoto'
|
|
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Ryu'
|
|
|
|
|
|
def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload):
|
|
AEC_PAYLOAD = dict(aec_payload)
|
|
display_id = AEC_PAYLOAD['display_id']
|
|
AEC_PAYLOAD['fields']['logitud_commentaire_usager'] = 'gentle user comment'
|
|
app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
|
|
# checking that attached files are referenced in -ent-.xml file
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id)
|
|
desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id)
|
|
|
|
root = etree.parse(desc).getroot()
|
|
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
|
|
assert root.find('ns2:LocalAccess/ns2:CommentaireUsager', namespaces=ns).text == 'gentle user comment'
|
|
|
|
|
|
def test_create_aec_demand_with_output_sftp(app, setup, aec_payload, sftpserver, caplog):
|
|
setup.outcoming_sftp = SFTP(
|
|
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
|
|
)
|
|
setup.save()
|
|
app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
caplog.clear()
|
|
|
|
demand = setup.demand_set.filter(sent=False).get()
|
|
assert not demand.sent
|
|
content_object = {'output': {}}
|
|
with sftpserver.serve_content(content_object):
|
|
setup.hourly()
|
|
assert not setup.demand_set.filter(sent=False).exists()
|
|
# check zip file was transfered
|
|
assert set(content_object['output']) == {demand.filename}
|
|
with open(demand.filepath, 'rb') as fd:
|
|
assert content_object['output'][demand.filename] == fd.read()
|
|
assert caplog.messages[-1:] == [f'Demand {demand.name} (pk {demand.num}) sent.']
|
|
|
|
|
|
def test_create_aec_demand_with_input_sftp(app, setup, aec_payload, sftpserver, caplog):
|
|
setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver))
|
|
setup.save()
|
|
|
|
with sftpserver.serve_content({'input': {'whatever': 'content'}}):
|
|
setup.hourly()
|
|
assert (pathlib.Path(setup.output_dir) / 'whatever').read_text() == 'content'
|
|
assert caplog.messages[-1:] == ['Retrieved new responses: whatever']
|
|
|
|
|
|
def test_create_aec_demand_type_without_date_acte(app, setup):
|
|
payload = json.loads(get_file_from_test_base_dir('formdata_aec_deces.json'))
|
|
payload['fields'].pop('date_acte')
|
|
resp = app.post_json('/mdel/test/create', params=payload, status=200)
|
|
assert resp.json['err_desc'] == '<date_acte> is required'
|
|
|
|
|
|
def test_create_aec_demand_type_with_files(app, setup, aec_payload):
|
|
AEC_PAYLOAD = dict(aec_payload)
|
|
display_id = AEC_PAYLOAD['display_id']
|
|
AEC_PAYLOAD['fields'].update(
|
|
{
|
|
'justificatif_identite_cni': {
|
|
'content': 'data',
|
|
'content_type': 'application/pdf',
|
|
'filename': 'cni.pdf',
|
|
},
|
|
'justificatif_domicile': {
|
|
'content': 'data',
|
|
'content_type': 'application/pdf',
|
|
'filename': 'quittance.pdf',
|
|
},
|
|
}
|
|
)
|
|
resp = app.post_json('/mdel/test/create', params=AEC_PAYLOAD, status=200)
|
|
|
|
assert resp.json['data']['demand_id'] == '%s-AEC-LA' % display_id
|
|
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id)
|
|
|
|
doc = os.path.join(basedir, '%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id)
|
|
validate_schema(doc, AEC_XSD)
|
|
|
|
expected_files = [
|
|
'%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id,
|
|
'message.xml',
|
|
'%s-EtatCivil-ent-0.xml' % display_id,
|
|
'cni.pdf',
|
|
'quittance.pdf',
|
|
]
|
|
|
|
for fname in os.listdir(basedir):
|
|
assert fname in expected_files
|
|
|
|
# checking that attached files are referenced in -ent-.xml file
|
|
desc = os.path.join(basedir, '%s-EtatCivil-ent-0.xml' % display_id)
|
|
root = etree.parse(desc).getroot()
|
|
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
|
|
assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns
|
|
|
|
attached_files = root.findall('ns2:PieceJointe', namespaces=ns)
|
|
assert len(attached_files) == 2
|
|
|
|
for afile in attached_files:
|
|
if afile.find('ns2:Intitule', namespaces=ns).text == 'cni.pdf':
|
|
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
|
|
assert afile.find('ns2:Fichier', namespaces=ns).text == 'cni.pdf'
|
|
else:
|
|
assert afile.find('ns2:Intitule', namespaces=ns).text == 'quittance.pdf'
|
|
assert afile.find('ns2:Code', namespaces=ns).text == 'JD'
|
|
assert afile.find('ns2:Fichier', namespaces=ns).text == 'quittance.pdf'
|
|
|
|
check_zip_file(
|
|
basedir + '.zip',
|
|
[
|
|
'message.xml',
|
|
'%s-EtatCivil-ent-0.xml' % display_id,
|
|
'%s-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml' % display_id,
|
|
'quittance.pdf',
|
|
'cni.pdf',
|
|
],
|
|
)
|
|
|
|
|
|
def test_create_ile_demand_type(app, setup):
|
|
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200)
|
|
assert resp.json['data']['demand_id'] == '1-14-ILE-LA'
|
|
|
|
base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0')
|
|
doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml')
|
|
|
|
validate_schema(doc, ILE_XSD)
|
|
root = etree.parse(doc).getroot()
|
|
|
|
assert root.tag == 'AvisDInscription'
|
|
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/Localite').text == 'Nancy'
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/CodePostal').text == '54000'
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/TypeVoie').text == 'RUE'
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/NomVoie').text == 'RUE DU CHEVAL BLANC'
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/NumeroVoie').text == '37'
|
|
assert root.find('Inscription/Electeur/AdresseDeLElecteur/Complement').text == 'Apt 4112, Batiment B'
|
|
assert root.find('Inscription/Electeur/Nationalite').text == 'FRA'
|
|
assert root.find('Inscription/Electeur/Noms/NomFamille').text == 'whatever'
|
|
assert root.find('Inscription/Electeur/Prenoms/Prenom').text == 'chelsea'
|
|
assert root.find('Inscription/Electeur/MethodeDeContact/URI').text == 'chelsea@whatever.com'
|
|
assert root.find('Inscription/Electeur/MethodeDeContact/CanalCode').text == 'EMAIL'
|
|
assert root.find('Inscription/Electeur/Sexe').text == 'F'
|
|
assert root.find('Inscription/Electeur/LieuDeNaissance/Pays').text == 'CAN'
|
|
assert root.find('Inscription/Electeur/LieuDeNaissance/Localite').text == 'Vancouver'
|
|
assert root.find('Inscription/Electeur/LieuDeNaissance/CodePostal').text == 'V56 B68'
|
|
assert root.find('Inscription/Electeur/DateDeNaissance').text == '2014-06-11'
|
|
assert root.find('Inscription/TypeDeListe').text == 'cm'
|
|
assert root.find('TypeDInscription').text == 'vol'
|
|
assert root.find('SituationElectoraleAnterieure/SituationDeLElecteur').text == 'cci'
|
|
assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Pays').text == 'BEL'
|
|
assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/Localite').text == 'Bruxelles'
|
|
assert (
|
|
root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text
|
|
== 'Whatever'
|
|
)
|
|
|
|
# checking that attached files are referenced in -ent-.xml file
|
|
desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml')
|
|
root = etree.parse(desc).getroot()
|
|
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
|
|
assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns
|
|
assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14'
|
|
assert root.find('ns2:Routage/ns2:Donnee/ns2:Id', namespaces=ns).text == 'CodeINSEE'
|
|
assert root.find('ns2:Routage/ns2:Donnee/ns2:Valeur', namespaces=ns).text == '54395'
|
|
assert root.find('ns2:Document/ns2:Code', namespaces=ns).text == '1-14-ILE-LA'
|
|
assert root.find('ns2:Document/ns2:Nom', namespaces=ns).text == '1-14-ILE-LA'
|
|
assert (
|
|
root.find('ns2:Document/ns2:FichierFormulaire/ns2:FichierDonnees', namespaces=ns).text
|
|
== '1-14-ILE-LA-doc-.xml'
|
|
)
|
|
|
|
attached_files = root.findall('ns2:PieceJointe', namespaces=ns)
|
|
assert len(attached_files) == 3
|
|
|
|
for afile in attached_files:
|
|
if afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_recto.pdf':
|
|
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
|
|
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_recto.pdf'
|
|
elif afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_passeport_verso.pdf':
|
|
assert afile.find('ns2:Code', namespaces=ns).text == 'JI'
|
|
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_passeport_verso.pdf'
|
|
else:
|
|
assert afile.find('ns2:Intitule', namespaces=ns).text == 'mdel_edf.pdf'
|
|
assert afile.find('ns2:Code', namespaces=ns).text == 'JD'
|
|
assert afile.find('ns2:Fichier', namespaces=ns).text == 'mdel_edf.pdf'
|
|
|
|
expected_files = [
|
|
'message.xml',
|
|
'1-14-ILE-LA-doc-.xml',
|
|
'1-14-ILE-LA-ent-.xml',
|
|
'mdel_passeport_recto.pdf',
|
|
'mdel_passeport_verso.pdf',
|
|
'mdel_edf.pdf',
|
|
]
|
|
|
|
for fname in os.listdir(base_doc):
|
|
assert fname in expected_files
|
|
|
|
# Without anterieur_situation_raw
|
|
payload = copy.deepcopy(ILE_PAYLOAD)
|
|
payload['fields'].pop('anterieur_situation_raw')
|
|
resp = app.post_json('/mdel/test/create', params=payload, status=200)
|
|
assert resp.json['err_desc'] == 'anterieur_situation_raw is required'
|
|
|
|
|
|
def test_create_ile_demand_type_invalid_document_proof(app, setup):
|
|
# test with missing key
|
|
payload = json.loads(get_file_from_test_base_dir('formdata.json'))
|
|
payload['fields'].pop('justificatif_domicile_hebergeur')
|
|
resp = app.post_json('/mdel/test/create', params=payload, status=200)
|
|
assert resp.json['err_desc'] == 'justificatif_domicile and all its attributes are required'
|
|
|
|
# test with invalid content
|
|
payload = json.loads(get_file_from_test_base_dir('formdata.json'))
|
|
payload['fields']['justificatif_identite'] = None
|
|
payload['fields']['justificatif_identite_verso'] = None
|
|
resp = app.post_json('/mdel/test/create', params=payload, status=200)
|
|
assert resp.json['err_desc'] == 'justificatif_identite and all its attributes are required'
|
|
|
|
|
|
def test_get_status(app, setup):
|
|
shutil.copytree(
|
|
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
|
|
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
|
|
)
|
|
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD, status=200)
|
|
demand_id = resp.json['data']['demand_id']
|
|
assert demand_id == '1-14-ILE-LA'
|
|
resp = app.get('/mdel/test/status', params={'demand_id': demand_id})
|
|
data = resp.json['data']
|
|
|
|
assert data['closed'] is True
|
|
assert data['status'] == 'accepted'
|
|
assert data['comment'] == 'Dossier traité.'
|
|
assert Demand.objects.get(demand_id='1-14-ILE-LA').status == 'accepted'
|
|
|
|
Demand.objects.create(resource=setup, num='97-5', flow_type='AEC-LA', demand_id='97-5-AEC-LA')
|
|
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '97-5-AEC-LA'}, status=200)
|
|
data = resp.json['data']
|
|
|
|
assert data['closed'] is False
|
|
assert data['status'] == 'imported'
|
|
assert data['comment'] == 'Le dossier a été reçu et sera traité prochainement.'
|
|
assert Demand.objects.get(demand_id='97-5-AEC-LA').status == 'imported'
|
|
|
|
Demand.objects.create(resource=setup, num='102-2', flow_type='AEC-LA', demand_id='102-2-AEC-LA')
|
|
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200)
|
|
data = resp.json['data']
|
|
|
|
assert data['closed'] is True
|
|
assert data['status'] == 'accepted'
|
|
assert data['comment'] == 'Dossier accepté'
|
|
assert Demand.objects.get(demand_id='102-2-AEC-LA').status == 'accepted'
|
|
|
|
# bad zipfile
|
|
filepath = os.path.join(get_resource_base_dir(), 'test', 'outputs', '102-2-aec-la--4.zip')
|
|
with open(filepath, 'w') as f:
|
|
f.write(' ')
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '102-2-AEC-LA'}, status=200)
|
|
assert resp.json['err_desc'] == 'zipfile error'
|
|
|
|
|
|
def test_get_status_unknown_demand(app, setup):
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '1-14-ILE-LA'})
|
|
assert resp.json['err_desc'] == 'demand 1-14-ILE-LA does not exist'
|
|
|
|
|
|
def test_get_status_no_response(app, setup):
|
|
shutil.copytree(
|
|
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
|
|
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
|
|
)
|
|
Demand.objects.create(resource=setup, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA')
|
|
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200)
|
|
data = resp.json['data']
|
|
|
|
assert data['closed'] is False
|
|
assert data['status'] is None
|
|
assert data['comment'] == ''
|
|
assert Demand.objects.get(demand_id='1-15-ILE-LA').status is None
|
|
|
|
|
|
def test_get_not_closed_status(app, setup):
|
|
shutil.copytree(
|
|
os.path.join(get_mdel_base_dir(), 'test', 'outputs'),
|
|
os.path.join(get_resource_base_dir(), 'test', 'outputs'),
|
|
)
|
|
Demand.objects.create(resource=setup, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA')
|
|
|
|
resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200)
|
|
data = resp.json['data']
|
|
|
|
assert data['closed'] is False
|
|
assert data['status'] == 'in progress'
|
|
assert data['comment'] == 'Dossier en cours de traitement.'
|
|
|
|
assert Demand.objects.get(demand_id='15-9-AEC-LA').status == 'in progress'
|
|
|
|
|
|
def test_data_source_applicants(app, setup):
|
|
resp = app.get('/mdel/test/applicants', status=200)
|
|
data = resp.json['data']
|
|
assert len(data) == 9
|
|
|
|
# test without PersonneConcernee and Representant
|
|
params = {'without': 'PersonneConcernee,Representant'}
|
|
resp = app.get('/mdel/test/applicants', params=params, status=200)
|
|
data = resp.json['data']
|
|
assert len(data) == 7
|
|
|
|
|
|
def test_data_source_certificates(app, setup):
|
|
resp = app.get('/mdel/test/certificates', status=200)
|
|
data = resp.json['data']
|
|
for datum in data:
|
|
if datum['id'] == 'NAISSANCE':
|
|
assert datum['text'] == 'Acte de naissance'
|
|
elif datum['id'] == 'MARIAGE':
|
|
assert datum['text'] == 'Acte de mariage'
|
|
else:
|
|
assert datum['text'] == 'Acte de décès'
|
|
|
|
|
|
def test_data_source_certificate_types(app, setup):
|
|
resp = app.get('/mdel/test/certificate-types', status=200)
|
|
data = resp.json['data']
|
|
assert len(data) == 4
|
|
for datum in data:
|
|
if datum['id'] == 'COPIE-INTEGRALE':
|
|
assert datum['text'] == 'Copie intégrale'
|
|
elif datum['id'] == 'EXTRAIT-AVEC-FILIATION':
|
|
assert datum['text'] == 'Extrait avec filiation'
|
|
elif datum['id'] == 'EXTRAIT-SANS-FILIATION':
|
|
assert datum['text'] == 'Extrait sans filiation'
|
|
else:
|
|
assert datum['text'] == 'Extrait plurilingue'
|
|
|
|
# test without COPIE-INTEGRALE and EXTRAIT-AVEC-FILIATION
|
|
resp = app.get(
|
|
'/mdel/test/certificate-types',
|
|
params={'without': 'COPIE-INTEGRALE,EXTRAIT-AVEC-FILIATION'},
|
|
status=200,
|
|
)
|
|
data = resp.json['data']
|
|
assert len(data) == 2
|
|
assert 'EXTRAIT-SANS-FILIATION' in (data[0]['id'], data[1]['id'])
|
|
assert 'EXTRAIT-PLURILINGUE' in (data[0]['id'], data[1]['id'])
|
|
|
|
|
|
def test_date_parsing():
|
|
from passerelle.utils.jsonresponse import APIError
|
|
|
|
with pytest.raises(APIError) as error:
|
|
parse_date('2018-02-29')
|
|
assert 'day is out of range for month' in str(error)
|
|
|
|
with pytest.raises(APIError) as error:
|
|
parse_date('28-02-2018')
|
|
for text in ('date', '28-02-2018', 'not iso-formated'):
|
|
assert text in str(error)
|
|
|
|
|
|
def test_aec_filenames_and_routing(app, setup):
|
|
aec_payload = json.loads(get_file_from_test_base_dir('formdata_aec_naiss.json'))
|
|
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
|
|
assert resp.json['data']['demand_id'] == '15-4-AEC-LA'
|
|
|
|
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-EtatCivil-0')
|
|
assert os.path.exists(basedir)
|
|
check_zip_file(
|
|
basedir + '.zip',
|
|
['message.xml', '15-4-EtatCivil-ent-0.xml', '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'],
|
|
)
|
|
|
|
doc = os.path.join(basedir, '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml')
|
|
assert os.path.exists(doc)
|
|
validate_schema(doc, AEC_XSD)
|
|
|
|
ent = os.path.join(basedir, '15-4-EtatCivil-ent-0.xml')
|
|
assert os.path.exists(ent)
|
|
root = letree.parse(ent).getroot()
|
|
num_dem = root[0]
|
|
assert letree.QName(num_dem).localname == 'NumeroDemarche'
|
|
assert num_dem.text == 'EtatCivil'
|
|
ns = {'ns': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
|
|
assert root.find('ns:Document/ns:Code', namespaces=ns).text == 'ActeEtatCivil-XML'
|
|
assert root.find('ns:Document/ns:Nom', namespaces=ns).text == 'ActeEtatCivil-XML'
|
|
assert (
|
|
root.find('ns:Document/ns:FichierFormulaire/ns:FichierDonnees', namespaces=ns).text
|
|
== '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'
|
|
)
|
|
|
|
message = os.path.join(basedir, 'message.xml')
|
|
assert os.path.exists(message)
|
|
root = letree.parse(message).getroot()
|
|
ns = {
|
|
'ns1': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier',
|
|
'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1',
|
|
}
|
|
assert root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:NumeroDemarche', namespaces=ns).text == 'EtatCivil'
|
|
assert (
|
|
root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Code', namespaces=ns).text
|
|
== 'ActeEtatCivil-XML'
|
|
)
|
|
assert (
|
|
root.find('ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:Nom', namespaces=ns).text
|
|
== 'ActeEtatCivil-XML'
|
|
)
|
|
assert (
|
|
root.find(
|
|
'ns2:Body/ns2:Content/ns2:Aller/ns1:Document/ns1:FichierFormulaire/ns1:FichierDonnees',
|
|
namespaces=ns,
|
|
).text
|
|
== '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'
|
|
)
|
|
|
|
|
|
def resource_logs(caplog):
|
|
return [record.msg for record in caplog.records if record.name == 'passerelle.resource.mdel.test']
|
|
|
|
|
|
def test_create_aec_demand_with_output_sftp_error(app, setup, aec_payload, sftpserver, caplog):
|
|
caplog.set_level('WARNING')
|
|
setup.outcoming_sftp = SFTP(
|
|
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
|
|
)
|
|
setup.save()
|
|
app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
|
|
setup.hourly()
|
|
|
|
demand = setup.demand_set.get()
|
|
assert not demand.sent
|
|
assert resource_logs(caplog) == ['Demand %s (pk %s) could not be sent: %s']
|
|
|
|
|
|
def test_create_aec_demand_with_input_sftp_error(app, setup, aec_payload, sftpserver, caplog):
|
|
caplog.set_level('WARNING')
|
|
setup.incoming_sftp = SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver))
|
|
setup.save()
|
|
|
|
app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
setup.hourly()
|
|
|
|
assert resource_logs(caplog) == ['Could not retrieve all responses: %s']
|