sigerly: add sigerly connector (#47856)

This commit is contained in:
Nicolas Roche 2020-10-19 15:16:42 +02:00
parent 2d649b2247
commit de1e45a50e
7 changed files with 496 additions and 0 deletions

View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2020-10-19 13:26
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('base', '0022_auto_20200715_1033'),
]
operations = [
migrations.CreateModel(
name='Sigerly',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=50, verbose_name='Title')),
('slug', models.SlugField(unique=True, verbose_name='Identifier')),
('description', models.TextField(verbose_name='Description')),
('basic_auth_username', models.CharField(blank=True, max_length=128, verbose_name='Basic authentication username')),
('basic_auth_password', models.CharField(blank=True, max_length=128, verbose_name='Basic authentication password')),
('client_certificate', models.FileField(blank=True, null=True, upload_to='', verbose_name='TLS client certificate')),
('trusted_certificate_authorities', models.FileField(blank=True, null=True, upload_to='', verbose_name='TLS trusted CAs')),
('verify_cert', models.BooleanField(default=True, verbose_name='TLS verify certificates')),
('http_proxy', models.CharField(blank=True, max_length=128, verbose_name='HTTP and HTTPS proxy')),
('base_url', models.CharField(help_text='example: https://sig.sigerly.fr/syecl_intervention/webservicev2/', max_length=256, verbose_name='Service URL')),
('users', models.ManyToManyField(blank=True, related_name='_sigerly_users_+', related_query_name='+', to='base.ApiUser')),
],
options={
'verbose_name': 'Sigerly',
},
),
]

View File

@ -0,0 +1,154 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import urljoin
from django.db import models
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
CREATE_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
"type": "object",
'required': ['demandeur', 'id_typeinterv', 'id_urgence', 'elements'],
'properties': {
'demandeur': {
'description': "Nom du demandeur",
'type': 'string',
},
'id_typeinterv': {
'description': "Type de l'intervention",
'type': 'string',
},
'id_urgence': {
'description': 'Urgence',
'type': 'string',
},
'id_qualification': {
'description': 'Qualification',
'type': 'string',
},
'observations': {
'description': 'Observations',
'type': 'string',
},
'elements': {
'description': "Identifiant de l'objet : liste séparée par ':'",
'type': 'string',
'pattern': r'^[0-9A-Z :]+$',
},
},
}
QUERY_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
"type": "object",
'properties': {
'id_intervention': {
'description': 'Rechercher une intervention par son numéro'
' (non cumulable avec les autres filtres)',
'type': 'string',
},
'date_debut_demande': {
'description': 'Rechercher toutes les interventions dont la date de demande'
' est supérieure ou égale à la date renseignée (YYYY-MM-DD)',
'type': 'string',
},
'date_fin_demande': {
'description': 'Rechercher toutes les interventions dont la date de demande'
' est inférieure ou égale à la date renseignée (YYYY-MM-DD)',
'type': 'string',
},
'insee': {
'description': "Code INSEE de la commune : liste séparée par ':'",
'type': 'string',
'pattern': r'^[0-9A-Z :]+$',
},
},
}
class Sigerly(BaseResource, HTTPResource):
base_url = models.CharField(
max_length=256,
blank=False,
verbose_name=_('Webservice Base URL'),
help_text='exemple: https://sig.sigerly.fr/syecl_intervention_preprod/webservicev2/',
)
category = _('Business Process Connectors')
class Meta:
verbose_name = 'Sigerly'
def request(self, uri, json):
url = urljoin(self.base_url, uri)
headers = {'Accept': 'application/json'}
response = self.requests.post(url, json=json, headers=headers)
if not response.ok:
raise APIError('Sigerly returned bad response code from API: %s' % response.status_code)
try:
json_response = response.json()
except ValueError:
raise APIError('Sigerly returned invalid JSON content: %r' % response.content[:1024])
return json_response
@endpoint(
perm='can_access',
methods=['post'],
description='Envoyer une demande',
post={'request_body': {'schema': {'application/json': CREATE_SCHEMA}}},
)
def create(self, request, post_data):
post_data['id_typeinterv'] = int(post_data['id_typeinterv'])
post_data['id_urgence'] = int(post_data['id_urgence'])
post_data['id_qualification'] = int(post_data['id_qualification'])
post_data['elements'] = [x.strip() for x in post_data['elements'].split(':') if x.strip()]
response = self.request('createIntervention.php', json=post_data)
if not response.get('success', None):
raise APIError(response.get('message', None))
if not response.get('message', None):
raise APIError('No intervention id returned')
return {'data': response}
@endpoint(
perm='can_access',
methods=['post'],
description='Récupérer le statut dune demande',
post={'request_body': {'schema': {'application/json': QUERY_SCHEMA}}},
)
def query(self, request, post_data):
if post_data.get('id_intervention', None):
post_data['id_intervention'] = int(post_data['id_intervention'])
post_data.pop('date_debut_demande', None)
post_data.pop('date_fin_demande', None)
post_data.pop('insee', None)
else:
post_data.pop('id_intervention', None)
if post_data.get('insee'):
post_data['insee'] = [x.strip() for x in post_data['insee'].split(':') if x.strip()]
response = self.request('getIntervention.php', json=post_data)
for record in response:
record['id'] = record.get('code_inter')
record['text'] = '%(code_inter)s: %(libelle_intervention)s' % record
return {'data': response}

