445 lines
17 KiB
Python
445 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
# combo-plugin-gnm - Combo GNM plugin
|
|
# Copyright (C) 2017 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from collections import OrderedDict
|
|
import datetime
|
|
import math
|
|
import operator
|
|
import random
|
|
import re
|
|
import urllib2
|
|
|
|
from dateutil.parser import parse as dateutil_parse
|
|
from requests import RequestException
|
|
|
|
from django import template
|
|
from django.conf import settings
|
|
from django.utils.dateparse import parse_datetime
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import is_naive, make_aware
|
|
from django.utils.safestring import mark_safe
|
|
|
|
from combo.apps.maps.models import MapLayer
|
|
from combo.data.models import Page, ConfigJsonCell
|
|
from combo.public.views import render_cell
|
|
from combo.utils import requests
|
|
|
|
register = template.Library()
|
|
|
|
FR_WEEKDAYS = ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche']
|
|
EN_ABBREV_WEEKDAYS = OrderedDict([
|
|
('mo', 'Monday'),
|
|
('tu', 'Tuesday'),
|
|
('we', 'Wednesday'),
|
|
('th', 'Thursday'),
|
|
('fr', 'Friday'),
|
|
('sa', 'Saturday'),
|
|
('su', 'Sunday')
|
|
])
|
|
|
|
|
|
class TimeSlot(object):
|
|
def __init__(self, start, end):
|
|
if is_naive(start):
|
|
start = make_aware(start)
|
|
if is_naive(end):
|
|
end = make_aware(end)
|
|
self.start = start
|
|
self.end = end
|
|
|
|
def __repr__(self):
|
|
return '<TimeSlot start=%s - end=%s>' % (self.start.strftime('%c'), self.end.strftime('%c'))
|
|
|
|
|
|
def openinghours_to_datetime(codename, hour, minute, default=None):
|
|
try:
|
|
weekday = EN_ABBREV_WEEKDAYS.get(codename, None)
|
|
# if default is None, return the next date and time after now()
|
|
# a default datetime instance is used to replace absent parameters for datetime
|
|
return dateutil_parse('%s %d:%d:00' % (weekday, hour, minute), default=default)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def get_slots_from_mdr_format(data, today):
|
|
"""Process data from /ws/grandlyon/ter_territoire.maison_du_rhone/all.json
|
|
add to slots all the next opening hours in chronological order & beginning from today()
|
|
"""
|
|
slots = []
|
|
known_format = False
|
|
mdr_weekdays_format = ['%s_am' % day for day in FR_WEEKDAYS] + ['%s_pm' % day for day in FR_WEEKDAYS]
|
|
if any([re.search('|'.join(mdr_weekdays_format), data_key) is not None for data_key in data.keys()]):
|
|
known_format = True
|
|
for i in range(7):
|
|
for period in ('am', 'pm'):
|
|
hours = data.get('%s_%s' % (FR_WEEKDAYS[today.weekday()], period))
|
|
if not hours:
|
|
continue
|
|
try:
|
|
parts = re.match('(\d?\d)h(\d\d)-(\d?\d)h(\d\d)', hours).groups()
|
|
except AttributeError:
|
|
continue
|
|
# add to slots the opening hours in chronological order beginning from today
|
|
slots.append(TimeSlot(
|
|
datetime.datetime(today.year, today.month, today.day, int(parts[0]), int(parts[1])),
|
|
datetime.datetime(today.year, today.month, today.day, int(parts[2]), int(parts[3]))
|
|
))
|
|
|
|
today = today + datetime.timedelta(days=1)
|
|
|
|
return (slots, known_format)
|
|
|
|
|
|
def get_slots_from_mairie_format(data, base_datetime):
|
|
"""Process mairie json and return slots the opening hours in chronological order beginning today
|
|
"""
|
|
if 'properties' in data:
|
|
data = data['properties']
|
|
|
|
known_format = False
|
|
slots = []
|
|
exclusion_slots = []
|
|
if len(data.get('openinghours', [])) and len(data.get('openinghoursspecification', [])):
|
|
known_format = True
|
|
# prepare annual opening exclusions
|
|
for specification in data.get('openinghoursspecification', []):
|
|
valid_from, valid_through = (
|
|
parse_datetime(specification.get('validFrom')),
|
|
parse_datetime(specification.get('validThrough'))
|
|
)
|
|
if not valid_from or not valid_through:
|
|
continue
|
|
if 'opens' in specification and 'closes' in specification:
|
|
# case when opening periods are defined
|
|
if base_datetime >= valid_from and base_datetime < valid_through:
|
|
slots.append(TimeSlot(dateutil_parse(specification['opens']), dateutil_parse(specification['closes'])))
|
|
else:
|
|
# case when exclusions are defined
|
|
exclusion_slots.append(TimeSlot(valid_from, valid_through))
|
|
|
|
for openinghours in data.get('openinghours', []):
|
|
try:
|
|
parts = re.match('(\w\w)-(\w\w) (\d\d):(\d\d)-(\d\d):(\d\d)', openinghours).groups()
|
|
except AttributeError:
|
|
continue
|
|
for weekday in EN_ABBREV_WEEKDAYS.keys()[
|
|
EN_ABBREV_WEEKDAYS.keys().index(parts[0].lower()):
|
|
EN_ABBREV_WEEKDAYS.keys().index(parts[1].lower())+1]:
|
|
timeslot = TimeSlot(
|
|
openinghours_to_datetime(weekday, int(parts[2]), int(parts[3]), default=base_datetime),
|
|
openinghours_to_datetime(weekday, int(parts[4]), int(parts[5]), default=base_datetime)
|
|
)
|
|
# add to slots the opening hours in chronological order beginning from today
|
|
slots.append(timeslot)
|
|
|
|
# order slots and cycle the list beginning with 'base_datetime'
|
|
slots = sorted(slots, key=operator.attrgetter('start'))
|
|
if len(slots):
|
|
def timedelta_key_func(slot):
|
|
return slot.start - base_datetime
|
|
nearest_slot_index = slots.index(min(slots, key=timedelta_key_func))
|
|
slots = slots[nearest_slot_index:] + slots[:nearest_slot_index]
|
|
return (slots, exclusion_slots, known_format)
|
|
|
|
|
|
@register.filter
|
|
def as_opening_hours_badge(data, base_datetime=None):
|
|
if not data:
|
|
return ''
|
|
|
|
if base_datetime is None:
|
|
base_datetime = make_aware(datetime.datetime.now())
|
|
|
|
# defaults
|
|
exclusion_slots = []
|
|
today = base_datetime.date()
|
|
|
|
(slots, known_format) = get_slots_from_mdr_format(data, today)
|
|
if not known_format:
|
|
(slots, exclusion_slots, known_format) = get_slots_from_mairie_format(data, base_datetime)
|
|
|
|
if not known_format:
|
|
return ''
|
|
|
|
# remove past slots and exclude special timeslots
|
|
for i, slot in enumerate(slots):
|
|
if base_datetime > slot.end:
|
|
slots[i] = None
|
|
else:
|
|
for exclusion in exclusion_slots:
|
|
if slot.start >= exclusion.start and slot.end <= exclusion.end:
|
|
slots[i] = None
|
|
|
|
# parse slots to return the right html
|
|
slots = [x for x in slots if x]
|
|
if not slots:
|
|
klass = 'closed'
|
|
label = u'Fermé'
|
|
elif base_datetime < slots[0].start:
|
|
klass = 'closed'
|
|
verb = u'Réouvre'
|
|
if slots[0].start.weekday() == today.weekday():
|
|
day_label = ''
|
|
if slots[0].start.hour < 12:
|
|
verb = 'Ouvre'
|
|
elif slots[0].start.weekday() == (today.weekday() + 1) % 7:
|
|
day_label = u'demain'
|
|
else:
|
|
day_label = FR_WEEKDAYS[slots[0].start.weekday()]
|
|
label = u'%s %s à %sh%02d' % (verb, day_label, slots[0].start.hour, slots[0].start.minute)
|
|
elif base_datetime < slots[0].end:
|
|
if (slots[0].end - base_datetime).seconds < 3600:
|
|
klass = 'soon-to-be-closed'
|
|
else:
|
|
klass = 'open'
|
|
label = u"Ouvert jusqu'à %sh%02d" % (slots[0].end.hour, slots[0].end.minute)
|
|
|
|
return mark_safe(u'<div class="badge %s"><span>%s</span></div>' % (klass, label))
|
|
|
|
|
|
@register.filter
|
|
def onlymoov_duration(string):
|
|
# take the hours and minutes components of duration strings provided by
|
|
# onlymoov, "PT1H16M3S", "PT1M35S"
|
|
try:
|
|
groups = re.match(r'PT(\d+H)?(\d+M)', string).groups()
|
|
except AttributeError: # didn't match
|
|
return '?'
|
|
hours = ''
|
|
if groups[0]:
|
|
nb_hours = int(groups[0][:-1])
|
|
if nb_hours > 1:
|
|
hours = '%s heures' % nb_hours
|
|
else:
|
|
hours = '%s heure' % nb_hours
|
|
nb_minutes = int(groups[1][:-1])
|
|
if nb_minutes == 0:
|
|
minutes = ''
|
|
else:
|
|
minutes = '%d min' % nb_minutes
|
|
return '%s %s' % (hours, minutes)
|
|
|
|
|
|
@register.filter
|
|
def place_page_url(cell):
|
|
try:
|
|
fixed_place_cell = ConfigJsonCell.objects.get(
|
|
key=cell.key,
|
|
parameters=cell.parameters,
|
|
page__template_name='place')
|
|
except ConfigJsonCell.DoesNotExist:
|
|
return ''
|
|
return fixed_place_cell.page.get_online_url()
|
|
|
|
|
|
@register.filter
|
|
def is_place_page(page):
|
|
if not page:
|
|
return False
|
|
return page.template_name == 'place'
|
|
|
|
|
|
PRODUCER_LABELS = {
|
|
'atmo': u'Air Rhône-Alpes',
|
|
'onlymoov': u'ONLY MOOV',
|
|
'tcl': u'TCL',
|
|
'toodego': u'Toodego',
|
|
'velov': u"Velo'v",
|
|
'bron': u'Bron',
|
|
'dardilly': u'Dardilly',
|
|
'oullins': u'Oullins',
|
|
'vaulx-en-velin': u'Vaulx-en-Velin',
|
|
'villeurbanne': u'Villeurbanne',
|
|
}
|
|
|
|
@register.filter
|
|
def as_producer(slug):
|
|
producer = None
|
|
if ':' in slug: # formdef_reference
|
|
slug = slug.split(':')[0]
|
|
if slug.startswith('_'):
|
|
producer = slug.split('_')[1].replace('hobo-', '')
|
|
if slug == 'eservices':
|
|
# handle collectivity sites, they are individually named
|
|
# "eservices" but have the collectivity slug as a template
|
|
# variable.
|
|
producer = settings.TEMPLATE_VARS.get('gnm_commune')
|
|
if producer and settings.TEMPLATE_VARS.get('gnm_commune_name'):
|
|
return {'slug': producer,
|
|
'label': settings.TEMPLATE_VARS.get('gnm_commune_name')}
|
|
else:
|
|
producer = slugify(slug)
|
|
try:
|
|
producer = re.search(r'(^|\W)producer-(\w*)(\W|$)', producer).group(2).strip()
|
|
except AttributeError:
|
|
pass
|
|
|
|
if settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer):
|
|
return {'slug': producer,
|
|
'label': settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer, {'title': ''})['title']}
|
|
elif settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer):
|
|
return {'slug': producer,
|
|
'label': settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer, {'title': ''})['title']}
|
|
elif producer in PRODUCER_LABELS:
|
|
return {'slug': producer, 'label': PRODUCER_LABELS[producer]}
|
|
else:
|
|
return {'slug': 'grandlyon', 'label': 'Grand Lyon'}
|
|
|
|
@register.filter
|
|
def as_commune(user_data):
|
|
if not user_data:
|
|
return None
|
|
city = user_data.get('city') or user_data.get('address_city')
|
|
if city:
|
|
# first look for known portals
|
|
collectivities = get_gnm_collectivities()
|
|
for collectivity in collectivities:
|
|
if collectivity.get('label') == city:
|
|
return {
|
|
'label': city,
|
|
'slug': slugify(city),
|
|
'url': collectivity['url']
|
|
}
|
|
# if not found look in mairie pages
|
|
pages = Page.objects.filter(parent__slug='mairie',
|
|
slug__icontains=slugify(city)).exclude(slug__icontains='annexe')
|
|
if pages.exists():
|
|
return {
|
|
'label': city,
|
|
'slug': slugify(city),
|
|
'url': pages[0].get_online_url(),
|
|
}
|
|
return None
|
|
|
|
|
|
@register.assignment_tag
|
|
def get_suggestions(request, cell, user_data, places_data):
|
|
tile_data = []
|
|
addresses = []
|
|
city = user_data.get('city') or user_data.get('address_city')
|
|
if city:
|
|
# get commune tile for the user city
|
|
maplayer = MapLayer.objects.get(slug='mairie')
|
|
try:
|
|
data_result = requests.get(maplayer.geojson_url, timeout=2,
|
|
without_user=True, cache_duration=300).json()
|
|
except RequestException:
|
|
pass
|
|
else:
|
|
city_slug = slugify(city)
|
|
if data_result.get('features'):
|
|
for feature in data_result['features']:
|
|
if 'Annexe' in feature['properties']['nom']:
|
|
continue
|
|
if city_slug in slugify(feature['properties']['nom']):
|
|
tile_data.append({'key': maplayer.slug,
|
|
'properties': feature['properties']})
|
|
break
|
|
|
|
if random.random() < 0.3:
|
|
tile_data.append({'key': 'airquality'})
|
|
if random.random() < 0.1:
|
|
tile_data.append({'key': 'pollen'})
|
|
|
|
if user_data.get('address_street'):
|
|
if not user_data.get('address_number'):
|
|
user_data['address_number'] = ''
|
|
addresses.append(u'%(address_number)s %(address_street)s, %(address_city)s, France' % user_data)
|
|
|
|
if places_data:
|
|
for place_data in places_data.get('data'):
|
|
addresses.append(u'%(adresse)s, %(ville)s, France' % place_data['content'])
|
|
|
|
coords = []
|
|
nominatim_url = 'https://nominatim.entrouvert.org'
|
|
for address in addresses:
|
|
url = '%s/search?q=%s&accept-language=fr&format=json' % (
|
|
nominatim_url, urllib2.quote(address.encode('utf-8')))
|
|
try:
|
|
search_result = requests.get(url, timeout=2, without_user=True,
|
|
cache_duration=300).json()
|
|
except RequestException:
|
|
continue
|
|
if not search_result:
|
|
continue
|
|
coords.append({'lon': search_result[0]['lon'], 'lat': search_result[0]['lat']})
|
|
|
|
for coord in coords:
|
|
lat1, lat2 = float(coord['lat']) - 0.008, float(coord['lat']) + 0.008
|
|
lon1, lon2 = float(coord['lon']) - 0.006, float(coord['lon']) + 0.006
|
|
for maplayer in MapLayer.objects.filter(slug__in=('velov', 'piscine', 'tcl')):
|
|
url = maplayer.geojson_url + '&BBOX=%s,%s,%s,%s' % (lat1, lon1, lat2, lon2)
|
|
try:
|
|
data_result = requests.get(url, timeout=2, without_user=True,
|
|
cache_duration=300).json()
|
|
except RequestException:
|
|
continue
|
|
features = data_result.get('features')
|
|
if not features:
|
|
continue
|
|
for feature in features:
|
|
# thanks to the flat earth society
|
|
feature['distance'] = math.sqrt(
|
|
(float(coord['lon']) - feature['geometry']['coordinates'][0])**2 +
|
|
(float(coord['lat']) - feature['geometry']['coordinates'][1])**2)
|
|
features.sort(key=lambda x: x['distance'])
|
|
# take two closest features
|
|
for feature in features[:2]:
|
|
tile_data.append({'key': maplayer.slug, 'properties': feature['properties']})
|
|
|
|
dashboard = DashboardCell.objects.all()[0]
|
|
cells = []
|
|
seen = {}
|
|
for data in tile_data:
|
|
cell = ConfigJsonCell(key=data['key'], order=1,
|
|
page_id=cell.page_id, placeholder='_auto_tile')
|
|
cell_form_keys = [x['varname'] for x in settings.JSON_CELL_TYPES[cell.key].get('form') or {}]
|
|
cell.parameters = {}
|
|
for key in cell_form_keys:
|
|
cell.parameters[key] = data['properties'].get(key)
|
|
cell_uid = repr((data['key'], cell.parameters))
|
|
if cell_uid in seen:
|
|
continue
|
|
seen[cell_uid] = True
|
|
cell.save()
|
|
cells.append(render_cell(request, cell=cell).content)
|
|
|
|
random.shuffle(cells)
|
|
return cells[:5]
|
|
|
|
@register.assignment_tag
|
|
def get_gnm_portal_url():
|
|
if '_interco_portal' in settings.KNOWN_SERVICES['combo']:
|
|
return settings.KNOWN_SERVICES['combo']['_interco_portal'].get('url')
|
|
return settings.KNOWN_SERVICES['combo']['portal'].get('url')
|
|
|
|
@register.assignment_tag
|
|
def get_gnm_collectivities():
|
|
collectivities = []
|
|
for key in settings.KNOWN_SERVICES['combo']:
|
|
if not key.endswith('_portal'):
|
|
continue
|
|
matching_hobo = settings.KNOWN_SERVICES['hobo'].get(key.split('_portal')[0][1:])
|
|
if not matching_hobo:
|
|
continue
|
|
if matching_hobo['title'] in ('SAU'): # blacklist
|
|
continue
|
|
service = settings.KNOWN_SERVICES['combo'][key]
|
|
collectivities.append({'url': service.get('url'), 'label': matching_hobo['title']})
|
|
collectivities.sort(key=lambda x: x['label'])
|
|
return collectivities
|