passerelle/passerelle/contrib/toulouse_axel/models.py

1669 lines
66 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import json
import logging
import os
from django.core.cache import cache
from django.db import models
from django.http import HttpResponse
from django.utils import dateformat
from django.utils.dates import WEEKDAYS as WEEKDAYS_LABELS
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.contrib.utils import axel
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from . import schemas, utils
logger = logging.getLogger('passerelle.contrib.toulouse_axel')
BASE_XSD_PATH = os.path.join(os.path.dirname(__file__), 'xsd')
WEEKDAYS = {
0: 'monday',
1: 'tuesday',
2: 'wednesday',
3: 'thursday',
4: 'friday',
5: 'saturday',
6: 'sunday',
}
class ToulouseAxel(BaseResource):
wsdl_url = models.CharField(
max_length=128, blank=False, verbose_name=_('WSDL URL'), help_text=_('Toulouse Axel WSDL URL')
)
category = _('Business Process Connectors')
_category_ordering = ['DUI', 'CAN-CLA', _('Invoices')]
class Meta:
verbose_name = _('Toulouse Axel')
def check_status(self):
response = self.requests.get(self.wsdl_url)
response.raise_for_status()
@endpoint(
display_order=1,
description=_('Lock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to lock')},
'locker': {'description': _('Identifier of the locker (can be empty)')},
},
)
def lock(self, request, key, locker):
if not key:
raise APIError('key is empty', err_code='bad-request', http_status=400)
lock, dummy = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
@endpoint(
display_order=2,
description=_('Unlock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to unlock')},
},
)
def unlock(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
lock.delete()
return {'key': key, 'locked': False, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
@endpoint(
display_order=3,
description=_('Get the lock status of a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource')},
},
)
def locked(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
def get_management_dates(self):
cache_key = 'toulouse-axel-%s-management-dates' % self.pk
result = cache.get(cache_key)
if result is not None:
return result
try:
result = schemas.ref_date_gestion_dui(self)
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
management_dates = {}
for key, value in result.json_response['DATA']['PORTAIL']['DUIDATEGESTION'].items():
management_dates[key] = value
management_dates[key.lower().replace('-', '_')] = value
cache.set(cache_key, management_dates, 3600) # 1 hour
return management_dates
@endpoint(display_order=4, description=_("Get dates of the update management"), perm='can_access')
def management_dates(self, request):
return {'data': self.get_management_dates()}
def check_dui(self, post_data):
try:
result = schemas.ref_verif_dui(self, {'PORTAIL': {'DUI': post_data}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
dui_data = result.json_response['DATA']['PORTAIL']['DUI']
code = dui_data['CODE']
if code not in [2, 3]:
# 2: RL1; 3: RL2
raise APIError('Wrong DUI status', err_code='dui-code-error-%s' % code)
return result
@endpoint(
display_category='DUI',
display_order=1,
description=_('Create link between user and Toulouse Axel'),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.LINK_SCHEMA,
}
}
},
)
def link(self, request, NameID, post_data):
if not NameID:
raise APIError('NameID is empty', err_code='bad-request', http_status=400)
post_data['IDPERSONNE'] = ''
try:
result = self.check_dui(post_data)
except APIError as e:
if not hasattr(e, 'err_code') or e.err_code == 'error':
raise
raise APIError('Person not found', err_code='not-found')
dui_data = result.json_response['DATA']['PORTAIL']['DUI']
link, created = self.link_set.get_or_create(
name_id=NameID, defaults={'dui': dui_data['IDDUI'], 'person_id': dui_data['IDPERSONNE']}
)
if not created and (link.dui != dui_data['IDDUI'] or link.person_id != dui_data['IDPERSONNE']):
raise APIError('Data conflict', err_code='conflict')
return {
'link': link.pk,
'created': created,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
},
}
def get_link(self, name_id):
try:
return self.link_set.get(name_id=name_id)
except Link.DoesNotExist:
raise APIError('Person not found', err_code='not-found')
@endpoint(
display_category='DUI',
display_order=2,
description=_('Delete link between user and Toulouse Axel'),
methods=['post'],
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def unlink(self, request, NameID):
link = self.get_link(NameID)
link_id = link.pk
link.delete()
return {'link': link_id, 'deleted': True, 'dui': link.dui}
@endpoint(
display_category='DUI',
display_order=3,
description=_("Check DUI status"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def active_dui(self, request, NameID):
# get link if exists
try:
link = self.get_link(NameID)
except APIError:
raise APIError('Unknown NameID', err_code='unknown')
# get family info
try:
family_data = self.get_family_data(link.dui)
except APIError:
raise APIError('No family info', err_code='no-family-info')
# to get the corresponding RL
rl = None
for key in ['RL1', 'RL2']:
if key not in family_data:
continue
if family_data[key]['IDPERSONNE'] == link.person_id:
rl = family_data[key]
break
if rl is None:
raise APIError('No corresponding RL', err_code='no-rl')
# now check DUI status
post_data = {
'IDDUI': family_data['IDDUI'],
'IDPERSONNE': '',
'PRENOM': rl['PRENOM'],
'NOM': rl['NOM'],
'NAISSANCE': rl['DATENAISSANCE'],
}
self.check_dui(post_data)
return {'data': family_data}
@endpoint(
display_order=5,
description=_("Get a referential"),
perm='can_access',
pattern=r'^(?P<code>[\w-]+)/?$',
example_pattern='{code}',
parameters={
'code': {
'description': _(
'Referential code. Possible values: situation_familiale, csp, lien_parente, type_regime, regime'
),
'example_value': 'csp',
},
},
)
def referential(self, request, code):
if code not in ['situation_familiale', 'csp', 'lien_parente', 'type_regime', 'regime']:
raise APIError('Referential not found', err_code='not-found')
references = getattr(utils, '{}_mapping'.format(code))
if references is None:
raise APIError('Referential not found', err_code='not-found', http_status=404)
return {'data': [{'id': key, 'text': val} for key, val in references.items()]}
def get_family_data(self, dui, check_registrations=False, with_management_dates=False):
try:
result = schemas.ref_famille_dui(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
if not result.json_response['DATA']['PORTAIL']:
raise APIError('Family not found', err_code='not-found')
family_data = result.json_response['DATA']['PORTAIL']['DUI']
today = datetime.date.today()
current_reference_year = utils.get_reference_year_from_date(today)
next_reference_year = current_reference_year + 1
if check_registrations:
children_registred = self.are_children_registered(dui=dui)
for child in family_data.get('ENFANT', []):
child['clae_cantine_current'] = children_registred.get(child['IDPERSONNE'])
if with_management_dates:
family_data['management_dates'] = self.get_management_dates()
family_data['annee_reference'] = current_reference_year
family_data['annee_reference_short'] = str(current_reference_year)[2:]
family_data['annee_reference_label'] = '{}/{}'.format(current_reference_year, next_reference_year)
family_data['SITUATIONFAMILIALE_label'] = utils.get_label(
utils.situation_familiale_mapping, family_data['SITUATIONFAMILIALE']
)
for key in ['RL1', 'RL2']:
if key not in family_data:
continue
rl = family_data[key]
rl['CSP_label'] = utils.get_label(utils.csp_mapping, rl['CSP'])
for child in family_data.get('ENFANT', []):
child['id'] = child['IDPERSONNE']
child['text'] = '{} {}'.format(child['PRENOM'], child['NOM']).strip()
for i, contact in enumerate(child.get('CONTACT', [])):
contact['id'] = i
contact['text'] = '{} {}'.format(contact['PRENOM'], contact['NOM']).strip()
contact['LIENPARENTE_label'] = utils.get_label(
utils.lien_parente_mapping, contact['LIENPARENTE']
)
if 'REVENUS' in family_data:
family_data['REVENUS']['TYPEREGIME_label'] = utils.get_label(
utils.type_regime_mapping, family_data['REVENUS']['TYPEREGIME']
)
return family_data
@endpoint(
display_category='DUI',
display_order=4,
description=_("Get information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def family_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True, with_management_dates=True)
return {'data': family_data}
@endpoint(
display_category='DUI',
display_order=5,
description=_("Get information about children"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def children_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
return {'data': family_data.get('ENFANT', [])}
@endpoint(
display_category='DUI',
display_order=6,
description=_("Get information about a child"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
},
)
def child_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data.get('ENFANT', []):
if child['IDPERSONNE'] == idpersonne:
return {'data': child}
raise APIError('Child not found', err_code='not-found')
@endpoint(
display_category='DUI',
display_order=7,
description=_("Get information about a child's contacts"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
},
)
def child_contacts_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data['ENFANT']:
if child['IDPERSONNE'] == idpersonne:
return {'data': child.get('CONTACT', [])}
raise APIError('Child not found', err_code='not-found')
@endpoint(
display_category='DUI',
display_order=7,
description=_("Get information about children contacts"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def children_contacts_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
seen = set()
contacts = []
for child in family_data['ENFANT']:
for contact in child.get('CONTACT', []):
contact_str = '%(NOM)s %(PRENOM)s %(TELFIXE)s %(TELPORTABLE)s' % contact
if contact_str not in seen:
contacts.append(contact)
contact['id'] = len(seen)
seen.add(contact_str)
return {'data': contacts}
def pre_sanitize_update_family_data(self, post_data):
# before json payload validation, check maj fields and remove empty blocks
# transform ENFANT list to dict, where the key is the field IDPERSONNE
# because children in post_data are maybe not in the same order than on Axel side
children = {}
children_by_index = {}
for i, child in enumerate(post_data.get('ENFANT', [])):
# check if IDPERSONNE is filled
if child.get('IDPERSONNE'):
children[child['IDPERSONNE']] = child
children_by_index[str(i)] = child['IDPERSONNE']
post_data['ENFANT'] = children
# sanitize post_data
flags = sorted(schemas.UPDATE_FAMILY_FLAGS.keys())
for flag in flags:
flag_value = post_data.get(flag)
flag_value = axel.encode_bool(flag_value)
# no update for the related block
if flag_value == 'OUI':
continue
# build the xml elements to cross
key = schemas.UPDATE_FAMILY_FLAGS[flag]
# special case for ENFANT flags
if key.startswith('ENFANT/'):
# replace the index by IDPERSONNE value
index = key.split('/')[1]
if index not in children_by_index:
# no child with IDPERSONNE found in post_data
continue
key = key.replace('ENFANT/%s' % index, 'ENFANT/%s' % children_by_index[index])
elements = key.split('/')
schema = schemas.UPDATE_FAMILY_SCHEMA
data = post_data
# find the structure in schema and data containing the element to remove
not_found = False
for element in elements[:-1]:
if schema.get('type') == 'array':
schema = schema['items']
else:
schema = schema['properties'][element]
try:
data = data[element]
except (IndexError, KeyError):
not_found = True
break
if not_found:
continue
element_to_remove = elements[-1]
if element_to_remove == 'ADRESSE':
# empty all subelements
for k in data[element_to_remove].keys():
data[element_to_remove][k] = None
elif element_to_remove in ['HANDICAP', 'ALLERGIE']:
if element_to_remove in data:
# will be filled in sanitize_update_family_data
data[element_to_remove]['_to_reset'] = True
elif element_to_remove in data:
# remove block
data.pop(element_to_remove)
# transform ENFANT dict to a list back
post_data['ENFANT'] = list(post_data['ENFANT'].values())
# remove incomplete CONTACT
for child in post_data['ENFANT']:
if 'CONTACT' not in child:
continue
child['CONTACT'] = [c for c in child['CONTACT'] if c['NOM'] and c['PRENOM']]
# if ENFANT block is empty, remove it
if not post_data['ENFANT']:
post_data.pop('ENFANT')
utils.upperize(post_data)
schemas.UPDATE_FAMILY_SCHEMA['pre_process'] = pre_sanitize_update_family_data
def sanitize_update_family_data(self, dui, post_data):
family_data = None
def fill_handicap_fields(child_data, orig_family_data):
child_id = child_data['IDPERSONNE']
for orig_child in orig_family_data.get('ENFANT', []):
# find the correct child in family info
if orig_child['IDPERSONNE'] != child_id:
continue
# reset handicap related fields
handicap_fields = [
'AUTREDIFFICULTE',
'ECOLESPECIALISEE',
'INDICATEURAUXILIAIREVS',
'INDICATEURECOLE',
'INDICATEURHANDICAP',
'INDICATEURNOTIFMDPH',
]
for key in handicap_fields:
child_data['SANITAIRE'][key] = orig_child['SANITAIRE'][key]
def fill_allergie_fields(child_data, orig_family_data):
child_id = child_data['IDPERSONNE']
for orig_child in orig_family_data.get('ENFANT', []):
# find the correct child in family info
if orig_child['IDPERSONNE'] != child_id:
continue
# reset allergie related fields
child_data['SANITAIRE']['ALLERGIE'] = orig_child['SANITAIRE'].get('ALLERGIE', [])
for child in post_data.get('ENFANT', []):
if 'SANITAIRE' not in child:
continue
# check if HANDICAP fields are to be filled
if child['SANITAIRE']['HANDICAP'].pop('_to_reset', False) is True:
# get family info
if family_data is None:
family_data = self.get_family_data(dui)
# and fill HANDICAP fields
fill_handicap_fields(child, family_data)
child['SANITAIRE'].pop('HANDICAP')
else:
# transform HANDICAP block
child['SANITAIRE'].update(child['SANITAIRE'].pop('HANDICAP'))
if 'ALLERGIE' not in child['SANITAIRE']:
# ALLERGIE block is optional
continue
# check if ALLERGIE fields are to be filled
if child['SANITAIRE']['ALLERGIE'].pop('_to_reset', False) is True:
# get family info
if family_data is None:
family_data = self.get_family_data(dui)
# and fill ALLERGIE fields
fill_allergie_fields(child, family_data)
else:
# transform ALLERGIE block
new_allergie = []
for key in ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES']:
if axel.encode_bool(child['SANITAIRE']['ALLERGIE'][key]) == 'OUI':
new_allergie.append(
{
'TYPE': key,
'ALLERGIQUE': 'OUI',
'NOMALLERGIE': None,
}
)
if child['SANITAIRE']['ALLERGIE']['AUTRES']:
new_allergie.append(
{
'TYPE': 'AUTRES',
'ALLERGIQUE': 'OUI',
'NOMALLERGIE': child['SANITAIRE']['ALLERGIE']['AUTRES'],
}
)
child['SANITAIRE']['ALLERGIE'] = new_allergie
if not child['SANITAIRE'].get('ALLERGIE'):
# remove ALLERGIE block if empty
child['SANITAIRE'].pop('ALLERGIE')
# retrieve RL not posted fields
for rl in ['RL1', 'RL2']:
if rl not in post_data:
continue
if family_data is None:
family_data = self.get_family_data(dui)
# fill missing fields
for key in ['IDPERSONNE', 'NOM', 'PRENOM', 'NOMJEUNEFILLE', 'DATENAISSANCE', 'CIVILITE']:
post_data[rl][key] = family_data[rl][key]
post_data[rl]['INDICATEURRL'] = '1' if rl == 'RL1' else '2'
# fill NBENFANTSACHARGE
if 'REVENUS' in post_data:
if family_data is None:
family_data = self.get_family_data(dui)
post_data['REVENUS']['NBENFANTSACHARGE'] = family_data.get('REVENUS', {}).get('NBENFANTSACHARGE')
# remove flags
for flag in schemas.UPDATE_FAMILY_FLAGS:
post_data.pop(flag)
@endpoint(
display_category='DUI',
display_order=8,
description=_("Update information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.UPDATE_FAMILY_SCHEMA,
}
}
},
)
def update_family_info(self, request, NameID, post_data):
link = self.get_link(NameID)
# prepare data
post_data['IDDUI'] = link.dui
post_data['DATEDEMANDE'] = datetime.date.today().strftime(axel.json_date_format)
self.sanitize_update_family_data(dui=link.dui, post_data=post_data)
if 'RL2' in post_data and post_data['RL2'].get('IDPERSONNE') == link.person_id:
post_data['QUIACTUALISEDUI'] = '2'
else:
post_data['QUIACTUALISEDUI'] = '1'
try:
result = schemas.form_maj_famille_dui(self, {'PORTAIL': {'DUI': post_data}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={
'error_post_data': post_data,
'xml_request': e.xml_request,
'xml_response': e.xml_response,
},
)
return {
'updated': True,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
},
}
def get_invoices(self, regie_id, dui=None, name_id=None):
assert name_id or dui
if name_id:
dui = self.get_link(name_id).dui
try:
result = schemas.ref_facture_a_payer(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
data = result.json_response['DATA']['PORTAIL']['DUI']
result = []
for facture in data.get('FACTURES', []):
if facture['IDREGIE'] != regie_id:
continue
result.append(utils.normalize_invoice(facture, dui))
return result
def get_historical_invoices(self, dui=None, name_id=None):
assert name_id or dui
if name_id:
dui = self.get_link(name_id).dui
try:
result = schemas.list_dui_factures(self, {'LISTFACTURE': {'NUMDUI': dui, 'DEBUT': '1970-01-01'}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
data = result.json_response['DATA']['PORTAIL']['LISTFACTURE']
result = []
for direction in data.get('DIRECTION', []):
for facture in direction.get('FACTURE', []):
result.append(
utils.normalize_invoice(
facture,
dui,
historical=True,
vendor_base={
'NUMDIRECTION': direction['NUMDIRECTION'],
'IDDIRECTION': direction['IDDIRECTION'],
'LIBDIRECTION': direction['LIBDIRECTION'],
},
)
)
return result
def get_invoice(self, regie_id, invoice_id, dui=None, name_id=None, historical=None):
if historical:
invoices_data = self.get_historical_invoices(dui=dui, name_id=name_id)
else:
invoices_data = self.get_invoices(regie_id=regie_id, dui=dui, name_id=name_id)
for invoice in invoices_data:
if invoice['display_id'] == invoice_id:
return invoice
@endpoint(
display_category=_('Invoices'),
display_order=1,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/?$',
example_pattern='{regie_id}/invoices',
description=_("Get invoices to pay"),
parameters={
'NameID': {'description': _('Publik ID'), 'blank': False},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
},
)
def invoices(self, request, regie_id, NameID):
invoices_data = self.get_invoices(regie_id=regie_id, name_id=NameID)
return {'data': invoices_data}
@endpoint(
display_category=_('Invoices'),
display_order=2,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/history/?$',
example_pattern='{regie_id}/invoices/history',
description=_("Get invoices already paid"),
parameters={
'NameID': {'description': _('Publik ID'), 'blank': False},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
},
)
def invoices_history(self, request, regie_id, NameID):
invoices_data = self.get_historical_invoices(name_id=NameID)
return {'data': invoices_data}
@endpoint(
display_category=_('Invoices'),
display_order=3,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?[\w-]+-\d+)/?$',
example_pattern='{regie_id}/invoice/{invoice_id}',
description=_('Get invoice details'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'},
},
)
def invoice(self, request, regie_id, invoice_id, NameID=None):
historical = invoice_id.startswith('historical-')
if historical:
invoice_id = invoice_id[len('historical-') :]
real_invoice_id = invoice_id.split('-')[-1]
dui = invoice_id[: -(len(real_invoice_id) + 1)]
invoice = self.get_invoice(
regie_id=regie_id, dui=dui, invoice_id=real_invoice_id, historical=historical
)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
return {'data': invoice}
@endpoint(
display_category=_('Invoices'),
display_order=4,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?[\w-]+-\d+)/pdf/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pdf',
description=_('Get invoice as a PDF file'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'},
},
)
def invoice_pdf(self, request, regie_id, invoice_id, NameID):
# check that invoice is related to current user
real_invoice_id = invoice_id.split('-')[-1]
historical = invoice_id.startswith('historical-')
try:
invoice = self.get_invoice(
regie_id=regie_id, name_id=NameID, invoice_id=real_invoice_id, historical=historical
)
except APIError as e:
e.http_status = 404
raise
if invoice is None:
raise APIError('Invoice not found', err_code='not-found', http_status=404)
# check that PDF is available
if not invoice['has_pdf']:
raise APIError('PDF not available', err_code='not-available', http_status=404)
try:
result = schemas.ref_facture_pdf(
self, {'PORTAIL': {'FACTUREPDF': {'IDFACTURE': int(invoice['display_id'])}}}
)
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
http_status=404,
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
b64content = base64.b64decode(result.json_response['DATA']['PORTAIL']['PDF']['@FILE'])
if not b64content:
raise APIError('PDF error', err_code='error', http_status=404)
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % invoice_id
response.write(b64content)
return response
@endpoint(
display_category=_('Invoices'),
display_order=5,
name='regie',
methods=['post'],
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>[\w-]+-\d+)/pay/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pay',
description=_('Notify an invoice as paid'),
parameters={
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'},
},
post={
'request_body': {
'schema': {
'application/json': schemas.PAYMENT_SCHEMA,
}
}
},
)
def pay_invoice(self, request, regie_id, invoice_id, **kwargs):
data = json.loads(request.body)
dui, invoice_id = invoice_id.rsplit('-', 1)
invoice = self.get_invoice(regie_id=regie_id, dui=dui, invoice_id=invoice_id)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
transaction_amount = invoice['amount']
transaction_id = data['transaction_id']
transaction_date = axel.parse_datetime(data['transaction_date'])
if transaction_date is None:
raise APIError('invalid transaction_date')
transaction_date = axel.encode_datetime(transaction_date)
post_data = {
'IDFACTURE': int(invoice_id),
'IDREGIEENCAISSEMENT': '',
'MONTANTPAYE': transaction_amount,
'DATEPAIEMENT': transaction_date,
'REFERENCE': transaction_id,
}
try:
schemas.form_paiement_dui(self, {'PORTAIL': {'DUI': post_data}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
return {'data': True}
def get_children_activities(self, dui, reference_year):
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, dui, reference_year)
result = cache.get(cache_key)
if result is not None:
return result
try:
result = schemas.enfants_activites(
self,
{
'DUI': {
'IDDUI': dui,
'ANNEEREFERENCE': str(reference_year),
'TYPESACTIVITES': 'MAT,MIDI,SOIR,GARD',
}
},
)
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
children_activities = result.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', [])
for child in children_activities:
child['REGIME_label'] = utils.get_label(utils.regime_mapping, child['REGIME'])
data = {}
# format and filter children
for child in children_activities:
# exclude child in private schools
if child['LIBELLEECOLE'].startswith('PRIVEE'):
continue
# exclude also child with more than one registration per activity_type or missing activity
activity_types = [a['TYPEACTIVITE'] for a in child.get('ACTIVITE', [])]
activity_types.sort()
if activity_types not in (['MAT', 'MIDI', 'SOIR'], ['GARD', 'MAT', 'MIDI', 'SOIR']):
# GARD is optional
continue
# ok, store child
data[child['IDPERSONNE']] = child
cache.set(cache_key, data, 30) # 30 seconds
return data
def get_child_activities(self, dui, reference_year, child_id):
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
if child_id not in children_activities:
raise APIError('Child not found', err_code='not-found')
return children_activities[child_id]
def are_children_registered(self, dui):
# check reference_year
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
try:
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
except APIError:
# don't fail on the check
return {}
return {child_id: bool(child.get('ACTIVITE', [])) for child_id, child in children_activities.items()}
def get_booking_data(self, dui, child_id, booking_date):
start_date, end_date = utils.get_week_dates_from_date(booking_date)
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (
self.pk,
dui,
child_id,
start_date.isoformat(),
)
result = cache.get(cache_key)
if result is not None:
return result
reference_year = utils.get_reference_year_from_date(booking_date)
# first get activities information for the child
child_activities = self.get_child_activities(
dui=dui, reference_year=reference_year, child_id=child_id
)
# then get booking of the requested week for the child
activity_ids = [act['IDACTIVITE'] for act in child_activities.get('ACTIVITE', [])]
activity_data = []
for activity_id in activity_ids:
activity_data.append(
{
'IDACTIVITE': activity_id,
'ANNEEREFERENCE': str(reference_year),
'DATEDEBUT': start_date.strftime(axel.json_date_format),
'DATEDFIN': end_date.strftime(axel.json_date_format),
}
)
try:
data = schemas.reservation_periode(
self,
{
'PORTAIL': {
'DUI': {
'IDDUI': dui,
'ENFANT': {
'IDPERSONNE': child_id,
'ACTIVITE': activity_data,
},
}
}
},
)
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
child_booking = None
for child in data.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', []):
if child['IDPERSONNE'] == child_id:
child_booking = child
break
if child_booking is None:
# should not happen
raise APIError('Child not found', err_code='not-found')
# build the response payload: add booking to activities information
booking_days = {}
for booking in child_booking.get('ACTIVITE', []):
booking_days[booking['IDACTIVITE']] = {
'raw_value': booking['JOUR'],
'days': {
'monday': utils.get_booking(booking['JOUR'][0]),
'tuesday': utils.get_booking(booking['JOUR'][1]),
'wednesday': utils.get_booking(booking['JOUR'][2]),
'thursday': utils.get_booking(booking['JOUR'][3]),
'friday': utils.get_booking(booking['JOUR'][4]),
},
}
for activity in child_activities.get('ACTIVITE', []):
activity['id'] = activity['IDACTIVITE']
start_date = datetime.datetime.strptime(activity['DATEENTREE'], axel.json_date_format)
end_date = datetime.datetime.strptime(activity['DATESORTIE'], axel.json_date_format)
activity['text'] = '{} (inscription du {} au {})'.format(
activity['LIBELLEACTIVITE'],
start_date.strftime(axel.xml_date_format),
end_date.strftime(axel.xml_date_format),
)
activity['annee_reference'] = reference_year
activity['annee_reference_short'] = str(reference_year)[2:]
activity['annee_reference_label'] = '{}/{}'.format(reference_year, reference_year + 1)
activity['booking'] = booking_days.get(activity['IDACTIVITE'], {})
cache.set(cache_key, child_activities, 30) # 30 seconds
return child_activities
@endpoint(
display_category='CAN-CLA',
display_order=1,
description=_("Get the list of reference years available for bookings"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'pivot_date': {
'description': _('Pivot date (format MM-DD). After this date, next year is available.')
},
},
)
def clae_years(self, request, NameID, pivot_date):
link = self.get_link(NameID)
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
# get pivot date
try:
pivot_date = datetime.datetime.strptime(
'%s-%s' % (reference_year, pivot_date), axel.json_date_format
).date()
except ValueError:
raise APIError('bad date format, should be MM-DD', err_code='bad-request', http_status=400)
# adjust pivot year
if pivot_date.month <= 7:
# between january and july, reference year is the year just before
pivot_date = pivot_date.replace(year=reference_year + 1)
data = [
{
'id': str(reference_year),
'text': '%s/%s' % (reference_year, reference_year + 1),
'type': 'encours',
'refdate': today.strftime(axel.json_date_format),
}
]
if today < pivot_date:
# date pivot not in the past, return only current year
return {'data': data}
children_activities = self.get_children_activities(dui=link.dui, reference_year=reference_year + 1)
if not children_activities:
# no activities for next year, return only current year
return {'data': data}
try:
next_ref_date = today.replace(year=today.year + 1)
except ValueError:
# 02/29 ?
next_ref_date = today + datetime.timedelta(days=366)
# return also next year
data.append(
{
'id': str(reference_year + 1),
'text': '%s/%s' % (reference_year + 1, reference_year + 2),
'type': 'suivante',
'refdate': next_ref_date.strftime(axel.json_date_format),
}
)
return {'data': data}
@endpoint(
display_category='CAN-CLA',
display_order=2,
description=_("Get information about CLAE activities of all children for the year"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'booking_date': {'description': _('Booking date (to get reference year)')},
},
)
def clae_children_activities_info(self, request, NameID, booking_date):
link = self.get_link(NameID)
try:
booking_date = datetime.datetime.strptime(booking_date, axel.json_date_format)
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
reference_year = utils.get_reference_year_from_date(booking_date)
children_activities = self.get_children_activities(dui=link.dui, reference_year=reference_year)
for child in children_activities.values():
child['id'] = child['IDPERSONNE']
child['text'] = '{} {}'.format(child['PRENOM'], child['NOM']).strip()
return {'data': list(children_activities.values())}
@endpoint(
display_category='CAN-CLA',
display_order=3,
description=_("Get the list of CLAE booked activities of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
},
)
def clae_booking_activities_info(self, request, NameID, idpersonne, start_date, end_date):
link = self.get_link(NameID)
try:
start_date = datetime.datetime.strptime(start_date, axel.json_date_format).date()
end_date = datetime.datetime.strptime(end_date, axel.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
today = datetime.date.today()
in_8_days = today + datetime.timedelta(days=8)
def get_activities_for_week(week_start_date, week_end_date):
booking_data = self.get_booking_data(
dui=link.dui, child_id=idpersonne, booking_date=week_start_date
).get('ACTIVITE', [])
booking_data = {d['TYPEACTIVITE']: d for d in booking_data}
start_date, end_date = utils.get_week_dates_from_date(week_start_date)
week = 'week:%s:%s' % (
start_date.strftime(axel.json_date_format),
end_date.strftime(axel.json_date_format),
)
day_date = week_start_date
while day_date <= week_end_date:
day = WEEKDAYS[day_date.weekday()]
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type not in booking_data:
continue
activity = booking_data[activity_type]
booked = activity['booking']['days'][day]
if booked is not None:
yield {
'day': day_date.strftime(axel.json_date_format),
'activity_id': activity['id'],
'activity_type': activity_type,
'activity_label': activity['LIBELLEACTIVITE'],
'booked': booked,
'bookable': day_date >= in_8_days,
'week': week,
}
day_date = day_date + datetime.timedelta(days=1)
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
result = []
# cross all weeks until end date
while week_end_date <= end_date:
result += list(get_activities_for_week(max(start_date, week_start_date), week_end_date))
if week_end_date == end_date:
break
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = min(week_end_date + datetime.timedelta(days=7), end_date)
return {'data': result}
def get_min_and_max_possible_days(self, dui, reference_year, child_id):
child_activities = self.get_child_activities(
dui=dui, reference_year=reference_year, child_id=child_id
)
if not child_activities.get('ACTIVITE', []):
return None, None
entree_dates = [act['DATEENTREE'] for act in child_activities.get('ACTIVITE', [])]
sortie_dates = [act['DATESORTIE'] for act in child_activities.get('ACTIVITE', [])]
return (
datetime.datetime.strptime(max(entree_dates), axel.json_date_format).date(),
datetime.datetime.strptime(min(sortie_dates), axel.json_date_format).date(),
)
@endpoint(
display_category='CAN-CLA',
display_order=4,
description=_("Get possible days to book an activity of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
},
)
def clae_booking_activity_possible_days(
self, request, NameID, idpersonne, activity_type, start_date, end_date
):
link = self.get_link(NameID)
try:
start_date = datetime.datetime.strptime(start_date, axel.json_date_format).date()
end_date = datetime.datetime.strptime(end_date, axel.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
if activity_type not in ['MAT', 'MIDI', 'SOIR', 'GARD']:
raise APIError(
'bad activity_type, should be MAT, MIDI, SOIR or GARD',
err_code='bad-request',
http_status=400,
)
today = datetime.date.today()
# be sure that start_date is after today + 8 days
start_date = max(start_date, today + datetime.timedelta(days=8))
# besure that start_date is after greatest DATEENTREE,
# and end_date is before smallest DATESORTIE
reference_year = utils.get_reference_year_from_date(start_date)
possible_days_min, possible_days_max = self.get_min_and_max_possible_days(
dui=link.dui, reference_year=reference_year, child_id=idpersonne
)
if possible_days_min and possible_days_max:
start_date = max(start_date, possible_days_min)
end_date = min(end_date, possible_days_max)
# if start_date is a saturday or a sunday, jump to the next monday
if start_date.weekday() > 4:
start_date = start_date + datetime.timedelta(days=7 - start_date.weekday())
def get_activity_days_for_week(week_start_date, week_end_date):
# ask Axel for the booking of a week (starts may be a monday, ends a friday)
activities = self.get_booking_data(
dui=link.dui, child_id=idpersonne, booking_date=week_start_date
).get('ACTIVITE', [])
activity = None
for act in activities:
if act['TYPEACTIVITE'] == activity_type:
activity = act
break
if activity is None:
return
day_date = week_start_date
while day_date <= week_end_date:
day = WEEKDAYS[day_date.weekday()]
activity_day = {
'id': '{}:{}:{}:{}'.format(
idpersonne, activity_type, activity['id'], day_date.strftime(axel.json_date_format)
),
'text': dateformat.format(day_date, 'l j F Y'),
'disabled': activity['booking']['days'][day] is None,
'prefill': activity['booking']['days'][day],
'details': activity,
}
day_date = day_date + datetime.timedelta(days=1)
yield activity_day
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
activity_days = []
# cross all weeks until end date
while week_end_date <= end_date:
activity_days += list(get_activity_days_for_week(max(start_date, week_start_date), week_end_date))
if week_end_date == end_date:
break
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = min(week_end_date + datetime.timedelta(days=7), end_date)
return {'data': activity_days}
@endpoint(
display_category='CAN-CLA',
display_order=5,
description=_("Get annual possible days to book an activity of a child"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
'booking_date': {'description': _('Booking date (to get reference year)')},
},
)
def clae_booking_activity_annual_possible_days(
self, request, NameID, idpersonne, activity_type, booking_date
):
link = self.get_link(NameID)
try:
booking_date = datetime.datetime.strptime(booking_date, axel.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
if activity_type not in ['MAT', 'MIDI', 'SOIR', 'GARD']:
raise APIError(
'bad activity_type, should be MAT, MIDI, SOIR or GARD',
err_code='bad-request',
http_status=400,
)
reference_year = utils.get_reference_year_from_date(booking_date)
activities = self.get_child_activities(
dui=link.dui, reference_year=reference_year, child_id=idpersonne
).get('ACTIVITE', [])
activity = None
for act in activities:
if act['TYPEACTIVITE'] == activity_type:
activity = act
break
if activity is None:
return {'data': []}
activity_days = []
for i, day in enumerate(['monday', 'tuesday', 'wednesday', 'thursday', 'friday']):
disabled = False
if activity_type == 'GARD' and day != 'wednesday':
disabled = True
elif activity_type == 'SOIR' and day == 'wednesday':
disabled = True
activity_days.append(
{
'id': '{}:{}:{}:{}'.format(idpersonne, activity_type, activity['IDACTIVITE'], day),
'text': WEEKDAYS_LABELS[i],
'disabled': disabled,
}
)
return {'data': activity_days}
@endpoint(
display_category='CAN-CLA',
display_order=6,
description=_("Get booked days for an activity of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
},
)
def clae_booking_activity_prefill(self, request, NameID, idpersonne, activity_type, start_date, end_date):
possible_days = self.clae_booking_activity_possible_days(
request, NameID, idpersonne, activity_type, start_date, end_date
)
return {'data': [d['id'] for d in possible_days['data'] if d['prefill'] is True]}
@endpoint(
display_category='CAN-CLA',
display_order=7,
description=_("CLAE/Cantine booking"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.BOOKING_SCHEMA,
}
}
},
)
def clae_booking(self, request, NameID, post_data):
link = self.get_link(NameID)
# check dates
today = datetime.date.today()
start_date_min = today + datetime.timedelta(days=8)
start_date = datetime.datetime.strptime(post_data['booking_start_date'], axel.json_date_format).date()
reference_year = utils.get_reference_year_from_date(start_date)
end_date_max = datetime.date(reference_year + 1, 7, 31)
end_date = datetime.datetime.strptime(post_data['booking_end_date'], axel.json_date_format).date()
if start_date > end_date:
raise APIError(
'booking_start_date should be before booking_end_date',
err_code='bad-request',
http_status=400,
)
if start_date < start_date_min:
raise APIError(
'booking_start_date min value: %s' % start_date_min, err_code='bad-request', http_status=400
)
if end_date > end_date_max:
raise APIError(
'booking_end_date max value: %s' % end_date_max, err_code='bad-request', http_status=400
)
# get known activities for this child, to have the ids
child_activities_info = self.get_child_activities(
dui=link.dui, reference_year=reference_year, child_id=post_data['child_id']
)
child_known_activities_by_type = {
a['TYPEACTIVITE']: a for a in child_activities_info.get('ACTIVITE', [])
}
# build activity list to post
activities_by_type = {}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if post_data['booking_list_%s' % activity_type] is None:
# exclude if None (not updated)
continue
if activity_type not in child_known_activities_by_type:
# exclude activity types not registered for the child
continue
activities_by_type[activity_type] = {
'IDACTIVITE': child_known_activities_by_type[activity_type]['IDACTIVITE'],
'ANNEEREFERENCE': str(reference_year),
'PERIODE': [],
}
def get_week_pattern(week_start_date, week_end_date, activity_type, activity_id):
week_pattern = '00000'
day_date = week_start_date
# cross days of the week to find bookings
while day_date <= week_end_date:
key = '{}:{}:{}:{}'.format(
post_data['child_id'],
activity_type,
activity_id,
day_date.strftime(axel.json_date_format),
)
if key in post_data['booking_list_%s' % activity_type]:
week_pattern = (
week_pattern[: day_date.weekday()] + '1' + week_pattern[day_date.weekday() + 1 :]
)
day_date = day_date + datetime.timedelta(days=1)
return week_pattern
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
# cross all weeks until the last week of the posted period
booking_dates = set()
while week_start_date <= end_date:
for activity_type, activity in activities_by_type.items():
real_start_date = max(start_date, week_start_date)
real_end_date = min(end_date, week_end_date)
booking_dates.add(real_start_date)
activity_id = activity['IDACTIVITE']
week_pattern = get_week_pattern(real_start_date, real_end_date, activity_type, activity_id)
activity['PERIODE'].append(
{
'DATEDEBUT': real_start_date.strftime(axel.json_date_format),
'DATEDFIN': real_end_date.strftime(axel.json_date_format),
'SEMAINETYPE': week_pattern,
}
)
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = week_end_date + datetime.timedelta(days=7)
# build data
data = {
'IDDUI': link.dui,
'DATEDEMANDE': today.strftime(axel.json_date_format),
'ENFANT': [
{
'IDPERSONNE': post_data['child_id'],
'REGIME': post_data.get('regime') or child_activities_info['REGIME'],
}
],
}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type in activities_by_type:
if 'ACTIVITE' not in data['ENFANT'][0]:
data['ENFANT'][0]['ACTIVITE'] = []
data['ENFANT'][0]['ACTIVITE'].append(activities_by_type[activity_type])
try:
result = schemas.reservation_annuelle(self, {'PORTAIL': {'DUI': data}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
# invalidate caches
# invalidate get_children_activities cache
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, link.dui, reference_year)
cache.delete(cache_key)
for booking_date in sorted(booking_dates):
# invalidate get_booking_data cache for each week crossed
start_date, end_date = utils.get_week_dates_from_date(booking_date)
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (
self.pk,
link.dui,
post_data['child_id'],
start_date.isoformat(),
)
cache.delete(cache_key)
return {
'updated': True,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
},
}
@endpoint(
display_category='CAN-CLA',
display_order=8,
description=_("CLAE/Cantine annual booking"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.ANNUAL_BOOKING_SCHEMA,
}
}
},
)
def clae_booking_annual(self, request, NameID, post_data):
link = self.get_link(NameID)
# build dates of the period
today = datetime.date.today()
start_date_min = today + datetime.timedelta(days=8)
start_date = datetime.datetime.strptime(post_data['booking_date'], axel.json_date_format).date()
start_date = max(start_date, start_date_min)
reference_year = utils.get_reference_year_from_date(start_date)
end_date = datetime.date(reference_year + 1, 7, 31)
# get known activities for this child, to have the ids
child_activities_info = self.get_child_activities(link.dui, reference_year, post_data['child_id'])
child_known_activities_by_type = {
a['TYPEACTIVITE']: a for a in child_activities_info.get('ACTIVITE', [])
}
# build activity list to post
activities_by_type = {}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if post_data['booking_list_%s' % activity_type] is None:
# exclude if None (not updated)
continue
if activity_type not in child_known_activities_by_type:
# exclude activity types not registered for the child
continue
week_pattern = ''
activity_id = child_known_activities_by_type[activity_type]['IDACTIVITE']
# cross days of the week to find bookings
for i, day in enumerate(['monday', 'tuesday', 'wednesday', 'thursday', 'friday']):
key = '{}:{}:{}:{}'.format(post_data['child_id'], activity_type, activity_id, day)
week_pattern += key in post_data['booking_list_%s' % activity_type] and '1' or '0'
activities_by_type[activity_type] = {
'IDACTIVITE': activity_id,
'ANNEEREFERENCE': str(reference_year),
'PERIODE': [
{
'DATEDEBUT': start_date.strftime(axel.json_date_format),
'DATEDFIN': end_date.strftime(axel.json_date_format),
'SEMAINETYPE': week_pattern,
}
],
}
# build data
data = {
'IDDUI': link.dui,
'DATEDEMANDE': today.strftime(axel.json_date_format),
'ENFANT': [
{
'IDPERSONNE': post_data['child_id'],
'REGIME': post_data.get('regime') or child_activities_info['REGIME'],
}
],
}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type in activities_by_type:
if 'ACTIVITE' not in data['ENFANT'][0]:
data['ENFANT'][0]['ACTIVITE'] = []
data['ENFANT'][0]['ACTIVITE'].append(activities_by_type[activity_type])
try:
result = schemas.reservation_annuelle(self, {'PORTAIL': {'DUI': data}})
except axel.AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request, 'xml_response': e.xml_response},
)
# invalidate cache
# invalidate get_children_activities cache
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, link.dui, reference_year)
cache.delete(cache_key)
booking_date = utils.get_week_dates_from_date(start_date)[0]
while booking_date <= end_date:
# invalidate get_booking_data cache for each monday from now to the end of the reference year
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (
self.pk,
link.dui,
post_data['child_id'],
booking_date.isoformat(),
)
cache.delete(cache_key)
booking_date += datetime.timedelta(days=7)
return {
'updated': True,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
},
}
class Link(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
name_id = models.CharField(blank=False, max_length=256)
dui = models.CharField(blank=False, max_length=128)
person_id = models.CharField(blank=False, max_length=128)
class Meta:
unique_together = ('resource', 'name_id')
class Lock(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
key = models.CharField(max_length=256)
lock_date = models.DateTimeField(auto_now_add=True)
locker = models.CharField(max_length=256, blank=True)
class Meta:
unique_together = ('resource', 'key')