combo-plugin-gnm/combo_plugin_gnm/templatetags/gnm.py

888 lines
31 KiB
Python

# -*- coding: utf-8 -*-
# combo-plugin-gnm - Combo GNM plugin
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import datetime
import json
import operator
import random
import re
from pyproj import Geod
from requests import RequestException
from django import template
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.core import signing
from django.utils import six
from django.utils.dateparse import parse_date, parse_datetime
from django.utils.http import quote
from django.utils.html import format_html
from django.utils.text import slugify
from django.utils.timezone import now, is_naive, make_aware
from django.utils.safestring import mark_safe
from combo.apps.dashboard.models import DashboardCell, Tile
from combo.apps.maps.models import Map, MapLayer
from combo.data.models import Page, CellBase, ConfigJsonCell
from combo.public.views import render_cell
from combo.utils import requests
register = template.Library()
FR_WEEKDAYS = ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche']
EN_ABBREV_WEEKDAYS = OrderedDict(
[
('mo', 'Monday'),
('tu', 'Tuesday'),
('we', 'Wednesday'),
('th', 'Thursday'),
('fr', 'Friday'),
('sa', 'Saturday'),
('su', 'Sunday'),
]
)
EN_ABBREV_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.keys())
EN_FULL_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.values())
FR_ABBREV_WEEKDAYS_LIST = OrderedDict(zip(EN_ABBREV_WEEKDAYS_LIST, FR_WEEKDAYS))
class TimeSlot(object):
def __init__(self, start, end):
if is_naive(start):
start = make_aware(start)
if is_naive(end):
end = make_aware(end)
self.start = start
self.end = end
def __repr__(self):
return '<TimeSlot start=%s - end=%s>' % (self.start.strftime('%c'), self.end.strftime('%c'))
def openinghours_to_datetime(day_number, hour, minute, base_datetime):
"""
return the next date and time after base_datetime
"""
day_number = day_number % 7 # ease operations using this parameter
# get next weekday
days = (7 + day_number - base_datetime.weekday()) % 7
datetime_obj = base_datetime + datetime.timedelta(days=days)
if is_naive(datetime_obj):
datetime_obj = make_aware(datetime_obj)
# set time
try:
time = datetime.time(hour=hour, minute=minute)
except ValueError:
return None
datetime_obj = datetime.datetime.combine(datetime_obj, time)
return datetime_obj
def get_slot(day_number, time_table, base_datetime):
start_hour = int(time_table['start_hour'])
start_minute = int(time_table['start_minute'])
end_hour = int(time_table['end_hour'])
end_minute = int(time_table['end_minute'])
if end_hour == 24 and end_minute == 0:
end_hour = 0
start = openinghours_to_datetime(day_number, start_hour, start_minute, base_datetime)
# hours may belongs on next day
end_day_number = day_number
if (
end_hour < start_hour
or end_hour == start_hour
and end_minute < start_minute
or end_hour == start_hour == 0
and end_minute == start_minute == 0
): # 24h/24
end_day_number += 1
end = openinghours_to_datetime(end_day_number, end_hour, end_minute, base_datetime)
if end < start:
# end time may be find this week whereas start time is picked on next week,
# this occure if we are now past 24:00, on next day.
# Compute start time from yesterday
yesterday = base_datetime - datetime.timedelta(days=1)
start = openinghours_to_datetime(day_number, start_hour, start_minute, yesterday)
return TimeSlot(start, end)
def get_time_table_from_specification(specification):
"""Parse an openinghoursspecification data block"""
if not isinstance(specification['dayOfWeek'], str):
raise ValueError
day_of_week = specification.get('dayOfWeek')
day_number = EN_FULL_WEEKDAYS_LIST.index(day_of_week.split('/')[-1])
start_hour, start_minute = specification['opens'].split(':')
end_hour, end_minute = specification['closes'].split(':')
time_table = {
'start_hour': start_hour,
'start_minute': start_minute,
'end_hour': end_hour,
'end_minute': end_minute,
}
return (day_number, time_table)
def get_period_from_data(time_table):
"""Return am or pm and all_day_hours from opening_time and closing_time"""
start_hour = int(time_table['start_hour'])
start_minute = int(time_table['start_minute'])
end_hour = int(time_table['end_hour'])
end_minute = int(time_table['end_minute'])
if end_hour == 24 and end_minute == 0:
end_hour = 0
closing_time = datetime.time(hour=end_hour, minute=end_minute)
opening_time = datetime.time(hour=start_hour, minute=start_minute)
all_day_hours = False
if opening_time < closing_time and closing_time.hour <= 12: # closing_time may last on the night
period = 'am'
elif opening_time.hour <= 12:
period = 'am'
all_day_hours = True
else:
period = 'pm'
return (period, all_day_hours)
def get_slots_from_mdr_format(data, base_datetime):
"""Process data from Maison du rhone geojson data from data.grandlyon.fr
(/ws/grandlyon/ter_territoire.maison_du_rhone/all.json)
add to slots all the next opening hours in chronological order & beginning from today()
"""
if 'properties' in data:
data = data['properties']
slots = []
known_format = False
mdr_weekdays_format = ['%s_am' % day for day in FR_WEEKDAYS] + ['%s_pm' % day for day in FR_WEEKDAYS]
if any([re.search('|'.join(mdr_weekdays_format), data_key) is not None for data_key in data.keys()]):
known_format = True
today = base_datetime
for i in range(7):
for period in ('am', 'pm'):
hours = data.get('%s_%s' % (FR_WEEKDAYS[today.weekday()], period))
if not hours:
continue
try:
parts = re.match(r'(\d?\d)h(\d\d)-(\d?\d)h(\d\d)', hours).groups()
except AttributeError:
continue
time_table = {
'start_hour': int(parts[0]),
'start_minute': int(parts[1]),
'end_hour': int(parts[2]),
'end_minute': int(parts[3]),
}
# add to slots the opening hours in chronological order beginning from today
timeslot = get_slot(today.weekday(), time_table, base_datetime)
slots.append(timeslot)
today += datetime.timedelta(days=1)
return (slots, known_format)
def parse_opening_hours_data(mairie_data):
"""Parse every known openinghours data formats"""
for openinghours in mairie_data.get('openinghours', []):
# format is comma-separated days and/or intervals, or only one day
try:
groups = re.match(
r'(\w\w(?:(?:,|\-)?\w\w)*) (\d\d?):(\d\d?)-(\d\d?):(\d\d?)', openinghours
).groups()
except AttributeError: # invalid input data
continue
for day in groups[0].split(','):
if '-' in day:
# interval
parts = re.match(r'(\w\w)-(\w\w)', day).groups() + groups[1:]
time_table = dict(
zip(
('start_day', 'end_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'),
parts,
)
)
days_list = EN_ABBREV_WEEKDAYS_LIST[
EN_ABBREV_WEEKDAYS_LIST.index(
time_table['start_day'].lower()
) : EN_ABBREV_WEEKDAYS_LIST.index(time_table['end_day'].lower())
+ 1
]
else:
# one day
time_table = dict(
zip(
('start_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'),
(day,) + groups[1:],
)
)
days_list = [
EN_ABBREV_WEEKDAYS_LIST[EN_ABBREV_WEEKDAYS_LIST.index(time_table['start_day'].lower())]
]
yield (days_list, time_table)
def parse_mairie_formats(data, base_datetime, oh_add, ohs_add, ohs_del):
if 'properties' in data:
data = data['properties']
some_opening_periods_defined = False
known_format = False
previous_week = base_datetime - datetime.timedelta(7)
next_week = base_datetime + datetime.timedelta(7)
for specification in data.get('openinghoursspecification', []):
known_format = True
valid_from, valid_through = previous_week, next_week
if specification.get('validFrom'):
valid_from = parse_valid_from(specification)
if specification.get('validThrough'):
valid_through = parse_valid_through(specification)
if not valid_from or not valid_through:
continue
if base_datetime > valid_through:
continue
if specification.get('opens') and specification.get('closes'):
# parse specification only for the current period relative to utcnow()
if base_datetime < valid_from:
continue
# case when opening periods are defined
some_opening_periods_defined = True
try:
day_number, time_table = get_time_table_from_specification(specification)
except ValueError:
continue
ohs_add(day_number, time_table)
else:
# case when exclusions are defined
ohs_del(valid_from, valid_through)
if not some_opening_periods_defined:
# some mairie may only have opening periods defined into openinghours (e.g. Bron)
for days_list, time_table in parse_opening_hours_data(data):
known_format = True
oh_add(days_list, time_table)
return known_format
def parse_valid_from(spec):
valid_from = parse_datetime(spec.get('validFrom')) or parse_date(spec.get('validFrom'))
if not isinstance(valid_from, datetime.datetime):
valid_from = make_aware(datetime.datetime(valid_from.year, valid_from.month, valid_from.day))
return valid_from
def parse_valid_through(spec):
valid_through = parse_datetime(spec.get('validThrough')) or parse_date(spec.get('validThrough'))
if not isinstance(valid_through, datetime.datetime):
valid_through = make_aware(
datetime.datetime(valid_through.year, valid_through.month, valid_through.day, 23, 59)
)
return valid_through
@register.simple_tag
def get_mairie_opening_hours(mairie_data):
"""Process Mairie Geojson to extract data of each day's opening hours"""
if not mairie_data:
return ''
base_datetime = now()
exclusions = set()
opening_hours_dict = OrderedDict(
zip(EN_ABBREV_WEEKDAYS_LIST, [{'am': None, 'pm': None} for i in range(7)])
)
def update_opening_hours(weekday, time_table):
period, all_day_hours = get_period_from_data(time_table)
if all_day_hours and period == 'am':
opening_hours_dict[weekday]['pm'] = '' # empty string to avoid displaying fermé
opening_hours_dict[weekday][period] = "%sh%s-%sh%s" % (
time_table['start_hour'],
time_table['start_minute'],
time_table['end_hour'],
time_table['end_minute'],
)
def oh_add(days_list, time_table):
for weekday in days_list:
update_opening_hours(weekday, time_table)
def ohs_add(day_number, time_table):
weekday = EN_ABBREV_WEEKDAYS_LIST[day_number]
update_opening_hours(weekday, time_table)
def ohs_del(valid_from, valid_through):
day = max(base_datetime, valid_from)
nb_max_days = 7 - max(valid_from.toordinal() - base_datetime.toordinal(), 0)
nb_days = 0
while nb_days < nb_max_days and day < valid_through:
exclusions.add(EN_ABBREV_WEEKDAYS_LIST[day.weekday()])
day += datetime.timedelta(days=1)
nb_days += 1
known_format = parse_mairie_formats(mairie_data, base_datetime, oh_add, ohs_add, ohs_del)
for weekday in exclusions:
opening_hours_dict[weekday] = {'am': None, 'pm': None}
if not (
any([x['am'] for x in opening_hours_dict.values()])
or any([x['pm'] for x in opening_hours_dict.values()])
):
# always closed, returns None if the format is unknown so it can be
# displayed as "unavailable".
if not known_format:
return None
# otherwise returns an array of closed days
return [(weekday, {'am': None, 'pm': ''}) for weekday in FR_WEEKDAYS]
return [
(FR_ABBREV_WEEKDAYS_LIST[weekday], hours)
for weekday, hours in opening_hours_dict.items()
if not (weekday in ['sa', 'su'] and not hours['am'] and not hours['pm'])
]
@register.filter
def as_opening_hours_badge(data):
if not data:
return ''
base_datetime = now()
slots = []
exclusion_slots = []
today = base_datetime.date()
(slots, known_format) = get_slots_from_mdr_format(data, base_datetime)
def oh_add(days_list, time_table):
for weekday in days_list:
day_number = EN_ABBREV_WEEKDAYS_LIST.index(weekday)
timeslot = get_slot(day_number, time_table, base_datetime)
# add to slots the opening hours in chronological order beginning from today
slots.append(timeslot)
def ohs_add(day_number, time_table):
timeslot = get_slot(day_number, time_table, base_datetime)
slots.append(timeslot)
def ohs_del(valid_from, valid_through):
exclusion_slots.append(TimeSlot(valid_from, valid_through))
if not known_format:
# Process mairie json and return slots the opening hours
# in chronological order beginning today
known_format = parse_mairie_formats(data, base_datetime, oh_add, ohs_add, ohs_del)
# order slots and cycle the list beginning with 'base_datetime'
slots.sort(key=operator.attrgetter('start'))
if not known_format:
return ''
# remove past slots and exclude special timeslots
for i, slot in enumerate(slots):
if base_datetime > slot.end:
slots[i] = None
else:
for exclusion in exclusion_slots:
if slot.start >= exclusion.start and slot.end <= exclusion.end:
slots[i] = None
def format_time(hour, minute):
time = "%sh%02d" % (hour, minute)
if time == '0h00':
time = 'minuit'
return time
# parse slots to return the right html
slots = [x for x in slots if x]
if not slots:
klass = 'closed'
label = u'Fermé'
elif base_datetime < slots[0].start:
klass = 'closed'
verb = u'Réouvre'
if slots[0].start.weekday() == today.weekday():
day_label = ''
if slots[0].start.hour < 12:
verb = 'Ouvre'
elif slots[0].start.weekday() == (today.weekday() + 1) % 7:
day_label = u'demain'
else:
day_label = FR_WEEKDAYS[slots[0].start.weekday()]
if slots[0].start.strftime("%H:%M") == slots[0].end.strftime("%H:%M") == '00:00':
label = u'%s %s 24h/24' % (verb, day_label)
else:
time = format_time(slots[0].start.hour, slots[0].start.minute)
label = u'%s %s à %s' % (verb, day_label, time)
elif base_datetime < slots[0].end:
if (slots[0].end - base_datetime).seconds < 3600:
klass = 'soon-to-be-closed'
else:
klass = 'open'
if slots[0].start.strftime("%H:%M") == slots[0].end.strftime("%H:%M") == '00:00':
label = u"Ouvert 24h/24"
else:
time = format_time(slots[0].end.hour, slots[0].end.minute)
label = u"Ouvert jusqu'à %s" % time
return mark_safe(u'<div class="badge %s"><span>%s</span></div>' % (klass, label))
@register.filter
def onlymoov_duration(string):
# take the hours and minutes components of duration strings provided by
# onlymoov, "PT1H16M3S", "PT1M35S"
try:
groups = re.match(r'PT(\d+H)?(\d+M)', string).groups()
except AttributeError: # didn't match
return '?'
hours = ''
if groups[0]:
nb_hours = int(groups[0][:-1])
if nb_hours > 1:
hours = '%s heures' % nb_hours
else:
hours = '%s heure' % nb_hours
nb_minutes = int(groups[1][:-1])
if nb_minutes == 0:
minutes = ''
else:
minutes = '%d min' % nb_minutes
return '%s %s' % (hours, minutes)
@register.filter
def place_page(cell):
try:
fixed_place_cell = ConfigJsonCell.objects.get(
key=cell.key, parameters=cell.parameters, page__template_name='place'
)
except ConfigJsonCell.DoesNotExist:
return None
return fixed_place_cell.page
@register.filter
def place_page_url(cell):
page = place_page(cell)
if page is None:
return ''
return page.get_online_url()
@register.filter
def is_place_page(page):
if not page:
return False
return page.template_name == 'place'
@register.simple_tag
def get_tile_picture_size(page):
if is_place_page(page):
return '1300'
return '300x300'
@register.filter
def as_producer(slug, default_slug=None):
COLLECTIVITY_UNACCENT_LABELS = {x: slugify(y) for x, y in settings.COLLECTIVITY_LABELS.items()}
if isinstance(slug, dict):
# actually a form
if slug.get('form_digest'):
parenthesis = re.match(r'.*\((.*)\)', slug['form_digest'])
if parenthesis:
city_name = parenthesis.group(1)
if city_name in settings.COLLECTIVITY_LABELS.values():
collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == city_name][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
city_slug = slugify(city_name)
if city_slug in COLLECTIVITY_UNACCENT_LABELS.values():
collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == city_slug][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
for keyword in slug.get('keywords') or []:
if keyword.startswith('producer-'):
slug = keyword.split('-', 1)[1]
break
else:
slug = slug.get('site_slug')
producer = None
if ':' in slug: # formdef_reference
slug = slug.split(':')[0]
if slug.startswith('_'):
producer = slug.split('_')[1].replace('hobo-', '')
else:
producer = slug
if slug == 'eservices':
# handle collectivity sites, they are individually named
# "eservices" but have the collectivity slug as a template
# variable.
producer = settings.TEMPLATE_VARS.get('gnm_commune', 'grandlyon')
if producer and settings.TEMPLATE_VARS.get('gnm_commune_name'):
return {'slug': producer, 'label': settings.TEMPLATE_VARS.get('gnm_commune_name')}
try:
producer = re.search(r'(^|\W)producer-([\w-]*)(\W|$)', producer).group(2).strip()
except AttributeError:
pass
if producer.startswith('Lyon '): # assume sth like "Lyon 7eme"
producer = 'Lyon'
producer_slug = slugify(producer)
if settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer):
return {
'slug': producer,
'label': settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer, {'title': ''})['title'],
}
elif settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer):
return {
'slug': producer,
'label': settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer, {'title': ''})[
'title'
],
}
elif producer in settings.PRODUCER_LABELS:
return {'slug': producer, 'label': settings.PRODUCER_LABELS[producer]}
elif producer in settings.COLLECTIVITY_LABELS.values():
collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == producer][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
elif producer_slug in COLLECTIVITY_UNACCENT_LABELS.values():
collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == producer_slug][0]
return {'slug': collectivity[0], 'label': collectivity[1]}
elif default_slug:
return as_producer(default_slug)
else:
return {'slug': 'toodego', 'label': 'Toodego'}
@register.filter
def as_commune(user_data):
if not user_data:
return None
if isinstance(user_data, six.string_types):
# user_data is expected to be (page) slug
collectivities = get_gnm_collectivities()
for collectivity in collectivities:
if slugify(collectivity['label']) in user_data:
collectivity['gnm'] = True
return collectivity
return None
city = user_data.get('city') or user_data.get('address_city')
if city:
# first look for known portals
collectivities = get_gnm_collectivities()
for collectivity in collectivities:
if collectivity.get('label') == city:
return {
'label': city,
'slug': slugify(city),
'url': collectivity['url'],
'gnm': True,
}
# if not found look in mairie pages
pages = Page.objects.filter(parent__slug='mairie', slug__icontains=slugify(city)).exclude(
slug__icontains='annexe'
)
if pages.exists():
return {
'label': city,
'slug': slugify(city),
'url': pages[0].get_online_url(),
}
return None
@register.simple_tag
def get_suggestions(request, user_data, places_data):
# fill initial dashboard based on this layout:
## au quotidien
# mairie tile
# closest velov/tcl/swimming pool/etc. tiles
## environnement
# air quality
# pollen
if not getattr(request, 'user', None) or not request.user.is_authenticated():
# no user
return ['no user']
dashboard = DashboardCell.objects.all().filter(page__snapshot__isnull=True)[0]
if Tile.objects.filter(dashboard=dashboard, user=request.user).exists():
# dashboard already filled
return ['already filled']
mairie_tile = None
service_tiles = []
airquality_tile = {
'key': 'airquality',
'parameters': {
'lon': settings.COMBO_MAP_DEFAULT_POSITION['lng'],
'lat': settings.COMBO_MAP_DEFAULT_POSITION['lat'],
},
}
pollen_tile = {'key': 'pollen'}
city = user_data.get('city') or user_data.get('address_city')
zipcode = user_data.get('zipcode') or user_data.get('address_zipcode')
if city:
# get commune tile for the user city
maplayer = MapLayer.objects.get(slug='mairie')
try:
data_result = requests.get(
maplayer.geojson_url, timeout=2, without_user=True, cache_duration=300
).json()
except RequestException:
pass
else:
city_slug = slugify(city)
if city_slug == 'lyon' and zipcode:
try:
city_slug = 'lyon-%s' % (int(zipcode) - 69000)
except ValueError:
# fallback to hotel de ville
city_slug = 'ville-de-lyon'
if data_result.get('features'):
for feature in data_result['features']:
if 'Annexe' in feature['properties']['nom']:
continue
if city_slug in slugify(feature['properties']['nom']):
mairie_tile = {'key': maplayer.slug, 'properties': feature['properties']}
break
address = None
if places_data and places_data.get('data'):
place_data = places_data['data'][0]
address = u'%(adresse)s, %(ville)s, France' % place_data['content']
elif user_data.get('address_street'):
if not user_data.get('address_number'):
user_data['address_number'] = ''
address = u'%(address_number)s %(address_street)s, %(address_city)s, France' % user_data
coords = None
if address:
nominatim_url = settings.COMBO_GEOCODING_SERVICE
url = '%s/search?q=%s&accept-language=fr&format=json' % (
nominatim_url,
quote(address.encode('utf-8')),
)
search_result = None
try:
search_result = requests.get(url, timeout=2, without_user=True, cache_duration=300).json()
except RequestException:
pass
if search_result:
coords = {'lon': search_result[0]['lon'], 'lat': search_result[0]['lat']}
if coords:
airquality_tile = {'key': 'airquality', 'parameters': coords}
lat1, lat2 = float(coords['lat']) - 0.008, float(coords['lat']) + 0.008
lon1, lon2 = float(coords['lon']) - 0.006, float(coords['lon']) + 0.006
geod = Geod(ellps='WGS84')
for maplayer in MapLayer.objects.filter(slug__in=('velov', 'piscine', 'tcl', 'bibliotheque', 'mdr')):
url = maplayer.geojson_url + '&BBOX=%s,%s,%s,%s' % (lat1, lon1, lat2, lon2)
try:
data_result = requests.get(url, timeout=2, without_user=True, cache_duration=300).json()
except RequestException:
continue
features = data_result.get('features')
if not features:
continue
for feature in features:
feature['distance'] = geod.inv(
float(coords['lon']),
float(coords['lat']),
float(feature['geometry']['coordinates'][0]),
float(feature['geometry']['coordinates'][1]),
)[2]
features.sort(key=lambda x: x['distance'])
# take closest feature
if features:
service_tiles.append({'key': maplayer.slug, 'properties': features[0]['properties']})
tiles = []
if mairie_tile or service_tiles:
if mairie_tile:
tiles.append(mairie_tile)
if service_tiles:
random.shuffle(service_tiles)
tiles.extend(service_tiles)
if airquality_tile or pollen_tile:
tiles.append({'key': 'group-title', 'parameters': {'text': 'Environnement'}})
if airquality_tile:
tiles.append(airquality_tile)
if pollen_tile:
tiles.append(pollen_tile)
for i, tile_data in enumerate(tiles):
if 'properties' in tile_data:
cell_form_keys = [
x['varname'] for x in settings.JSON_CELL_TYPES[tile_data['key']].get('form') or {}
]
tile_data['parameters'] = {}
for key in cell_form_keys:
tile_data['parameters'][key] = tile_data['properties'].get(key)
cell = ConfigJsonCell(
key=tile_data['key'],
parameters=tile_data.get('parameters', {}),
order=0,
page_id=dashboard.page_id,
placeholder='_suggested_tile',
)
cell.save()
tile = Tile(dashboard=dashboard, cell=cell, user=request.user, order=i + 1)
tile.save()
return tiles
@register.simple_tag
def get_gnm_portal_url():
if '_interco_portal' in settings.KNOWN_SERVICES['combo']:
return settings.KNOWN_SERVICES['combo']['_interco_portal'].get('url')
return settings.KNOWN_SERVICES['combo']['portal'].get('url')
@register.simple_tag
def get_gnm_collectivities():
collectivities = []
for key in settings.KNOWN_SERVICES['combo']:
if not key.endswith('_portal'):
continue
matching_hobo = settings.KNOWN_SERVICES['hobo'].get(key.split('_portal')[0][1:])
if not matching_hobo:
continue
if matching_hobo['title'] in ('SAU', 'Villeurbanne'): # blacklist
continue
service = settings.KNOWN_SERVICES['combo'][key]
collectivities.append({'url': service.get('url'), 'label': matching_hobo['title']})
collectivities.sort(key=lambda x: x['label'])
return collectivities
@register.inclusion_tag('combo/gnm/place_map.html')
def gnm_place_map(lat, lng):
map_cell = Map()
map_cell.initial_zoom = '17'
map_cell.min_zoom = '17'
map_cell.max_zoom = '17'
context = map_cell.get_cell_extra_context({})
context['init_lat'] = lat
context['init_lng'] = lng
return context
@register.inclusion_tag('combo/gnm/airquality_map.html', takes_context=True)
def gnm_airquality_map(context):
map_cell = Map()
map_cell.initial_state = 'device-location'
map_cell.initial_zoom = '15'
map_cell.min_zoom = '10'
map_cell.max_zoom = '19'
context.push(map_cell.get_cell_extra_context({}))
return context
_json_script_escapes = {
ord('>'): '\\u003E',
ord('<'): '\\u003C',
ord('&'): '\\u0026',
}
@register.filter(is_safe=True)
def json_script(value, element_id):
json_str = json.dumps(value, cls=DjangoJSONEncoder)
json_str = json_str.replace('>', '\\u003E').replace('<', '\\u003C').replace('&', '\\u0026')
return format_html('<script id="{}" type="application/json">{}</script>', element_id, mark_safe(json_str))
@register.simple_tag
def get_goto_cell(page, request):
try:
cell = ConfigJsonCell.objects.get(id=request.GET['to'])
except (ConfigJsonCell.DoesNotExist, ValueError, KeyError):
return None
if cell.page.template_name != 'place':
return cell
# create an alternate version of cell
cell.id = None
cell.placeholder = '_auto_tile'
cell.page = page
cell.save()
return cell
@register.simple_tag
def get_collectivity_slugs():
return list(settings.COLLECTIVITY_LABELS.keys())
@register.filter
def indice_values(indices):
for key in ('indice_j-1', 'indice_j', 'indice_j+1'):
if indices.get(key):
yield indices.get(key)
@register.filter
def airquality_hack(cell, request):
if cell.key == 'airquality' and not cell.parameters:
# Cell on airquality dynamic page, it has empty cell.parameters as it
# gets those from the query string. In order to get un/favorite link
# to work we need to duplicate the cell into a concrete object with a
# copy of query parameters within.
if request.GET.get('ctx'):
ctx = signing.loads(request.GET['ctx'])
lon, lat = ctx['q_lon'], ctx['q_lat']
else:
lon, lat = request.path.split('/')[-2].split(',')
cell.parameters = {'lon': lon, 'lat': lat}
cell.placeholder = '_auto_tile'
cell.id = None
cell.save()
return cell
@register.simple_tag
def get_known_tile_types():
return list(settings.JSON_CELL_TYPES.keys())