240 lines
8.5 KiB
Python
240 lines
8.5 KiB
Python
# ANTS-Hub - Copyright (C) Entr'ouvert
|
|
|
|
import collections
|
|
import datetime
|
|
import functools
|
|
import logging
|
|
import secrets
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import zoneinfo
|
|
|
|
from django.conf import settings
|
|
from django.http import JsonResponse
|
|
from django.utils.text import slugify
|
|
|
|
from ants_hub.data.models import Config, Lieu, Plage, RendezVous, TypeDeRdv, make_available_time_slots
|
|
|
|
ANTS_TIMEZONE = zoneinfo.ZoneInfo('Europe/Paris')
|
|
|
|
logger = logging.getLogger('ants_hub.api.ants')
|
|
|
|
|
|
def format_date_ants(dt):
|
|
'''Format date as ANTS requests it, i.e. Paris local time with 'Z' suffix :/'''
|
|
return dt.astimezone(ANTS_TIMEZONE).isoformat().split('+')[0] + 'Z'
|
|
|
|
|
|
AUTH_TOKEN = None
|
|
AUTH_TOKEN_TIME = None
|
|
|
|
|
|
def authenticate(func):
|
|
@functools.wraps(func)
|
|
def wrapper(request, *args, **kwargs):
|
|
global AUTH_TOKEN, AUTH_TOKEN_TIME # pylint: disable=global-statement
|
|
|
|
if not request.user.is_authenticated:
|
|
header = request.headers.get('X-Hub-Rdv-Auth-Token', '')
|
|
if not header:
|
|
logger.warning('authentication failed, missing header X-HUB-RDV-AUTH-TOKEN')
|
|
return JsonResponse("Missing X-HUB-RDV-AUTH-TOKEN header", status=401, safe=False)
|
|
if AUTH_TOKEN_TIME and time.time() - AUTH_TOKEN_TIME < 60 and 'pytest' not in sys.modules:
|
|
auth_token = AUTH_TOKEN
|
|
else:
|
|
auth_token = Config.get(Config.REQUEST_FROM_ANTS_AUTH_TOKEN)
|
|
AUTH_TOKEN_TIME, AUTH_TOKEN = time.time(), auth_token
|
|
if not auth_token:
|
|
logger.error('authentication failed, REQUEST_FROM_ANTS_AUTH_TOKEN is not configured')
|
|
return JsonResponse("X-HUB-RDV-AUTH-TOKEN not configured", status=401, safe=False)
|
|
if not secrets.compare_digest(header, auth_token):
|
|
logger.warning('authentication failed, bad authentication token "%s"', header)
|
|
return JsonResponse("X-HUB-RDV-AUTH-TOKEN header invalid", status=401, safe=False)
|
|
return func(request, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
@authenticate
|
|
def get_managed_meeting_points(request):
|
|
lieux = []
|
|
for lieu in Lieu.objects.select_related('collectivite'):
|
|
website = lieu.url or lieu.collectivite.url
|
|
city_logo = lieu.logo_url or lieu.collectivite.logo_url
|
|
d = {
|
|
'id': str(lieu.id),
|
|
'name': lieu.nom,
|
|
'longitude': lieu.longitude,
|
|
'latitude': lieu.latitude,
|
|
'public_entry_address': lieu.numero_rue,
|
|
'zip_code': lieu.code_postal,
|
|
'city_name': lieu.ville,
|
|
'website': website,
|
|
'city_logo': city_logo,
|
|
}
|
|
lieux.append(d)
|
|
logger.info('get_managed_meeting_points returned %d meeting points', len(lieux))
|
|
return JsonResponse(lieux, safe=False)
|
|
|
|
|
|
def available_time_slots_parse_qs(request):
|
|
meeting_point_ids = request.GET.getlist('meeting_point_ids')
|
|
if not meeting_point_ids:
|
|
raise ValueError('missing meeting_point_ids')
|
|
try:
|
|
meeting_point_ids = list(map(int, map(str.strip, meeting_point_ids)))
|
|
except ValueError:
|
|
raise ValueError('invalid meeting_point_ids')
|
|
start_date = request.GET.get('start_date', '').strip()
|
|
if not start_date:
|
|
raise ValueError('missing start_date')
|
|
try:
|
|
start_date = datetime.date.fromisoformat(start_date)
|
|
except ValueError:
|
|
start_date = datetime.date.today()
|
|
if start_date < datetime.date.today():
|
|
start_date = datetime.date.today()
|
|
end_date = request.GET.get('end_date', '').strip()
|
|
if not end_date:
|
|
raise ValueError('missing end_date')
|
|
try:
|
|
end_date = datetime.date.fromisoformat(end_date)
|
|
except ValueError:
|
|
end_date = start_date + datetime.timedelta(days=7)
|
|
if end_date < start_date:
|
|
end_date = start_date + datetime.timedelta(days=7)
|
|
end_date = min(end_date, start_date + datetime.timedelta(days=180))
|
|
reason = request.GET.get('reason', 'CNI').strip()
|
|
try:
|
|
reason = TypeDeRdv.from_ants_name(reason)
|
|
except KeyError:
|
|
reason = TypeDeRdv.CNI
|
|
try:
|
|
documents_number = min(int(request.GET.get('documents_number', '').strip()), 4)
|
|
except (ValueError, TypeError):
|
|
documents_number = 1
|
|
return meeting_point_ids, start_date, end_date, reason, documents_number
|
|
|
|
|
|
@authenticate
|
|
def available_time_slots(request):
|
|
try:
|
|
meeting_point_ids, start_date, end_date, reason, documents_number = available_time_slots_parse_qs(
|
|
request
|
|
)
|
|
except ValueError as e:
|
|
logger.warning('available_time_slots received a bad request %s', e)
|
|
return JsonResponse(
|
|
{
|
|
'detail': [
|
|
{
|
|
'loc': 'query-string',
|
|
'msg': str(e),
|
|
'type': 'bad request',
|
|
}
|
|
]
|
|
},
|
|
status=422,
|
|
)
|
|
types_de_rdv = [reason]
|
|
qs = Plage.objects.filter(lieu__id__in=meeting_point_ids)
|
|
qs = qs.filter(type_de_rdv__in=types_de_rdv)
|
|
qs = qs.filter(date__gte=start_date, date__lte=end_date)
|
|
qs = qs.filter(personnes=documents_number)
|
|
qs = qs.select_related('lieu', 'lieu__collectivite')
|
|
|
|
slots_by_lieu = collections.defaultdict(list)
|
|
date_by_lieu = collections.defaultdict(set)
|
|
slot_count = 0
|
|
|
|
qs = qs.order_by('date').values_list(
|
|
'date',
|
|
'horaires',
|
|
'duree',
|
|
'lieu__pk',
|
|
'lieu__nom',
|
|
'lieu__collectivite__pk',
|
|
'lieu__collectivite__nom',
|
|
)
|
|
base_url = getattr(settings, 'BASE_URL', None)
|
|
if not base_url:
|
|
base_callback_url = request.build_absolute_uri('/')
|
|
for date, horaires, duree, lieu_pk, lieu_nom, collectivite_pk, collectivite_nom in qs:
|
|
lieu_slug = slugify(lieu_nom)
|
|
collectivite_slug = slugify(collectivite_nom)
|
|
for date, url in make_available_time_slots(
|
|
date,
|
|
horaires,
|
|
duree,
|
|
lieu_pk=lieu_pk,
|
|
lieu_slug=lieu_slug,
|
|
collectivite_pk=collectivite_pk,
|
|
collectivite_slug=collectivite_slug,
|
|
):
|
|
if not base_url:
|
|
callback_url = urllib.parse.urljoin(base_callback_url, url)
|
|
else:
|
|
callback_url = base_url + url
|
|
if date not in date_by_lieu[lieu_pk]:
|
|
# prevent the same datetime to be reported multiple times
|
|
date_by_lieu[lieu_pk].add(date)
|
|
slots_by_lieu[lieu_pk].append(
|
|
{
|
|
'datetime': format_date_ants(date),
|
|
'callback_url': callback_url,
|
|
}
|
|
)
|
|
slot_count += 1
|
|
logger.info(
|
|
'available_time_slots returned %d slots (ids=%s start=%s end=%s reason=%s persons=%s)',
|
|
slot_count,
|
|
','.join(map(str, meeting_point_ids)),
|
|
start_date,
|
|
end_date,
|
|
reason.name,
|
|
documents_number,
|
|
)
|
|
return JsonResponse(slots_by_lieu)
|
|
|
|
|
|
@authenticate
|
|
def search_application_ids(request):
|
|
application_ids = map(str.strip, request.GET.getlist('application_ids'))
|
|
application_ids = filter(None, application_ids)
|
|
application_ids = list(filter(lambda x: len(x) < 64, application_ids))
|
|
if not application_ids:
|
|
logger.warning('search_application_ids received a bad request "missing application_ids"')
|
|
return JsonResponse(
|
|
{
|
|
'detail': [
|
|
{
|
|
'loc': 'query-string',
|
|
'msg': 'missing application_ids',
|
|
'type': 'bad request',
|
|
}
|
|
]
|
|
},
|
|
status=422,
|
|
)
|
|
rdv_by_identifiant_predemande = {}
|
|
qs = RendezVous.objects.filter(identifiant_predemande__in=application_ids)
|
|
qs = qs.select_related('lieu', 'lieu__collectivite')
|
|
rdv_count = 0
|
|
for rdv in qs:
|
|
rdv_by_identifiant_predemande.setdefault(rdv.identifiant_predemande, []).append(
|
|
{
|
|
'meeting_point': rdv.lieu.nom,
|
|
'datetime': format_date_ants(rdv.date),
|
|
'management_url': request.build_absolute_uri(rdv.make_gestion_url()),
|
|
'cancel_url': request.build_absolute_uri(rdv.make_annulation_url()),
|
|
}
|
|
)
|
|
rdv_count += 1
|
|
logger.info(
|
|
'search_application_ids returned %d meetings (application_ids=%s)',
|
|
rdv_count,
|
|
','.join(application_ids),
|
|
)
|
|
return JsonResponse(rdv_by_identifiant_predemande)
|