general: remove sp_fr connector (#68918)

This commit is contained in:
Frédéric Péters 2022-09-10 17:36:47 +02:00 committed by Thomas NOËL
parent 53d419627e
commit 94f687ae51
21 changed files with 0 additions and 2199 deletions

View File

@ -1,108 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- edited with XMLSpy v2010 (http://www.altova.com) by BULL SAS (BULL SAS) -->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xs:complexType name="DECLARANT">
<xs:sequence>
<xs:element name="identité" type="identité" minOccurs="0"/>
<xs:element name="designation-permis" type="designation-permis" minOccurs="0"/>
<xs:element name="coordonnees" type="coordonnees" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="identité">
<xs:sequence>
<xs:element name="type-personne" type="xs:boolean" minOccurs="0"/>
<xs:element name="personne-physique" type="personne-physique" minOccurs="0"/>
<xs:element name="personne-morale" type="personne-morale" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="personne-physique">
<xs:sequence>
<xs:element name="civilité" type="xs:string" minOccurs="0"/>
<xs:element name="nom" type="xs:string" minOccurs="0"/>
<xs:element name="prenom" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="personne-morale">
<xs:sequence>
<xs:element name="denomination"/>
<xs:element name="raison-sociale"/>
<xs:element name="SIRET"/>
<xs:element name="categorie-juridique"/>
<xs:element name="representant-personne-morale" type="representant-personne-morale"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="representant-personne-morale">
<xs:sequence>
<xs:element name="civilité" type="xs:string" minOccurs="0"/>
<xs:element name="nom" type="xs:string" minOccurs="0"/>
<xs:element name="prenom" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:element name="DOC">
<xs:complexType>
<xs:sequence>
<xs:element name="DECLARANT" type="DECLARANT" minOccurs="0"/>
<xs:element name="OUVERTURE-CHANTIER" type="OUVERTURE-CHANTIER" minOccurs="0"/>
<xs:element name="acceptation" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:complexType name="designation-permis">
<xs:sequence>
<xs:element name="numero-permis_construire" type="xs:string" minOccurs="0"/>
<xs:element name="numero-permis_amenager" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="coordonnees">
<xs:sequence>
<xs:element name="adresse" type="adresse" minOccurs="0"/>
<xs:element name="courriel" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="adresse">
<xs:sequence>
<xs:element name="numero-voie" type="xs:string" minOccurs="0"/>
<xs:element name="extension" type="xs:string" minOccurs="0"/>
<xs:element name="type-voie" type="xs:string" minOccurs="0"/>
<xs:element name="nom-voie" type="xs:string" minOccurs="0"/>
<xs:element name="lieu-dit" type="xs:string" minOccurs="0"/>
<xs:element name="boite-postale" type="xs:string" minOccurs="0"/>
<xs:element name="code-postal" type="xs:string" minOccurs="0"/>
<xs:element name="localite" type="xs:string" minOccurs="0"/>
<xs:element name="bureau-cedex" type="xs:string" minOccurs="0"/>
<xs:element name="pays" type="xs:string" minOccurs="0"/>
<xs:element name="division-territoriale" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="OUVERTURE-CHANTIER">
<xs:sequence>
<xs:element name="date-ouverture" type="xs:date" minOccurs="0"/>
<xs:element name="totalite-travaux" type="xs:boolean" minOccurs="0"/>
<xs:element name="tranche-travaux" type="tranche-travaux" minOccurs="0"/>
<xs:element name="autorisation-differer-travaux" type="xs:string" minOccurs="0"/>
<xs:element name="SHON" type="xs:string" minOccurs="0"/>
<xs:element name="nombre-logements-commences" type="nombre-logements-commences" minOccurs="0"/>
<xs:element name="repartition-type-financement" type="repartition-type-financement" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="tranche-travaux">
<xs:sequence>
<xs:element name="amenagements-commences" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="nombre-logements-commences">
<xs:sequence>
<xs:element name="total-logements" type="xs:int" minOccurs="0"/>
<xs:element name="individuels" type="xs:int" minOccurs="0"/>
<xs:element name="collectifs" type="xs:int" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="repartition-type-financement">
<xs:sequence>
<xs:element name="logement-locatif-social" type="xs:int" minOccurs="0"/>
<xs:element name="accession-aidee" type="xs:int" minOccurs="0"/>
<xs:element name="pret-taux-zero" type="xs:int" minOccurs="0"/>
<xs:element name="autres-financements" type="xs:int" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:schema>

View File

@ -1,34 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import admin
from django.utils.html import format_html
from .models import Request
class RequestAdmin(admin.ModelAdmin):
date_hierarchy = 'created'
search_fields = ['url', 'filename']
list_display = ['id', 'created', 'modified', 'state', 'filename', 'form_url']
def form_url(self, obj):
return format_html('<a href="{0}">{0}</a>', obj.url)
form_url.allow_tags = True
admin.site.register(Request, RequestAdmin)

View File

@ -1,129 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="PACS" type="PacsType"/>
<xs:complexType name="PacsType">
<xs:sequence>
<xs:element name="partenaire1" type="PartenaireType" />
<xs:element name="partenaire2" type="PartenaireType" />
<xs:element name="convention" type="ConventionType" maxOccurs="1" minOccurs="1" />
<xs:element name="residenceCommune" type="AdresseType" />
<xs:element name="attestationHonneur" type="AttestationHonneurType" />
</xs:sequence>
</xs:complexType>
<xs:complexType name = "AttestationHonneurType">
<xs:sequence>
<xs:element name="nonParente" type="xs:boolean"/>
<xs:element name="residenceCommune" type="xs:boolean"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="PartenaireType">
<xs:sequence>
<xs:element name="civilite" type="CiviliteType"></xs:element>
<xs:element name="nomNaissance" type="xs:string" />
<xs:element name="prenoms" type="xs:string" />
<xs:element name="codeNationalite" type="xs:string" maxOccurs="unbounded"/>
<xs:element name="jourNaissance" type="xs:integer" maxOccurs="1" minOccurs="0" />
<xs:element name="moisNaissance" type="xs:integer" maxOccurs="1" minOccurs="0" />
<xs:element name="anneeNaissance" type="xs:integer" />
<xs:element name="LieuNaissance" type="LieuNaissanceType" />
<xs:element name="ofpra" type="xs:boolean" />
<xs:element name="mesureJuridique" type="xs:boolean" />
<xs:element name="adressePostale" type="AdresseType" />
<xs:element name="adresseElectronique" type="xs:string" />
<xs:element name="telephone" type="xs:string" minOccurs="0"/>
<xs:element name="filiationParent1" type="FiliationType" minOccurs="0"/>
<xs:element name="filiationParent2" type="FiliationType" minOccurs="0" />
<xs:element name="titreIdentiteVerifie" type="xs:boolean"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="ConventionType">
<xs:choice>
<xs:element name="conventionType" type="ConventionTypeType" />
<xs:element name="conventionSpecifique" type="xs:boolean" />
</xs:choice>
</xs:complexType>
<xs:complexType name="ConventionTypeType">
<xs:sequence>
<xs:element name="aideMaterielMontant" type="xs:double" maxOccurs="1" minOccurs="0"/>
<xs:element name="regimePacs" type="regimePacsType" />
<xs:element name="aideMateriel" type="AideMaterielType" />
</xs:sequence>
</xs:complexType>
<xs:complexType name="AdresseType">
<xs:sequence>
<xs:element name="NumeroLibelleVoie" type="xs:string" minOccurs="0" />
<xs:element name="Complement1" type="xs:string" minOccurs="0" />
<xs:element name="Complement2" type="xs:string" minOccurs="0" />
<xs:element name="LieuDitBpCommuneDeleguee" type="xs:string" minOccurs="0" />
<xs:element name="CodePostal" type="codePostalType" />
<xs:element name="Localite" type="localiteType" />
<xs:element name="Pays" type="xs:string" />
</xs:sequence>
</xs:complexType>
<xs:complexType name="LieuNaissanceType">
<xs:sequence>
<xs:element name="localite" type="localiteType"/>
<xs:element name="codePostal" type="xs:string"/>
<xs:element name="codeInsee" type="xs:string" minOccurs="0"/>
<xs:element name="departement" type="xs:string" maxOccurs="1" minOccurs="0"/>
<xs:element name="codePays" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="localiteType">
<xs:restriction base="xs:string">
<xs:minLength value="1" />
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="codePostalType">
<xs:restriction base="xs:string">
<xs:length value="5" />
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="regimePacsType">
<xs:restriction base="xs:string">
<xs:enumeration value="indivision"/>
<xs:enumeration value="legal"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="FiliationType">
<xs:sequence>
<xs:choice>
<xs:element name="filiationInconnu" type="xs:boolean"></xs:element>
<xs:element name="filiationConnu" type="FiliationConnuType">
</xs:element>
</xs:choice>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="CiviliteType">
<xs:restriction base="xs:string">
<xs:enumeration value="M"></xs:enumeration>
<xs:enumeration value="MME"></xs:enumeration>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="TypeAideMaterielType">
<xs:restriction base="xs:string">
<xs:enumeration value="aideFixe"/>
<xs:enumeration value="aideProportionnel"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="AideMaterielType">
<xs:sequence>
<xs:element name="typeAideMateriel" type="TypeAideMaterielType"></xs:element>
</xs:sequence>
</xs:complexType>
<xs:complexType name="FiliationConnuType">
<xs:sequence>
<xs:element name="sexe" type="SexeType"/>
<xs:element name="nomNaissance" type="xs:string" maxOccurs="1" minOccurs="0" />
<xs:element name="prenoms" type="xs:string" maxOccurs="1" minOccurs="0" />
<xs:element name="dateNaissance" type="xs:string" maxOccurs="1" minOccurs="0" />
<xs:element name="lieuNaissance" type="LieuNaissanceType" maxOccurs="1" minOccurs="0" />
</xs:sequence>
</xs:complexType>
<xs:simpleType name="SexeType">
<xs:restriction base="xs:string">
<xs:enumeration value="M"/>
<xs:enumeration value="F"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@ -1,99 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.core.exceptions import ValidationError
from django.template import TemplateSyntaxError, engines
def validate_django_template(value):
try:
engines['django'].from_string(value)
except TemplateSyntaxError as e:
raise ValidationError('invalid template %s' % e)
class VariableAndExpressionWidget(forms.MultiWidget):
template_name = 'passerelle/widgets/variable_and_expression_widget.html'
def __init__(self, **kwargs):
widgets = [
forms.Select,
forms.TextInput,
]
super().__init__(widgets=widgets, **kwargs)
def decompress(self, value):
if not value:
return [None, None]
return value['variable'], value['expression']
# XXX: bug in Django https://code.djangoproject.com/ticket/29205
# required_attribute is initialized from the parent.field required
# attribute and not from each sub-field attribute
def use_required_attribute(self, initial):
return False
class VariableAndExpressionField(forms.MultiValueField):
widget = VariableAndExpressionWidget
def __init__(
self, choices=(), required=True, widget=None, label=None, initial=None, help_text='', *args, **kwargs
):
fields = [
forms.ChoiceField(choices=choices, required=required),
forms.CharField(required=False, validators=[validate_django_template]),
]
super().__init__(
fields=fields,
required=required,
widget=widget,
label=label,
initial=initial,
help_text=help_text,
require_all_fields=False,
*args,
**kwargs,
)
self.choices = choices
def _get_choices(self):
return self._choices
def _set_choices(self, value):
# Setting choices also sets the choices on the widget.
# choices can be any iterable, but we call list() on it because
# it will be consumed more than once.
if callable(value):
value = forms.CallableChoiceIterator(value)
else:
value = list(value)
self._choices = value
self.widget.widgets[0].choices = value
choices = property(_get_choices, _set_choices)
def compress(self, data_list):
try:
variable, expression = data_list
except (ValueError, TypeError):
return None
else:
return {
'variable': variable,
'expression': expression,
}

View File

@ -1,66 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from . import fields, models
class MappingForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance.procedure and self.instance and self.instance.formdef:
choices = [('', '--------')] + [(v, v) for v in self.instance.variables]
for i, field in enumerate(self.schema_fields()):
label = field.label
label += ' (%s)' % (field.varname or 'NO VARNAME')
base_name = str(field.varname or i)
initial = self.instance.rules.get('fields', {}).get(base_name)
self.fields['field_%s' % base_name] = fields.VariableAndExpressionField(
label=label, choices=choices, initial=initial, required=False
)
def table_fields(self):
return [field for field in self if field.name.startswith('field_')]
def schema_fields(self):
if self.instance and self.instance.formdef:
schema = self.instance.formdef.schema
for i, field in enumerate(schema.fields):
if field.type in ('page', 'comment', 'title', 'subtitle'):
continue
yield field
def save(self, commit=True):
fields = {}
for key in self.cleaned_data:
if not key.startswith('field_'):
continue
if not self.cleaned_data[key]:
continue
real_key = key[len('field_') :]
value = self.cleaned_data[key].copy()
value['label'] = self.fields[key].label
fields[real_key] = value
self.instance.rules['fields'] = fields
return super().save(commit=commit)
class Meta:
model = models.Mapping
fields = [
'procedure',
'formdef',
]

View File

@ -1,141 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2019-04-19 17:15
from __future__ import unicode_literals
import django.contrib.postgres.fields.jsonb
import django.db.models.deletion
from django.db import migrations, models
import passerelle.apps.sp_fr.models
import passerelle.utils.sftp
import passerelle.utils.wcs
class Migration(migrations.Migration):
initial = True
dependencies = [
('base', '0012_job'),
]
operations = [
migrations.CreateModel(
name='Mapping',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
(
'procedure',
models.CharField(
choices=[
(b'DOC', 'Request for construction site opening'),
(b'recensementCitoyen', 'Request for mandatory citizen census'),
(b'depotDossierPACS', 'Pre-request for citizen solidarity pact'),
],
max_length=32,
unique=True,
verbose_name='Procedure',
),
),
('formdef', passerelle.utils.wcs.FormDefField(verbose_name='Formdef')),
(
'rules',
django.contrib.postgres.fields.jsonb.JSONField(
default=passerelle.apps.sp_fr.models.default_rule, verbose_name='Rules'
),
),
],
options={
'verbose_name': 'MDEL mapping',
'verbose_name_plural': 'MDEL mappings',
},
),
migrations.CreateModel(
name='Request',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('created', models.DateTimeField(auto_now_add=True, verbose_name='Created')),
('modified', models.DateTimeField(auto_now=True, verbose_name='Created')),
('filename', models.CharField(max_length=128, verbose_name='Identifier')),
('archive', models.FileField(max_length=256, upload_to=b'', verbose_name='Archive')),
(
'state',
models.CharField(
choices=[
(b'received', 'Received'),
(b'transfered', 'Transferred'),
(b'error', 'Error'),
(b'returned', 'Returned'),
],
default=b'received',
max_length=16,
verbose_name='State',
),
),
('url', models.URLField(blank=True, verbose_name='URL')),
],
options={
'verbose_name': 'MDEL request',
'verbose_name_plural': 'MDEL requests',
},
),
migrations.CreateModel(
name='Resource',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('title', models.CharField(max_length=50, verbose_name='Title')),
('description', models.TextField(verbose_name='Description')),
('slug', models.SlugField(unique=True, verbose_name='Identifier')),
(
'input_sftp',
passerelle.utils.sftp.SFTPField(default=None, null=True, verbose_name='Input SFTP URL'),
),
(
'output_sftp',
passerelle.utils.sftp.SFTPField(default=None, null=True, verbose_name='Output SFTP URL'),
),
(
'users',
models.ManyToManyField(
blank=True,
related_name='_resource_users_+',
related_query_name='+',
to='base.ApiUser',
),
),
],
options={
'verbose_name': 'Service-Public.fr',
},
),
migrations.AddField(
model_name='request',
name='resource',
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to='sp_fr.Resource', verbose_name='Resource'
),
),
migrations.AddField(
model_name='mapping',
name='resource',
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name='mappings',
to='sp_fr.Resource',
verbose_name='Resource',
),
),
migrations.AlterUniqueTogether(
name='request',
unique_together=set([('resource', 'filename')]),
),
]

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2020-05-04 12:02
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('sp_fr', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='mapping',
name='procedure',
field=models.CharField(
choices=[
('DOC', 'Request for construction site opening'),
('recensementCitoyen', 'Request for mandatory citizen census'),
('depotDossierPACS', 'Pre-request for citizen solidarity pact'),
],
max_length=32,
unique=True,
verbose_name='Procedure',
),
),
migrations.AlterField(
model_name='request',
name='archive',
field=models.FileField(max_length=256, upload_to='', verbose_name='Archive'),
),
migrations.AlterField(
model_name='request',
name='state',
field=models.CharField(
choices=[
('received', 'Received'),
('transfered', 'Transferred'),
('error', 'Error'),
('returned', 'Returned'),
],
default='received',
max_length=16,
verbose_name='State',
),
),
]

