combo-plugin-gnm/combo_plugin_gnm/templatetags/gnm.py

750 lines
29 KiB
Python

# -*- coding: utf-8 -*-
# combo-plugin-gnm - Combo GNM plugin
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import datetime
import json
import math
import operator
import random
import re
from dateutil.parser import parse as dateutil_parse
from pyproj import Geod
from requests import RequestException
from django import template
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.core import signing
from django.utils import six
from django.utils.dateparse import parse_datetime
from django.utils.http import quote
from django.utils.html import format_html
from django.utils.text import slugify
from django.utils.timezone import now, is_naive, make_aware
from django.utils.safestring import mark_safe
from combo.apps.dashboard.models import DashboardCell, Tile
from combo.apps.maps.models import Map, MapLayer
from combo.data.models import Page, CellBase, ConfigJsonCell
from combo.public.views import render_cell
from combo.utils import requests
register = template.Library()
FR_WEEKDAYS = ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche']
EN_ABBREV_WEEKDAYS = OrderedDict([
('mo', 'Monday'),
('tu', 'Tuesday'),
('we', 'Wednesday'),
('th', 'Thursday'),
('fr', 'Friday'),
('sa', 'Saturday'),
('su', 'Sunday')
])
EN_ABBREV_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.keys())
EN_FULL_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.values())
FR_ABBREV_WEEKDAYS_LIST = OrderedDict(zip(EN_ABBREV_WEEKDAYS_LIST, FR_WEEKDAYS))
class TimeSlot(object):
def __init__(self, start, end):
if is_naive(start):
start = make_aware(start)
if is_naive(end):
end = make_aware(end)
self.start = start
self.end = end
def __repr__(self):
return '<TimeSlot start=%s - end=%s>' % (self.start.strftime('%c'), self.end.strftime('%c'))
def get_open_close_from_specification(specification, valid_from, base_datetime):
'''Parse geojson 'openinghoursspecification' fields data
'''
opening_time = datetime.datetime.combine(base_datetime, dateutil_parse(specification['opens']).time())
closing_time = datetime.datetime.combine(base_datetime, dateutil_parse(specification['closes']).time())
opening_time = opening_time.replace(tzinfo=valid_from.tzinfo)
closing_time = closing_time.replace(tzinfo=valid_from.tzinfo)
day_number = EN_FULL_WEEKDAYS_LIST.index(specification['dayOfWeek'].split('/')[-1])
opening_time = opening_time + datetime.timedelta(
days=(7 + (day_number - opening_time.weekday())) % 7)
closing_time = closing_time + datetime.timedelta(
days=(7 + (day_number - closing_time.weekday())) % 7)
return (opening_time, closing_time, day_number)
def openinghours_to_datetime(codename, hour, minute):
"""
return the next date and time after now()
"""
try:
weekday = EN_ABBREV_WEEKDAYS.get(codename, None)
datetime_obj = dateutil_parse('%s %d:%d:00' % (weekday, hour, minute))
if is_naive(datetime_obj):
datetime_obj = make_aware(datetime_obj)
return datetime_obj
except ValueError:
return None
def get_period_from_data(weekday, open_close_time_string=None, opening_time=None, closing_time=None):
"""Return am or pm and all_day_hours from opening_time and closing_time
"""
if open_close_time_string is not None:
(start_hour, start_minute, end_hour, end_minute) = open_close_time_string
if closing_time is None:
closing_time = openinghours_to_datetime(weekday, int(end_hour), int(end_minute))
if opening_time is None:
opening_time = openinghours_to_datetime(weekday, int(start_hour), int(start_minute))
all_day_hours = False
if closing_time.hour <= 12:
period = 'am'
elif opening_time.hour <= 12:
period = 'am'
all_day_hours = True
else:
period = 'pm'
return (period, all_day_hours)
def get_slots_from_mdr_format(data, today):
"""Process data from Maison du rhone geojson data from data.grandlyon.fr
(/ws/grandlyon/ter_territoire.maison_du_rhone/all.json)
add to slots all the next opening hours in chronological order & beginning from today()
"""
if 'properties' in data:
data = data['properties']
slots = []
known_format = False
mdr_weekdays_format = ['%s_am' % day for day in FR_WEEKDAYS] + ['%s_pm' % day for day in FR_WEEKDAYS]
if any([re.search('|'.join(mdr_weekdays_format), data_key) is not None for data_key in data.keys()]):
known_format = True
for i in range(7):
for period in ('am', 'pm'):
hours = data.get('%s_%s' % (FR_WEEKDAYS[today.weekday()], period))
if not hours:
continue
try:
parts = re.match('(\d?\d)h(\d\d)-(\d?\d)h(\d\d)', hours).groups()
except AttributeError:
continue
# add to slots the opening hours in chronological order beginning from today
slots.append(TimeSlot(
datetime.datetime(today.year, today.month, today.day, int(parts[0]), int(parts[1]), tzinfo=today.tzinfo),
datetime.datetime(today.year, today.month, today.day, int(parts[2]), int(parts[3]), tzinfo=today.tzinfo)
))
today = today + datetime.timedelta(days=1)
return (slots, known_format)
def parse_opening_hours_data(mairie_data):
"""Parse every known openinghours data formats
"""
for openinghours in mairie_data.get('openinghours', []):
# format is comma-separated days and/or intervals, or only one day
try:
groups = re.match('(\w\w(?:(?:,|\-)?\w\w)*) (\d\d?):(\d\d?)-(\d\d?):(\d\d?)', openinghours).groups()
except AttributeError: # invalid input data
continue
for day in groups[0].split(','):
if '-' in day:
# interval
parts = re.match('(\w\w)-(\w\w)', day).groups() + groups[1:]
time_table = dict(zip(('start_day', 'end_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'), parts))
days_list = EN_ABBREV_WEEKDAYS_LIST[
EN_ABBREV_WEEKDAYS_LIST.index(time_table['start_day'].lower()):
EN_ABBREV_WEEKDAYS_LIST.index(time_table['end_day'].lower()) + 1]
else:
# one day
time_table = dict(zip(('start_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'), (day,) + groups[1:]))
days_list = [EN_ABBREV_WEEKDAYS_LIST[
EN_ABBREV_WEEKDAYS_LIST.index(time_table['start_day'].lower())]]
yield (days_list, time_table)
def get_slots_from_mairie_format(data, base_datetime):
"""Process mairie json and return slots the opening hours in chronological order beginning today
"""
if 'properties' in data:
data = data['properties']
known_format = False
slots = []
exclusion_slots = []
previous_week = base_datetime - datetime.timedelta(7)
next_week = base_datetime + datetime.timedelta(7)
if len(data.get('openinghours', [])) or len(data.get('openinghoursspecification', [])):
known_format = True
# prepare annual opening exclusions
for specification in data.get('openinghoursspecification', []):
valid_from = parse_datetime(specification.get('validFrom')) if specification.get('validFrom') else previous_week
valid_through = parse_datetime(specification.get('validThrough')) if specification.get('validThrough') else next_week
if not valid_from or not valid_through:
continue
if specification.get('opens') and specification.get('closes'):
# case when opening periods are defined
if base_datetime >= valid_from and base_datetime < valid_through:
(opening_time, closing_time, day_number) = get_open_close_from_specification(
specification, valid_from, base_datetime)
slots.append(TimeSlot(opening_time, closing_time))
else:
# case when exclusions are defined
exclusion_slots.append(TimeSlot(valid_from, valid_through))
for days_list, time_table in parse_opening_hours_data(data):
for weekday in days_list:
timeslot = TimeSlot(
openinghours_to_datetime(weekday, int(time_table['start_hour']), int(time_table['start_minute'])),
openinghours_to_datetime(weekday, int(time_table['end_hour']), int(time_table['end_minute']))
)
# add to slots the opening hours in chronological order beginning from today
slots.append(timeslot)
# order slots and cycle the list beginning with 'base_datetime'
slots = sorted(slots, key=operator.attrgetter('start'))
if len(slots):
def timedelta_key_func(slot):
return slot.start - base_datetime
nearest_slot_index = slots.index(min(slots, key=timedelta_key_func))
slots = slots[nearest_slot_index:] + slots[:nearest_slot_index]
return (slots, exclusion_slots, known_format)
@register.assignment_tag
def get_mairie_opening_hours(mairie_data):
"""Process Mairie Geojson to extract data of each day's opening hours
"""
if not mairie_data:
return ''
if 'properties' in mairie_data:
mairie_data = mairie_data['properties']
base_datetime = now()
days_list = []
opening_hours_dict = OrderedDict(zip(EN_ABBREV_WEEKDAYS_LIST, [{
'am': None, 'pm': None
} for i in range(7)]))
known_format = False
for days_list, time_table in parse_opening_hours_data(mairie_data):
known_format = True
for weekday in days_list:
(period, all_day_hours) = get_period_from_data(weekday,
open_close_time_string=(time_table['start_hour'], time_table['start_minute'], time_table['end_hour'], time_table['end_minute']))
if all_day_hours and period == 'am':
opening_hours_dict[weekday]['pm'] = '' # empty string to avoid displaying fermé
opening_hours_dict[weekday][period] = "%sh%s-%sh%s" % (time_table['start_hour'], time_table['start_minute'], time_table['end_hour'], time_table['end_minute'])
if not known_format:
# some mairie only have openinghoursspecification (e.g. Jonage)
previous_week = base_datetime - datetime.timedelta(7)
next_week = base_datetime + datetime.timedelta(7)
for specification in mairie_data.get('openinghoursspecification', []):
valid_from = parse_datetime(specification.get('validFrom')) if specification.get('validFrom') else previous_week
valid_through = parse_datetime(specification.get('validThrough')) if specification.get('validThrough') else next_week
if not valid_from or not valid_through:
continue
# case when opening periods are defined
if 'opens' in specification and 'closes' in specification:
# parse specification only for the current period relative to utcnow()
if base_datetime >= valid_from and base_datetime < valid_through:
(opening_time, closing_time, day_number) = get_open_close_from_specification(
specification, valid_from, base_datetime)
abbr_day_of_week = EN_ABBREV_WEEKDAYS_LIST[day_number]
(period, all_day_hours) = get_period_from_data(abbr_day_of_week,
opening_time=opening_time, closing_time=closing_time)
if all_day_hours and period == 'am':
opening_hours_dict[abbr_day_of_week]['pm'] = '' # empty string to avoid displaying fermé
opening_hours_dict[abbr_day_of_week][period] = "%sh%s-%sh%s" % (
opening_time.strftime('%H'), opening_time.strftime('%M'),
closing_time.strftime('%H'), closing_time.strftime('%M'))
if not (any([x['am'] for x in opening_hours_dict.values()]) or
any([x['pm'] for x in opening_hours_dict.values()])):
# always closed, return None to mark unavailability
return None
return [
(FR_ABBREV_WEEKDAYS_LIST[weekday], hours) for weekday, hours in opening_hours_dict.items()
if not (weekday in ['sa', 'su'] and not hours['am'] and not hours['pm'])
]
@register.filter
def as_opening_hours_badge(data):
if not data:
return ''
base_datetime = now()
exclusion_slots = []
today = base_datetime.date()
(slots, known_format) = get_slots_from_mdr_format(data, base_datetime)
if not known_format:
(slots, exclusion_slots, known_format) = get_slots_from_mairie_format(data, base_datetime)
if not known_format:
return ''
# remove past slots and exclude special timeslots
for i, slot in enumerate(slots):
if base_datetime > slot.end:
slots[i] = None
else:
for exclusion in exclusion_slots:
if slot.start >= exclusion.start and slot.end <= exclusion.end:
slots[i] = None
# parse slots to return the right html
slots = [x for x in slots if x]
if not slots:
klass = 'closed'
label = u'Fermé'
elif base_datetime < slots[0].start:
klass = 'closed'
verb = u'Réouvre'
if slots[0].start.weekday() == today.weekday():
day_label = ''
if slots[0].start.hour < 12:
verb = 'Ouvre'
elif slots[0].start.weekday() == (today.weekday() + 1) % 7:
day_label = u'demain'
else:
day_label = FR_WEEKDAYS[slots[0].start.weekday()]
label = u'%s %s à %sh%02d' % (verb, day_label, slots[0].start.hour, slots[0].start.minute)
elif base_datetime < slots[0].end:
if (slots[0].end - base_datetime).seconds < 3600:
klass = 'soon-to-be-closed'
else:
klass = 'open'
label = u"Ouvert jusqu'à %sh%02d" % (slots[0].end.hour, slots[0].end.minute)
return mark_safe(u'<div class="badge %s"><span>%s</span></div>' % (klass, label))
@register.filter
def onlymoov_duration(string):
# take the hours and minutes components of duration strings provided by
# onlymoov, "PT1H16M3S", "PT1M35S"
try:
groups = re.match(r'PT(\d+H)?(\d+M)', string).groups()
except AttributeError: # didn't match
return '?'
hours = ''
if groups[0]:
nb_hours = int(groups[0][:-1])
if nb_hours > 1:
hours = '%s heures' % nb_hours
else:
hours = '%s heure' % nb_hours
nb_minutes = int(groups[1][:-1])
if nb_minutes == 0:
minutes = ''
else:
minutes = '%d min' % nb_minutes
return '%s %s' % (hours, minutes)
@register.filter
def place_page(cell):
try:
fixed_place_cell = ConfigJsonCell.objects.get(
key=cell.key,
parameters=cell.parameters,
page__template_name='place')
except ConfigJsonCell.DoesNotExist:
return None
return fixed_place_cell.page
@register.filter
def place_page_url(cell):
page = place_page(cell)
if page is None:
return ''
return page.get_online_url()
@register.filter
def is_place_page(page):
if not page:
return False
return page.template_name == 'place'
@register.assignment_tag
def get_tile_picture_size(page):
if is_place_page(page):
return '1300'
return '300x300'
@register.filter
def as_producer(slug, default_slug=None):
COLLECTIVITY_UNACCENT_LABELS = {x: slugify(y) for x, y in settings.COLLECTIVITY_LABELS.items()}
if isinstance(slug, dict):
# actually a form
if slug.get('form_digest'):
parenthesis = re.match(r'.*\((.*)\)', slug['form_digest'])
if parenthesis:
city_name = parenthesis.group(1)
if city_name in settings.COLLECTIVITY_LABELS.values():
collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == city_name][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
city_slug = slugify(city_name)
if city_slug in COLLECTIVITY_UNACCENT_LABELS.values():
collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == city_slug][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
for keyword in slug.get('keywords') or []:
if keyword.startswith('producer-'):
slug = keyword.split('-', 1)[1]
break
else:
slug = slug.get('site_slug')
producer = None
if ':' in slug: # formdef_reference
slug = slug.split(':')[0]
if slug.startswith('_'):
producer = slug.split('_')[1].replace('hobo-', '')
else:
producer = slug
if slug == 'eservices':
# handle collectivity sites, they are individually named
# "eservices" but have the collectivity slug as a template
# variable.
producer = settings.TEMPLATE_VARS.get('gnm_commune', 'grandlyon')
if producer and settings.TEMPLATE_VARS.get('gnm_commune_name'):
return {'slug': producer,
'label': settings.TEMPLATE_VARS.get('gnm_commune_name')}
try:
producer = re.search(r'(^|\W)producer-([\w-]*)(\W|$)', producer).group(2).strip()
except AttributeError:
pass
if producer.startswith('Lyon '): # assume sth like "Lyon 7eme"
producer = 'Lyon'
producer_slug = slugify(producer)
if settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer):
return {'slug': producer,
'label': settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer, {'title': ''})['title']}
elif settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer):
return {'slug': producer,
'label': settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer, {'title': ''})['title']}
elif producer in settings.PRODUCER_LABELS:
return {'slug': producer, 'label': settings.PRODUCER_LABELS[producer]}
elif producer in settings.COLLECTIVITY_LABELS.values():
collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == producer][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
elif producer_slug in COLLECTIVITY_UNACCENT_LABELS.values():
collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == producer_slug][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
elif default_slug:
return as_producer(default_slug)
else:
return {'slug': 'toodego', 'label': 'Toodego'}
@register.filter
def as_commune(user_data):
if not user_data:
return None
if isinstance(user_data, six.string_types):
# user_data is expected to be (page) slug
collectivities = get_gnm_collectivities()
for collectivity in collectivities:
if slugify(collectivity['label']) in user_data:
collectivity['gnm'] = True
return collectivity
return None
city = user_data.get('city') or user_data.get('address_city')
if city:
# first look for known portals
collectivities = get_gnm_collectivities()
for collectivity in collectivities:
if collectivity.get('label') == city:
return {
'label': city,
'slug': slugify(city),
'url': collectivity['url'],
'gnm': True,
}
# if not found look in mairie pages
pages = Page.objects.filter(parent__slug='mairie',
slug__icontains=slugify(city)).exclude(slug__icontains='annexe')
if pages.exists():
return {
'label': city,
'slug': slugify(city),
'url': pages[0].get_online_url(),
}
return None
@register.assignment_tag
def get_suggestions(request, user_data, places_data):
# fill initial dashboard based on this layout:
## au quotidien
# mairie tile
# closest velov/tcl/swimming pool/etc. tiles
## environnement
# air quality
# pollen
if not getattr(request, 'user', None) or not request.user.is_authenticated():
# no user
return ['no user']
dashboard = DashboardCell.objects.all().filter(page__snapshot__isnull=True)[0]
if Tile.objects.filter(dashboard=dashboard, user=request.user).exists():
# dashboard already filled
return ['already filled']
mairie_tile = None
service_tiles = []
airquality_tile = {
'key': 'airquality',
'parameters': {
'lon': settings.COMBO_MAP_DEFAULT_POSITION['lng'],
'lat': settings.COMBO_MAP_DEFAULT_POSITION['lat'],
},
}
pollen_tile = {'key': 'pollen'}
addresses = []
city = user_data.get('city') or user_data.get('address_city')
if city:
# get commune tile for the user city
maplayer = MapLayer.objects.get(slug='mairie')
try:
data_result = requests.get(maplayer.geojson_url, timeout=2,
without_user=True, cache_duration=300).json()
except RequestException:
pass
else:
city_slug = slugify(city)
if data_result.get('features'):
for feature in data_result['features']:
if 'Annexe' in feature['properties']['nom']:
continue
if city_slug in slugify(feature['properties']['nom']):
mairie_tile = {'key': maplayer.slug, 'properties': feature['properties']}
break
address = None
if places_data and places_data.get('data'):
place_data = places_data['data'][0]
address = u'%(adresse)s, %(ville)s, France' % place_data['content']
elif user_data.get('address_street'):
if not user_data.get('address_number'):
user_data['address_number'] = ''
address = u'%(address_number)s %(address_street)s, %(address_city)s, France' % user_data
coords = None
if address:
nominatim_url = settings.COMBO_GEOCODING_SERVICE
url = '%s/search?q=%s&accept-language=fr&format=json' % (
nominatim_url, quote(address.encode('utf-8')))
try:
search_result = requests.get(url, timeout=2, without_user=True,
cache_duration=300).json()
except RequestException:
pass
if search_result:
coords = {'lon': search_result[0]['lon'], 'lat': search_result[0]['lat']}
if coords:
airquality_tile = {'key': 'airquality', 'parameters': coords}
lat1, lat2 = float(coords['lat']) - 0.008, float(coords['lat']) + 0.008
lon1, lon2 = float(coords['lon']) - 0.006, float(coords['lon']) + 0.006
geod = Geod(ellps='WGS84')
for maplayer in MapLayer.objects.filter(slug__in=('velov', 'piscine', 'tcl', 'bibliotheque', 'mdr')):
url = maplayer.geojson_url + '&BBOX=%s,%s,%s,%s' % (lat1, lon1, lat2, lon2)
try:
data_result = requests.get(url, timeout=2, without_user=True,
cache_duration=300).json()
except RequestException:
continue
features = data_result.get('features')
if not features:
continue
for feature in features:
feature['distance'] = geod.inv(
float(coords['lon']),
float(coords['lat']),
float(feature['geometry']['coordinates'][0]),
float(feature['geometry']['coordinates'][1]))[2]
features.sort(key=lambda x: x['distance'])
# take closest feature
if features:
service_tiles.append({'key': maplayer.slug, 'properties': features[0]['properties']})
tiles = []
if mairie_tile or service_tiles:
if mairie_tile:
tiles.append(mairie_tile)
if service_tiles:
random.shuffle(service_tiles)
tiles.extend(service_tiles)
if airquality_tile or pollen_tile:
tiles.append({'key': 'group-title', 'parameters': {'text': 'Environnement'}})
if airquality_tile:
tiles.append(airquality_tile)
if pollen_tile:
tiles.append(pollen_tile)
for i, tile_data in enumerate(tiles):
if 'properties' in tile_data:
cell_form_keys = [x['varname'] for x in settings.JSON_CELL_TYPES[tile_data['key']].get('form') or {}]
tile_data['parameters'] = {}
for key in cell_form_keys:
tile_data['parameters'][key] = tile_data['properties'].get(key)
cell = ConfigJsonCell(
key=tile_data['key'],
parameters=tile_data.get('parameters', {}),
order=0,
page_id=dashboard.page_id,
placeholder='_suggested_tile')
cell.save()
tile = Tile(
dashboard=dashboard,
cell=cell,
user=request.user,
order=i+1)
tile.save()
return tiles
@register.assignment_tag
def get_gnm_portal_url():
if '_interco_portal' in settings.KNOWN_SERVICES['combo']:
return settings.KNOWN_SERVICES['combo']['_interco_portal'].get('url')
return settings.KNOWN_SERVICES['combo']['portal'].get('url')
@register.assignment_tag
def get_gnm_collectivities():
collectivities = []
for key in settings.KNOWN_SERVICES['combo']:
if not key.endswith('_portal'):
continue
matching_hobo = settings.KNOWN_SERVICES['hobo'].get(key.split('_portal')[0][1:])
if not matching_hobo:
continue
if matching_hobo['title'] in ('SAU', 'Villeurbanne'): # blacklist
continue
service = settings.KNOWN_SERVICES['combo'][key]
collectivities.append({'url': service.get('url'), 'label': matching_hobo['title']})
collectivities.sort(key=lambda x: x['label'])
return collectivities
@register.inclusion_tag('combo/gnm/place_map.html')
def gnm_place_map(lat, lng):
map_cell = Map()
map_cell.initial_zoom = '17'
map_cell.min_zoom = '17'
map_cell.max_zoom = '17'
context = map_cell.get_cell_extra_context({})
context['init_lat'] = lat
context['init_lng'] = lng
return context
@register.inclusion_tag('combo/gnm/airquality_map.html', takes_context=True)
def gnm_airquality_map(context):
map_cell = Map()
map_cell.initial_state = 'device-location'
map_cell.initial_zoom = '15'
map_cell.min_zoom = '10'
map_cell.max_zoom = '19'
context.push(map_cell.get_cell_extra_context({}))
return context
_json_script_escapes = {
ord('>'): '\\u003E',
ord('<'): '\\u003C',
ord('&'): '\\u0026',
}
@register.filter(is_safe=True)
def json_script(value, element_id):
json_str = json.dumps(value, cls=DjangoJSONEncoder)
json_str = json_str.replace('>', '\\u003E').replace('<', '\\u003C').replace('&', '\\u0026')
return format_html(
'<script id="{}" type="application/json">{}</script>',
element_id, mark_safe(json_str)
)
@register.assignment_tag
def get_goto_cell(page, request):
try:
cell = ConfigJsonCell.objects.get(id=request.GET['to'])
except (ConfigJsonCell.DoesNotExist, KeyError):
return None
if cell.page.template_name != 'place':
return cell
# create an alternate version of cell
cell.id = None
cell.placeholder = '_auto_tile'
cell.page = page
cell.save()
return cell
@register.assignment_tag
def get_collectivity_slugs():
return list(settings.COLLECTIVITY_LABELS.keys())
@register.filter
def indice_values(indices):
for key in ('indice_j-1', 'indice_j', 'indice_j+1'):
if indices.get(key):
yield indices.get(key)
@register.filter
def airquality_hack(cell, request):
if cell.key == 'airquality' and not cell.parameters:
# Cell on airquality dynamic page, it has empty cell.parameters as it
# gets those from the query string. In order to get un/favorite link
# to work we need to duplicate the cell into a concrete object with a
# copy of query parameters within.
if request.GET.get('ctx'):
ctx = signing.loads(request.GET['ctx'])
lon, lat = ctx['q_lon'], ctx['q_lat']
else:
lon, lat = request.path.split('/')[-2].split(',')
cell.parameters = {'lon': lon, 'lat': lat}
cell.placeholder = '_auto_tile'
cell.id = None
cell.save()
return cell