ants-hub/src/ants_hub/api/views/ants.py

296 lines
9.9 KiB
Python

# ANTS-Hub - Copyright (C) Entr'ouvert
import collections
import datetime
import functools
import logging
import secrets
import sys
import time
import urllib.parse
import zoneinfo
from django.conf import settings
from django.db import connection
from django.http import JsonResponse
from django.utils.text import slugify
from ants_hub.data.models import (
Config,
HoraireList,
Lieu,
Plage,
RendezVous,
TypeDeRdv,
make_available_time_slots,
)
ANTS_TIMEZONE = zoneinfo.ZoneInfo('Europe/Paris')
logger = logging.getLogger('ants_hub.api.ants')
def format_date_ants(dt):
'''Format date as ANTS requests it, i.e. Paris local time with 'Z' suffix :/'''
return dt.astimezone(ANTS_TIMEZONE).isoformat().split('+')[0] + 'Z'
AUTH_TOKEN = None
AUTH_TOKEN_TIME = None
def authenticate(func):
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
global AUTH_TOKEN, AUTH_TOKEN_TIME # pylint: disable=global-statement
if not hasattr(request, 'user') or not request.user.is_authenticated:
header = request.headers.get('X-Hub-Rdv-Auth-Token', '')
if not header:
logger.warning('authentication failed, missing header X-HUB-RDV-AUTH-TOKEN')
return JsonResponse("Missing X-HUB-RDV-AUTH-TOKEN header", status=401, safe=False)
if AUTH_TOKEN_TIME and time.time() - AUTH_TOKEN_TIME < 60 and 'pytest' not in sys.modules:
auth_token = AUTH_TOKEN
else:
auth_token = Config.get(Config.REQUEST_FROM_ANTS_AUTH_TOKEN)
AUTH_TOKEN_TIME, AUTH_TOKEN = time.time(), auth_token
if not auth_token:
logger.error('authentication failed, REQUEST_FROM_ANTS_AUTH_TOKEN is not configured')
return JsonResponse("X-HUB-RDV-AUTH-TOKEN not configured", status=401, safe=False)
if not secrets.compare_digest(header, auth_token):
logger.warning('authentication failed, bad authentication token "%s"', header)
return JsonResponse("X-HUB-RDV-AUTH-TOKEN header invalid", status=401, safe=False)
return func(request, *args, **kwargs)
return wrapper
@authenticate
def get_managed_meeting_points(request):
lieux = []
for lieu in Lieu.objects.select_related('collectivite'):
website = lieu.url or lieu.collectivite.url
city_logo = lieu.logo_url or lieu.collectivite.logo_url
d = {
'id': str(lieu.id),
'name': lieu.nom,
'longitude': lieu.longitude,
'latitude': lieu.latitude,
'public_entry_address': lieu.numero_rue,
'zip_code': lieu.code_postal,
'city_name': lieu.ville,
'website': website,
'city_logo': city_logo,
}
lieux.append(d)
logger.info('get_managed_meeting_points returned %d meeting points', len(lieux))
return JsonResponse(lieux, safe=False)
def available_time_slots_parse_qs(request):
meeting_point_ids = request.GET.getlist('meeting_point_ids')
if not meeting_point_ids:
raise ValueError('missing meeting_point_ids')
try:
meeting_point_ids = list(map(int, map(str.strip, meeting_point_ids)))
except ValueError:
raise ValueError('invalid meeting_point_ids')
start_date = request.GET.get('start_date', '').strip()
if not start_date:
raise ValueError('missing start_date')
try:
start_date = datetime.date.fromisoformat(start_date)
except ValueError:
start_date = datetime.date.today()
if start_date < datetime.date.today():
start_date = datetime.date.today()
end_date = request.GET.get('end_date', '').strip()
if not end_date:
raise ValueError('missing end_date')
try:
end_date = datetime.date.fromisoformat(end_date)
except ValueError:
end_date = start_date + datetime.timedelta(days=7)
if end_date < start_date:
end_date = start_date + datetime.timedelta(days=7)
end_date = min(end_date, start_date + datetime.timedelta(days=180))
reason = request.GET.get('reason', 'CNI').strip()
try:
reason = TypeDeRdv.from_ants_name(reason)
except KeyError:
reason = TypeDeRdv.CNI
try:
documents_number = min(int(request.GET.get('documents_number', '').strip()), 4)
except (ValueError, TypeError):
documents_number = 1
return meeting_point_ids, start_date, end_date, reason, documents_number
USE_ORM = False
@authenticate
def available_time_slots(request):
try:
meeting_point_ids, start_date, end_date, reason, documents_number = available_time_slots_parse_qs(
request
)
except ValueError as e:
logger.warning('available_time_slots received a bad request %s', e)
return JsonResponse(
{
'detail': [
{
'loc': 'query-string',
'msg': str(e),
'type': 'bad request',
}
]
},
status=422,
)
types_de_rdv = [reason]
base_url = getattr(settings, 'BASE_URL', None)
if not base_url:
base_callback_url = request.build_absolute_uri('/')
if USE_ORM:
qs = Plage.objects.filter(lieu__id__in=meeting_point_ids)
qs = qs.filter(type_de_rdv__in=types_de_rdv)
qs = qs.filter(date__gte=start_date, date__lte=end_date)
qs = qs.filter(personnes=documents_number)
qs = qs.select_related('lieu', 'lieu__collectivite')
qs = qs.order_by('date').values_list(
'date',
'horaires',
'duree',
'lieu__pk',
'lieu__nom',
'lieu__collectivite__pk',
'lieu__collectivite__nom',
)
else:
with connection.cursor() as cur:
cur.execute(
'''\
SELECT plage.date, plage.horaires, plage.duree, lieu.id, lieu.nom, collectivite.id, collectivite.nom
FROM
ants_hub_plage AS plage,
ants_hub_lieu AS lieu,
ants_hub_collectivite AS collectivite
WHERE
plage.lieu_id = lieu.id
AND
lieu.collectivite_id = collectivite.id
AND
plage.type_de_rdv IN (%s)
AND
plage.date >= %%s
AND
plage.date <= %%s
AND
plage.personnes = %%s'''
% ','.join(['%s'] * len(types_de_rdv)),
[*types_de_rdv, start_date, end_date, documents_number],
)
def generator():
for (
date,
horaires,
duree,
lieu_pk,
lieu_nom,
collectivite_pk,
collectivite_nom,
) in cur.fetchall():
horaires = HoraireList.from_str(horaires)
yield date, horaires, duree, lieu_pk, lieu_nom, collectivite_pk, collectivite_nom
qs = list(generator())
slots_by_lieu = collections.defaultdict(list)
date_by_lieu = collections.defaultdict(set)
slot_count = 0
for date, horaires, duree, lieu_pk, lieu_nom, collectivite_pk, collectivite_nom in qs:
lieu_slug = slugify(lieu_nom)
collectivite_slug = slugify(collectivite_nom)
for date, url in make_available_time_slots(
date,
horaires,
duree,
lieu_pk=lieu_pk,
lieu_slug=lieu_slug,
collectivite_pk=collectivite_pk,
collectivite_slug=collectivite_slug,
):
if not base_url:
callback_url = urllib.parse.urljoin(base_callback_url, url)
else:
callback_url = base_url + url
if date not in date_by_lieu[lieu_pk]:
# prevent the same datetime to be reported multiple times
date_by_lieu[lieu_pk].add(date)
slots_by_lieu[lieu_pk].append(
{
'datetime': format_date_ants(date),
'callback_url': callback_url,
}
)
slot_count += 1
logger.info(
'available_time_slots returned %d slots (ids=%s start=%s end=%s reason=%s persons=%s)',
slot_count,
','.join(map(str, meeting_point_ids)),
start_date,
end_date,
reason.name,
documents_number,
)
return JsonResponse(slots_by_lieu)
@authenticate
def search_application_ids(request):
application_ids = map(str.strip, request.GET.getlist('application_ids'))
application_ids = filter(None, application_ids)
application_ids = list(filter(lambda x: len(x) < 64, application_ids))
if not application_ids:
logger.warning('search_application_ids received a bad request "missing application_ids"')
return JsonResponse(
{
'detail': [
{
'loc': 'query-string',
'msg': 'missing application_ids',
'type': 'bad request',
}
]
},
status=422,
)
rdv_by_identifiant_predemande = {}
qs = RendezVous.objects.filter(identifiant_predemande__in=application_ids)
qs = qs.select_related('lieu', 'lieu__collectivite')
rdv_count = 0
for rdv in qs:
rdv_by_identifiant_predemande.setdefault(rdv.identifiant_predemande, []).append(
{
'meeting_point': rdv.lieu.nom,
'datetime': format_date_ants(rdv.date),
'management_url': request.build_absolute_uri(rdv.make_gestion_url()),
'cancel_url': request.build_absolute_uri(rdv.make_annulation_url()),
}
)
rdv_count += 1
logger.info(
'search_application_ids returned %d meetings (application_ids=%s)',
rdv_count,
','.join(application_ids),
)
return JsonResponse(rdv_by_identifiant_predemande)