passerelle/passerelle/contrib/toulouse_axel/models.py

1114 lines
43 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import copy
import datetime
import logging
import os
import re
import xml.etree.ElementTree as ET
from collections import namedtuple
from django.db import models
from django.http import HttpResponse
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _
import xmlschema
from passerelle.base.models import BaseResource
from passerelle.compat import json_loads
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.xml import JSONSchemaFromXMLSchema
from . import utils
logger = logging.getLogger('passerelle.contrib.toulouse_axel')
BASE_XSD_PATH = os.path.join(os.path.dirname(__file__), 'xsd')
PAYMENT_SCHEMA = {
'type': 'object',
'properties': {
'transaction_date': copy.deepcopy(utils.datetime_type),
'transaction_id': {
'type': 'string',
}
},
'required': ['transaction_date', 'transaction_id']
}
class AxelSchema(JSONSchemaFromXMLSchema):
type_map = {
'{urn:AllAxelTypes}DATEREQUIREDType': 'date',
'{urn:AllAxelTypes}DATEType': 'date_optional',
'{urn:AllAxelTypes}OUINONREQUIREDType': 'bool',
'{urn:AllAxelTypes}OUINONType': 'bool_optional',
}
@classmethod
def schema_date(cls):
return {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}',
}
def encode_date(self, obj):
try:
return datetime.datetime.strptime(obj, utils.json_date_format).strftime(utils.xml_date_format)
except ValueError:
return obj
def encode_date_optional(self, obj):
if not obj:
return obj
return self.encode_date(obj)
def decode_date(self, data):
value = datetime.datetime.strptime(data.text, utils.xml_date_format).strftime(utils.json_date_format)
return xmlschema.ElementData(tag=data.tag, text=value, content=data.content, attributes=data.attributes)
def decode_date_optional(self, data):
if not data.text:
return data
return self.decode_date(data)
@classmethod
def schema_bool(cls):
return copy.deepcopy(utils.boolean_type)
def encode_bool(self, obj):
return utils.encode_bool(obj)
def decode_bool(self, data):
value = False
if data.text.lower() == 'oui':
value = True
return xmlschema.ElementData(tag=data.tag, text=value, content=data.content, attributes=data.attributes)
@classmethod
def schema_bool_optional(cls):
schema_bool_optional = cls.schema_bool()
schema_bool_optional['oneOf'].append({'type': 'string', 'enum': ['']})
return schema_bool_optional
def encode_bool_optional(self, obj):
return self.encode_bool(obj)
def decode_bool_optional(self, data):
if not data.text:
return data
return self.decode_bool(data)
class AxelError(Exception):
def __init__(self, message, xml_request=None, xml_response=None, *args):
self.message = message
self.xml_request = xml_request
self.xml_response = xml_response
super(AxelError, self).__init__(message, *args)
def __str__(self):
return self.message
def xml_schema_converter(name, root_element):
xsd_path = os.path.join(BASE_XSD_PATH, name)
if not os.path.exists(xsd_path):
return None
return AxelSchema(xsd_path, root_element)
OperationResult = namedtuple('OperationResult', ['json_response', 'xml_request', 'xml_response'])
class Operation(object):
def __init__(self, operation, prefix='Dui/', request_root_element='PORTAIL'):
self.operation = operation
self.request_converter = xml_schema_converter('%sQ_%s.xsd' % (prefix, operation), request_root_element)
self.response_converter = xml_schema_converter('%sR_%s.xsd' % (prefix, operation), 'PORTAILSERVICE')
self.name = re.sub(
'(.?)([A-Z])',
lambda s: s.group(1) + ('-' if s.group(1) else '') + s.group(2).lower(),
operation)
self.snake_name = self.name.replace('-', '_')
@property
def request_schema(self):
schema = self.request_converter.json_schema
schema['flatten'] = True
schema['merge_extra'] = True
return schema
def __call__(self, resource, request_data=None):
client = resource.soap_client()
serialized_request = ''
if self.request_converter:
try:
serialized_request = self.request_converter.encode(request_data)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError('invalid request %s' % str(e))
utils.indent(serialized_request)
serialized_request = force_text(ET.tostring(serialized_request))
try:
self.request_converter.xml_schema.validate(serialized_request)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError(
'invalid request %s' % str(e),
xml_request=serialized_request)
result = client.service.getData(
self.operation,
serialized_request,
'') # FIXME: What is the user parameter for ?
xml_result = ET.fromstring(result.encode('utf-8'))
utils.indent(xml_result)
pretty_result = force_text(ET.tostring(xml_result))
if xml_result.find('RESULTAT/STATUS').text != 'OK':
msg = xml_result.find('RESULTAT/COMMENTAIRES').text
raise AxelError(
msg,
xml_request=serialized_request,
xml_response=pretty_result)
try:
return OperationResult(
json_response=self.response_converter.decode(xml_result),
xml_request=serialized_request,
xml_response=pretty_result
)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError(
'invalid response %s' % str(e),
xml_request=serialized_request,
xml_response=pretty_result)
ref_date_gestion_dui = Operation('RefDateGestionDui')
ref_verif_dui = Operation('RefVerifDui')
ref_famille_dui = Operation('RefFamilleDui')
form_maj_famille_dui = Operation('FormMajFamilleDui')
form_paiement_dui = Operation('FormPaiementDui')
ref_facture_a_payer = Operation('RefFactureAPayer')
ref_facture_pdf = Operation('RefFacturePDF', prefix='')
list_dui_factures = Operation('ListeDuiFacturesPayeesRecettees', request_root_element='LISTFACTURE')
enfants_activites = Operation('EnfantsActivites', request_root_element='DUI')
reservation_periode = Operation('ReservationPeriode')
class ToulouseAxel(BaseResource):
wsdl_url = models.CharField(
max_length=128,
blank=False,
verbose_name=_('WSDL URL'),
help_text=_('Toulouse Axel WSDL URL'))
category = _('Business Process Connectors')
class Meta:
verbose_name = _('Toulouse Axel')
def check_status(self):
response = self.requests.get(self.wsdl_url)
response.raise_for_status()
LINK_SCHEMA = copy.deepcopy(ref_verif_dui.request_schema['properties']['PORTAIL']['properties']['DUI'])
LINK_SCHEMA['properties'].pop('IDPERSONNE')
LINK_SCHEMA['required'].remove('IDPERSONNE')
UPDATE_FAMILY_FLAGS = {
'maj:adresse': 'ADRESSE',
'maj:rl1': 'RL1',
'maj:rl1_adresse_employeur': 'RL1/ADREMPLOYEUR',
'maj:rl2': 'RL2',
'maj:rl2_adresse_employeur': 'RL2/ADREMPLOYEUR',
'maj:revenus': 'REVENUS',
}
UPDATE_FAMILY_REQUIRED_FLAGS = [
'maj:adresse',
'maj:rl1',
'maj:rl2',
'maj:revenus',
]
for i in range(0, 6):
UPDATE_FAMILY_FLAGS.update({
'maj:enfant_%s' % i: 'ENFANT/%s' % i,
'maj:enfant_%s_sanitaire' % i: 'ENFANT/%s/SANITAIRE' % i,
'maj:enfant_%s_sanitaire_medecin' % i: 'ENFANT/%s/SANITAIRE/MEDECIN' % i,
'maj:enfant_%s_sanitaire_vaccin' % i: 'ENFANT/%s/SANITAIRE/VACCIN' % i,
'maj:enfant_%s_sanitaire_allergie' % i: 'ENFANT/%s/SANITAIRE/ALLERGIE' % i,
'maj:enfant_%s_sanitaire_handicap' % i: 'ENFANT/%s/SANITAIRE/HANDICAP' % i,
'maj:enfant_%s_assurance' % i: 'ENFANT/%s/ASSURANCE' % i,
'maj:enfant_%s_contact' % i: 'ENFANT/%s/CONTACT' % i,
})
UPDATE_FAMILY_REQUIRED_FLAGS.append('maj:enfant_%s' % i)
UPDATE_FAMILY_SCHEMA = copy.deepcopy(
form_maj_famille_dui.request_schema['properties']['PORTAIL']['properties']['DUI'])
for flag in sorted(UPDATE_FAMILY_FLAGS.keys()):
flag_type = copy.deepcopy(utils.boolean_type)
if flag not in UPDATE_FAMILY_REQUIRED_FLAGS:
flag_type['oneOf'].append({'type': 'null'})
flag_type['oneOf'].append({'type': 'string', 'enum': ['']})
UPDATE_FAMILY_SCHEMA['properties'][flag] = flag_type
UPDATE_FAMILY_SCHEMA['required'].append(flag)
UPDATE_FAMILY_SCHEMA['properties'].pop('IDDUI')
UPDATE_FAMILY_SCHEMA['properties'].pop('DATEDEMANDE')
UPDATE_FAMILY_SCHEMA['properties'].pop('QUIACTUALISEDUI')
UPDATE_FAMILY_SCHEMA['required'].remove('IDDUI')
UPDATE_FAMILY_SCHEMA['required'].remove('DATEDEMANDE')
UPDATE_FAMILY_SCHEMA['required'].remove('QUIACTUALISEDUI')
for key in ['IDPERSONNE', 'NOM', 'PRENOM', 'NOMJEUNEFILLE', 'DATENAISSANCE', 'CIVILITE', 'INDICATEURRL']:
UPDATE_FAMILY_SCHEMA['properties']['RL1']['properties'].pop(key)
UPDATE_FAMILY_SCHEMA['properties']['RL1']['required'].remove(key)
UPDATE_FAMILY_SCHEMA['properties']['RL2']['properties'].pop(key)
UPDATE_FAMILY_SCHEMA['properties']['RL2']['required'].remove(key)
UPDATE_FAMILY_SCHEMA['properties']['REVENUS']['properties'].pop('NBENFANTSACHARGE')
UPDATE_FAMILY_SCHEMA['properties']['REVENUS']['required'].remove('NBENFANTSACHARGE')
handicap_fields = [
'AUTREDIFFICULTE',
'ECOLESPECIALISEE',
'INDICATEURAUXILIAIREVS',
'INDICATEURECOLE',
'INDICATEURHANDICAP',
'INDICATEURNOTIFMDPH',
]
sanitaire_properties = UPDATE_FAMILY_SCHEMA['properties']['ENFANT']['items']['properties']['SANITAIRE']['properties']
sanitaire_required = UPDATE_FAMILY_SCHEMA['properties']['ENFANT']['items']['properties']['SANITAIRE']['required']
sanitaire_properties['HANDICAP'] = {
'type': 'object',
'properties': {},
'required': handicap_fields,
}
sanitaire_required.append('HANDICAP')
for key in handicap_fields:
field = sanitaire_properties.pop(key)
sanitaire_properties['HANDICAP']['properties'][key] = field
sanitaire_required.remove(key)
sanitaire_properties.pop('ALLERGIE')
sanitaire_properties['ALLERGIE'] = {
'type': 'object',
'properties': {},
'required': ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES', 'AUTRES'],
}
for key in ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES']:
sanitaire_properties['ALLERGIE']['properties'][key] = copy.deepcopy(utils.boolean_type)
sanitaire_properties['ALLERGIE']['properties']['AUTRES'] = {
'oneOf': [
{'type': 'null'},
{
'type': 'string',
'minLength': 0,
'maxLength': 50,
}
]
}
UPDATE_FAMILY_SCHEMA['unflatten'] = True
@endpoint(
description=_('Lock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to lock')},
'locker': {'description': _('Identifier of the locker (can be empty)')}
})
def lock(self, request, key, locker):
if not key:
raise APIError('key is empty', err_code='bad-request', http_status=400)
lock, created = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
@endpoint(
description=_('Unlock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to unlock')},
})
def unlock(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
lock.delete()
return {'key': key, 'locked': False, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
@endpoint(
description=_('Get the lock status of a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource')},
})
def locked(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
@endpoint(
description=_("Get dates of the update management"),
perm='can_access')
def management_dates(self, request):
try:
result = ref_date_gestion_dui(self)
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
return {'data': result.json_response['DATA']['PORTAIL']['DUIDATEGESTION']}
@endpoint(
description=_('Create link between user and Toulouse Axel'),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': LINK_SCHEMA,
}
}
})
def link(self, request, NameID, post_data):
if not NameID:
raise APIError('NameID is empty', err_code='bad-request', http_status=400)
post_data['IDPERSONNE'] = ''
try:
result = ref_verif_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
dui_data = result.json_response['DATA']['PORTAIL']['DUI']
code = dui_data['CODE']
if code not in [2, 3]:
# 2: RL1; 3: RL2
raise APIError('Person not found', err_code='not-found')
link, created = self.link_set.get_or_create(
name_id=NameID,
defaults={
'dui': dui_data['IDDUI'],
'person_id': dui_data['IDPERSONNE']})
if not created and (link.dui != dui_data['IDDUI'] or link.person_id != dui_data['IDPERSONNE']):
raise APIError('Data conflict', err_code='conflict')
return {
'link': link.pk,
'created': created,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
def get_link(self, name_id):
try:
return self.link_set.get(name_id=name_id)
except Link.DoesNotExist:
raise APIError('Person not found', err_code='not-found')
@endpoint(
description=_('Delete link between user and Toulouse Axel'),
methods=['post'],
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def unlink(self, request, NameID):
link = self.get_link(NameID)
link_id = link.pk
link.delete()
return {'link': link_id, 'deleted': True, 'dui': link.dui}
@endpoint(
description=_("Get a referential"),
perm='can_access',
pattern=r'^(?P<code>[\w-]+)/?$',
example_pattern='csp',
parameters={
'code': {'description': _('Referential code. Possible values: situation_familiale, csp, lien_parente, type_regime')},
})
def referential(self, request, code):
if code not in ['situation_familiale', 'csp', 'lien_parente', 'type_regime']:
raise APIError('Referential not found', err_code='not-found')
references = getattr(utils, '{}_mapping'.format(code))
if references is None:
raise APIError('Referential not found', err_code='not-found', http_status=404)
return {'data': [{'id': key, 'text': val} for key, val in references.items()]}
def get_family_data(self, dui, check_registrations=False):
try:
result = ref_famille_dui(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
family_data = result.json_response['DATA']['PORTAIL']['DUI']
if check_registrations:
today = datetime.date.today()
current_reference_year = utils.get_reference_year_from_date(today)
next_reference_year = current_reference_year + 1
children_registred_for_current_year = self.are_children_registered(
dui=dui,
reference_year=current_reference_year)
children_registred_for_next_year = self.are_children_registered(
dui=dui,
reference_year=next_reference_year)
for child in family_data.get('ENFANT', []):
child['clae_cantine_current'] = children_registred_for_current_year.get(child['IDPERSONNE'])
child['clae_cantine_next'] = children_registred_for_next_year.get(child['IDPERSONNE'])
family_data['SITUATIONFAMILIALE_label'] = utils.get_label(utils.situation_familiale_mapping, family_data['SITUATIONFAMILIALE'])
for key in ['RL1', 'RL2']:
if key not in family_data:
continue
rl = family_data[key]
rl['CSP_label'] = utils.get_label(utils.csp_mapping, rl['CSP'])
for child in family_data.get('ENFANT', []):
child['id'] = child['IDPERSONNE']
child['text'] = '{} {}'.format(child['PRENOM'], child['NOM']).strip()
for i, contact in enumerate(child.get('CONTACT', [])):
contact['id'] = i
contact['text'] = '{} {}'.format(contact['PRENOM'], contact['NOM']).strip()
contact['LIENPARENTE_label'] = utils.get_label(utils.lien_parente_mapping, contact['LIENPARENTE'])
if 'REVENUS' in family_data:
family_data['REVENUS']['TYPEREGIME_label'] = utils.get_label(utils.type_regime_mapping, family_data['REVENUS']['TYPEREGIME'])
return family_data
@endpoint(
description=_("Get information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def family_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
return {'data': family_data}
@endpoint(
description=_("Get information about children"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def children_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
return {'data': family_data.get('ENFANT', [])}
@endpoint(
description=_("Get information about a child"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
})
def child_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data.get('ENFANT', []):
if child['IDPERSONNE'] == idpersonne:
return {'data': child}
raise APIError('Child not found', err_code='not-found')
@endpoint(
description=_("Get information about a child's contacts"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
})
def child_contacts_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data['ENFANT']:
if child['IDPERSONNE'] == idpersonne:
return {'data': child.get('CONTACT', [])}
raise APIError('Child not found', err_code='not-found')
def pre_sanitize_update_family_data(self, post_data):
# before json payload validation, check maj fields and remove empty blocks
# transform ENFANT list to dict, where the key is the field IDPERSONNE
# because children in post_data are maybe not in the same order than on Axel side
children = {}
children_by_index = {}
for i, child in enumerate(post_data.get('ENFANT', [])):
# check if IDPERSONNE is filled
if child.get('IDPERSONNE'):
children[child['IDPERSONNE']] = child
children_by_index[str(i)] = child['IDPERSONNE']
post_data['ENFANT'] = children
# sanitize post_data
flags = sorted(self.UPDATE_FAMILY_FLAGS.keys())
for flag in flags:
flag_value = post_data.get(flag)
flag_value = utils.encode_bool(flag_value)
# no update for the related block
if flag_value == 'OUI':
continue
# build the xml elements to cross
key = self.UPDATE_FAMILY_FLAGS[flag]
# special case for ENFANT flags
if key.startswith('ENFANT/'):
# replace the index by IDPERSONNE value
index = key.split('/')[1]
if index not in children_by_index:
# no child with IDPERSONNE found in post_data
continue
key = key.replace('ENFANT/%s' % index, 'ENFANT/%s' % children_by_index[index])
elements = key.split('/')
schema = self.UPDATE_FAMILY_SCHEMA
data = post_data
# find the structure in schema and data containing the element to remove
not_found = False
for element in elements[:-1]:
if schema.get('type') == 'array':
schema = schema['items']
else:
schema = schema['properties'][element]
try:
data = data[element]
except (IndexError, KeyError):
not_found = True
break
if not_found:
continue
element_to_remove = elements[-1]
if element_to_remove == 'ADRESSE':
# empty all subelements
for k in data[element_to_remove].keys():
data[element_to_remove][k] = None
elif element_to_remove == 'HANDICAP':
# will be filled in sanitize_update_family_data
data[element_to_remove]['_to_reset'] = True
elif element_to_remove in data:
# remove block
data.pop(element_to_remove)
# transform ENFANT dict to a list back
post_data['ENFANT'] = list(post_data['ENFANT'].values())
# if ENFANT block is empty, remove it
if not post_data['ENFANT']:
post_data.pop('ENFANT')
UPDATE_FAMILY_SCHEMA['pre_process'] = pre_sanitize_update_family_data
def sanitize_update_family_data(self, dui, post_data):
family_data = None
for i, child_data in enumerate(post_data.get('ENFANT', [])):
child_id = child_data['IDPERSONNE']
# check if HANDICAP fields are to be filled
if 'SANITAIRE' not in child_data:
continue
if child_data['SANITAIRE']['HANDICAP'].pop('_to_reset', False) is not True:
continue
# get family info
if family_data is None:
family_data = self.get_family_data(dui)
for orig_child in family_data.get('ENFANT', []):
# find the correct child in family info
if orig_child['IDPERSONNE'] != child_id:
continue
# reset handicap related fields
handicap_fields = [
'AUTREDIFFICULTE',
'ECOLESPECIALISEE',
'INDICATEURAUXILIAIREVS',
'INDICATEURECOLE',
'INDICATEURHANDICAP',
'INDICATEURNOTIFMDPH',
]
for key in handicap_fields:
child_data['SANITAIRE']['HANDICAP'][key] = orig_child['SANITAIRE'][key]
for child in post_data.get('ENFANT', []):
if 'SANITAIRE' not in child:
continue
# transform HANDICAP block
child['SANITAIRE'].update(child['SANITAIRE'].pop('HANDICAP'))
if 'ALLERGIE' not in child['SANITAIRE']:
continue
# transform ALLERGIE block
new_allergie = []
for key in ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES']:
new_allergie.append({
'TYPE': key,
'ALLERGIQUE': child['SANITAIRE']['ALLERGIE'][key],
'NOMALLERGIE': None,
})
if child['SANITAIRE']['ALLERGIE']['AUTRES']:
new_allergie.append({
'TYPE': 'AUTRES',
'ALLERGIQUE': 'OUI',
'NOMALLERGIE': child['SANITAIRE']['ALLERGIE']['AUTRES'],
})
child['SANITAIRE']['ALLERGIE'] = new_allergie
# retrieve RL not posted fields
for rl in ['RL1', 'RL2']:
if rl not in post_data:
continue
if family_data is None:
family_data = self.get_family_data(dui)
# fill missing fields
for key in ['IDPERSONNE', 'NOM', 'PRENOM', 'NOMJEUNEFILLE', 'DATENAISSANCE', 'CIVILITE']:
post_data[rl][key] = family_data[rl][key]
post_data[rl]['INDICATEURRL'] = '1' if rl == 'RL1' else '2'
# fill NBENFANTSACHARGE
if 'REVENUS' in post_data:
if family_data is None:
family_data = self.get_family_data(dui)
post_data['REVENUS']['NBENFANTSACHARGE'] = family_data.get('REVENUS', {}).get('NBENFANTSACHARGE')
# remove flags
for flag in self.UPDATE_FAMILY_FLAGS.keys():
post_data.pop(flag)
@endpoint(
description=_("Update information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': UPDATE_FAMILY_SCHEMA,
}
}
})
def update_family_info(self, request, NameID, post_data):
link = self.get_link(NameID)
# prepare data
post_data['IDDUI'] = link.dui
post_data['DATEDEMANDE'] = datetime.date.today().strftime('%Y-%m-%d')
self.sanitize_update_family_data(dui=link.dui, post_data=post_data)
if 'RL2' in post_data and post_data['RL2'].get('IDPERSONNE') == link.person_id:
post_data['QUIACTUALISEDUI'] = '2'
else:
post_data['QUIACTUALISEDUI'] = '1'
try:
result = form_maj_famille_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'error_post_data': post_data,
'xml_request': e.xml_request,
'xml_response': e.xml_response})
return {
'updated': True,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
def get_invoices(self, regie_id, dui=None, name_id=None):
assert name_id or dui
if name_id:
dui = self.get_link(name_id).dui
try:
result = ref_facture_a_payer(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
data = result.json_response['DATA']['PORTAIL']['DUI']
result = []
for facture in data.get('FACTURES', []):
if facture['IDREGIE'] != regie_id:
continue
result.append(utils.normalize_invoice(facture, dui))
return result
def get_historical_invoices(self, name_id):
link = self.get_link(name_id)
try:
result = list_dui_factures(
self,
{'LISTFACTURE': {'NUMDUI': link.dui, 'DEBUT': '1970-01-01'}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
data = result.json_response['DATA']['PORTAIL']['LISTFACTURE']
result = []
for direction in data.get('DIRECTION', []):
for facture in direction.get('FACTURE', []):
result.append(
utils.normalize_invoice(
facture,
link.dui,
historical=True,
vendor_base={
'NUMDIRECTION': direction['NUMDIRECTION'],
'IDDIRECTION': direction['IDDIRECTION'],
'LIBDIRECTION': direction['LIBDIRECTION'],
}))
return result
def get_invoice(self, regie_id, invoice_id, dui=None, name_id=None, historical=None):
if historical:
invoices_data = self.get_historical_invoices(name_id=name_id)
else:
invoices_data = self.get_invoices(regie_id=regie_id, dui=dui, name_id=name_id)
for invoice in invoices_data:
if invoice['display_id'] == invoice_id:
return invoice
@endpoint(
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/?$',
example_pattern='{regie_id}/invoices',
description=_("Get invoices to pay"),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'}
})
def invoices(self, request, regie_id, NameID):
invoices_data = self.get_invoices(regie_id=regie_id, name_id=NameID)
return {'data': invoices_data}
@endpoint(
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/history/?$',
example_pattern='{regie_id}/invoices/history',
description=_("Get invoices already paid"),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'}
})
def invoices_history(self, request, regie_id, NameID):
invoices_data = self.get_historical_invoices(name_id=NameID)
return {'data': invoices_data}
@endpoint(
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?\w+-\d+)/?$',
example_pattern='{regie_id}/invoice/{invoice_id}',
description=_('Get invoice details'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
})
def invoice(self, request, regie_id, invoice_id, NameID):
real_invoice_id = invoice_id.split('-')[-1]
historical = invoice_id.startswith('historical-')
invoice = self.get_invoice(regie_id=regie_id, name_id=NameID, invoice_id=real_invoice_id, historical=historical)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
return {'data': invoice}
@endpoint(
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?\w+-\d+)/pdf/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pdf',
description=_('Get invoice as a PDF file'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
})
def invoice_pdf(self, request, regie_id, invoice_id, NameID):
# check that invoice is related to current user
real_invoice_id = invoice_id.split('-')[-1]
historical = invoice_id.startswith('historical-')
try:
invoice = self.get_invoice(regie_id=regie_id, name_id=NameID, invoice_id=real_invoice_id, historical=historical)
except APIError as e:
e.http_status = 404
raise
if invoice is None:
raise APIError('Invoice not found', err_code='not-found', http_status=404)
# check that PDF is available
if not invoice['has_pdf']:
raise APIError('PDF not available', err_code='not-available', http_status=404)
try:
result = ref_facture_pdf(self, {'PORTAIL': {'FACTUREPDF': {'IDFACTURE': int(invoice['display_id'])}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
http_status=404,
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
b64content = base64.b64decode(result.json_response['DATA']['PORTAIL']['PDF']['@FILE'])
if not b64content:
raise APIError('PDF error', err_code='error', http_status=404)
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % invoice_id
response.write(b64content)
return response
@endpoint(
name='regie',
methods=['post'],
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>\w+-\d+)/pay/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pay',
description=_('Notify an invoice as paid'),
parameters={
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
},
post={
'request_body': {
'schema': {
'application/json': PAYMENT_SCHEMA,
}
}
})
def pay_invoice(self, request, regie_id, invoice_id, **kwargs):
data = json_loads(request.body)
dui, invoice_id = invoice_id.split('-')
invoice = self.get_invoice(regie_id=regie_id, dui=dui, invoice_id=invoice_id)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
transaction_amount = invoice['amount']
transaction_id = data['transaction_id']
transaction_date = utils.encode_datetime(data['transaction_date'])
post_data = {
'IDFACTURE': int(invoice_id),
'IDREGIEENCAISSEMENT': '',
'MONTANTPAYE': transaction_amount,
'DATEPAIEMENT': transaction_date,
'REFERENCE': transaction_id,
}
try:
form_paiement_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
return {'data': True}
def get_children_activities(self, dui, reference_year):
try:
result = enfants_activites(self, {
'DUI': {
'IDDUI': dui,
'ANNEEREFERENCE': str(reference_year),
'TYPESACTIVITES': 'MAT,MIDI,SOIR,GARD',
}
})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
children_activities = result.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', [])
return {child['IDPERSONNE']: child for child in children_activities}
def get_child_activities(self, dui, reference_year, child_id):
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
if child_id not in children_activities:
raise APIError('Child not found', err_code='not-found')
return children_activities[child_id]
def are_children_registered(self, dui, reference_year):
# check reference_year
today = datetime.date.today()
current_reference_year = utils.get_reference_year_from_date(today)
# don't check registration for other years than current and next
if reference_year not in [current_reference_year, current_reference_year + 1]:
return {}
# if next year, check dates.
# check registration for next year only in june or july
if reference_year == current_reference_year + 1 and today.month not in [6, 7]:
return {}
# ok, check registrations
try:
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
except APIError:
# don't fail on the check
return {}
return {child_id: bool(child.get('ACTIVITE', [])) for child_id, child in children_activities.items()}
@endpoint(
description=_("Get information about CLAE booking"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'booking_date': {'description': _('Booking date')},
})
def clae_booking_info(self, request, NameID, idpersonne, booking_date):
link = self.get_link(NameID)
try:
booking_date = datetime.datetime.strptime(booking_date, utils.json_date_format)
except ValueError:
raise APIError('bad date format', err_code='bad-request', http_status=400)
reference_year = utils.get_reference_year_from_date(booking_date)
# first get activities information for the child
child_activities = self.get_child_activities(
dui=link.dui,
reference_year=reference_year,
child_id=idpersonne)
# then get booking of the requested week for the child
activity_ids = [act['IDACTIVITE'] for act in child_activities.get('ACTIVITE', [])]
start_date, end_date = utils.get_week_dates_from_date(booking_date)
activity_data = []
for activity_id in activity_ids:
activity_data.append({
'IDACTIVITE': activity_id,
'ANNEEREFERENCE': str(reference_year),
'DATEDEBUT': start_date.strftime(utils.xml_date_format),
'DATEDFIN': end_date.strftime(utils.xml_date_format),
})
try:
data = reservation_periode(self, {'PORTAIL': {
'DUI': {
'IDDUI': link.dui,
'ENFANT': {
'IDPERSONNE': idpersonne,
'ACTIVITE': activity_data,
}
}
}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
child_booking = None
for child in data.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', []):
if child['IDPERSONNE'] == idpersonne:
child_booking = child
break
if child_booking is None:
# should not happen
raise APIError('Child not found', err_code='not-found')
# build the response payload: add booking to activities information
booking_days = {}
for booking in child_booking.get('ACTIVITE', []):
booking_days[booking['IDACTIVITE']] = {
'raw_value': booking['JOUR'],
'days': {
'monday': utils.get_booking(booking['JOUR'][0]),
'tuesday': utils.get_booking(booking['JOUR'][1]),
'wednesday': utils.get_booking(booking['JOUR'][2]),
'thursday': utils.get_booking(booking['JOUR'][3]),
'friday': utils.get_booking(booking['JOUR'][4]),
}
}
for activity in child_activities.get('ACTIVITE', []):
activity['booking'] = booking_days.get(activity['IDACTIVITE'], {})
return {'data': child_activities}
class Link(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
name_id = models.CharField(blank=False, max_length=256)
dui = models.CharField(blank=False, max_length=128)
person_id = models.CharField(blank=False, max_length=128)
class Meta:
unique_together = ('resource', 'name_id')
class Lock(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
key = models.CharField(max_length=256)
lock_date = models.DateTimeField(auto_now_add=True)
locker = models.CharField(max_length=256, blank=True)
class Meta:
unique_together = ('resource', 'key')