View File

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2020-05-04 12:06
from __future__ import unicode_literals
from django.db import migrations
from passerelle.utils.db import EnsureJsonbType
class Migration(migrations.Migration):
dependencies = [
('sp_fr', '0002_auto_20200504_1402'),
]
operations = [
EnsureJsonbType(model_name='Mapping', field_name='rules'),
]

View File

@ -1,805 +0,0 @@
# -*- coding: utf-8 -*-
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import datetime
import os
import re
import stat
import zipfile
from django.contrib.postgres.fields import JSONField
from django.core.files import File
from django.db import models, transaction
from django.template import engines
from django.urls import reverse
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from lxml import etree as ET
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import normalize
from passerelle.utils.sftp import SFTPField
from passerelle.utils.wcs import FormDefField, get_wcs_choices
from passerelle.utils.xml import text_content
from .xsd import Schema
MAX_REQUESTS_PER_ITERATION = 200
PROCEDURE_DOC = 'DOC'
PROCEDURE_RCO = 'recensementCitoyen'
PROCEDURE_DDPACS = 'depotDossierPACS'
PROCEDURES = [
(PROCEDURE_DOC, _('Request for construction site opening')),
(PROCEDURE_RCO, _('Request for mandatory citizen census')),
(PROCEDURE_DDPACS, _('Pre-request for citizen solidarity pact')),
]
FILE_PATTERN = re.compile(r'^(?P<identifier>.*)-(?P<procedure>[a-zA-Z0-9]+)-(?P<sequence>\d+).zip$')
ENT_PATTERN = re.compile(r'^.*-ent-\d+(?:-.*)?.xml$')
NSMAP = {'dgme-metier': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
ROUTAGE_XPATH = ET.XPath(
('dgme-metier:Routage/dgme-metier:Donnee/dgme-metier:Valeur/text()'), namespaces=NSMAP
)
EMAIL_XPATH = ET.XPath(('dgme-metier:Teledemarche/dgme-metier:Email/text()'), namespaces=NSMAP)
DOCUMENTS_XPATH = ET.XPath('dgme-metier:Document', namespaces=NSMAP)
PIECE_JOINTE_XPATH = ET.XPath('dgme-metier:PieceJointe', namespaces=NSMAP)
CODE_XPATH = ET.XPath('dgme-metier:Code', namespaces=NSMAP)
FICHIER_XPATH = ET.XPath('dgme-metier:Fichier', namespaces=NSMAP)
FICHIER_DONNEES_XPATH = ET.XPath('.//dgme-metier:FichierDonnees', namespaces=NSMAP)
ET.register_namespace(
'dgme-metier', 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'
)
def simplify(s):
'''Simplify XML node tag names because XSD from DGME are garbage'''
if not s:
return ''
if not isinstance(s, str):
s = str(s, 'utf-8', 'ignore')
s = normalize(s)
s = re.sub(r'[^\w\s\'-_]', '', s)
s = s.replace('-', '_')
s = re.sub(r'[\s\']+', '', s)
return s.strip().lower()
class Resource(BaseResource):
category = _('Business Process Connectors')
input_sftp = SFTPField(verbose_name=_('Input SFTP URL'), null=True)
output_sftp = SFTPField(verbose_name=_('Output SFTP URL'), null=True)
def check_status(self):
with self.input_sftp.client() as sftp:
sftp.listdir()
with self.output_sftp.client() as sftp:
sftp.listdir()
get_wcs_choices(session=self.requests)
@endpoint(name='ping', show=False, description=_('Check SFTP availability'))
def ping(self, request):
# deprecated endpoint
self.check_status()
return {'err': 0}
def hourly(self):
self.run_loop()
def run_loop(self, count=0):
if count == 0:
count = MAX_REQUESTS_PER_ITERATION
with transaction.atomic():
# lock resource
r = Resource.objects.select_for_update(skip_locked=True).filter(pk=self.pk)
if not r:
# already locked
self.logger.info('did nothing')
return
with self.input_sftp.client() as sftp:
try:
sftp.lstat('DONE')
except IOError:
sftp.mkdir('DONE')
try:
sftp.lstat('FAILED')
except IOError:
sftp.mkdir('FAILED')
def helper():
for file_stat in sftp.listdir_attr():
if stat.S_ISDIR(file_stat.st_mode):
continue
yield file_stat.filename
for filename in helper():
m = FILE_PATTERN.match(filename)
if not m:
self.logger.info(
'file "%s" did not match pattern %s, moving to FAILED/', filename, FILE_PATTERN
)
sftp.rename(filename, 'FAILED/' + filename)
continue
procedure = m.group('procedure')
try:
mapping = self.mappings.get(procedure=procedure)
except Mapping.DoesNotExist:
self.logger.info(
'no mapping for procedure "%s" for file "%s", moving to FAILED/',
procedure,
filename,
)
continue
handler = self.FileHandler(
resource=self,
sftp=sftp,
filename=filename,
identifier=m.group('identifier'),
procedure=procedure,
sequence=m.group('sequence'),
mapping=mapping,
)
if not handler.request:
count -= 1
try:
move, error = handler()
except Exception:
count -= 1
self.logger.exception('handling of file "%s" failed', filename)
sftp.rename(filename, 'FAILED/' + filename)
else:
if move and error:
count -= 1
self.logger.error('handling of file "%s" failed: %s', filename, error)
sftp.rename(filename, 'FAILED/' + filename)
else:
if error:
count -= 1
self.logger.warning('handling of file "%s" failed: %s', filename, error)
elif move:
count -= 1
sftp.rename(filename, 'DONE/' + filename)
if not count:
break
class FileHandler:
def __init__(self, resource, sftp, filename, identifier, procedure, sequence, mapping):
self.resource = resource
self.sftp = sftp
self.filename = filename
self.identifier = identifier
self.procedure = procedure
self.sequence = sequence
self.mapping = mapping
self.variables = list(self.mapping.variables)
self.request = Request.objects.filter(resource=resource, filename=filename).first()
def __call__(self):
if not self.request:
with self.sftp.open(self.filename) as fd:
with transaction.atomic():
self.request = Request.objects.create(resource=self.resource, filename=self.filename)
self.request.state = Request.STATE_RECEIVED
self.request.archive.save(self.filename, File(fd))
if self.request.state == Request.STATE_RECEIVED:
with self.request.archive as fd:
# error during processing are fatal, we want to log them
data, error = self.process(fd)
if not data:
return False, error
try:
backoffice_url = self.transfer(data)
except Exception as e:
return False, 'error during transfer to w.c.s %r' % e
self.request.url = backoffice_url
self.request.state = Request.STATE_TRANSFERED
self.request.save()
if self.request.state == Request.STATE_TRANSFERED:
try:
self.response()
except Exception as e:
return False, 'error during response to service-public.fr %r' % e
self.request.state = Request.STATE_RETURNED
self.request.save()
self.resource.logger.info('%s responded, closed', self.request.filename)
return True, None
def process(self, fd):
try:
with zipfile.ZipFile(fd) as archive:
# sort files
doc_files = []
ent_files = []
attachments = {}
for name in archive.namelist():
if ENT_PATTERN.match(name):
ent_files.append(name)
if len(ent_files) != 1:
return False, 'too many/few ent files found: %s' % ent_files
ent_file = ent_files[0]
with archive.open(ent_file) as fd:
document = ET.parse(fd)
for pj_node in PIECE_JOINTE_XPATH(document):
code = CODE_XPATH(pj_node)[0].text
code = 'pj_' + code.lower().replace('-', '_')
fichier = FICHIER_XPATH(pj_node)[0].text
attachments.setdefault(code, []).append(fichier)
for doc_node in DOCUMENTS_XPATH(document):
code = CODE_XPATH(doc_node)[0].text
code = 'doc_' + code.lower().replace('-', '_')
fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text
attachments.setdefault(code, []).append(fichier)
doc_files = [
value for l in attachments.values() for value in l if value.lower().endswith('.xml')
]
if len(doc_files) != 1:
return False, 'too many/few doc files found: %s' % doc_files
for key, attachment in attachments.items():
if len(attachment) > 1:
return False, 'too many attachments of kind %s: %r' % (key, attachment)
name = attachment[0]
with archive.open(name) as zip_fd:
content = zip_fd.read()
attachments[key] = {
'filename': name,
'content': base64.b64encode(content).decode('ascii'),
'content_type': 'application/octet-stream',
}
if self.procedure == PROCEDURE_RCO and not attachments:
return False, 'no attachments but RCO requires them'
doc_file = doc_files[0]
insee_codes = ROUTAGE_XPATH(document)
if len(insee_codes) != 1:
return False, 'too many/few insee codes found: %s' % insee_codes
insee_code = insee_codes[0]
email = EMAIL_XPATH(document)
email = email[0] if email else ''
data = {
'insee_code': insee_code,
'email': email,
}
data.update(attachments)
with archive.open(doc_file) as fd:
document = ET.parse(fd)
data.update(self.extract_data(document))
if hasattr(self, 'update_data_%s' % self.procedure):
getattr(self, 'update_data_%s' % self.procedure)(data)
except zipfile.BadZipfile:
return False, 'could not load zipfile'
return data, None
def transfer(self, data):
formdef = self.mapping.formdef
formdef.session = self.resource.requests
with formdef.submit() as submitter:
submitter.submission_channel = 'web'
submitter.submission_context = {
'mdel_procedure': self.procedure,
'mdel_identifier': self.identifier,
'mdel_sequence': self.sequence,
}
fields = self.mapping.rules.get('fields', {})
for name in fields:
field = fields[name]
variable = field['variable']
expression = field['expression']
value = data.get(variable)
if expression.strip():
template = engines['django'].from_string(expression)
context = data.copy()
context['value'] = value
value = template.render(context)
if not value:
continue
submitter.set(name, value)
return submitter.result.backoffice_url
def response(self):
with self.resource.output_sftp.client() as client:
with client.open(self.request.response_zip_filename, mode='w') as fd:
self.request.build_response_zip(
fd, etat='100', commentaire='Demande transmise à la collectivité'
)
with self.resource.input_sftp.client() as client:
with client.open('DONE/' + self.request.response_zip_filename, mode='w') as fd:
self.request.build_response_zip(
fd, etat='100', commentaire='Demande transmise à la collectivité'
)
def get_data(self, data, name):
# prevent error in manual mapping
assert name in self.variables, 'variable "%s" is unknown' % name
return data.get(name, '')
def update_data_DOC(self, data):
def get(name):
return self.get_data(data, name)
numero_permis_construire = get('doc_declarant_designation_permis_numero_permis_construire')
numero_permis_amenager = get('doc_declarant_designation_permis_numero_permis_amenager')
data['type_permis'] = (
'Un permis de construire' if numero_permis_construire else 'Un permis d\'aménager'
)
data['numero_permis'] = numero_permis_construire or numero_permis_amenager
particulier = get('doc_declarant_identite_type_personne').strip().lower() == 'true'
data['type_declarant'] = 'Un particulier' if particulier else 'Une personne morale'
if particulier:
data['nom'] = get('doc_declarant_identite_personne_physique_nom')
data['prenoms'] = get('doc_declarant_identite_personne_physique_prenom')
else:
data['nom'] = get('doc_declarant_identite_personne_morale_representant_personne_morale_nom')
data['prenoms'] = get(
'doc_declarant_identite_personne_morale_representant_personne_morale_prenom'
)
mapping = {
'1000': 'Monsieur',
'1001': 'Madame',
'1002': 'Madame et Monsieur',
}
if particulier:
data['civilite_particulier'] = mapping.get(
get('doc_declarant_identite_personne_physique_civilite'), ''
)
else:
data['civilite_pm'] = mapping.get(
get('doc_declarant_identite_personne_morale_representant_personne_morale_civilite'), ''
)
data['portee'] = (
'Pour la totalité des travaux'
if get('doc_ouverture_chantier_totalite_travaux').lower().strip() == 'true'
else 'Pour une tranche des travaux'
)
def update_data_recensementCitoyen(self, data):
def get(name):
return self.get_data(data, name)
motif = get('recensementcitoyen_formalite_formalitemotifcode_1') or get(
'recensementcitoyen_formalite_formalitemotifcode_2'
)
data['motif'] = {'RECENSEMENT': '1', 'EXEMPTION': '2'}[motif]
if data['motif'] == '2':
data['motif_exempte'] = (
"Titulaire d'une carte d'invalidité de 80% minimum"
if get('recensementcitoyen_formalite_formalitemotifcode_2') == 'INFIRME'
else "Autre situation"
)
data['justificatif_exemption'] = get('pj_je')
data['double_nationalite'] = 'Oui' if get('recensementcitoyen_personne_nationalite') else 'Non'
data['residence_differente'] = (
'Oui' if get('recensementcitoyen_personne_adresseresidence_localite') else 'Non'
)
data['civilite'] = 'Monsieur' if get('recensementcitoyen_personne_civilite') == 'M' else 'Madame'
def get_lieu_naissance(variable, code):
for idx in ['', '_1', '_2']:
v = variable + idx
if get(v + '_code') == code:
return get(v + '_nom')
data['cp_naissance'] = get_lieu_naissance('recensementcitoyen_personne_lieunaissance', 'AUTRE')
data['commune_naissance'] = get_lieu_naissance(
'recensementcitoyen_personne_lieunaissance', 'COMMUNE'
)
data['justificatif_identite'] = get('pj_ji')
situation_matrimoniale = get('recensementcitoyen_personne_situationfamille_situationmatrimoniale')
data['situation_familiale'] = {
'Célibataire': 'Célibataire',
'Marié': 'Marié(e)',
}.get(situation_matrimoniale, 'Autres')
if data['situation_familiale'] == 'Autres':
data['situation_familiale_precision'] = situation_matrimoniale
pupille = get('recensementcitoyen_personne_situationfamille_pupille')
data['pupille'] = 'Oui' if pupille else 'Non'
data['pupille_categorie'] = {
'NATION': "Pupille de la nation",
'ETAT': "Pupille de l'État",
}.get(pupille)
for idx in ['', '_1', '_2']:
code = get('recensementcitoyen_personne_methodecontact%s_canalcode' % idx)
uri = get('recensementcitoyen_personne_methodecontact%s_uri' % idx)
if code == 'EMAIL':
data['courriel'] = uri
if code == 'TEL':
data['telephone_fixe'] = uri
data['justificatif_famille'] = get('pj_jf')
data['filiation_inconnue_p1'] = not get('recensementcitoyen_filiationpere_nomfamille')
data['filiation_inconnue_p2'] = not get('recensementcitoyen_filiationmere_nomfamille')
data['cp_naissance_p1'] = get_lieu_naissance(
'recensementcitoyen_filiationpere_lieunaissance', 'AUTRE'
)
data['cp_naissance_p2'] = get_lieu_naissance(
'recensementcitoyen_filiationmere_lieunaissance', 'AUTRE'
)
data['commune_naissance_p1'] = get_lieu_naissance(
'recensementcitoyen_filiationpere_lieunaissance', 'COMMUNE'
)
data['commune_naissance_p2'] = get_lieu_naissance(
'recensementcitoyen_filiationmere_lieunaissance', 'COMMUNE'
)
for key in data:
if key.endswith('_datenaissance') and data[key]:
data[key] = datetime.datetime.strptime(data[key], '%d/%m/%Y').date().strftime('%Y-%m-%d')
def update_data_depotDossierPACS(self, data):
def get(name):
return self.get_data(data, name)
civilite_p1 = get('pacs_partenaire1_civilite')
data['civilite_p1'] = 'Monsieur' if civilite_p1 == 'M' else 'Madame'
data['acte_naissance_p1'] = get('pj_an')
data['identite_verifiee_p1'] = (
'Oui' if get('pacs_partenaire1_titreidentiteverifie') == 'true' else 'Non'
)
civilite_p2 = get('pacs_partenaire2_civilite')
data['civilite_p2'] = 'Monsieur' if civilite_p2 == 'M' else 'Madame'
data['acte_naissance_p2'] = get('pj_anp')
data['identite_verifiee_p2'] = (
'Oui' if get('pacs_partenaire2_titreidentiteverifie') == 'true' else 'Non'
)
data['type_convention'] = '2' if get('pacs_convention_conventionspecifique') == 'true' else '1'
data['aide_materielle'] = (
'1'
if get('pacs_convention_conventiontype_aidemateriel_typeaidemateriel') == 'aideProportionnel'
else '2'
)
data['regime'] = '1' if get('pacs_convention_conventiontype_regimepacs') == 'legal' else '2'
data['convention_specifique'] = get('pj_cp')
def extract_data(self, document):
'''Convert XML into a dictionnary of values'''
root = document.getroot()
def tag_name(node):
return simplify(ET.QName(node.tag).localname)
def helper(path, node):
if len(node):
tags = collections.Counter(tag_name(child) for child in node)
counter = collections.Counter()
for child in node:
name = tag_name(child)
if tags[name] > 1:
counter[name] += 1
name += '_%s' % counter[name]
for p, value in helper(path + [name], child):
yield p, value
else:
yield path, text_content(node)
# case of multiple nodes
new_path = path[:-1] + [path[-1] + '_1']
yield new_path, text_content(node)
return {'_'.join(path): value for path, value in helper([tag_name(root)], root)}
def export_json(self):
d = super().export_json()
d['mappings'] = [mapping.export_json() for mapping in self.mappings.all()]
return d
@classmethod
def import_json_real(cls, overwrite, instance, d, **kwargs):
mappings_json = d.pop('mappings', [])
instance = super().import_json_real(overwrite, instance, d, **kwargs)
if instance and overwrite:
instance.mappings.all().delete()
for mapping_json in mappings_json:
Mapping.import_json(mapping_json, instance)
return instance
class Meta:
verbose_name = _('Service-Public.fr')
def default_rule():
return {}
class Mapping(models.Model):
resource = models.ForeignKey(
Resource, verbose_name=_('Resource'), related_name='mappings', on_delete=models.CASCADE
)
procedure = models.CharField(verbose_name=_('Procedure'), choices=PROCEDURES, unique=True, max_length=32)
formdef = FormDefField(verbose_name=_('Formdef'))
rules = JSONField(verbose_name=_('Rules'), default=default_rule)
def get_absolute_url(self):
return reverse('sp-fr-mapping-edit', kwargs=dict(slug=self.resource.slug, pk=self.pk))
@property
def xsd(self):
path = os.path.join(os.path.dirname(__file__), '%s.XSD' % self.procedure)
with open(path, 'rb') as fd:
doc = ET.parse(fd)
schema = Schema()
schema.visit(doc.getroot())
return schema
@property
def variables(self):
yield 'insee_code'
yield 'email'
for path, dummy in self.xsd.paths():
names = [simplify(tag.localname) for tag in path]
yield '_'.join(names)
if hasattr(self, 'variables_%s' % self.procedure):
for variable in getattr(self, 'variables_%s' % self.procedure):
yield variable
@property
def variables_DOC(self):
yield 'type_permis'
yield 'numero_permis'
yield 'type_declarant'
yield 'nom'
yield 'prenoms'
yield 'civilite_particulier'
yield 'civilite_pm'
yield 'portee'
@property
def variables_recensementCitoyen(self):
yield 'motif'
yield 'motif_exempte'
yield 'justificatif_exemption'
yield 'double_nationalite'
yield 'residence_differente'
yield 'civilite'
yield 'cp_naissance'
yield 'commune_naissance'
yield 'pj_je'
yield 'pj_ji'
yield 'situation_familiale'
yield 'situation_familiale_precision'
yield 'pupille'
yield 'pupille_categorie'
yield 'courriel'
yield 'telephone_fixe'
yield 'pj_jf'
yield 'filiation_inconnue_p1'
yield 'filiation_inconnue_p2'
yield 'cp_naissance_p1'
yield 'cp_naissance_p2'
yield 'commune_naissance_p1'
yield 'commune_naissance_p2'
@property
def variables_depotDossierPACS(self):
yield 'pj_an'
yield 'pj_anp'
yield 'pj_cp'
yield 'doc_15725_01'
yield 'doc_flux_pacs'
yield 'doc_recappdf'
yield 'civilite_p1'
yield 'acte_naissance_p1'
yield 'identite_verifiee_p1'
yield 'civilite_p2'
yield 'acte_naissance_p2'
yield 'identite_verifiee_p2'
yield 'type_convention'
yield 'aide_materielle'
yield 'regime'
yield 'convention_specifique'
def __str__(self):
return gettext('Mapping from "{procedure}" to formdef "{formdef}"').format(
procedure=self.get_procedure_display(), formdef=self.formdef.title if self.formdef else '-'
)
def export_json(self):
return {
'procedure': self.procedure,
'formdef': str(self.formdef),
'rules': self.rules,
}
@classmethod
def import_json(cls, d, resource):
mapping = cls.objects.filter(resource=resource, procedure=d['procedure']).first() or cls(
resource=resource, procedure=d['procedure']
)
mapping.formdef = d['formdef']
mapping.rules = d['rules']
mapping.save()
return mapping
class Meta:
verbose_name = _('MDEL mapping')
verbose_name_plural = _('MDEL mappings')
class Request(models.Model):
# To prevent mixing errors from analysing archive from s-p.fr and errors
# from pushing to w.c.s we separate processing with three steps:
# - receiving, i.e. copying zipfile from SFTP and storing them locally
# - processing, i.e. openeing the zipfile and extracting content as we need it
# - transferring, pushing content as a new form in w.c.s.
STATE_RECEIVED = 'received'
STATE_TRANSFERED = 'transfered'
STATE_RETURNED = 'returned'
STATE_ERROR = 'error'
STATES = [
(STATE_RECEIVED, _('Received')),
(STATE_TRANSFERED, _('Transferred')),
(STATE_ERROR, _('Error')),
(STATE_RETURNED, _('Returned')),
]
resource = models.ForeignKey(Resource, verbose_name=_('Resource'), on_delete=models.CASCADE)
created = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
modified = models.DateTimeField(verbose_name=_('Created'), auto_now=True)
filename = models.CharField(verbose_name=_('Identifier'), max_length=128)
archive = models.FileField(verbose_name=_('Archive'), max_length=256)
state = models.CharField(verbose_name=_('State'), choices=STATES, default=STATE_RECEIVED, max_length=16)
url = models.URLField(verbose_name=_('URL'), blank=True)
def delete(self, *args, **kwargs):
try:
self.archive.delete()
except Exception:
self.resource.logger.error('could not delete %s', self.archive)
return super().delete(*args, **kwargs)
@property
def message_xml(self):
# FileField can be closed, or open, you never know, and used as a
# contextmanager, __enter__ does not re-open/re-seek(0) it :/
self.archive.open()
# pylint: disable=not-context-manager
with self.archive as fd:
with zipfile.ZipFile(fd) as archive:
with archive.open('message.xml') as message_xml_fd:
s = message_xml_fd.read()
return ET.fromstring(s)
@property
def id_enveloppe(self):
message_xml = self.message_xml
ns = {
'pec': 'http://finances.gouv.fr/dgme/pec/message/v1',
'mdel': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier',
}
return message_xml.find('.//{%(pec)s}MessageId' % ns).text.split()[1]
def build_message_xml_retour(self, etat, commentaire):
message_xml = self.message_xml
ns = {
'pec': 'http://finances.gouv.fr/dgme/pec/message/v1',
'mdel': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier',
}
template = '''<ns2:Message xmlns:ns2="http://finances.gouv.fr/dgme/pec/message/v1"
xmlns="http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier">
<ns2:Header>
<ns2:Routing>
<ns2:MessageId/>
<ns2:RefToMessageId/>
<ns2:FlowType/>
<ns2:Sender/>
<ns2:Recipients>
<ns2:Recipient/>
</ns2:Recipients>
</ns2:Routing>
<ns2:Security>
<ns2:Horodatage>false</ns2:Horodatage>
</ns2:Security>
</ns2:Header>
<ns2:Body>
<ns2:Content><ns2:Retour>
<ns2:Enveloppe>
<ns2:NumeroTeledemarche/>
<ns2:MotDePasse/>
</ns2:Enveloppe>
<ns2:Instruction>
<ns2:Maj>
<ns2:Etat/>
<ns2:Commentaire/>
</ns2:Maj>
</ns2:Instruction>
</ns2:Retour>
</ns2:Content>
</ns2:Body>
</ns2:Message>'''
response = ET.XML(template)
message_id = message_xml.find('.//{%(pec)s}MessageId' % ns).text
# maybe could work with str(uuid.uuid4().hex), which would be more unique, we will never know
response.find('.//{%(pec)s}MessageId' % ns).text = 'RET-1-' + message_id
response.find('.//{%(pec)s}RefToMessageId' % ns).text = message_id
response.find('.//{%(pec)s}FlowType' % ns).text = message_xml.find('.//{%(pec)s}FlowType' % ns).text
response.find('.//{%(pec)s}Sender' % ns).extend(message_xml.find('.//{%(pec)s}Recipient' % ns))
response.find('.//{%(pec)s}Recipient' % ns).extend(message_xml.find('.//{%(pec)s}Sender' % ns))
response.find('.//{%(pec)s}FlowType' % ns).text = message_xml.find('.//{%(pec)s}FlowType' % ns).text
# Strangely the same node in the response does not have the same
# namespace as the node in the request, whatever...
response.find('.//{%(pec)s}NumeroTeledemarche' % ns).text = message_xml.find(
'.//{%(mdel)s}NumeroTeledemarche' % ns
).text
response.find('.//{%(pec)s}MotDePasse' % ns).text = message_xml.find(
'.//{%(mdel)s}MotDePasse' % ns
).text
response.find('.//{%(pec)s}Etat' % ns).text = '100'
response.find('.//{%(pec)s}Commentaire' % ns).text = 'Dossier transmis à la collectivité'
return response
def build_response_zip(self, fd_or_filename, etat, commentaire):
with zipfile.ZipFile(fd_or_filename, 'w') as archive:
message_xml = self.build_message_xml_retour(etat=etat, commentaire=commentaire)
archive.writestr(
'message.xml',
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
+ ET.tostring(message_xml, encoding='utf-8').decode(),
)
@property
def response_zip_filename(self):
m = FILE_PATTERN.match(self.filename)
numero_teledossier = m.group('identifier')
code_demarche = m.group('procedure')
id_enveloppe = self.id_enveloppe
numero_sequence = '1'
return '%s-%s-%s-%s.zip' % (numero_teledossier, code_demarche, id_enveloppe, numero_sequence)
class Meta:
verbose_name = _('MDEL request')
verbose_name_plural = _('MDEL requests')
unique_together = (('resource', 'filename'),)

View File

@ -1,137 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- edited with XMLSpy v2010 rel. 2 (http://www.altova.com) by BULL SAS (BULL SAS) -->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="RecensementCitoyen">
<xs:complexType>
<xs:sequence>
<xs:element name="Convention" type="xs:string"/>
<xs:element name="Formalite">
<xs:complexType>
<xs:sequence>
<xs:element name="Identifiant" type="xs:string"/>
<xs:element name="FormaliteType" type="xs:string"/>
<xs:element name="DateSoumission" type="xs:string"/>
<xs:element name="FormaliteMotifCode" type="xs:string" maxOccurs="2"/>
<xs:element name="FormaliteModeCode" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="Personne">
<xs:complexType>
<xs:sequence>
<xs:element name="Civilite" type="xs:string"/>
<xs:element name="Sexe" type="xs:string"/>
<xs:element name="NomFamille" type="xs:string"/>
<xs:element name="NomUsage" type="xs:string"/>
<xs:element name="PrenomUsuel" type="xs:string"/>
<xs:element name="Prenom" type="xs:string" maxOccurs="2"/>
<xs:element name="DateNaissance" type="xs:string"/>
<xs:element name="PaysNaissance" type="xs:string"/>
<xs:element name="Nationalite" type="xs:string"/>
<xs:element name="CodeINSEENaissance" type="xs:string"/>
<xs:element name="LieuNaissance" maxOccurs="2">
<xs:complexType>
<xs:sequence>
<xs:element name="Code" type="xs:string"/>
<xs:element name="Nom" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="AdresseDomicile">
<xs:complexType>
<xs:sequence>
<xs:element name="PointDeRemise" type="xs:string"/>
<xs:element name="Complement" type="xs:string"/>
<xs:element name="NumeroVoie" type="xs:string"/>
<xs:element name="Extension" type="xs:string"/>
<xs:element name="TypeVoie" type="xs:string"/>
<xs:element name="NomVoie" type="xs:string"/>
<xs:element name="LieuDit" type="xs:string"/>
<xs:element name="CodePostal" type="xs:string"/>
<xs:element name="Localite" type="xs:string"/>
<xs:element name="CodeINSEE" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="AdresseResidence">
<xs:complexType>
<xs:sequence>
<xs:element name="PointDeRemise" type="xs:string"/>
<xs:element name="Complement" type="xs:string"/>
<xs:element name="NumeroVoie" type="xs:string"/>
<xs:element name="Extension" type="xs:string"/>
<xs:element name="TypeVoie" type="xs:string"/>
<xs:element name="NomVoie" type="xs:string"/>
<xs:element name="LieuDit" type="xs:string"/>
<xs:element name="CodePostal" type="xs:string"/>
<xs:element name="Localite" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="SituationFamille">
<xs:complexType>
<xs:sequence>
<xs:element name="SituationMatrimoniale" type="xs:string"/>
<xs:element name="NombreEnfants" type="xs:string"/>
<xs:element name="Pupille" type="xs:string"/>
<xs:element name="NombreFrereSoeur" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="MethodeContact" maxOccurs="2">
<xs:complexType>
<xs:sequence>
<xs:element name="URI" type="xs:string"/>
<xs:element name="CanalCode" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="FiliationPere">
<xs:complexType>
<xs:sequence>
<xs:element name="NomFamille" type="xs:string"/>
<xs:element name="PrenomUsuel" type="xs:string"/>
<xs:element name="Prenom" type="xs:string" maxOccurs="2"/>
<xs:element name="DateNaissance" type="xs:string"/>
<xs:element name="PaysNaissance" type="xs:string"/>
<xs:element name="Nationalite" type="xs:string"/>
<xs:element name="CodeINSEENaissance" type="xs:string"/>
<xs:element name="LieuNaissance" maxOccurs="2">
<xs:complexType>
<xs:sequence>
<xs:element name="Code" type="xs:string"/>
<xs:element name="Nom" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="FiliationMere">
<xs:complexType>
<xs:sequence>
<xs:element name="NomFamille" type="xs:string"/>
<xs:element name="PrenomUsuel" type="xs:string"/>
<xs:element name="Prenom" type="xs:string" maxOccurs="2"/>
<xs:element name="DateNaissance" type="xs:string"/>
<xs:element name="PaysNaissance" type="xs:string"/>
<xs:element name="Nationalite" type="xs:string"/>
<xs:element name="CodeINSEENaissance" type="xs:string"/>
<xs:element name="LieuNaissance" maxOccurs="2">
<xs:complexType>
<xs:sequence>
<xs:element name="Code" type="xs:string"/>
<xs:element name="Nom" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>

View File

@ -1,6 +0,0 @@
<div class="variable-widget">
{% include widget.subwidgets.0.template_name with widget=widget.subwidgets.0 %}
</div>
<div class="expression-widget">
{% include widget.subwidgets.1.template_name with widget=widget.subwidgets.1 %}
</div>

View File

@ -1,9 +0,0 @@
{% extends "passerelle/manage/resource_child_confirm_delete.html" %}
{% block resource-child-breadcrumb %}
{% if object.id %}
<a href="#">{{ object.get_procedure_display }}</a>
{% else %}
<a href="#">{% trans "Add mapping" %}</a>
{% endif %}
{% endblock %}

View File

@ -1,55 +0,0 @@
{% extends "passerelle/manage/resource_child_form.html" %}
{% load i18n %}
{% comment %}
{% block resource-child-breadcrumb %}
{% if object.id %}
<a href="#">{{ object }}</a>
{% else %}
<a href="#">{% trans "Add mapping" %}</a>
{% endif %}
{% endblock %}
{% endcomment %}
{% block form %}
{% if form.errors %}
<div class="errornotice">
<p>{% trans "There were errors processing your form." %}</p>
{% for error in form.non_field_errors %}
<p>{{ error }}</p>
{% endfor %}
{% for field in form %}
{% if field.is_hidden and field.errors %}
<p>
{% for error in field.errors %}
{% blocktrans with name=field.name %}(Hidden field {{name}}) {{ error }}{% endblocktrans %}
{% if not forloop.last %}<br>{% endif %}
{% endfor %}
</p>
{% endif %}
{% endfor %}
</div>
{% endif %}
{% include "gadjo/widget.html" with field=form.procedure %}
{% include "gadjo/widget.html" with field=form.formdef%}
{% if form.table_fields %}
<table class="main">
<thead>
<tr>
<td>Label</td>
<td>Variable</td>
</tr>
</thead>
<tbody>
{% for field in form.table_fields %}
<tr>
<td>{{ field.label_tag }}</td>
<td>{{ field }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
{% endblock %}

View File

@ -1,42 +0,0 @@
{% extends "passerelle/manage/service_view.html" %}
{% load i18n passerelle %}
{% block description %}
<p>
{% blocktrans %}
Connector to forms published by <a href="https://www.service-public.fr/">service-public.fr</a>
{% endblocktrans %}
<a href="{% url "sp-fr-run" slug=object.slug %}">Run</a>
</p>
{{ block.super }}
{% endblock %}
{% block extra-tab-buttons %}
<button role="tab" aria-selected="false" aria-controls="panel-mappings" id="tab-mappings" tabindex="-1">{% trans "Mappings" %}</button>
{% endblock %}
{% block extra-tab-panels %}
<div id="panel-mappings" role="tabpanel" tabindex="-1" aria-labelledby="tab-mappings" hidden>
<ul>
{% for mapping in object.mappings.all %}
<li>
<fieldset class="gadjo-foldable gadjo-folded" id="sp-fr-mapping-{{ mapping.pk}}">
<legend class="gadjo-foldable-widget">
<a href="{% url "sp-fr-mapping-edit" connector=object.get_connector_slug slug=object.slug pk=mapping.pk %}">{% blocktrans with procedure=mapping.get_procedure_display formdef=mapping.formdef.title %}From procedure {{ procedure }} to form {{ formdef }}{% endblocktrans %}</a>
</legend>
<div class="gadjo-folding">
{% for key, value in mapping.rules.fields.items %}
{% if value %}
<p>{{ value.label }}&nbsp;: {{ value.variable }} {% if value.expression %}({% trans "with expression" %} <tt>{{ value.expression }}</tt>){% endif %}</p>
{% endif %}
{% endfor %}
<a rel="popup" class="delete" href="{% url "sp-fr-mapping-delete" connector=object.get_connector_slug slug=object.slug pk=mapping.pk %}">{% trans "Delete" %}</a>
</div>
</fieldset>
</li>
{% endfor %}
</ul>
<p><a class="button" href="{% url "sp-fr-mapping-new" slug=object.slug %}">{% trans "Add" %}</a></p>
</div>
{% endblock %}

View File

@ -1,32 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.urls import re_path
from . import views
management_urlpatterns = [
re_path(r'^(?P<slug>[\w,-]+)/mapping/new/$', views.MappingNew.as_view(), name='sp-fr-mapping-new'),
re_path(
r'^(?P<slug>[\w,-]+)/mapping/(?P<pk>\d+)/$', views.MappingEdit.as_view(), name='sp-fr-mapping-edit'
),
re_path(
r'^(?P<slug>[\w,-]+)/mapping/(?P<pk>\d+)/delete/$',
views.MappingDelete.as_view(),
name='sp-fr-mapping-delete',
),
re_path(r'^(?P<slug>[\w,-]+)/run/$', views.run, name='sp-fr-run'),
]

View File

@ -1,67 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.views.generic import CreateView, DeleteView, UpdateView
from passerelle.base.mixins import ResourceChildViewMixin
from . import forms, models
class StayIfChanged:
has_changed = False
def form_valid(self, form):
if set(form.changed_data) & set(['procedure', 'formdef']):
self.has_changed = True
return super().form_valid(form)
def get_success_url(self):
if self.has_changed:
return self.get_changed_url()
return super().get_success_url()
def get_changed_url(self):
return ''
class MappingNew(StayIfChanged, ResourceChildViewMixin, CreateView):
model = models.Mapping
form_class = forms.MappingForm
def form_valid(self, form):
form.instance.resource = self.resource
return super().form_valid(form)
def get_changed_url(self):
return self.object.get_absolute_url()
class MappingEdit(StayIfChanged, ResourceChildViewMixin, UpdateView):
model = models.Mapping
form_class = forms.MappingForm
class MappingDelete(ResourceChildViewMixin, DeleteView):
model = models.Mapping
def run(request, connector, slug):
resource = get_object_or_404(models.Resource, slug=slug)
resource.run_loop()
return HttpResponseRedirect(resource.get_absolute_url())

View File

@ -1,320 +0,0 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import isodate
from lxml import etree as ET
from zeep.utils import qname_attr
def parse_bool(boolean):
return boolean.lower() == 'true'
def parse_date(date):
if isinstance(date, datetime.date):
return date
return datetime.datetime.strptime('%Y-%m-%d', date).date()
XSD = 'http://www.w3.org/2001/XMLSchema'
ns = {'xsd': XSD}
SCHEMA = ET.QName(XSD, 'schema')
ANNOTATION = ET.QName(XSD, 'annotation')
ELEMENT = ET.QName(XSD, 'element')
ATTRIBUTE = ET.QName(XSD, 'attribute')
COMPLEX_TYPE = ET.QName(XSD, 'complexType')
SIMPLE_TYPE = ET.QName(XSD, 'simpleType')
COMPLEX_CONTENT = ET.QName(XSD, 'complexContent')
EXTENSION = ET.QName(XSD, 'extension')
RESTRICTION = ET.QName(XSD, 'restriction')
SEQUENCE = ET.QName(XSD, 'sequence')
CHOICE = ET.QName(XSD, 'choice')
ALL = ET.QName(XSD, 'all')
BOOLEAN = ET.QName(XSD, 'boolean')
STRING = ET.QName(XSD, 'string')
DATE = ET.QName(XSD, 'date')
INT = ET.QName(XSD, 'int')
INTEGER = ET.QName(XSD, 'integer')
DATE_TIME = ET.QName(XSD, 'dateTime')
ANY_TYPE = ET.QName(XSD, 'anyType')
TYPE_CASTER = {
BOOLEAN: parse_bool,
STRING: str,
DATE: parse_date,
INT: int,
INTEGER: int,
DATE_TIME: isodate.parse_datetime,
ANY_TYPE: lambda v: v,
}
class Schema:
def __init__(self):
self.types = {}
self.elements = {}
self.target_namespace = None
self.element_form_default = 'qualified'
self.attribute_form_default = 'unqualified'
self.nsmap = {}
def visit(self, root):
assert root.tag == SCHEMA
assert set(root.attrib) <= set(['targetNamespace', 'elementFormDefault', 'attributeFormDefault']), (
'unsupported schema attributes %s' % root.attrib
)
self.target_namespace = root.get('targetNamespace')
self.element_form_default = root.get('elementFormDefault', self.element_form_default)
self.attribute_form_default = root.get('attributeFormDefault', self.attribute_form_default)
self.nsmap = root.nsmap
self.reverse_nsmap = {value: key for key, value in self.nsmap.items()}
# first pass
for node in root:
if node.tag == COMPLEX_TYPE:
name = qname_attr(node, 'name')
assert name, 'unsupported top complexType without name'
self.types[name] = {}
elif node.tag == ELEMENT:
name = qname_attr(node, 'name')
assert name, 'unsupported top element without name'
self.elements[name] = {}
elif node.tag == SIMPLE_TYPE:
name = qname_attr(node, 'name')
assert name, 'unsupported top simpleType without name'
self.types[name] = {}
else:
raise NotImplementedError('unsupported top element %s' % node)
# second pass
for node in root:
if node.tag == COMPLEX_TYPE:
d = self.visit_complex_type(node)
target = self.types
elif node.tag == SIMPLE_TYPE:
d = self.visit_simple_type(node)
target = self.types
elif node.tag == ELEMENT:
d = self.visit_element(node)
target = self.elements
else:
raise NotImplementedError
if not d['name'].namespace and self.target_namespace:
d['name'] = ET.QName(self.target_namespace, d['name'].localname)
target[d['name']] = d
def visit_simple_type(self, node):
# ignore annotations
children = [child for child in node if child.tag != ANNOTATION]
d = {}
name = qname_attr(node, 'name')
if name:
d['name'] = name
assert len(children) == 1, list(node)
assert children[0].tag == RESTRICTION
xsd_type = qname_attr(children[0], 'base')
assert xsd_type == STRING
d['type'] = STRING
return d
def visit_complex_content(self, node):
d = {}
name = qname_attr(node, 'name')
if name:
d['name'] = name
assert len(node) == 1
assert node[0].tag == EXTENSION
xsd_type = qname_attr(node[0], 'base')
d['type'] = xsd_type
return d
def visit_complex_type(self, node):
# ignore annotations
children = [child for child in node if child.tag != ANNOTATION]
if children and children[0].tag in (SEQUENCE, CHOICE, ALL, COMPLEX_CONTENT):
if children[0].tag == SEQUENCE:
d = self.visit_sequence(children[0])
elif children[0].tag == CHOICE:
d = self.visit_choice(children[0])
elif children[0].tag == ALL:
d = self.visit_all(children[0])
elif children[0].tag == COMPLEX_CONTENT:
d = self.visit_complex_content(children[0])
children = children[1:]
else:
d = {}
for child in children:
assert child.tag == ATTRIBUTE, 'unsupported complexType with child %s' % child
name = qname_attr(child, 'name')
assert name, 'attribute without a name %s' % ET.tostring(child)
assert set(child.attrib) <= set(['use', 'type', 'name']), child.attrib
attributes = d.setdefault('attributes', {})
xsd_type = qname_attr(child, 'type')
attributes[name] = {
'name': name,
'use': child.get('use', 'optional'),
'type': xsd_type,
}
name = qname_attr(node, 'name')
if name:
d['name'] = name
return d
def visit_element(self, node, top=False):
# ignore annotations
assert set(node.attrib.keys()) <= set(['name', 'type', 'minOccurs', 'maxOccurs']), node.attrib
children = [child for child in node if child.tag != ANNOTATION]
# we handle elements with a name and one child, an anonymous complex type
# or element without children referencing a complex type
name = qname_attr(node, 'name')
assert name is not None
min_occurs = node.attrib.get('minOccurs') or 1
max_occurs = node.attrib.get('maxOccurs') or 1
d = {
'name': name,
'min_occurs': int(min_occurs),
'max_occurs': max_occurs if max_occurs == 'unbounded' else int(max_occurs),
}
if len(children) == 1:
ctype_node = children[0]
assert ctype_node.tag == COMPLEX_TYPE
assert ctype_node.attrib == {}
d.update(self.visit_complex_type(ctype_node))
return d
elif len(children) == 0:
xsd_type = qname_attr(node, 'type')
if xsd_type is None:
xsd_type = STRING
d['type'] = xsd_type
return d
else:
raise NotImplementedError('unsupported element with more than one children %s' % list(node))
def visit_sequence(self, node):
assert set(node.attrib) <= set(['maxOccurs']), node.attrib
sequence = []
for element_node in node:
assert element_node.tag in (
ELEMENT,
CHOICE,
), 'unsupported sequence with child not an element or a choice %s' % ET.tostring(element_node)
if element_node.tag == ELEMENT:
sequence.append(self.visit_element(element_node))
elif element_node.tag == CHOICE:
sequence.append(self.visit_choice(element_node))
d = {
'sequence': sequence,
}
if 'maxOccurs' in node.attrib:
d['max_occurs'] = node.get('maxOccurs', 1)
return d
def visit_all(self, node):
return self.visit_sequence(node)
def visit_choice(self, node):
assert node.attrib == {}, 'unsupported choice with attributes %s' % node.attrib
choice = []
for element_node in node:
assert element_node.tag == ELEMENT, 'unsupported sequence with child not an element %s' % node
choice.append(self.visit_element(element_node))
return {'choice': choice}
def qname_display(self, name):
if name.namespace in self.reverse_nsmap:
name = '%s:%s' % (self.reverse_nsmap[name.namespace], name.localname)
return str(name)
def paths(self):
roots = sorted(self.elements.keys())
def helper(path, ctype, is_type=False):
name = None
if 'name' in ctype:
name = ctype['name']
max_occurs = ctype.get('max_occurs', 1)
max_occurs = 3 if max_occurs == 'unbounded' else max_occurs
if 'type' in ctype:
if name and not is_type:
path = path + [name]
xsd_type = ctype['type']
if xsd_type in self.types:
sub_type = self.types[xsd_type]
for subpath in helper(path, sub_type, is_type=True):
yield subpath
else:
if max_occurs > 1:
for i in range(max_occurs):
yield path[:-1] + [
ET.QName(name.namespace, name.localname + '_%d' % (i + 1))
], xsd_type
yield path, xsd_type
else:
for extension in (
[''] if max_occurs == 1 else [''] + ['_%s' % i for i in list(range(1, max_occurs + 1))]
):
new_path = path
if name and not is_type:
new_path = new_path + [ET.QName(name.namespace, name.localname + extension)]
if 'sequence' in ctype:
for sub_ctype in ctype['sequence']:
for subpath in helper(new_path, sub_ctype):
yield subpath
elif 'choice' in ctype:
for sub_ctype in ctype['choice']:
for subpath in helper(new_path, sub_ctype):
yield subpath
for root in roots:
for path in helper([], self.elements[root]):
yield path
class Path:
def __init__(self, path, xsd_type):
assert path
self.path = path
self.xsd_type = xsd_type
try:
self.caster = TYPE_CASTER[xsd_type]
except KeyError:
raise KeyError(str(xsd_type))
def resolve(self, root):
def helper(node, path):
if not path:
return node
else:
for child in node:
if child.tag == path[0]:
return helper(child, path[1:])
if root.tag != self.path[0]:
return None
child = helper(root, self.path[1:])
if child is not None and child.text and not list(child):
return self.caster(child.text)
def __str__(self):
return '.'.join(str(name) for name in self.path)

View File

@ -170,7 +170,6 @@ INSTALLED_APPS = (
'passerelle.apps.sivin',
'passerelle.apps.soap',
'passerelle.apps.solis',
'passerelle.apps.sp_fr',
'passerelle.apps.twilio',
'passerelle.apps.vivaticket',
# backoffice templates and static

View File

@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mock
import pytest
import tests.utils
from passerelle.apps.sp_fr.models import Resource
from passerelle.utils.sftp import SFTP
from passerelle.utils.wcs import FormDefRef, get_wcs_choices
DUMMY_CONTENT = {
'DILA': {
'a.zip': 'a',
}
}
@pytest.fixture
def spfr(settings, wcs_host, db, sftpserver):
wcs_host.add_api_secret('test', 'test')
settings.KNOWN_SERVICES = {
'wcs': {
'eservices': {
'title': 'Démarches',
'url': wcs_host.url,
'secret': 'test',
'orig': 'test',
}
}
}
yield tests.utils.make_resource(
Resource,
title='Test 1',
slug='test1',
description='Connecteur de test',
input_sftp=SFTP('sftp://john:doe@{server.host}:{server.port}/DILA/'.format(server=sftpserver)),
output_sftp=SFTP('sftp://john:doe@{server.host}:{server.port}/DILA/'.format(server=sftpserver)),
)
def test_resource(spfr):
assert [x[1] for x in get_wcs_choices()] == ['---------', 'D\xe9marches - Demande']
def test_sftp_access(spfr, sftpserver):
with sftpserver.serve_content(DUMMY_CONTENT):
with spfr.input_sftp.client() as input_sftp:
assert input_sftp.listdir() == ['a.zip']
with spfr.output_sftp.client() as output_sftp:
assert output_sftp.listdir() == ['a.zip']
def test_import_export(spfr):
# mock FormDefRef.formdef property to prevent w.c.s. API calls
with mock.patch.object(FormDefRef, 'formdef') as mock_formdef:
mock_formdef.__get__ = mock.Mock(return_value=None)
mapping = spfr.mappings.create(procedure='DOC', formdef=FormDefRef('wcs:formdef1'), rules={'a': 'b'})
serialization = spfr.export_json()
spfr.delete()
new_spfr = spfr.__class__.import_json(serialization)
assert dict(spfr.__dict__, _state=None, id=None, logger=None) == dict(
new_spfr.__dict__, id=None, logger=None, _state=None
)
assert dict(
new_spfr.mappings.get().__dict__, _resource_cache=None, resource_id=None, id=None, _state=None
) == dict(mapping.__dict__, _resource_cache=None, resource_id=None, id=None, _state=None)