View File

@ -0,0 +1,56 @@
[
{
"annule" : null,
"code_inter" : "DP.20.116.1",
"date_cr1" : null,
"date_emission" : "2020-11-19",
"date_inter_planif1" : null,
"date_inter_rea_1" : null,
"date_valid" : "2020-11-19",
"date_valid_planif_1" : null,
"date_validfact" : null,
"elements" : [
{
"icon_gmap" : "luminaire.png",
"id_interv" : 22014,
"idelement" : 42282,
"ident" : "LIMW003D",
"libelle_objetgeo" : "Luminaire",
"nom_rue" : "Voie d'accès parc des Sports",
"the_geom" : "0101000020110F000038A23D18A5372041C775D7FE5BF35541",
"travaux" : []
},
{
"icon_gmap" : "luminaire.png",
"id_interv" : 22014,
"idelement" : 42283,
"ident" : "LIMW003C",
"libelle_objetgeo" : "Luminaire",
"nom_rue" : "Voie d'accès parc des Sports",
"the_geom" : "0101000020110F00006F56132FA53720417E4630FB5BF35541",
"travaux" : []
}
],
"flag_mob" : null,
"garantie" : null,
"id_commune" : 29,
"id_demande_web" : 10914,
"id_entreprise" : 2,
"id_entreprise_2" : null,
"id_traitement" : null,
"id_typeinterv" : 5,
"id_urgence" : 1,
"idinterv" : 22014,
"libcause" : null,
"libcommune" : "LIMONEST",
"libelle_intervention" : "DEPANNAGE EP",
"libelle_urgence" : "Normale",
"libentreprise" : "EIFFAGE",
"libentreprise2" : null,
"libterritoire" : "NORD",
"num_mandat" : null,
"ref_technique" : null,
"valid_entreprise" : null,
"valid_sydev" : null
}
]

View File

@ -0,0 +1,91 @@
[
{
"annule" : null,
"code_inter" : "VN.20.291.11",
"date_cr1" : null,
"date_emission" : "2020-11-19",
"date_inter_planif1" : null,
"date_inter_rea_1" : null,
"date_inter_rea_2" : null,
"date_valid" : null,
"date_valid_planif_1" : null,
"date_valid_planif_2" : null,
"date_validfact" : null,
"elements" : [],
"flag_mob" : null,
"garantie" : null,
"id_commune" : 6,
"id_demande_web" : null,
"id_entreprise" : 5,
"id_entreprise_2" : 6,
"id_suite" : null,
"id_traitement" : 1,
"id_typeinterv" : 4,
"id_urgence" : null,
"idinterv" : 25080,
"inseecommune" : "069291",
"libcause" : null,
"libcommune" : "ST SYMPHORIEN D OZON",
"libelle_intervention" : "VISITE DE NUIT",
"libelle_urgence" : null,
"libentreprise" : "SERPOLLET",
"libentreprise2" : "INEO",
"libterritoire" : "SUD",
"mail" : "Mikael.BOBROSKY@serpollet.com",
"num_mandat" : null,
"ref_technique" : null,
"toodego" : null,
"valid_entreprise" : null,
"valid_sydev" : null
},
{
"annule" : null,
"code_inter" : "DP.20.291.53",
"date_cr1" : "2020-11-19",
"date_emission" : "2020-11-19",
"date_inter_planif1" : "2020-11-19",
"date_inter_rea_1" : "2020-11-19",
"date_inter_rea_2" : "2020-11-19",
"date_valid" : "2020-11-19",
"date_valid_planif_1" : "19/11/2020",
"date_valid_planif_2" : "2020-11-19",
"date_validfact" : null,
"elements" : [
{
"icon_gmap" : "armoire.png",
"id_interv" : 27948,
"idelement" : 51681,
"ident" : "AB",
"libelle_objetgeo" : "Armoire",
"nom_rue" : "Portes de Lyon (avenue des)",
"the_geom" : "0101000020110F0000C7A520BC497F20415B9A1EFB2AD45541",
"travaux" : []
}
],
"flag_mob" : null,
"garantie" : null,
"id_commune" : 6,
"id_demande_web" : 15879,
"id_entreprise" : 5,
"id_entreprise_2" : null,
"id_suite" : 2,
"id_traitement" : 1,
"id_typeinterv" : 5,
"id_urgence" : null,
"idinterv" : 27948,
"inseecommune" : "069291",
"libcause" : null,
"libcommune" : "ST SYMPHORIEN D OZON",
"libelle_intervention" : "DEPANNAGE EP",
"libelle_urgence" : null,
"libentreprise" : "SERPOLLET",
"libentreprise2" : null,
"libterritoire" : "SUD",
"mail" : "Mikael.BOBROSKY@serpollet.com",
"num_mandat" : null,
"ref_technique" : null,
"toodego" : null,
"valid_entreprise" : 1,
"valid_sydev" : 1
}
]

