passerelle/passerelle/apps/maelis/utils.py

353 lines
14 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from copy import copy, deepcopy
from datetime import datetime
from dateutil.relativedelta import relativedelta
from django.utils import timezone
from django.utils.dateparse import parse_date
from passerelle.utils.jsonresponse import APIError
DATE_FORMAT = '%Y-%m-%d'
TIME_FORMAT = '%H:%M:%S'
DATETIME_FORMAT = DATE_FORMAT + ' ' + TIME_FORMAT
COMPONENTS = {
'PART01': {
'text': 'Matinée',
'time': '08:00:00',
},
'PART02': {
'text': 'Repas',
'time': '12:00:00',
},
'PART03': {
'text': 'Après-midi',
'time': '14:00:00',
},
}
COMPOSED_UNITS = {
'A10003132034': { # JOURNEE
'virtual_unit': 'EO0001',
'components': ['PART01', 'PART02', 'PART03'],
},
'A10003132036': { # MATIN
'virtual_unit': 'EO0001',
'components': ['PART01'],
},
'A10003132038': { # MATIN ET REPAS
'virtual_unit': 'EO0001',
'components': ['PART01', 'PART02'],
},
'A10003132040': { # APRES MIDI
'virtual_unit': 'EO0001',
'components': ['PART03'],
},
}
def normalize_invoice(invoice):
data = {
'id': '%s-%s' % (invoice.numFamily, invoice.numInvoice),
'display_id': str(invoice.numInvoice),
'label': invoice.TTFInfo.libelle,
'created': invoice.dateInvoice.strftime(DATETIME_FORMAT),
'amount': invoice.amountInvoice - invoice.amountPaid,
'paid': invoice.amountInvoice == invoice.amountPaid,
'total_amount': invoice.amountInvoice,
'pay_limit_date': invoice.dateDeadline.strftime(DATETIME_FORMAT),
'has_pdf': bool(invoice.pdfName),
'amount_paid': invoice.amountPaid,
}
if invoice.amountInvoice == invoice.amountPaid:
data.update({'amount': 0, 'pay_limit_date': '', 'online_payment': False})
return data
def normalize_activity(activity):
activity['id'] = activity['activityPortail']['idAct']
activity['text'] = activity['activityPortail']['label']
return activity
def normalize_person(person):
person['id'] = person['num']
person['text'] = '{} {}'.format(person['firstname'], person['lastname']).strip()
return person
def get_school_year(date=None):
if not date:
date = timezone.now().date()
if date.strftime('%m-%d') >= '07-31':
return date.year
else:
return date.year - 1
def week_boundaries_datetimes(date_string=None):
"""Return start and end of the week including the provided date,
or the current week if no date is provided."""
if date_string:
date = parse_date(date_string)
else:
date = timezone.now().date()
week_date_string = date.strftime('%Y-W%W')
monday_datetime = timezone.make_aware(datetime.strptime(week_date_string + '-1', "%Y-W%W-%w"))
sunday_datetime = timezone.make_aware(datetime.strptime(week_date_string + '-0', "%Y-W%W-%w"))
return monday_datetime, sunday_datetime
def get_datetime(date_string):
return datetime.combine(parse_date(date_string), datetime.min.time())
def month_range(start_datetime, end_datetime):
"""Generate first days of month for the provided date range."""
if end_datetime < start_datetime:
return
date_time = start_datetime.replace(day=1)
while date_time.strftime('%Y-%m') <= end_datetime.strftime('%Y-%m'):
yield date_time
date_time = date_time + relativedelta(months=1)
def get_events(activity, start_datetime, end_datetime):
"""Generate events from activity's open days
the events looks like the chrono ones : /api/agenda/agenda-evenement/datetimes/
(https://doc-publik.entrouvert.com/dev/api-chrono/#exemple)"""
activity_id = activity['activityPortail']['idAct']
for unit in activity['unitPortailList']:
unit_id = unit['idUnit']
for date_time in activity['openDayList']:
# readActivityList may return more days than requested
if not start_datetime <= date_time <= end_datetime:
continue
date_string = date_time.strftime(DATE_FORMAT)
time_string = date_time.strftime(TIME_FORMAT)
event_id = '%s-%s-%s' % (date_string, activity_id, unit_id)
yield event_id, {
'id': event_id,
'category_id': activity_id,
'slot_id': unit_id,
'datetime': '%s %s' % (date_string, time_string),
'category': activity['activityPortail']['label'],
'text': unit['label'],
'user_booking_status': 'not-booked',
}
def book_event(events, schedule, start_date, end_date):
"""Book event matching the provided schedule."""
activity_id = schedule['unit']['idActivity']
unit_id = schedule['unit']['id']
for day in schedule['listDays']:
if not start_date <= day['datePlanning'] <= end_date:
continue
date_string = day['datePlanning'].strftime(DATE_FORMAT)
event_id = '%s-%s-%s' % (date_string, activity_id, unit_id)
# database may be corrupted by using updateScheduleCalendars
try:
event = events[event_id]
except KeyError:
raise APIError('The planning returns an unknow day on activities: %s' % day['datePlanning'])
event['user_booking_status'] = 'booked'
def decompose_event(event):
"""Break down 'JOURNEE', 'MATIN', 'MATIN ET REPAS' and APRES MIDI' units
into 'Matin', 'Repas' and 'Après-midi' virtual units."""
if event['slot_id'] not in COMPOSED_UNITS:
yield event
return
date_time = datetime.strptime(event['datetime'], DATETIME_FORMAT)
date_string = date_time.strftime(DATE_FORMAT)
composition = COMPOSED_UNITS[event['slot_id']]
for component_id in composition['components']:
component = COMPONENTS[component_id]
new_event = copy(event)
new_event['datetime'] = '%s %s' % (date_string, component['time'])
new_event['slot_id'] = "%s%s" % (composition['virtual_unit'], component_id)
new_event['id'] = '%s-%s-%s' % (date_string, event['category_id'], new_event['slot_id'])
new_event['text'] = component['text']
yield new_event
def flatten_activities(activities, start_date, end_date):
regex = re.compile('^([0-9] ){0,1}2[0-9]{3}-2[0-9]{3} ') # prefix to remove
data = {}
for activity in activities:
if activity.get('openDayList'):
del activity['openDayList']
activity_id = activity['activityPortail']['idAct']
activity_text = activity_text_legacy = activity['activityPortail']['label']
match = regex.match(activity_text)
if match:
activity_text = activity_text[match.end() :].strip()
activity_text = activity_text.capitalize()
activity_obj = deepcopy(activity)
del activity_obj['unitPortailList']
if not activity_obj['activityPortail']['activityType']:
activity_obj['activityPortail']['activityType'] = {
"code": "?",
"libelle": "Inconnu",
"natureSpec": {"code": "?", "libelle": "Inconnu"},
}
# compute weekly planning mask parameter to use for subscribing
planning_masks = []
for year in range(start_date.year, end_date.year + 1):
for item in activity['activityPortail']['weeklyCalendarActivityList']:
if item['yearCalendar'] == year:
planning_masks.append(item['weeklyCalendarStr'])
break
if planning_masks:
activity_weekly_planning_mask = ""
for letters in zip(*planning_masks):
if '1' in letters:
activity_weekly_planning_mask += '1'
else:
activity_weekly_planning_mask += '0'
else:
activity_weekly_planning_mask = "0000000"
bus_unit_ids = []
bus_activity_id = None
if activity['activityPortail'].get('activityBusList'):
bus_activity_id = activity['activityPortail']['activityBusList'][0]['activity']['id']
for bus_unit in activity['activityPortail']['activityBusList'][0]['unitList']:
bus_unit_ids.append(bus_unit['idUnit'])
activity_info = {
'activity_id': activity_id,
'activity_text': activity_text,
'activity_object': activity_obj,
'activity_weekly_planning_mask': activity_weekly_planning_mask,
'bus_activity_id': bus_activity_id,
'bus_unit_ids': bus_unit_ids,
}
units = {}
for unit in activity['unitPortailList']:
unit_id = unit['idUnit']
unit_text = unit_text_legacy = unit['label']
match = regex.match(unit_text)
if match:
unit_text = unit_text[match.end() :].strip()
unit_text = unit_text.capitalize()
text_first_part = activity_text
if activity_text != unit_text:
text_first_part += ' / ' + unit_text
unit_obj = deepcopy(unit)
del unit_obj['placeList']
# compute subscribing parameters
unit_start_date = unit['dateStart']
unit_end_date = unit['dateEnd'] or datetime.combine(end_date, datetime.min.time())
unit_calendar_letter = unit['calendarLetter']
unit_weekly_planning = ""
for letter in activity_weekly_planning_mask:
if letter == '0':
unit_weekly_planning += unit_calendar_letter
else:
unit_weekly_planning += '1'
unit_info = {
'unit_id': unit_id,
'unit_text': unit_text,
'text_first_part': text_first_part,
'unit_object': unit_obj,
'unit_start_date': unit_start_date,
'unit_end_date': unit_end_date,
'unit_calendar_letter': unit_calendar_letter,
'unit_weekly_planning': unit_weekly_planning,
}
places = {}
for place in unit['placeList']:
place_id = place['id']
place_text_legacy = place['lib']
place_text = ' '.join([w.capitalize() for w in place_text_legacy.split(' ')])
places[place_text_legacy] = {
'id': '%s-%s-%s' % (activity_id, unit_id, place_id),
'text_legacy': '%s / %s / %s'
% (activity_text_legacy, unit_text_legacy, place_text_legacy),
'text_first_part': text_first_part,
'text': "%s / %s" % (text_first_part, place_text),
'activity_id': activity_id,
'unit_id': unit_id,
'place_id': place_id,
'activity_text': activity_text,
'unit_text': unit_text,
'place_text': place_text,
'activity_weekly_planning_mask': activity_weekly_planning_mask,
'subscribe_start_date': unit_start_date,
'subscribe_end_date': unit_end_date,
'unit_calendar_letter': unit_calendar_letter,
'unit_weekly_planning': unit_weekly_planning,
'activity_object': activity_obj,
'unit_object': unit_obj,
'place_object': place,
}
units[unit_id] = {'info': unit_info, 'places': places}
data[activity_id] = {'info': activity_info, 'units': units}
return data
def mark_subscribed_flatted_activities(flatted_activities, child_info):
for child_activity in child_info['subscribeActivityList']:
activity = flatted_activities[child_activity['idActivity']]
for child_unit in child_activity['subscribesUnit']:
unit = activity['units'].get(child_unit['idUnit'])
if not unit: # generic unit for bus line is not described on the catalog
continue
place = unit['places'][child_activity['place']]
place['user_subscribing_status'] = 'subscribed'
place['unsubscribe_start_date'] = child_unit['dateStart']
def flatted_activities_as_list(flatted_activities, publication_filter, subscribe_filter, query_date):
data = []
for activity in [a[1] for a in sorted(flatted_activities.items())]:
for unit in [u[1] for u in sorted(activity['units'].items())]:
unit_info = unit['info']
if unit_info['unit_object']['subscribePublication'] not in publication_filter:
continue
if query_date > unit_info['unit_end_date'].date():
continue
is_unit_subscribed = False
for place in unit['places'].values():
if place.get('user_subscribing_status'):
is_unit_subscribed = True
else:
place['user_subscribing_status'] = 'not-subscribed'
if subscribe_filter == 'not-subscribed' and is_unit_subscribed:
continue
for place in sorted(unit['places'].values(), key=lambda p: p['place_id']):
if not subscribe_filter or place['user_subscribing_status'] == subscribe_filter:
data.append(place)
return data