passerelle/passerelle/contrib/toulouse_axel/models.py

1502 lines
62 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import logging
import os
from django.core.cache import cache
from django.db import models
from django.http import HttpResponse
from django.utils import dateformat
from django.utils.dates import WEEKDAYS_REV
from django.utils.dates import WEEKDAYS as WEEKDAYS_LABELS
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.compat import json_loads
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from . import schemas
from . import utils
from .exceptions import AxelError
logger = logging.getLogger('passerelle.contrib.toulouse_axel')
BASE_XSD_PATH = os.path.join(os.path.dirname(__file__), 'xsd')
WEEKDAYS = {v: k for k, v in WEEKDAYS_REV.items()}
class ToulouseAxel(BaseResource):
wsdl_url = models.CharField(
max_length=128,
blank=False,
verbose_name=_('WSDL URL'),
help_text=_('Toulouse Axel WSDL URL'))
category = _('Business Process Connectors')
_category_ordering = ['DUI', 'CAN-CLA', _('Invoices')]
class Meta:
verbose_name = _('Toulouse Axel')
def check_status(self):
response = self.requests.get(self.wsdl_url)
response.raise_for_status()
@endpoint(
display_order=1,
description=_('Lock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to lock')},
'locker': {'description': _('Identifier of the locker (can be empty)')}
})
def lock(self, request, key, locker):
if not key:
raise APIError('key is empty', err_code='bad-request', http_status=400)
lock, created = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
@endpoint(
display_order=2,
description=_('Unlock a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource to unlock')},
})
def unlock(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
lock.delete()
return {'key': key, 'locked': False, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
@endpoint(
display_order=3,
description=_('Get the lock status of a resource'),
perm='can_access',
parameters={
'key': {'description': _('Key of the resource')},
})
def locked(self, request, key):
try:
lock = Lock.objects.get(resource=self, key=key)
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
except Lock.DoesNotExist:
return {'key': key, 'locked': False}
def get_management_dates(self):
cache_key = 'toulouse-axel-%s-management-dates' % self.pk
result = cache.get(cache_key)
if result is not None:
return result
try:
result = schemas.ref_date_gestion_dui(self)
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
management_dates = {}
for key, value in result.json_response['DATA']['PORTAIL']['DUIDATEGESTION'].items():
management_dates[key] = value
management_dates[key.lower().replace('-', '_')] = value
cache.set(cache_key, management_dates, 3600) # 1 hour
return management_dates
@endpoint(
display_order=4,
description=_("Get dates of the update management"),
perm='can_access')
def management_dates(self, request):
return {'data': self.get_management_dates()}
def check_dui(self, post_data):
try:
result = schemas.ref_verif_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
dui_data = result.json_response['DATA']['PORTAIL']['DUI']
code = dui_data['CODE']
if code not in [2, 3]:
# 2: RL1; 3: RL2
raise APIError('Wrong DUI status', err_code='dui-code-error-%s' % code)
return result
@endpoint(
display_category='DUI',
display_order=1,
description=_('Create link between user and Toulouse Axel'),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.LINK_SCHEMA,
}
}
})
def link(self, request, NameID, post_data):
if not NameID:
raise APIError('NameID is empty', err_code='bad-request', http_status=400)
post_data['IDPERSONNE'] = ''
try:
result = self.check_dui(post_data)
except APIError as e:
if e.err_code == 'error':
raise
raise APIError('Person not found', err_code='not-found')
dui_data = result.json_response['DATA']['PORTAIL']['DUI']
link, created = self.link_set.get_or_create(
name_id=NameID,
defaults={
'dui': dui_data['IDDUI'],
'person_id': dui_data['IDPERSONNE']})
if not created and (link.dui != dui_data['IDDUI'] or link.person_id != dui_data['IDPERSONNE']):
raise APIError('Data conflict', err_code='conflict')
return {
'link': link.pk,
'created': created,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
def get_link(self, name_id):
try:
return self.link_set.get(name_id=name_id)
except Link.DoesNotExist:
raise APIError('Person not found', err_code='not-found')
@endpoint(
display_category='DUI',
display_order=2,
description=_('Delete link between user and Toulouse Axel'),
methods=['post'],
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def unlink(self, request, NameID):
link = self.get_link(NameID)
link_id = link.pk
link.delete()
return {'link': link_id, 'deleted': True, 'dui': link.dui}
@endpoint(
display_category='DUI',
display_order=3,
description=_("Check DUI status"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def active_dui(self, request, NameID):
# get link if exists
try:
link = self.get_link(NameID)
except APIError:
raise APIError('Unknown NameID', err_code='unknown')
# get family info
try:
family_data = self.get_family_data(link.dui)
except APIError:
raise APIError('No family info', err_code='no-family-info')
# to get the corresponding RL
rl = None
for key in ['RL1', 'RL2']:
if key not in family_data:
continue
if family_data[key]['IDPERSONNE'] == link.person_id:
rl = family_data[key]
break
if rl is None:
raise APIError('No corresponding RL', err_code='no-rl')
# now check DUI status
post_data = {
'IDDUI': family_data['IDDUI'],
'IDPERSONNE': '',
'PRENOM': rl['PRENOM'],
'NOM': rl['NOM'],
'NAISSANCE': rl['DATENAISSANCE'],
}
self.check_dui(post_data)
return {'data': family_data}
@endpoint(
display_order=5,
description=_("Get a referential"),
perm='can_access',
pattern=r'^(?P<code>[\w-]+)/?$',
example_pattern='{code}',
parameters={
'code': {'description': _('Referential code. Possible values: situation_familiale, csp, lien_parente, type_regime, regime'),
'example_value': 'csp'},
})
def referential(self, request, code):
if code not in ['situation_familiale', 'csp', 'lien_parente', 'type_regime', 'regime']:
raise APIError('Referential not found', err_code='not-found')
references = getattr(utils, '{}_mapping'.format(code))
if references is None:
raise APIError('Referential not found', err_code='not-found', http_status=404)
return {'data': [{'id': key, 'text': val} for key, val in references.items()]}
def get_family_data(self, dui, check_registrations=False, with_management_dates=False):
try:
result = schemas.ref_famille_dui(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
family_data = result.json_response['DATA']['PORTAIL']['DUI']
today = datetime.date.today()
current_reference_year = utils.get_reference_year_from_date(today)
next_reference_year = current_reference_year + 1
if check_registrations:
children_registred = self.are_children_registered(dui=dui)
for child in family_data.get('ENFANT', []):
child['clae_cantine_current'] = children_registred.get(child['IDPERSONNE'])
if with_management_dates:
family_data['management_dates'] = self.get_management_dates()
family_data['annee_reference'] = current_reference_year
family_data['annee_reference_short'] = str(current_reference_year)[2:]
family_data['annee_reference_label'] = '{}/{}'.format(current_reference_year, next_reference_year)
family_data['SITUATIONFAMILIALE_label'] = utils.get_label(utils.situation_familiale_mapping, family_data['SITUATIONFAMILIALE'])
for key in ['RL1', 'RL2']:
if key not in family_data:
continue
rl = family_data[key]
rl['CSP_label'] = utils.get_label(utils.csp_mapping, rl['CSP'])
for child in family_data.get('ENFANT', []):
child['id'] = child['IDPERSONNE']
child['text'] = '{} {}'.format(child['PRENOM'], child['NOM']).strip()
for i, contact in enumerate(child.get('CONTACT', [])):
contact['id'] = i
contact['text'] = '{} {}'.format(contact['PRENOM'], contact['NOM']).strip()
contact['LIENPARENTE_label'] = utils.get_label(utils.lien_parente_mapping, contact['LIENPARENTE'])
if 'REVENUS' in family_data:
family_data['REVENUS']['TYPEREGIME_label'] = utils.get_label(utils.type_regime_mapping, family_data['REVENUS']['TYPEREGIME'])
return family_data
@endpoint(
display_category='DUI',
display_order=4,
description=_("Get information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def family_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True, with_management_dates=True)
return {'data': family_data}
@endpoint(
display_category='DUI',
display_order=5,
description=_("Get information about children"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
})
def children_info(self, request, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
return {'data': family_data.get('ENFANT', [])}
@endpoint(
display_category='DUI',
display_order=6,
description=_("Get information about a child"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
})
def child_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data.get('ENFANT', []):
if child['IDPERSONNE'] == idpersonne:
return {'data': child}
raise APIError('Child not found', err_code='not-found')
@endpoint(
display_category='DUI',
display_order=7,
description=_("Get information about a child's contacts"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
})
def child_contacts_info(self, request, idpersonne, NameID):
link = self.get_link(NameID)
family_data = self.get_family_data(link.dui, check_registrations=True)
for child in family_data['ENFANT']:
if child['IDPERSONNE'] == idpersonne:
return {'data': child.get('CONTACT', [])}
raise APIError('Child not found', err_code='not-found')
def pre_sanitize_update_family_data(self, post_data):
# before json payload validation, check maj fields and remove empty blocks
# transform ENFANT list to dict, where the key is the field IDPERSONNE
# because children in post_data are maybe not in the same order than on Axel side
children = {}
children_by_index = {}
for i, child in enumerate(post_data.get('ENFANT', [])):
# check if IDPERSONNE is filled
if child.get('IDPERSONNE'):
children[child['IDPERSONNE']] = child
children_by_index[str(i)] = child['IDPERSONNE']
post_data['ENFANT'] = children
# sanitize post_data
flags = sorted(schemas.UPDATE_FAMILY_FLAGS.keys())
for flag in flags:
flag_value = post_data.get(flag)
flag_value = utils.encode_bool(flag_value)
# no update for the related block
if flag_value == 'OUI':
continue
# build the xml elements to cross
key = schemas.UPDATE_FAMILY_FLAGS[flag]
# special case for ENFANT flags
if key.startswith('ENFANT/'):
# replace the index by IDPERSONNE value
index = key.split('/')[1]
if index not in children_by_index:
# no child with IDPERSONNE found in post_data
continue
key = key.replace('ENFANT/%s' % index, 'ENFANT/%s' % children_by_index[index])
elements = key.split('/')
schema = schemas.UPDATE_FAMILY_SCHEMA
data = post_data
# find the structure in schema and data containing the element to remove
not_found = False
for element in elements[:-1]:
if schema.get('type') == 'array':
schema = schema['items']
else:
schema = schema['properties'][element]
try:
data = data[element]
except (IndexError, KeyError):
not_found = True
break
if not_found:
continue
element_to_remove = elements[-1]
if element_to_remove == 'ADRESSE':
# empty all subelements
for k in data[element_to_remove].keys():
data[element_to_remove][k] = None
elif element_to_remove in ['HANDICAP', 'ALLERGIE']:
if element_to_remove in data:
# will be filled in sanitize_update_family_data
data[element_to_remove]['_to_reset'] = True
elif element_to_remove in data:
# remove block
data.pop(element_to_remove)
# transform ENFANT dict to a list back
post_data['ENFANT'] = list(post_data['ENFANT'].values())
# remove incomplete CONTACT
for child in post_data['ENFANT']:
if 'CONTACT' not in child:
continue
child['CONTACT'] = [c for c in child['CONTACT'] if c['NOM'] and c['PRENOM']]
# if ENFANT block is empty, remove it
if not post_data['ENFANT']:
post_data.pop('ENFANT')
utils.upperize(post_data)
schemas.UPDATE_FAMILY_SCHEMA['pre_process'] = pre_sanitize_update_family_data
def sanitize_update_family_data(self, dui, post_data):
family_data = None
def fill_handicap_fields(child_data, orig_family_data):
child_id = child_data['IDPERSONNE']
for orig_child in orig_family_data.get('ENFANT', []):
# find the correct child in family info
if orig_child['IDPERSONNE'] != child_id:
continue
# reset handicap related fields
handicap_fields = [
'AUTREDIFFICULTE',
'ECOLESPECIALISEE',
'INDICATEURAUXILIAIREVS',
'INDICATEURECOLE',
'INDICATEURHANDICAP',
'INDICATEURNOTIFMDPH',
]
for key in handicap_fields:
child_data['SANITAIRE'][key] = orig_child['SANITAIRE'][key]
def fill_allergie_fields(child_data, orig_family_data):
child_id = child_data['IDPERSONNE']
for orig_child in orig_family_data.get('ENFANT', []):
# find the correct child in family info
if orig_child['IDPERSONNE'] != child_id:
continue
# reset allergie related fields
child_data['SANITAIRE']['ALLERGIE'] = orig_child['SANITAIRE'].get('ALLERGIE', [])
for child in post_data.get('ENFANT', []):
if 'SANITAIRE' not in child:
continue
# check if HANDICAP fields are to be filled
if child['SANITAIRE']['HANDICAP'].pop('_to_reset', False) is True:
# get family info
if family_data is None:
family_data = self.get_family_data(dui)
# and fill HANDICAP fields
fill_handicap_fields(child, family_data)
child['SANITAIRE'].pop('HANDICAP')
else:
# transform HANDICAP block
child['SANITAIRE'].update(child['SANITAIRE'].pop('HANDICAP'))
if 'ALLERGIE' not in child['SANITAIRE']:
# ALLERGIE block is optional
continue
# check if ALLERGIE fields are to be filled
if child['SANITAIRE']['ALLERGIE'].pop('_to_reset', False) is True:
# get family info
if family_data is None:
family_data = self.get_family_data(dui)
# and fill ALLERGIE fields
fill_allergie_fields(child, family_data)
else:
# transform ALLERGIE block
new_allergie = []
for key in ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES']:
if utils.encode_bool(child['SANITAIRE']['ALLERGIE'][key]) == 'OUI':
new_allergie.append({
'TYPE': key,
'ALLERGIQUE': 'OUI',
'NOMALLERGIE': None,
})
if child['SANITAIRE']['ALLERGIE']['AUTRES']:
new_allergie.append({
'TYPE': 'AUTRES',
'ALLERGIQUE': 'OUI',
'NOMALLERGIE': child['SANITAIRE']['ALLERGIE']['AUTRES'],
})
child['SANITAIRE']['ALLERGIE'] = new_allergie
if not child['SANITAIRE'].get('ALLERGIE'):
# remove ALLERGIE block if empty
child['SANITAIRE'].pop('ALLERGIE')
# retrieve RL not posted fields
for rl in ['RL1', 'RL2']:
if rl not in post_data:
continue
if family_data is None:
family_data = self.get_family_data(dui)
# fill missing fields
for key in ['IDPERSONNE', 'NOM', 'PRENOM', 'NOMJEUNEFILLE', 'DATENAISSANCE', 'CIVILITE']:
post_data[rl][key] = family_data[rl][key]
post_data[rl]['INDICATEURRL'] = '1' if rl == 'RL1' else '2'
# fill NBENFANTSACHARGE
if 'REVENUS' in post_data:
if family_data is None:
family_data = self.get_family_data(dui)
post_data['REVENUS']['NBENFANTSACHARGE'] = family_data.get('REVENUS', {}).get('NBENFANTSACHARGE')
# remove flags
for flag in schemas.UPDATE_FAMILY_FLAGS.keys():
post_data.pop(flag)
@endpoint(
display_category='DUI',
display_order=8,
description=_("Update information about user's family"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.UPDATE_FAMILY_SCHEMA,
}
}
})
def update_family_info(self, request, NameID, post_data):
link = self.get_link(NameID)
# prepare data
post_data['IDDUI'] = link.dui
post_data['DATEDEMANDE'] = datetime.date.today().strftime(utils.json_date_format)
self.sanitize_update_family_data(dui=link.dui, post_data=post_data)
if 'RL2' in post_data and post_data['RL2'].get('IDPERSONNE') == link.person_id:
post_data['QUIACTUALISEDUI'] = '2'
else:
post_data['QUIACTUALISEDUI'] = '1'
try:
result = schemas.form_maj_famille_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'error_post_data': post_data,
'xml_request': e.xml_request,
'xml_response': e.xml_response})
return {
'updated': True,
'dui': link.dui,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
def get_invoices(self, regie_id, dui=None, name_id=None):
assert name_id or dui
if name_id:
dui = self.get_link(name_id).dui
try:
result = schemas.ref_facture_a_payer(self, {'PORTAIL': {'DUI': {'IDDUI': dui}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
data = result.json_response['DATA']['PORTAIL']['DUI']
result = []
for facture in data.get('FACTURES', []):
if facture['IDREGIE'] != regie_id:
continue
result.append(utils.normalize_invoice(facture, dui))
return result
def get_historical_invoices(self, name_id):
link = self.get_link(name_id)
try:
result = schemas.list_dui_factures(
self,
{'LISTFACTURE': {'NUMDUI': link.dui, 'DEBUT': '1970-01-01'}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
data = result.json_response['DATA']['PORTAIL']['LISTFACTURE']
result = []
for direction in data.get('DIRECTION', []):
for facture in direction.get('FACTURE', []):
result.append(
utils.normalize_invoice(
facture,
link.dui,
historical=True,
vendor_base={
'NUMDIRECTION': direction['NUMDIRECTION'],
'IDDIRECTION': direction['IDDIRECTION'],
'LIBDIRECTION': direction['LIBDIRECTION'],
}))
return result
def get_invoice(self, regie_id, invoice_id, dui=None, name_id=None, historical=None):
if historical:
invoices_data = self.get_historical_invoices(name_id=name_id)
else:
invoices_data = self.get_invoices(regie_id=regie_id, dui=dui, name_id=name_id)
for invoice in invoices_data:
if invoice['display_id'] == invoice_id:
return invoice
@endpoint(
display_category=_('Invoices'),
display_order=1,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/?$',
example_pattern='{regie_id}/invoices',
description=_("Get invoices to pay"),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'}
})
def invoices(self, request, regie_id, NameID):
invoices_data = self.get_invoices(regie_id=regie_id, name_id=NameID)
return {'data': invoices_data}
@endpoint(
display_category=_('Invoices'),
display_order=2,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoices/history/?$',
example_pattern='{regie_id}/invoices/history',
description=_("Get invoices already paid"),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'}
})
def invoices_history(self, request, regie_id, NameID):
invoices_data = self.get_historical_invoices(name_id=NameID)
return {'data': invoices_data}
@endpoint(
display_category=_('Invoices'),
display_order=3,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?\w+-\d+)/?$',
example_pattern='{regie_id}/invoice/{invoice_id}',
description=_('Get invoice details'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
})
def invoice(self, request, regie_id, invoice_id, NameID):
real_invoice_id = invoice_id.split('-')[-1]
historical = invoice_id.startswith('historical-')
invoice = self.get_invoice(regie_id=regie_id, name_id=NameID, invoice_id=real_invoice_id, historical=historical)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
return {'data': invoice}
@endpoint(
display_category=_('Invoices'),
display_order=4,
name='regie',
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>(historical-)?\w+-\d+)/pdf/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pdf',
description=_('Get invoice as a PDF file'),
parameters={
'NameID': {'description': _('Publik ID')},
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
})
def invoice_pdf(self, request, regie_id, invoice_id, NameID):
# check that invoice is related to current user
real_invoice_id = invoice_id.split('-')[-1]
historical = invoice_id.startswith('historical-')
try:
invoice = self.get_invoice(regie_id=regie_id, name_id=NameID, invoice_id=real_invoice_id, historical=historical)
except APIError as e:
e.http_status = 404
raise
if invoice is None:
raise APIError('Invoice not found', err_code='not-found', http_status=404)
# check that PDF is available
if not invoice['has_pdf']:
raise APIError('PDF not available', err_code='not-available', http_status=404)
try:
result = schemas.ref_facture_pdf(self, {'PORTAIL': {'FACTUREPDF': {'IDFACTURE': int(invoice['display_id'])}}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
http_status=404,
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
b64content = base64.b64decode(result.json_response['DATA']['PORTAIL']['PDF']['@FILE'])
if not b64content:
raise APIError('PDF error', err_code='error', http_status=404)
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % invoice_id
response.write(b64content)
return response
@endpoint(
display_category=_('Invoices'),
display_order=5,
name='regie',
methods=['post'],
perm='can_access',
pattern=r'^(?P<regie_id>[\w-]+)/invoice/(?P<invoice_id>\w+-\d+)/pay/?$',
example_pattern='{regie_id}/invoice/{invoice_id}/pay',
description=_('Notify an invoice as paid'),
parameters={
'regie_id': {'description': _('Regie identifier'), 'example_value': '42-PERISCOL'},
'invoice_id': {'description': _('Invoice identifier'), 'example_value': 'DUI-42'}
},
post={
'request_body': {
'schema': {
'application/json': schemas.PAYMENT_SCHEMA,
}
}
})
def pay_invoice(self, request, regie_id, invoice_id, **kwargs):
data = json_loads(request.body)
dui, invoice_id = invoice_id.split('-')
invoice = self.get_invoice(regie_id=regie_id, dui=dui, invoice_id=invoice_id)
if invoice is None:
raise APIError('Invoice not found', err_code='not-found')
transaction_amount = invoice['amount']
transaction_id = data['transaction_id']
transaction_date = utils.parse_datetime(data['transaction_date'])
if transaction_date is None:
raise APIError('invalid transaction_date')
transaction_date = utils.encode_datetime(transaction_date)
post_data = {
'IDFACTURE': int(invoice_id),
'IDREGIEENCAISSEMENT': '',
'MONTANTPAYE': transaction_amount,
'DATEPAIEMENT': transaction_date,
'REFERENCE': transaction_id,
}
try:
schemas.form_paiement_dui(self, {'PORTAIL': {'DUI': post_data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
return {'data': True}
def get_children_activities(self, dui, reference_year):
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, dui, reference_year)
result = cache.get(cache_key)
if result is not None:
return result
try:
result = schemas.enfants_activites(self, {
'DUI': {
'IDDUI': dui,
'ANNEEREFERENCE': str(reference_year),
'TYPESACTIVITES': 'MAT,MIDI,SOIR,GARD',
}
})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
children_activities = result.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', [])
for child in children_activities:
child['REGIME_label'] = utils.get_label(utils.regime_mapping, child['REGIME'])
data = {}
# format and filter children
for child in children_activities:
# exclude child in private schools
if child['LIBELLEECOLE'].startswith('PRIVEE'):
continue
# exclude also child with more than one registration per activity_type or missing activity
activity_types = [a['TYPEACTIVITE'] for a in child.get('ACTIVITE', [])]
activity_types.sort()
if activity_types != ['MAT', 'MIDI', 'SOIR'] and activity_types != ['GARD', 'MAT', 'MIDI', 'SOIR']:
# GARD is optional
continue
# ok, store child
data[child['IDPERSONNE']] = child
cache.set(cache_key, data, 30) # 30 seconds
return data
def get_child_activities(self, dui, reference_year, child_id):
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
if child_id not in children_activities:
raise APIError('Child not found', err_code='not-found')
return children_activities[child_id]
def are_children_registered(self, dui):
# check reference_year
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
try:
children_activities = self.get_children_activities(dui=dui, reference_year=reference_year)
except APIError:
# don't fail on the check
return {}
return {child_id: bool(child.get('ACTIVITE', [])) for child_id, child in children_activities.items()}
def get_booking_data(self, dui, child_id, booking_date):
start_date, end_date = utils.get_week_dates_from_date(booking_date)
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (self.pk, dui, child_id, start_date.isoformat())
result = cache.get(cache_key)
if result is not None:
return result
reference_year = utils.get_reference_year_from_date(booking_date)
# first get activities information for the child
child_activities = self.get_child_activities(
dui=dui,
reference_year=reference_year,
child_id=child_id)
# then get booking of the requested week for the child
activity_ids = [act['IDACTIVITE'] for act in child_activities.get('ACTIVITE', [])]
activity_data = []
for activity_id in activity_ids:
activity_data.append({
'IDACTIVITE': activity_id,
'ANNEEREFERENCE': str(reference_year),
'DATEDEBUT': start_date.strftime(utils.json_date_format),
'DATEDFIN': end_date.strftime(utils.json_date_format),
})
try:
data = schemas.reservation_periode(self, {'PORTAIL': {
'DUI': {
'IDDUI': dui,
'ENFANT': {
'IDPERSONNE': child_id,
'ACTIVITE': activity_data,
}
}
}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
child_booking = None
for child in data.json_response['DATA']['PORTAIL']['DUI'].get('ENFANT', []):
if child['IDPERSONNE'] == child_id:
child_booking = child
break
if child_booking is None:
# should not happen
raise APIError('Child not found', err_code='not-found')
# build the response payload: add booking to activities information
booking_days = {}
for booking in child_booking.get('ACTIVITE', []):
booking_days[booking['IDACTIVITE']] = {
'raw_value': booking['JOUR'],
'days': {
'monday': utils.get_booking(booking['JOUR'][0]),
'tuesday': utils.get_booking(booking['JOUR'][1]),
'wednesday': utils.get_booking(booking['JOUR'][2]),
'thursday': utils.get_booking(booking['JOUR'][3]),
'friday': utils.get_booking(booking['JOUR'][4]),
}
}
for activity in child_activities.get('ACTIVITE', []):
activity['id'] = activity['IDACTIVITE']
start_date = datetime.datetime.strptime(activity['DATEENTREE'], utils.json_date_format)
end_date = datetime.datetime.strptime(activity['DATESORTIE'], utils.json_date_format)
activity['text'] = u'{} (inscription du {} au {})'.format(
activity['LIBELLEACTIVITE'],
start_date.strftime(utils.xml_date_format),
end_date.strftime(utils.xml_date_format))
activity['annee_reference'] = reference_year
activity['annee_reference_short'] = str(reference_year)[2:]
activity['annee_reference_label'] = '{}/{}'.format(reference_year, reference_year + 1)
activity['booking'] = booking_days.get(activity['IDACTIVITE'], {})
cache.set(cache_key, child_activities, 30) # 30 seconds
return child_activities
@endpoint(
display_category='CAN-CLA',
display_order=1,
description=_("Get the list of reference years available for bookings"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'pivot_date': {'description': _('Pivot date (format MM-DD). After this date, next year is available.')},
})
def clae_years(self, request, NameID, pivot_date):
link = self.get_link(NameID)
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
# get pivot date
try:
pivot_date = datetime.datetime.strptime('%s-%s' % (reference_year, pivot_date), utils.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be MM-DD', err_code='bad-request', http_status=400)
# adjust pivot year
if pivot_date.month <= 7:
# between january and july, reference year is the year just before
pivot_date = pivot_date.replace(year=reference_year + 1)
data = [{
'id': str(reference_year),
'text': '%s/%s' % (reference_year, reference_year + 1),
'type': 'encours',
'refdate': today.strftime(utils.json_date_format)
}]
if today < pivot_date:
# date pivot not in the past, return only current year
return {'data': data}
children_activities = self.get_children_activities(dui=link.dui, reference_year=reference_year + 1)
if not children_activities:
# no activities for next year, return only current year
return {'data': data}
try:
next_ref_date = today.replace(year=today.year + 1)
except ValueError:
# 02/29 ?
next_ref_date = today + datetime.timedelta(days=366)
# return also next year
data.append({
'id': str(reference_year + 1),
'text': '%s/%s' % (reference_year + 1, reference_year + 2),
'type': 'suivante',
'refdate': next_ref_date.strftime(utils.json_date_format)
})
return {'data': data}
@endpoint(
display_category='CAN-CLA',
display_order=2,
description=_("Get information about CLAE activities of all children for the year"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'booking_date': {'description': _('Booking date (to get reference year)')},
})
def clae_children_activities_info(self, request, NameID, booking_date):
link = self.get_link(NameID)
try:
booking_date = datetime.datetime.strptime(booking_date, utils.json_date_format)
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
reference_year = utils.get_reference_year_from_date(booking_date)
children_activities = self.get_children_activities(dui=link.dui, reference_year=reference_year)
for child in children_activities.values():
child['id'] = child['IDPERSONNE']
child['text'] = '{} {}'.format(child['PRENOM'], child['NOM']).strip()
return {'data': list(children_activities.values())}
@endpoint(
display_category='CAN-CLA',
display_order=3,
description=_("Get the list of CLAE booked activities of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
})
def clae_booking_activities_info(self, request, NameID, idpersonne, start_date, end_date):
link = self.get_link(NameID)
try:
start_date = datetime.datetime.strptime(start_date, utils.json_date_format).date()
end_date = datetime.datetime.strptime(end_date, utils.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
today = datetime.date.today()
in_8_days = today + datetime.timedelta(days=8)
def get_activities_for_week(week_start_date, week_end_date):
booking_data = self.get_booking_data(dui=link.dui, child_id=idpersonne, booking_date=week_start_date).get('ACTIVITE', [])
booking_data = {d['TYPEACTIVITE']: d for d in booking_data}
start_date, end_date = utils.get_week_dates_from_date(week_start_date)
week = 'week:%s:%s' % (start_date.strftime(utils.json_date_format), end_date.strftime(utils.json_date_format))
day_date = week_start_date
while day_date <= week_end_date:
day = WEEKDAYS[day_date.weekday()]
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type not in booking_data:
continue
activity = booking_data[activity_type]
booked = activity['booking']['days'][day]
if booked is not None:
yield {
'day': day_date.strftime(utils.json_date_format),
'activity_id': activity['id'],
'activity_type': activity_type,
'activity_label': activity['LIBELLEACTIVITE'],
'booked': booked,
'bookable': day_date >= in_8_days,
'week': week,
}
day_date = day_date + datetime.timedelta(days=1)
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
result = []
# cross all weeks until end date
while week_end_date <= end_date:
result += [a for a in get_activities_for_week(max(start_date, week_start_date), week_end_date)]
if week_end_date == end_date:
break
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = min(week_end_date + datetime.timedelta(days=7), end_date)
return {'data': result}
def get_min_and_max_possible_days(self, dui, reference_year, child_id):
child_activities = self.get_child_activities(
dui=dui,
reference_year=reference_year,
child_id=child_id)
if not child_activities.get('ACTIVITE', []):
return None, None
entree_dates = [act['DATEENTREE'] for act in child_activities.get('ACTIVITE', [])]
sortie_dates = [act['DATESORTIE'] for act in child_activities.get('ACTIVITE', [])]
return (
datetime.datetime.strptime(max(entree_dates), utils.json_date_format).date(),
datetime.datetime.strptime(min(sortie_dates), utils.json_date_format).date()
)
@endpoint(
display_category='CAN-CLA',
display_order=4,
description=_("Get possible days to book an activity of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
})
def clae_booking_activity_possible_days(self, request, NameID, idpersonne, activity_type, start_date, end_date):
link = self.get_link(NameID)
try:
start_date = datetime.datetime.strptime(start_date, utils.json_date_format).date()
end_date = datetime.datetime.strptime(end_date, utils.json_date_format).date()
except ValueError:
raise APIError('bad date format, should be YYYY-MM-DD', err_code='bad-request', http_status=400)
if activity_type not in ['MAT', 'MIDI', 'SOIR', 'GARD']:
raise APIError('bad activity_type, should be MAT, MIDI, SOIR or GARD', err_code='bad-request', http_status=400)
today = datetime.date.today()
# be sure that start_date is after today + 8 days
start_date = max(start_date, today + datetime.timedelta(days=8))
# besure that start_date is after greatest DATEENTREE,
# and end_date is before smallest DATESORTIE
reference_year = utils.get_reference_year_from_date(today)
possible_days_min, possible_days_max = self.get_min_and_max_possible_days(
dui=link.dui, reference_year=reference_year, child_id=idpersonne)
if possible_days_min and possible_days_max:
start_date = max(start_date, possible_days_min)
end_date = min(end_date, possible_days_max)
# if start_date is a saturday or a sunday, jump to the next monday
if start_date.weekday() > 4:
start_date = start_date + datetime.timedelta(days=7 - start_date.weekday())
def get_activity_days_for_week(week_start_date, week_end_date):
# ask Axel for the booking of a week (starts may be a monday, ends a friday)
activities = self.get_booking_data(
dui=link.dui, child_id=idpersonne, booking_date=week_start_date).get('ACTIVITE', [])
activity = None
for act in activities:
if act['TYPEACTIVITE'] == activity_type:
activity = act
break
if activity is None:
return
day_date = week_start_date
while day_date <= week_end_date:
day = WEEKDAYS[day_date.weekday()]
activity_day = {
'id': '{}:{}:{}:{}'.format(idpersonne, activity_type, activity['id'], day_date.strftime(utils.json_date_format)),
'text': dateformat.format(day_date, 'l j F Y'),
'disabled': activity['booking']['days'][day] is None,
'prefill': activity['booking']['days'][day],
'details': activity,
}
day_date = day_date + datetime.timedelta(days=1)
yield activity_day
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
activity_days = []
# cross all weeks until end date
while week_end_date <= end_date:
activity_days += [d for d in get_activity_days_for_week(max(start_date, week_start_date), week_end_date)]
if week_end_date == end_date:
break
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = min(week_end_date + datetime.timedelta(days=7), end_date)
return {'data': activity_days}
@endpoint(
display_category='CAN-CLA',
display_order=5,
description=_("Get annual possible days to book an activity of a child"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
})
def clae_booking_activity_annual_possible_days(self, request, NameID, idpersonne, activity_type):
link = self.get_link(NameID)
if activity_type not in ['MAT', 'MIDI', 'SOIR', 'GARD']:
raise APIError('bad activity_type, should be MAT, MIDI, SOIR or GARD', err_code='bad-request', http_status=400)
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
activities = self.get_child_activities(
dui=link.dui,
reference_year=reference_year,
child_id=idpersonne).get('ACTIVITE', [])
activity = None
for act in activities:
if act['TYPEACTIVITE'] == activity_type:
activity = act
break
if activity is None:
return {'data': []}
activity_days = []
for i, day in enumerate(['monday', 'tuesday', 'wednesday', 'thursday', 'friday']):
disabled = False
if activity_type == 'GARD' and day != 'wednesday':
disabled = True
elif activity_type == 'SOIR' and day == 'wednesday':
disabled = True
activity_days.append({
'id': '{}:{}:{}:{}'.format(idpersonne, activity_type, activity['IDACTIVITE'], day),
'text': WEEKDAYS_LABELS[i],
'disabled': disabled,
})
return {'data': activity_days}
@endpoint(
display_category='CAN-CLA',
display_order=6,
description=_("Get booked days for an activity of a child, for a period"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'idpersonne': {'description': _('Child ID')},
'activity_type': {'description': _('Activity type (MAT, MIDI, SOIR, GARD)')},
'start_date': {'description': _('Start date of the period')},
'end_date': {'description': _('End date of the period')},
})
def clae_booking_activity_prefill(self, request, NameID, idpersonne, activity_type, start_date, end_date):
possible_days = self.clae_booking_activity_possible_days(request, NameID, idpersonne, activity_type, start_date, end_date)
return {'data': [d['id'] for d in possible_days['data'] if d['prefill'] is True]}
@endpoint(
display_category='CAN-CLA',
display_order=7,
description=_("CLAE/Cantine booking"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.BOOKING_SCHEMA,
}
}
})
def clae_booking(self, request, NameID, post_data):
link = self.get_link(NameID)
# check dates
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
start_date_min = today + datetime.timedelta(days=8)
end_date_max = datetime.date(reference_year+1, 7, 31)
start_date = datetime.datetime.strptime(post_data['booking_start_date'], utils.json_date_format).date()
end_date = datetime.datetime.strptime(post_data['booking_end_date'], utils.json_date_format).date()
if start_date > end_date:
raise APIError('booking_start_date should be before booking_end_date', err_code='bad-request', http_status=400)
if start_date < start_date_min:
raise APIError('booking_start_date min value: %s' % start_date_min, err_code='bad-request', http_status=400)
if end_date > end_date_max:
raise APIError('booking_end_date max value: %s' % end_date_max, err_code='bad-request', http_status=400)
# get known activities for this child, to have the ids
child_activities_info = self.get_child_activities(link.dui, reference_year, post_data['child_id'])
child_known_activities_by_type = {a['TYPEACTIVITE']: a for a in child_activities_info.get('ACTIVITE', [])}
# build activity list to post
activities_by_type = {}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if post_data['booking_list_%s' % activity_type] is None:
# exclude if None (not updated)
continue
if activity_type not in child_known_activities_by_type:
# exclude activity types not registered for the child
continue
activities_by_type[activity_type] = {
'IDACTIVITE': child_known_activities_by_type[activity_type]['IDACTIVITE'],
'ANNEEREFERENCE': str(reference_year),
'PERIODE': [],
}
def get_week_pattern(week_start_date, week_end_date, activity_type, activity_id):
week_pattern = '00000'
day_date = week_start_date
# cross days of the week to find bookings
while day_date <= week_end_date:
key = '{}:{}:{}:{}'.format(post_data['child_id'], activity_type, activity_id, day_date.strftime(utils.json_date_format))
if key in post_data['booking_list_%s' % activity_type]:
week_pattern = week_pattern[:day_date.weekday()] + '1' + week_pattern[day_date.weekday() + 1:]
day_date = day_date + datetime.timedelta(days=1)
return week_pattern
# find the first week from start_date
week_start_date, week_end_date = utils.get_week_dates_from_date(start_date)
# cross all weeks until the last week of the posted period
booking_dates = set()
while week_start_date <= end_date:
for activity_type, activity in activities_by_type.items():
real_start_date = max(start_date, week_start_date)
real_end_date = min(end_date, week_end_date)
booking_dates.add(real_start_date)
activity_id = activity['IDACTIVITE']
week_pattern = get_week_pattern(real_start_date, real_end_date, activity_type, activity_id)
activity['PERIODE'].append({
'DATEDEBUT': real_start_date.strftime(utils.json_date_format),
'DATEDFIN': real_end_date.strftime(utils.json_date_format),
'SEMAINETYPE': week_pattern,
})
week_start_date = week_start_date + datetime.timedelta(days=7)
week_end_date = week_end_date + datetime.timedelta(days=7)
# build data
data = {
'IDDUI': link.dui,
'DATEDEMANDE': today.strftime(utils.json_date_format),
'ENFANT': [
{
'IDPERSONNE': post_data['child_id'],
'REGIME': post_data.get('regime') or child_activities_info['REGIME'],
}
]
}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type in activities_by_type:
if 'ACTIVITE' not in data['ENFANT'][0]:
data['ENFANT'][0]['ACTIVITE'] = []
data['ENFANT'][0]['ACTIVITE'].append(activities_by_type[activity_type])
try:
result = schemas.reservation_annuelle(self, {'PORTAIL': {'DUI': data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
# invalidate caches
# invalidate get_children_activities cache
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, link.dui, reference_year)
cache.delete(cache_key)
for booking_date in sorted(booking_dates):
# invalidate get_booking_data cache for each week crossed
start_date, end_date = utils.get_week_dates_from_date(booking_date)
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (self.pk, link.dui, post_data['child_id'], start_date.isoformat())
cache.delete(cache_key)
return {
'updated': True,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
@endpoint(
display_category='CAN-CLA',
display_order=8,
description=_("CLAE/Cantine annual booking"),
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
post={
'request_body': {
'schema': {
'application/json': schemas.ANNUAL_BOOKING_SCHEMA,
}
}
})
def clae_booking_annual(self, request, NameID, post_data):
link = self.get_link(NameID)
# build dates of the period
today = datetime.date.today()
reference_year = utils.get_reference_year_from_date(today)
start_date = today + datetime.timedelta(days=8)
end_date = datetime.date(reference_year+1, 7, 31)
# get known activities for this child, to have the ids
child_activities_info = self.get_child_activities(link.dui, reference_year, post_data['child_id'])
child_known_activities_by_type = {a['TYPEACTIVITE']: a for a in child_activities_info.get('ACTIVITE', [])}
# build activity list to post
activities_by_type = {}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if post_data['booking_list_%s' % activity_type] is None:
# exclude if None (not updated)
continue
if activity_type not in child_known_activities_by_type:
# exclude activity types not registered for the child
continue
week_pattern = ''
activity_id = child_known_activities_by_type[activity_type]['IDACTIVITE']
# cross days of the week to find bookings
for i, day in enumerate(['monday', 'tuesday', 'wednesday', 'thursday', 'friday']):
key = '{}:{}:{}:{}'.format(post_data['child_id'], activity_type, activity_id, day)
week_pattern += key in post_data['booking_list_%s' % activity_type] and '1' or '0'
activities_by_type[activity_type] = {
'IDACTIVITE': activity_id,
'ANNEEREFERENCE': str(reference_year),
'PERIODE': [{
'DATEDEBUT': start_date.strftime(utils.json_date_format),
'DATEDFIN': end_date.strftime(utils.json_date_format),
'SEMAINETYPE': week_pattern,
}],
}
# build data
data = {
'IDDUI': link.dui,
'DATEDEMANDE': today.strftime(utils.json_date_format),
'ENFANT': [
{
'IDPERSONNE': post_data['child_id'],
'REGIME': post_data.get('regime') or child_activities_info['REGIME'],
}
]
}
for activity_type in ['MAT', 'MIDI', 'SOIR', 'GARD']:
if activity_type in activities_by_type:
if 'ACTIVITE' not in data['ENFANT'][0]:
data['ENFANT'][0]['ACTIVITE'] = []
data['ENFANT'][0]['ACTIVITE'].append(activities_by_type[activity_type])
try:
result = schemas.reservation_annuelle(self, {'PORTAIL': {'DUI': data}})
except AxelError as e:
raise APIError(
'Axel error: %s' % e,
err_code='error',
data={'xml_request': e.xml_request,
'xml_response': e.xml_response})
# invalidate cache
# invalidate get_children_activities cache
cache_key = 'toulouse-axel-%s-children-activities-%s-%s' % (self.pk, link.dui, reference_year)
cache.delete(cache_key)
booking_date = utils.get_week_dates_from_date(start_date)[0]
while booking_date <= end_date:
# invalidate get_booking_data cache for each monday from now to the end of the reference year
cache_key = 'toulouse-axel-%s-booking-data-%s-%s-%s' % (self.pk, link.dui, post_data['child_id'], booking_date.isoformat())
cache.delete(cache_key)
booking_date += datetime.timedelta(days=7)
return {
'updated': True,
'data': {
'xml_request': result.xml_request,
'xml_response': result.xml_response,
}
}
class Link(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
name_id = models.CharField(blank=False, max_length=256)
dui = models.CharField(blank=False, max_length=128)
person_id = models.CharField(blank=False, max_length=128)
class Meta:
unique_together = ('resource', 'name_id')
class Lock(models.Model):
resource = models.ForeignKey(ToulouseAxel, on_delete=models.CASCADE)
key = models.CharField(max_length=256)
lock_date = models.DateTimeField(auto_now_add=True)
locker = models.CharField(max_length=256, blank=True)
class Meta:
unique_together = ('resource', 'key')