View File

@ -30,6 +30,7 @@ INSTALLED_APPS += (
'passerelle.contrib.nancypoll',
'passerelle.contrib.planitech',
'passerelle.contrib.rsa13',
'passerelle.contrib.sigerly',
'passerelle.contrib.solis_apa',
'passerelle.contrib.solis_afi_mss',
'passerelle.contrib.strasbourg_eu',

157
tests/test_sigerly.py Normal file
View File

@ -0,0 +1,157 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import httmock
import pytest
from passerelle.contrib.sigerly.models import Sigerly
import utils
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'sigerly')
@pytest.fixture
def connector(db):
return utils.setup_access_rights(Sigerly.objects.create(slug='test', base_url='https://dummy-server.org'))
def json_get_data(filename):
with open(os.path.join(TEST_BASE_DIR, "%s.json" % filename)) as fd:
return json.dumps(json.load(fd))
def get_endpoint(name):
return utils.generic_endpoint_url('sigerly', name)
@pytest.mark.parametrize(
'status_code, content, err_desc',
[
(500, 'xxx', 'bad response code from API: 500'),
(400, 'xxx', 'bad response code from API: 400'),
(200, 'not json', 'invalid JSON content'),
],
)
def test_request_error(app, connector, status_code, content, err_desc):
endpoint = get_endpoint('query')
@httmock.all_requests
def sigerly_mock(url, request):
return httmock.response(status_code, content)
with httmock.HTTMock(sigerly_mock):
resp = app.post_json(endpoint, params={})
assert resp.json['err']
assert err_desc in resp.json['err_desc']
def test_create(app, connector):
endpoint = get_endpoint('create')
payload = {
'demandeur': 'Test webservice',
'id_typeinterv': '5',
'id_urgence': '1',
'id_qualification': '8',
'observations': 'Test webservice',
'elements': 'LIMW003D:LIMWW003C',
}
@httmock.urlmatch(netloc='dummy-server.org', path='/createIntervention.php', method='POST')
def sigerly_mock(url, request):
assert request.headers['Accept'] == 'application/json'
assert json.loads(request.body)['elements'] == ['LIMW003D', 'LIMWW003C']
return httmock.response(200, json.dumps({'success': True, 'message': '7830'}))
with httmock.HTTMock(sigerly_mock):
resp = app.post_json(endpoint, params=payload)
assert not resp.json['err']
assert resp.json['data']['message'] == '7830' # id unusable within query endpoint
@pytest.mark.parametrize(
'success, message, err_desc',
[
(False, 'an error message', 'an error message'),
(True, '', 'No intervention id returned'),
],
)
def test_create_error(app, connector, success, message, err_desc):
endpoint = get_endpoint('create')
payload = {
'demandeur': 'Test webservice',
'id_typeinterv': '5',
'id_urgence': '1',
'id_qualification': '8',
'observations': 'Test webservice',
'elements': 'LIMW003D:LIMWW003C',
}
@httmock.urlmatch(netloc='dummy-server.org', path='/createIntervention.php', method='POST')
def sigerly_mock(url, request):
assert request.headers['Accept'] == 'application/json'
return httmock.response(200, json.dumps({'success': success, 'message': message}))
with httmock.HTTMock(sigerly_mock):
resp = app.post_json(endpoint, params=payload)
assert resp.json['err']
assert resp.json['err_desc'] == err_desc
def test_query_id(app, connector):
endpoint = get_endpoint('query')
payload = {
'id_intervention': '10914',
}
@httmock.urlmatch(netloc='dummy-server.org', path='/getIntervention.php', method='POST')
def sigerly_mock(url, request):
assert request.headers['Accept'] == 'application/json'
assert json.loads(request.body) == {'id_intervention': 10914}
return httmock.response(200, json_get_data('getIntervention_1'))
with httmock.HTTMock(sigerly_mock):
resp = app.post_json(endpoint, params=payload)
assert not resp.json['err']
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['id_demande_web'] == 10914
assert resp.json['data'][0]['idinterv'] == 22014
elements = resp.json['data'][0]['elements']
assert len(elements) == 2
assert [x['ident'] for x in elements] == ['LIMW003D', 'LIMW003C']
def test_query_filters(app, connector):
endpoint = get_endpoint('query')
payload = {'date_debut_demande': '19/11/2020', 'date_fin_demande': '19/11/2020', 'insee': '::069291:::069283::'}
@httmock.urlmatch(netloc='dummy-server.org', path='/getIntervention.php', method='POST')
def sigerly_mock(url, request):
assert request.headers['Accept'] == 'application/json'
assert json.loads(request.body)['insee'] == ['069291', '069283']
return httmock.response(200, json_get_data('getIntervention_2'))
with httmock.HTTMock(sigerly_mock):
resp = app.post_json(endpoint, params=payload)
assert not resp.json['err']
assert len(resp.json['data']) == 2
assert [x['date_emission'] for x in resp.json['data']] == ['2020-11-19'] * 2
assert [x['inseecommune'] for x in resp.json['data']] == ['069291'] * 2