# w.c.s. - web application for online forms # Copyright (C) 2005-2012 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import collections import collections.abc import hashlib import urllib.parse import xml.etree.ElementTree as ET from django.template import TemplateSyntaxError, VariableDoesNotExist from django.utils.encoding import force_bytes, force_text from quixote import get_publisher, get_request, get_session from quixote.html import TemplateIO from .api_utils import sign_url_auto_orig from .categories import DataSourceCategory from .qommon import _, force_str, get_logger, misc from .qommon.afterjobs import AfterJob from .qommon.cron import CronJob from .qommon.form import CompositeWidget, OptGroup, SingleSelectWidget, StringWidget from .qommon.humantime import seconds2humanduration from .qommon.misc import get_variadic_url from .qommon.publisher import get_publisher_class from .qommon.storage import StorableObject from .qommon.template import Template from .qommon.templatetags.qommon import unlazy from .qommon.xml_storage import XmlStorableObject data_source_functions = {} class DataSourceError(Exception): pass def register_data_source_function(function, function_name=None): if not function_name: function_name = function.__name__ if function_name not in data_source_functions: data_source_functions[function_name] = function class DataSourceSelectionWidget(CompositeWidget): def __init__( self, name, value=None, allow_jsonp=True, allow_geojson=False, allow_named_sources=True, **kwargs ): CompositeWidget.__init__(self, name, value, **kwargs) if not value: value = {} options = [(None, _('None'), None)] if allow_named_sources: from wcs.carddef import CardDef cards_options = [ (t[2], t[1], t[2], {'data-goto-url': t[0].get_admin_url()}) for t in CardDef.get_carddefs_as_data_source() ] if not get_publisher().get_backoffice_root().is_accessible('cards'): cards_options = [x[:3] for x in cards_options] cards_options.sort(key=lambda x: misc.simplify(x[1])) if cards_options: options.append(OptGroup(_('Cards'))) options.extend(cards_options) admin_accessible = NamedDataSource.is_admin_accessible() nds_options = [] nds_agenda_options = [] nds_users_options = [] for ds in NamedDataSource.select(): option = [ ds.slug, ds.name, ds.slug, { 'data-type': ds.type, 'data-maybe-datetimes': 'true' if ds.maybe_datetimes() else 'false', }, ] if admin_accessible: option[-1]['data-goto-url'] = ds.get_admin_url() if ds.external == 'agenda': nds_agenda_options.append(option) elif ds.type == 'wcs:users': nds_users_options.append(option) else: option.append(ds.category) nds_options.append(option) nds_agenda_options.sort(key=lambda x: misc.simplify(x[1])) if nds_agenda_options: options.append(OptGroup(_('Agendas'))) options.extend(nds_agenda_options) nds_users_options.sort(key=lambda x: misc.simplify(x[1])) if nds_users_options: options.append(OptGroup(_('Users'))) options.extend(nds_users_options) nds_options.sort(key=lambda x: misc.simplify(x[1])) if nds_options: nds_by_category_names = collections.defaultdict(list) for nds in nds_options: name = '' if nds[-1]: name = nds[-1].name nds_by_category_names[name].append(nds[:-1]) category_names = list(nds_by_category_names.keys()) if len(category_names) == 1 and category_names[0] == '': # no category found options.append(OptGroup(_('Manually Configured Data Sources'))) options.extend(nds_options) else: # sort categories category_names = sorted(category_names) # datasources without categories at the end if category_names[0] == '': category_names = category_names[1:] + [''] # group by category name for name in category_names: options.append(OptGroup(name or _('Without category'))) options.extend(nds_by_category_names[name]) if len(options) > 1: options.append(OptGroup(_('Generic Data Sources'))) options.append(('json', _('JSON URL'), 'json', {'data-maybe-datetimes': 'true'})) if allow_jsonp and ( value.get('type') == 'jsonp' or not get_publisher().has_site_option('disable-jsonp-sources') ): options.append(('jsonp', _('JSONP URL'), 'jsonp')) if allow_geojson: options.append(('geojson', _('GeoJSON URL'), 'geojson')) if not get_publisher().has_site_option('disable-python-expressions'): options.append(('formula', _('Python Expression'), 'python')) self.add( SingleSelectWidget, 'type', options=options, value=value.get('type'), attrs={'data-dynamic-display-parent': 'true'}, ) self.parse() if not self.value: self.value = {} self.add( StringWidget, 'value', value=value.get('value'), size=80, attrs={ 'data-dynamic-display-child-of': 'data_source$type', 'data-dynamic-display-value-in': 'json|jsonp|geojson|python', }, ) self._parsed = False def _parse(self, request): values = {} for name in ('type', 'value'): value = self.get(name) if value: values[name] = value if values.get('type') in ('json', 'jsonp', 'geojson'): url = values.get('value') or '' if url and not Template.is_template_string(url): parsed = urllib.parse.urlparse(url) if not (parsed.scheme and parsed.netloc): self.error = _('Value must be a full URL.') if values.get('type', '') in ('none', ''): values = None self.value = values or None def render_content(self): r = TemplateIO(html=True) for widget in self.get_widgets(): r += widget.render_content() return r.getvalue() def get_items(data_source, include_disabled=False, mode=None): structured_items = get_structured_items(data_source, mode=mode, include_disabled=include_disabled) tupled_items = [] for item in structured_items: tupled_items.append((str(item['id']), str(item['text']), str(item.get('key', item['id'])), item)) return tupled_items def get_id_by_option_text(data_source, text_value): data_source = get_object(data_source) if data_source: if data_source.data_source.get('type') == 'json' and data_source.query_parameter: url = data_source.get_json_query_url() url += urllib.parse.quote(text_value) items = request_json_items(url, data_source.extended_data_source) else: items = get_structured_items(data_source.extended_data_source, include_disabled=False) # fallback to iterating on all options for option in items: # get raw value from display value if option['text'] == text_value: return str(option['id']) def get_json_from_url(url, data_source=None, log_message_part='JSON data source'): url = sign_url_auto_orig(url) data_source = data_source or {} data_key = data_source.get('data_attribute') or 'data' geojson = data_source.get('type') == 'geojson' error_summary = None try: entries = misc.json_loads(misc.urlopen(url).read()) if not isinstance(entries, dict): raise ValueError('not a json dict') if entries.get('err') not in (None, 0, "0"): raise ValueError('err %s' % entries['err']) if geojson: if not isinstance(entries.get('features'), list): raise ValueError('bad geojson format') else: # data_key can be "data.foo.bar.results" keys = data_key.split('.') data = entries for key in keys[:-1]: if not isinstance(data.get(key), dict): raise ValueError('not a json dict with a %s list attribute' % data_key) data = data[key] if not isinstance(data.get(keys[-1]), list): raise ValueError('not a json dict with a %s list attribute' % data_key) return entries except misc.ConnectionError as e: error_summary = 'Error loading %s (%s)' % (log_message_part, str(e)) except (ValueError, TypeError) as e: error_summary = 'Error reading %s output (%s)' % (log_message_part, str(e)) if data_source: get_publisher().record_error( error_summary, context='[DATASOURCE]', notify=data_source.get('notify_on_errors'), record=data_source.get('record_on_errors'), ) return None def request_json_items(url, data_source): entries = get_json_from_url(url, data_source) if entries is None: return None data_key = data_source.get('data_attribute') or 'data' id_attribute = data_source.get('id_attribute') or 'id' text_attribute = data_source.get('text_attribute') or 'text' # data_key can be "data.foo.bar.results" keys = data_key.split('.') for key in keys: entries = entries[key] items = [] for item in entries: # skip malformed items if not isinstance(item, dict): continue if item.get(id_attribute) is None or item.get(id_attribute) == '': continue item['id'] = item[id_attribute] if text_attribute not in item: item['text'] = str(item['id']) else: item['text'] = item[text_attribute] if not isinstance(item['text'], str): continue items.append(item) return items def request_geojson_items(url, data_source): entries = get_json_from_url(url, data_source) if entries is None: return None items = [] id_property = data_source.get('id_property') or 'id' for item in entries.get('features'): if id_property == 'id' and 'id' in item: # If a Feature has a commonly used identifier, that identifier # SHOULD be included as a member of the Feature object with the # name "id", and the value of this member is either a JSON string # or number. # -- https://tools.ietf.org/html/rfc7946#section-3.2 pass elif item.get('properties', {}).get(id_property): item['id'] = item['properties'][id_property] else: # missing id property, skip entry continue try: item['text'] = Template(data_source.get('label_template_property') or '{{ text }}').render( item['properties'] ) except (TemplateSyntaxError, VariableDoesNotExist): pass if not item.get('text'): item['text'] = item['id'] items.append(item) return items def get_structured_items(data_source, mode=None, include_disabled=True, raise_on_error=False): items = _get_structured_items(data_source, mode=mode, raise_on_error=raise_on_error) if not include_disabled: items = [i for i in items if not i.get('disabled')] return items def _get_structured_items(data_source, mode=None, raise_on_error=False): cache_duration = 0 if data_source.get('type') and data_source.get('type').startswith('carddef:'): # cards from wcs.carddef import CardDef return CardDef.get_data_source_items(data_source['type']) if data_source.get('type') not in ('json', 'jsonp', 'geojson', 'formula', 'wcs:users'): # named data source named_data_source = NamedDataSource.get_by_slug(data_source['type']) if named_data_source.cache_duration: cache_duration = int(named_data_source.cache_duration) data_source = named_data_source.extended_data_source if data_source.get('type') == 'wcs:users': users = get_publisher().user_class.get_users_with_roles( included_roles=data_source.get('included_roles'), excluded_roles=data_source.get('excluded_roles'), order_by='name', ) include_disabled_users = data_source.get('include_disabled_users') return [{'id': u.id, 'text': u.name} for u in users if u.is_active or include_disabled_users] if data_source.get('type') == 'formula': # the result of a python expression, it must be a list. # - of strings # - of dictionaries, in which case it has to have both a "id" and a # "text" keys # - of lists or tuples, in which case it may have up to three elements: # - three elements, (id, text, key) # - two elements, (id, text) # - a single element, (id,) variables = get_publisher().substitutions.get_context_variables(mode=mode) global_eval_dict = get_publisher().get_global_eval_dict() global_eval_dict.update(data_source_functions) try: value = misc.eval_python(data_source.get('value'), global_eval_dict, variables) if not isinstance(value, collections.abc.Iterable): get_publisher().record_error( 'Python data source (%r) gave a non-iterable result' % data_source.get('value'), context='[DATASOURCE]', notify=data_source.get('notify_on_errors'), record=data_source.get('record_on_errors'), ) return [] if len(value) == 0: return [] value = misc.json_encode_helper(value, get_publisher().site_charset) if isinstance(value[0], (list, tuple)): if len(value[0]) >= 3: return [{'id': x[0], 'text': x[1], 'key': x[2]} for x in value] elif len(value[0]) == 2: return [{'id': x[0], 'text': x[1]} for x in value] elif len(value[0]) == 1: return [{'id': x[0], 'text': x[0]} for x in value] return value elif isinstance(unlazy(value[0]), str): return [{'id': x, 'text': x} for x in value] elif isinstance(value[0], dict): if all(str(x.get('id', '')) and x.get('text') for x in value): return value get_publisher().record_error( 'Python data source (%r) gave a non usable result' % data_source.get('value'), context='[DATASOURCE]', notify=data_source.get('notify_on_errors'), record=data_source.get('record_on_errors'), ) return [] except Exception as exc: get_publisher().record_error( 'Failed to eval() Python data source (%r)' % data_source.get('value'), exception=exc, context='[DATASOURCE]', notify=data_source.get('notify_on_errors'), record=data_source.get('record_on_errors'), ) return [] elif data_source.get('type') in ['json', 'geojson']: # the content available at a json URL, it must answer with a dict with # a 'data' key holding the list of items, each of them being a dict # with at least both an "id" and a "text" key. geojson = data_source.get('type') == 'geojson' url = data_source.get('value') if not url: return [] url = url.strip() if Template.is_template_string(url): vars = get_publisher().substitutions.get_context_variables(mode='lazy') url = get_variadic_url(url, vars) if data_source.get('qs_data'): # merge qs_data into url from wcs.workflows import WorkflowStatusItem parsed = urllib.parse.urlparse(url) qs = list(urllib.parse.parse_qsl(parsed.query)) for key, value in data_source['qs_data'].items(): try: value = WorkflowStatusItem.compute(value, raises=True, record_errors=False) value = str(value) if value is not None else '' except Exception as e: get_publisher().record_error( _( 'Failed to compute value "%(value)s" for "%(query)s" query parameter' % {'value': value, 'query': key} ), context='[DATASOURCE]', exception=e, notify=data_source.get('notify_on_errors'), record=data_source.get('record_on_errors'), ) else: key = force_str(key) value = force_str(value) qs.append((key, value)) qs = urllib.parse.urlencode(qs) url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6]) request = get_request() if hasattr(request, 'datasources_cache') and url in request.datasources_cache: return request.datasources_cache[url] if cache_duration: cache_key = 'data-source-%s' % force_str(hashlib.md5(force_bytes(url)).hexdigest()) from django.core.cache import cache items = cache.get(cache_key) if items is not None: return items if geojson: items = request_geojson_items(url, data_source) else: items = request_json_items(url, data_source) if items is None: if raise_on_error: raise DataSourceError('datasource %s is unavailable' % url) return [] if hasattr(request, 'datasources_cache'): request.datasources_cache[url] = items if cache_duration: cache.set(cache_key, items, cache_duration) return items return [] def get_real(data_source): if not data_source: return None ds_type = data_source.get('type') if ds_type in ('json', 'jsonp', 'geojson', 'formula'): return data_source if ds_type and ds_type.startswith('carddef:'): return data_source return NamedDataSource.get_by_slug(ds_type).data_source def get_object(data_source, ignore_errors=True): if not data_source: return None ds_type = data_source.get('type') if ds_type is None: return None if ds_type in ('json', 'jsonp', 'geojson', 'formula'): named_data_source = NamedDataSource() named_data_source.data_source = data_source return named_data_source if ds_type.startswith('carddef:'): named_data_source = NamedDataSource() named_data_source.data_source = data_source return named_data_source return NamedDataSource.get_by_slug(ds_type, ignore_errors=ignore_errors) class NamedDataSource(XmlStorableObject): _names = 'datasources' _indexes = ['slug'] xml_root_node = 'datasource' backoffice_class = 'wcs.admin.data_sources.NamedDataSourcePage' verbose_name = _('Data source') verbose_name_plural = _('Data sources') name = None slug = None description = None data_source = None cache_duration = None query_parameter = None id_parameter = None data_attribute = None id_attribute = None text_attribute = None id_property = None qs_data = None label_template_property = None external = None external_status = None notify_on_errors = False record_on_errors = False users_included_roles = None users_excluded_roles = None category_id = None include_disabled_users = False SLUG_DASH = '_' # declarations for serialization XML_NODES = [ ('name', 'str'), ('slug', 'str'), ('description', 'str'), ('cache_duration', 'str'), ('query_parameter', 'str'), ('id_parameter', 'str'), ('data_attribute', 'str'), ('id_attribute', 'str'), ('text_attribute', 'str'), ('id_property', 'str'), ('qs_data', 'qs_data'), ('label_template_property', 'str'), ('external', 'str'), ('external_status', 'str'), ('data_source', 'data_source'), ('notify_on_errors', 'bool'), ('record_on_errors', 'bool'), ('users_included_roles', 'str_list'), ('users_excluded_roles', 'str_list'), ('include_disabled_users', 'bool'), ] def __init__(self, name=None): StorableObject.__init__(self) self.name = name @property def category(self): return DataSourceCategory.get(self.category_id, ignore_errors=True) @category.setter def category(self, category): if category: self.category_id = category.id elif self.category_id: self.category_id = None @property def type(self): if not self.data_source: return None return self.data_source.get('type') @property def extended_data_source(self): notify_on_errors = self.notify_on_errors record_on_errors = self.record_on_errors if getattr(get_request(), 'disable_error_notifications', None) is True: notify_on_errors = False record_on_errors = False if self.type == 'geojson': data_source = self.data_source.copy() data_source.update( { 'id_property': self.id_property, 'label_template_property': self.label_template_property, 'notify_on_errors': notify_on_errors, 'record_on_errors': record_on_errors, } ) return data_source if self.type == 'json': data_source = self.data_source.copy() data_source.update( { 'data_attribute': self.data_attribute, 'id_attribute': self.id_attribute, 'text_attribute': self.text_attribute, 'qs_data': self.qs_data, 'notify_on_errors': notify_on_errors, 'record_on_errors': record_on_errors, } ) return data_source if self.type == 'wcs:users': data_source = self.data_source.copy() data_source.update( { 'included_roles': self.users_included_roles, 'excluded_roles': self.users_excluded_roles, 'include_disabled_users': self.include_disabled_users, } ) return data_source return self.data_source def can_jsonp(self): if self.type == 'jsonp': return True if self.type == 'json' and self.query_parameter: return True if self.type and self.type.startswith('carddef:'): return True return False def maybe_datetimes(self): return self.type == 'json' and 'datetimes' in (self.data_source.get('value') or '') @property def agenda_ds(self): return self.external in ['agenda', 'agenda_manual'] @property def agenda_ds_origin(self): if self.external != 'agenda_manual': return for datasource in NamedDataSource.select(): if datasource.external != 'agenda': continue if datasource.data_source.get('value') == self.data_source.get('value'): return datasource def migrate(self): changed = False if not self.slug: # .store() will take care of setting the slug changed = True if changed: self.store(comment=_('Automatic update'), snapshot_store_user=False) def store(self, comment=None, snapshot_store_user=True, *args, **kwargs): assert not self.is_readonly() if self.slug is None: # set slug if it's not yet there self.slug = self.get_new_slug() super().store(*args, **kwargs) if get_publisher().snapshot_class: get_publisher().snapshot_class.snap( instance=self, comment=comment, store_user=snapshot_store_user ) @classmethod def is_admin_accessible(cls): for section in ('settings', 'forms', 'workflows'): if get_publisher().get_backoffice_root().is_accessible(section): return True return False def get_admin_url(self): base_url = get_publisher().get_backoffice_url() if get_request(): for section in ('settings', 'forms', 'workflows'): if get_publisher().get_backoffice_root().is_accessible(section): return '%s/%s/data-sources/%s/' % (base_url, section, self.id) # fallback to settings section section = 'settings' return '%s/%s/data-sources/%s/' % (base_url, section, self.id) def export_data_source_to_xml(self, element, attribute_name, charset, **kwargs): data_source = getattr(self, attribute_name) ET.SubElement(element, 'type').text = data_source.get('type') ET.SubElement(element, 'value').text = force_text(data_source.get('value') or '', charset) def import_data_source_from_xml(self, element, **kwargs): return { 'type': force_str(element.find('type').text), 'value': force_str(element.find('value').text or ''), } def export_qs_data_to_xml(self, element, attribute_name, *args, **kwargs): if not self.qs_data: return for (key, value) in self.qs_data.items(): item = ET.SubElement(element, 'item') if isinstance(key, str): ET.SubElement(item, 'name').text = force_text(key) else: raise AssertionError('unknown type for key (%r)' % key) if isinstance(value, str): ET.SubElement(item, 'value').text = force_text(value) else: raise AssertionError('unknown type for value (%r)' % key) def import_qs_data_from_xml(self, element, **kwargs): if element is None: return qs_data = {} for item in element.findall('item'): key = force_str(item.find('name').text) value = force_str(item.find('value').text or '') qs_data[key] = value return qs_data def get_dependencies(self): yield self.category def export_to_xml(self, include_id=False): root = super().export_to_xml(include_id=include_id) DataSourceCategory.object_category_xml_export(self, root, include_id=include_id) return root @classmethod def import_from_xml_tree(cls, tree, include_id=False, **kwargs): data_source = super().import_from_xml_tree(tree, include_id=include_id, **kwargs) DataSourceCategory.object_category_xml_import(data_source, tree, include_id=include_id) return data_source @classmethod def get_by_slug(cls, slug, ignore_errors=True): data_source = super().get_by_slug(slug, ignore_errors=ignore_errors) if data_source is None: get_logger().warning("data source '%s' does not exist" % slug) return StubNamedDataSource(name=slug) return data_source def get_json_query_url(self): url = self.get_variadic_url() if not url: return '' if '?' not in url: url += '?' + self.query_parameter + '=' else: url += '&' + self.query_parameter + '=' return url def get_jsonp_url(self): if self.type == 'jsonp': return self.data_source.get('value') token_context = {} if self.type == 'json' and self.query_parameter: json_url = self.get_json_query_url() token_context = {'url': json_url, 'data_source': self.id} elif self.type and self.type.startswith('carddef:'): token_context = {'carddef_ref': self.type} parts = self.type.split(':') if len(parts) > 2: # custom view, check if it's dynamic from wcs.carddef import CardDef from wcs.workflows import WorkflowStatusItem custom_view = CardDef.get_data_source_custom_view(self.type) if custom_view is None: get_publisher().record_error( _('Unknown custom view "%s" for CardDef "%s"') % (parts[2], parts[1]), context='[DATASOURCE]', notify=True, record=True, ) else: had_template = False for filter_key, filter_value in custom_view.filters.items(): if not Template.is_template_string(filter_value): continue custom_view.filters[filter_key] = WorkflowStatusItem.compute(filter_value) had_template = True if had_template: # keep altered custom view in token token_context = { 'carddef_ref': self.type, 'dynamic_custom_view': custom_view.id, 'dynamic_custom_view_filters': custom_view.filters, } if token_context: token_context['session_id'] = get_session().id token, created = get_publisher().token_class.get_or_create( type='autocomplete', context=token_context ) if created: token.store() return '/api/autocomplete/%s' % token.id return None def get_geojson_url(self): assert self.type == 'geojson' url = self.data_source.get('value').strip() new_url = self.get_variadic_url() if new_url != url: token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug} token, created = get_publisher().token_class.get_or_create( type='autocomplete', context=token_context ) if created: token.store() return '/api/geojson/%s' % token.id return '/api/geojson/%s' % self.slug def get_geojson_data(self, force_url=None): if force_url: url = force_url else: url = self.get_variadic_url() request = get_request() if hasattr(request, 'datasources_cache') and url in request.datasources_cache: return request.datasources_cache[url] cache_duration = 0 if self.cache_duration: cache_duration = int(self.cache_duration) if cache_duration: cache_key = 'geojson-data-source-%s' % force_str(hashlib.md5(force_bytes(url)).hexdigest()) from django.core.cache import cache data = cache.get(cache_key) if data is not None: return data data = get_json_from_url(url, self.data_source) id_property = self.id_property or 'id' label_template_property = self.label_template_property or '{{ text }}' for feature in data['features']: feature['properties']['_id'] = feature['properties'][id_property] try: feature['properties']['_text'] = Template(label_template_property).render( feature['properties'] ) except (TemplateSyntaxError, VariableDoesNotExist): pass if not feature['properties'].get('_text'): feature['properties']['_text'] = feature['properties']['_id'] if hasattr(request, 'datasources_cache'): request.datasources_cache[url] = data if cache_duration: cache.set(cache_key, data, cache_duration) return data def get_value_by_id(self, param_name, param_value): url = self.get_variadic_url() if '?' not in url: url += '?' else: url += '&' url += param_name + '=' + urllib.parse.quote(param_value) def find_item(items, name, value): for item in items: if str(item.get(name)) == str(value): return item # not found get_publisher().record_error(_('Could not find element by id "%s"') % value) return None request = get_request() if hasattr(request, 'datasources_cache') and url in request.datasources_cache: items = request.datasources_cache[url] if not items: # cache may contains empty list from get_structured_items return None return find_item(items, param_name, param_value) items = request_json_items(url, self.data_source) if not items: # None or empty list are not valid return None if hasattr(request, 'datasources_cache'): request.datasources_cache[url] = items return find_item(items, param_name, param_value) def get_card_structured_value_by_id(self, option_id): from wcs.carddef import CardDef values = CardDef.get_data_source_items(self.type, get_by_id=option_id) if not values: values = CardDef.get_data_source_items(self.type, get_by_text=option_id) if not values: return None return values[0] def get_display_value(self, option_id): value = self.get_structured_value(option_id) if value: return value.get('text') return None def get_structured_value(self, option_id): value = None if self.type and self.type.startswith('carddef:'): value = self.get_card_structured_value_by_id(option_id) elif self.type == 'json' and self.id_parameter: value = self.get_value_by_id(self.id_parameter, option_id) else: structured_items = get_structured_items(self.extended_data_source, mode='lazy') for item in structured_items: if str(item['id']) == str(option_id): value = item break else: # recheck in case option label was given instead of option id. for item in structured_items: if str(item['text']) == str(option_id): value = item break if value is None: return None return value @classmethod def get_substitution_variables(cls): return {'data_source': DataSourcesSubstitutionProxy()} def type_label(self): data_source_labels = { 'wcs:users': _('Users'), 'json': _('JSON'), 'jsonp': _('JSONP'), 'geojson': _('GeoJSON'), 'formula': _('Python Expression'), } data_source_type = self.data_source.get('type') return data_source_labels.get(data_source_type) def humanized_cache_duration(self): return seconds2humanduration(int(self.cache_duration)) def get_referenced_varnames(self, formdef): from .fields import Field if self.type in ('json', 'geojson'): return Field.get_referenced_varnames(formdef, self.data_source.get('value')) # else: carddef assert self.type.startswith('carddef:'), 'data source must be carddef' from wcs.carddef import CardDef return CardDef.get_data_source_referenced_varnames(self.type, formdef=formdef) def get_variadic_url(self): url = self.data_source.get('value').strip() if url and Template.is_template_string(url): vars = get_publisher().substitutions.get_context_variables(mode='lazy') url = get_variadic_url(url, vars) return url def is_used(self): from wcs.formdef import get_formdefs_of_all_kinds for formdef in get_formdefs_of_all_kinds(): if self.is_used_in_formdef(formdef): return True return False def is_used_in_formdef(self, formdef): for field in formdef.fields or []: data_source = getattr(field, 'data_source', None) if not data_source: continue if data_source.get('type') == self.slug: return True return False class StubNamedDataSource(NamedDataSource): type = 'formula' data_source = {'type': 'formula', 'value': []} cache_duration = None def __init__(self, name=None): self.name = name def store(self): pass def __repr__(self): return '' % self.name class DataSourcesSubstitutionProxy: def __getattr__(self, attr): return get_structured_items(NamedDataSource.get_by_slug(attr).extended_data_source) def inspect_keys(self): return [] def has_chrono(publisher): return publisher.get_site_option('chrono_url') is not None def chrono_url(publisher, url): chrono_url = publisher.get_site_option('chrono_url') return urllib.parse.urljoin(chrono_url, url) def collect_agenda_data(publisher): agenda_url = chrono_url(publisher, 'api/agenda/') result = get_json_from_url(agenda_url, log_message_part='agenda') if result is None: return # build datasources from chrono agenda_data = [] for agenda in result.get('data') or []: if agenda['kind'] == 'events': agenda_data.append( { 'slug': 'agenda-%s-%s' % (agenda['kind'], agenda['id']), 'text': agenda['text'], 'url': agenda['api']['datetimes_url'], } ) elif agenda['kind'] in ['meetings', 'virtual']: agenda_data.append( { 'slug': 'agenda-%s-%s-meetingtypes' % (agenda['kind'], agenda['id']), 'text': _('%s - Meeting types') % agenda['text'], 'url': agenda['api']['meetings_url'], } ) # get also meeting types mt_url = chrono_url(publisher, 'api/agenda/%s/meetings/' % agenda['id']) mt_results = get_json_from_url(mt_url, log_message_part='agenda') if mt_results is None: return for meetingtype in mt_results.get('data') or []: agenda_data.append( { 'slug': 'agenda-%s-%s-mt-%s' % (agenda['kind'], agenda['id'], meetingtype['id']), 'text': _('%s - Slots of type %s (%s minutes)') % (agenda['text'], meetingtype['text'], meetingtype['duration']), 'url': meetingtype['api']['datetimes_url'], } ) return agenda_data def build_agenda_datasources(publisher, **kwargs): if not has_chrono(publisher): return agenda_data = collect_agenda_data(publisher) if agenda_data is None: return # fetch existing datasources existing_datasources = {} for datasource in NamedDataSource.select(): if datasource.external != 'agenda': continue existing_datasources[datasource.data_source['value']] = datasource seen_datasources = [] # build datasources from chrono for agenda in agenda_data: url = agenda['url'] datasource = existing_datasources.get(url) if datasource is None: datasource = NamedDataSource() datasource.slug = datasource.get_new_slug('chrono_ds_%s' % agenda['slug']) datasource.external = 'agenda' datasource.data_source = {'type': 'json', 'value': url} datasource.external_status = None # reset datasource.name = agenda['text'] datasource.store() # maintain caches existing_datasources[url] = datasource seen_datasources.append(url) # now check outdated agenda datasources for url, datasource in existing_datasources.items(): if url in seen_datasources: continue if datasource.is_used(): datasource.external_status = 'not-found' datasource.store() continue datasource.remove_self() class RefreshAgendas(AfterJob): label = _('Refreshing agendas') def execute(self): build_agenda_datasources(get_publisher()) def register_cronjob(): # every hour: check for agenda datasources get_publisher_class().register_cronjob( CronJob(build_agenda_datasources, name='build_agenda_datasources', minutes=[0]) )