# -*- coding: utf-8 -*- # combo-plugin-gnm - Combo GNM plugin # Copyright (C) 2017 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from collections import OrderedDict import datetime import json import math import operator import random import re from dateutil.parser import parse as dateutil_parse from requests import RequestException from django import template from django.conf import settings from django.core.serializers.json import DjangoJSONEncoder from django.core import signing from django.utils.dateparse import parse_datetime from django.utils.http import quote from django.utils.html import format_html from django.utils.text import slugify from django.utils.timezone import is_naive, make_aware from django.utils.safestring import mark_safe from combo.apps.dashboard.models import DashboardCell from combo.apps.maps.models import Map, MapLayer from combo.data.models import Page, CellBase, ConfigJsonCell from combo.public.views import render_cell from combo.utils import requests register = template.Library() FR_WEEKDAYS = ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche'] EN_ABBREV_WEEKDAYS = OrderedDict([ ('mo', 'Monday'), ('tu', 'Tuesday'), ('we', 'Wednesday'), ('th', 'Thursday'), ('fr', 'Friday'), ('sa', 'Saturday'), ('su', 'Sunday') ]) EN_ABBREV_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.keys()) EN_FULL_WEEKDAYS_LIST = list(EN_ABBREV_WEEKDAYS.values()) FR_ABBREV_WEEKDAYS_LIST = OrderedDict(zip(EN_ABBREV_WEEKDAYS_LIST, FR_WEEKDAYS)) class TimeSlot(object): def __init__(self, start, end): if is_naive(start): start = make_aware(start) if is_naive(end): end = make_aware(end) self.start = start self.end = end def __repr__(self): return '' % (self.start.strftime('%c'), self.end.strftime('%c')) def get_open_close_from_specification(specification, valid_from, base_datetime): '''Parse geojson 'openinghoursspecification' fields data ''' opening_time = datetime.datetime.combine(base_datetime, dateutil_parse(specification['opens']).time()) closing_time = datetime.datetime.combine(base_datetime, dateutil_parse(specification['closes']).time()) opening_time = opening_time.replace(tzinfo=valid_from.tzinfo) closing_time = closing_time.replace(tzinfo=valid_from.tzinfo) day_number = EN_FULL_WEEKDAYS_LIST.index(specification['dayOfWeek'].split('/')[-1]) opening_time = opening_time + datetime.timedelta( days=(7 + (day_number - opening_time.weekday())) % 7) closing_time = closing_time + datetime.timedelta( days=(7 + (day_number - closing_time.weekday())) % 7) return (opening_time, closing_time, day_number) def openinghours_to_datetime(codename, hour, minute): """ return the next date and time after now() """ try: weekday = EN_ABBREV_WEEKDAYS.get(codename, None) datetime_obj = dateutil_parse('%s %d:%d:00' % (weekday, hour, minute)) if is_naive(datetime_obj): datetime_obj = make_aware(datetime_obj) return datetime_obj except ValueError: return None def get_period_from_data(weekday, open_close_time_string=None, opening_time=None, closing_time=None): """Return am or pm and all_day_hours from opening_time and closing_time """ if open_close_time_string is not None: (start_hour, start_minute, end_hour, end_minute) = open_close_time_string if closing_time is None: closing_time = openinghours_to_datetime(weekday, int(end_hour), int(end_minute)) if opening_time is None: opening_time = openinghours_to_datetime(weekday, int(start_hour), int(start_minute)) all_day_hours = False if closing_time.hour <= 12: period = 'am' elif opening_time.hour <= 12: period = 'am' all_day_hours = True else: period = 'pm' return (period, all_day_hours) def get_slots_from_mdr_format(data, today): """Process data from Maison du rhone geojson data from data.grandlyon.fr (/ws/grandlyon/ter_territoire.maison_du_rhone/all.json) add to slots all the next opening hours in chronological order & beginning from today() """ if 'properties' in data: data = data['properties'] slots = [] known_format = False mdr_weekdays_format = ['%s_am' % day for day in FR_WEEKDAYS] + ['%s_pm' % day for day in FR_WEEKDAYS] if any([re.search('|'.join(mdr_weekdays_format), data_key) is not None for data_key in data.keys()]): known_format = True for i in range(7): for period in ('am', 'pm'): hours = data.get('%s_%s' % (FR_WEEKDAYS[today.weekday()], period)) if not hours: continue try: parts = re.match('(\d?\d)h(\d\d)-(\d?\d)h(\d\d)', hours).groups() except AttributeError: continue # add to slots the opening hours in chronological order beginning from today slots.append(TimeSlot( datetime.datetime(today.year, today.month, today.day, int(parts[0]), int(parts[1]), tzinfo=today.tzinfo), datetime.datetime(today.year, today.month, today.day, int(parts[2]), int(parts[3]), tzinfo=today.tzinfo) )) today = today + datetime.timedelta(days=1) return (slots, known_format) def parse_opening_hours_data(mairie_data): """Parse every known openinghours data formats """ for openinghours in mairie_data.get('openinghours', []): # format is comma-separated days and/or intervals, or only one day try: groups = re.match('(\w\w(?:(?:,|\-)?\w\w)*) (\d\d?):(\d\d?)-(\d\d?):(\d\d?)', openinghours).groups() except AttributeError: # invalid input data continue for day in groups[0].split(','): if '-' in day: # interval parts = re.match('(\w\w)-(\w\w)', day).groups() + groups[1:] time_table = dict(zip(('start_day', 'end_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'), parts)) days_list = EN_ABBREV_WEEKDAYS_LIST[ EN_ABBREV_WEEKDAYS_LIST.index(time_table['start_day'].lower()): EN_ABBREV_WEEKDAYS_LIST.index(time_table['end_day'].lower()) + 1] else: # one day time_table = dict(zip(('start_day', 'start_hour', 'start_minute', 'end_hour', 'end_minute'), (day,) + groups[1:])) days_list = [EN_ABBREV_WEEKDAYS_LIST[ EN_ABBREV_WEEKDAYS_LIST.index(time_table['start_day'].lower())]] yield (days_list, time_table) def get_slots_from_mairie_format(data, base_datetime): """Process mairie json and return slots the opening hours in chronological order beginning today """ if 'properties' in data: data = data['properties'] known_format = False slots = [] exclusion_slots = [] previous_week = base_datetime - datetime.timedelta(7) next_week = base_datetime + datetime.timedelta(7) if len(data.get('openinghours', [])) or len(data.get('openinghoursspecification', [])): known_format = True # prepare annual opening exclusions for specification in data.get('openinghoursspecification', []): valid_from = parse_datetime(specification.get('validFrom')) if specification.get('validFrom') else previous_week valid_through = parse_datetime(specification.get('validThrough')) if specification.get('validThrough') else next_week if not valid_from or not valid_through: continue if 'opens' in specification and 'closes' in specification: # case when opening periods are defined if base_datetime >= valid_from and base_datetime < valid_through: (opening_time, closing_time, day_number) = get_open_close_from_specification( specification, valid_from, base_datetime) slots.append(TimeSlot(opening_time, closing_time)) else: # case when exclusions are defined exclusion_slots.append(TimeSlot(valid_from, valid_through)) for days_list, time_table in parse_opening_hours_data(data): for weekday in days_list: timeslot = TimeSlot( openinghours_to_datetime(weekday, int(time_table['start_hour']), int(time_table['start_minute'])), openinghours_to_datetime(weekday, int(time_table['end_hour']), int(time_table['end_minute'])) ) # add to slots the opening hours in chronological order beginning from today slots.append(timeslot) # order slots and cycle the list beginning with 'base_datetime' slots = sorted(slots, key=operator.attrgetter('start')) if len(slots): def timedelta_key_func(slot): return slot.start - base_datetime nearest_slot_index = slots.index(min(slots, key=timedelta_key_func)) slots = slots[nearest_slot_index:] + slots[:nearest_slot_index] return (slots, exclusion_slots, known_format) @register.assignment_tag def get_mairie_opening_hours(mairie_data): """Process Mairie Geojson to extract data of each day's opening hours """ if not mairie_data: return '' if 'properties' in mairie_data: mairie_data = mairie_data['properties'] base_datetime = make_aware(datetime.datetime.now()) days_list = [] opening_hours_dict = OrderedDict(zip(EN_ABBREV_WEEKDAYS_LIST, [{ 'am': None, 'pm': None } for i in range(7)])) known_format = False for days_list, time_table in parse_opening_hours_data(mairie_data): known_format = True for weekday in days_list: (period, all_day_hours) = get_period_from_data(weekday, open_close_time_string=(time_table['start_hour'], time_table['start_minute'], time_table['end_hour'], time_table['end_minute'])) if all_day_hours and period == 'am': opening_hours_dict[weekday]['pm'] = '' # empty string to avoid displaying fermé opening_hours_dict[weekday][period] = "%sh%s-%sh%s" % (time_table['start_hour'], time_table['start_minute'], time_table['end_hour'], time_table['end_minute']) if not known_format: # some mairie only have openinghoursspecification (e.g. Jonage) previous_week = base_datetime - datetime.timedelta(7) next_week = base_datetime + datetime.timedelta(7) for specification in mairie_data.get('openinghoursspecification', []): valid_from = parse_datetime(specification.get('validFrom')) if specification.get('validFrom') else previous_week valid_through = parse_datetime(specification.get('validThrough')) if specification.get('validThrough') else next_week if not valid_from or not valid_through: continue # case when opening periods are defined if 'opens' in specification and 'closes' in specification: # parse specification only for the current period relative to utcnow() if base_datetime >= valid_from and base_datetime < valid_through: (opening_time, closing_time, day_number) = get_open_close_from_specification( specification, valid_from, base_datetime) abbr_day_of_week = EN_ABBREV_WEEKDAYS_LIST[day_number] (period, all_day_hours) = get_period_from_data(abbr_day_of_week, opening_time=opening_time, closing_time=closing_time) if all_day_hours and period == 'am': opening_hours_dict[abbr_day_of_week]['pm'] = '' # empty string to avoid displaying fermé opening_hours_dict[abbr_day_of_week][period] = "%sh%s-%sh%s" % ( opening_time.strftime('%H'), opening_time.strftime('%M'), closing_time.strftime('%H'), closing_time.strftime('%M')) if not (any([x['am'] for x in opening_hours_dict.values()]) or any([x['pm'] for x in opening_hours_dict.values()])): # always closed, return None to mark unavailability return None return [ (FR_ABBREV_WEEKDAYS_LIST[weekday], hours) for weekday, hours in opening_hours_dict.items() if not (weekday in ['sa', 'su'] and not hours['am'] and not hours['pm']) ] @register.filter def as_opening_hours_badge(data): if not data: return '' base_datetime = make_aware(datetime.datetime.now()) exclusion_slots = [] today = base_datetime.date() (slots, known_format) = get_slots_from_mdr_format(data, base_datetime) if not known_format: (slots, exclusion_slots, known_format) = get_slots_from_mairie_format(data, base_datetime) if not known_format: return '' # remove past slots and exclude special timeslots for i, slot in enumerate(slots): if base_datetime > slot.end: slots[i] = None else: for exclusion in exclusion_slots: if slot.start >= exclusion.start and slot.end <= exclusion.end: slots[i] = None # parse slots to return the right html slots = [x for x in slots if x] if not slots: klass = 'closed' label = u'Fermé' elif base_datetime < slots[0].start: klass = 'closed' verb = u'Réouvre' if slots[0].start.weekday() == today.weekday(): day_label = '' if slots[0].start.hour < 12: verb = 'Ouvre' elif slots[0].start.weekday() == (today.weekday() + 1) % 7: day_label = u'demain' else: day_label = FR_WEEKDAYS[slots[0].start.weekday()] label = u'%s %s à %sh%02d' % (verb, day_label, slots[0].start.hour, slots[0].start.minute) elif base_datetime < slots[0].end: if (slots[0].end - base_datetime).seconds < 3600: klass = 'soon-to-be-closed' else: klass = 'open' label = u"Ouvert jusqu'à %sh%02d" % (slots[0].end.hour, slots[0].end.minute) return mark_safe(u'
%s
' % (klass, label)) @register.filter def onlymoov_duration(string): # take the hours and minutes components of duration strings provided by # onlymoov, "PT1H16M3S", "PT1M35S" try: groups = re.match(r'PT(\d+H)?(\d+M)', string).groups() except AttributeError: # didn't match return '?' hours = '' if groups[0]: nb_hours = int(groups[0][:-1]) if nb_hours > 1: hours = '%s heures' % nb_hours else: hours = '%s heure' % nb_hours nb_minutes = int(groups[1][:-1]) if nb_minutes == 0: minutes = '' else: minutes = '%d min' % nb_minutes return '%s %s' % (hours, minutes) @register.filter def place_page(cell): try: fixed_place_cell = ConfigJsonCell.objects.get( key=cell.key, parameters=cell.parameters, page__template_name='place') except ConfigJsonCell.DoesNotExist: return None return fixed_place_cell.page @register.filter def place_page_url(cell): page = place_page(cell) if page is None: return '' return page.get_online_url() @register.filter def is_place_page(page): if not page: return False return page.template_name == 'place' @register.assignment_tag def get_tile_picture_size(page): if is_place_page(page): return '1300' return '300x300' @register.filter def as_producer(slug, default_slug=None): COLLECTIVITY_UNACCENT_LABELS = {x: slugify(y) for x, y in settings.COLLECTIVITY_LABELS.items()} if isinstance(slug, dict): # actually a form if slug.get('form_digest'): parenthesis = re.match(r'.*\((.*)\)', slug['form_digest']) if parenthesis: city_name = parenthesis.group(1) if city_name in settings.COLLECTIVITY_LABELS.values(): collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == city_name][0] return {'slug': collectivity[0], 'label': collectivity[1]} city_slug = slugify(city_name) if city_slug in COLLECTIVITY_UNACCENT_LABELS.values(): collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == city_slug][0] return {'slug': collectivity[0], 'label': collectivity[1]} for keyword in slug.get('keywords') or []: if keyword.startswith('producer-'): slug = keyword.split('-', 1)[1] break else: slug = slug.get('site_slug') producer = None if ':' in slug: # formdef_reference slug = slug.split(':')[0] if slug.startswith('_'): producer = slug.split('_')[1].replace('hobo-', '') else: producer = slug if slug == 'eservices': # handle collectivity sites, they are individually named # "eservices" but have the collectivity slug as a template # variable. producer = settings.TEMPLATE_VARS.get('gnm_commune', 'grandlyon') if producer and settings.TEMPLATE_VARS.get('gnm_commune_name'): return {'slug': producer, 'label': settings.TEMPLATE_VARS.get('gnm_commune_name')} try: producer = re.search(r'(^|\W)producer-([\w-]*)(\W|$)', producer).group(2).strip() except AttributeError: pass if producer.startswith('Lyon '): # assume sth like "Lyon 7eme" producer = 'Lyon' producer_slug = slugify(producer) if settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer): return {'slug': producer, 'label': settings.KNOWN_SERVICES['hobo'].get('hobo-%s' % producer, {'title': ''})['title']} elif settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer): return {'slug': producer, 'label': settings.KNOWN_SERVICES['hobo'].get('_interco_hobo-%s' % producer, {'title': ''})['title']} elif producer in settings.PRODUCER_LABELS: return {'slug': producer, 'label': settings.PRODUCER_LABELS[producer]} elif producer in settings.COLLECTIVITY_LABELS.values(): collectivity = [x for x in settings.COLLECTIVITY_LABELS.items() if x[1] == producer][0] return {'slug': collectivity[0], 'label': collectivity[1]} elif producer_slug in COLLECTIVITY_UNACCENT_LABELS.values(): collectivity = [x for x in COLLECTIVITY_UNACCENT_LABELS.items() if x[1] == producer_slug][0] return {'slug': collectivity[0], 'label': collectivity[1]} elif default_slug: return as_producer(default_slug) else: return {'slug': 'toodego', 'label': 'Toodego'} @register.filter def as_commune(user_data): if not user_data: return None if isinstance(user_data, basestring): # user_data is expected to be (page) slug collectivities = get_gnm_collectivities() for collectivity in collectivities: if slugify(collectivity['label']) in user_data: collectivity['gnm'] = True return collectivity return None city = user_data.get('city') or user_data.get('address_city') if city: # first look for known portals collectivities = get_gnm_collectivities() for collectivity in collectivities: if collectivity.get('label') == city: return { 'label': city, 'slug': slugify(city), 'url': collectivity['url'], 'gnm': True, } # if not found look in mairie pages pages = Page.objects.filter(parent__slug='mairie', slug__icontains=slugify(city)).exclude(slug__icontains='annexe') if pages.exists(): return { 'label': city, 'slug': slugify(city), 'url': pages[0].get_online_url(), } return None @register.assignment_tag def get_suggestions(request, cell, user_data, places_data): tile_data = [] addresses = [] city = user_data.get('city') or user_data.get('address_city') if city: # get commune tile for the user city maplayer = MapLayer.objects.get(slug='mairie') try: data_result = requests.get(maplayer.geojson_url, timeout=2, without_user=True, cache_duration=300).json() except RequestException: pass else: city_slug = slugify(city) if data_result.get('features'): for feature in data_result['features']: if 'Annexe' in feature['properties']['nom']: continue if city_slug in slugify(feature['properties']['nom']): tile_data.append({'key': maplayer.slug, 'properties': feature['properties']}) break if random.random() < 0.3: tile_data.append({'key': 'airquality'}) if random.random() < 0.1: tile_data.append({'key': 'pollen'}) if user_data.get('address_street'): if not user_data.get('address_number'): user_data['address_number'] = '' addresses.append(u'%(address_number)s %(address_street)s, %(address_city)s, France' % user_data) if places_data: for place_data in places_data.get('data'): addresses.append(u'%(adresse)s, %(ville)s, France' % place_data['content']) coords = [] nominatim_url = 'https://nominatim.entrouvert.org' for address in addresses: url = '%s/search?q=%s&accept-language=fr&format=json' % ( nominatim_url, quote(address.encode('utf-8'))) try: search_result = requests.get(url, timeout=2, without_user=True, cache_duration=300).json() except RequestException: continue if not search_result: continue coords.append({'lon': search_result[0]['lon'], 'lat': search_result[0]['lat']}) for coord in coords: lat1, lat2 = float(coord['lat']) - 0.008, float(coord['lat']) + 0.008 lon1, lon2 = float(coord['lon']) - 0.006, float(coord['lon']) + 0.006 for maplayer in MapLayer.objects.filter(slug__in=('velov', 'piscine', 'tcl')): url = maplayer.geojson_url + '&BBOX=%s,%s,%s,%s' % (lat1, lon1, lat2, lon2) try: data_result = requests.get(url, timeout=2, without_user=True, cache_duration=300).json() except RequestException: continue features = data_result.get('features') if not features: continue for feature in features: # thanks to the flat earth society feature['distance'] = math.sqrt( (float(coord['lon']) - feature['geometry']['coordinates'][0])**2 + (float(coord['lat']) - feature['geometry']['coordinates'][1])**2) features.sort(key=lambda x: x['distance']) # take two closest features for feature in features[:2]: tile_data.append({'key': maplayer.slug, 'properties': feature['properties']}) dashboard = DashboardCell.objects.all()[0] cells = [] seen = {} for data in tile_data: cell = ConfigJsonCell(key=data['key'], order=1, page_id=cell.page_id, placeholder='_auto_tile') cell_form_keys = [x['varname'] for x in settings.JSON_CELL_TYPES[cell.key].get('form') or {}] cell.parameters = {} for key in cell_form_keys: cell.parameters[key] = data['properties'].get(key) cell_uid = repr((data['key'], cell.parameters)) if cell_uid in seen: continue seen[cell_uid] = True cell.save() cells.append(render_cell(request, cell=cell).content) random.shuffle(cells) return cells[:5] @register.assignment_tag def get_gnm_portal_url(): if '_interco_portal' in settings.KNOWN_SERVICES['combo']: return settings.KNOWN_SERVICES['combo']['_interco_portal'].get('url') return settings.KNOWN_SERVICES['combo']['portal'].get('url') @register.assignment_tag def get_gnm_collectivities(): collectivities = [] for key in settings.KNOWN_SERVICES['combo']: if not key.endswith('_portal'): continue matching_hobo = settings.KNOWN_SERVICES['hobo'].get(key.split('_portal')[0][1:]) if not matching_hobo: continue if matching_hobo['title'] in ('SAU', 'Villeurbanne'): # blacklist continue service = settings.KNOWN_SERVICES['combo'][key] collectivities.append({'url': service.get('url'), 'label': matching_hobo['title']}) collectivities.sort(key=lambda x: x['label']) return collectivities @register.inclusion_tag('combo/gnm/place_map.html') def gnm_place_map(lat, lng): map_cell = Map() map_cell.initial_zoom = '17' map_cell.min_zoom = '17' map_cell.max_zoom = '17' context = map_cell.get_cell_extra_context({}) context['init_lat'] = lat context['init_lng'] = lng return context @register.inclusion_tag('combo/gnm/airquality_map.html', takes_context=True) def gnm_airquality_map(context): map_cell = Map() map_cell.initial_state = 'device-location' map_cell.initial_zoom = '15' map_cell.min_zoom = '10' map_cell.max_zoom = '19' context.push(map_cell.get_cell_extra_context({})) return context _json_script_escapes = { ord('>'): '\\u003E', ord('<'): '\\u003C', ord('&'): '\\u0026', } @register.filter(is_safe=True) def json_script(value, element_id): json_str = json.dumps(value, cls=DjangoJSONEncoder) json_str = json_str.replace('>', '\\u003E').replace('<', '\\u003C').replace('&', '\\u0026') return format_html( '', element_id, mark_safe(json_str) ) @register.assignment_tag def get_goto_cell(page, request): try: cell = ConfigJsonCell.objects.get(id=request.GET['to']) except (ConfigJsonCell.DoesNotExist, KeyError): return None if cell.page.template_name != 'place': return cell # create an alternate version of cell cell.id = None cell.placeholder = '_auto_tile' cell.page = page cell.save() return cell @register.assignment_tag def get_collectivity_slugs(): return settings.COLLECTIVITY_LABELS.keys() @register.filter def indice_values(indices): for key in ('indice_j-1', 'indice_j', 'indice_j+1'): if indices.get(key): yield indices.get(key) @register.filter def airquality_hack(cell, request): if cell.key == 'airquality' and not cell.parameters: # Cell on airquality dynamic page, it has empty cell.parameters as it # gets those from the query string. In order to get un/favorite link # to work we need to duplicate the cell into a concrete object with a # copy of query parameters within. if request.GET.get('ctx'): ctx = signing.loads(request.GET['ctx']) lon, lat = ctx['q_lon'], ctx['q_lat'] else: lon, lat = request.path.split('/')[-2].split(',') cell.parameters = {'lon': lon, 'lat': lat} cell.placeholder = '_auto_tile' cell.id = None cell.save() return cell