474 lines
17 KiB
Python
474 lines
17 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2012 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import hashlib
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from django.utils import six
|
|
from django.utils.encoding import force_text, force_bytes
|
|
from django.utils.six.moves.urllib import parse as urllib
|
|
|
|
from quixote import get_publisher, get_request, get_session
|
|
from quixote.html import TemplateIO
|
|
|
|
from .qommon import _, force_str
|
|
from .qommon.form import *
|
|
from .qommon.humantime import seconds2humanduration
|
|
from .qommon.misc import get_variadic_url
|
|
from .qommon import misc
|
|
from .qommon import get_logger
|
|
|
|
from .qommon.storage import StorableObject
|
|
from .qommon.template import Template
|
|
from .qommon.xml_storage import XmlStorableObject
|
|
|
|
from .api_utils import sign_url_auto_orig
|
|
|
|
|
|
data_source_functions = {}
|
|
|
|
|
|
def register_data_source_function(function, function_name=None):
|
|
if not function_name:
|
|
function_name = function.__name__
|
|
if not function_name in data_source_functions:
|
|
data_source_functions[function_name] = function
|
|
|
|
|
|
class DataSourceSelectionWidget(CompositeWidget):
|
|
def __init__(self, name, value=None, allow_jsonp=True,
|
|
allow_named_sources=True, **kwargs):
|
|
CompositeWidget.__init__(self, name, value, **kwargs)
|
|
|
|
if not value:
|
|
value = {}
|
|
|
|
options = []
|
|
if allow_named_sources:
|
|
options.extend([(x.slug, x.name, x.slug) for x in NamedDataSource.select()])
|
|
from wcs.carddef import CardDef
|
|
options.extend([
|
|
('carddef:%s' % x.url_name, x.name, 'carddef:%s' % x.url_name)
|
|
for x in CardDef.select(lightweight=True, ignore_errors=True)
|
|
if x.digest_template])
|
|
options.sort(key=lambda x: misc.simplify(x[1]))
|
|
|
|
options.insert(0, (None, _('None'), None))
|
|
options.append(('json', _('JSON URL'), 'json'))
|
|
if allow_jsonp:
|
|
options.append(('jsonp', _('JSONP URL'), 'jsonp'))
|
|
options.append(('formula', _('Python Expression'), 'python'))
|
|
|
|
self.add(SingleSelectWidget, 'type', options=options, value=value.get('type'),
|
|
attrs={'data-dynamic-display-parent': 'true'})
|
|
|
|
self.parse()
|
|
if not self.value:
|
|
self.value = {}
|
|
|
|
self.add(StringWidget, 'value', value=value.get('value'), size=80,
|
|
attrs={'data-dynamic-display-child-of': 'data_source$type',
|
|
'data-dynamic-display-value-in': 'json|jsonp|python'})
|
|
|
|
self._parsed = False
|
|
|
|
def _parse(self, request):
|
|
values = {}
|
|
for name in ('type', 'value'):
|
|
value = self.get(name)
|
|
if value:
|
|
values[name] = value
|
|
if values.get('type', '') in ('none', ''):
|
|
values = None
|
|
self.value = values or None
|
|
|
|
def render_content(self):
|
|
r = TemplateIO(html=True)
|
|
for widget in self.get_widgets():
|
|
r += widget.render_content()
|
|
return r.getvalue()
|
|
|
|
|
|
def get_items(data_source, include_disabled=False, mode=None):
|
|
structured_items = get_structured_items(data_source, mode=mode)
|
|
tupled_items = []
|
|
for item in structured_items:
|
|
if item.get('disabled') and not include_disabled:
|
|
continue
|
|
tupled_items.append((str(item['id']),
|
|
str(item['text']),
|
|
str(item.get('key', item['id'])),
|
|
item))
|
|
return tupled_items
|
|
|
|
|
|
def request_json_items(url):
|
|
url = sign_url_auto_orig(url)
|
|
try:
|
|
entries = misc.json_loads(misc.urlopen(url).read())
|
|
if not isinstance(entries, dict):
|
|
raise ValueError('not a json dict')
|
|
if entries.get('err') not in (None, 0, "0"):
|
|
raise ValueError('err %s' % entries['err'])
|
|
if not isinstance(entries.get('data'), list):
|
|
raise ValueError('not a json dict with a data list attribute')
|
|
except misc.ConnectionError as e:
|
|
get_logger().warn('Error loading JSON data source (%s)' % str(e))
|
|
return None
|
|
except (ValueError, TypeError) as e:
|
|
get_logger().warn('Error reading JSON data source output (%s)' % str(e))
|
|
return None
|
|
items = []
|
|
for item in entries.get('data'):
|
|
# skip malformed items
|
|
if item.get('id') is None or item.get('id') == '':
|
|
continue
|
|
if 'text' not in item:
|
|
item['text'] = item['id']
|
|
items.append(item)
|
|
return items
|
|
|
|
|
|
def get_structured_items(data_source, mode=None):
|
|
cache_duration = 0
|
|
|
|
if data_source.get('type') and data_source.get('type').startswith('carddef:'):
|
|
# cards
|
|
from wcs.carddef import CardDef
|
|
carddef = CardDef.get_by_urlname(data_source['type'][8:])
|
|
items = [x.get_data_source_structured_item()
|
|
for x in carddef.data_class().select()
|
|
if not x.is_draft()]
|
|
items.sort(key=lambda x: misc.simplify(x['text']))
|
|
return items
|
|
|
|
if data_source.get('type') not in ('json', 'jsonp', 'formula'):
|
|
# named data source
|
|
named_data_source = NamedDataSource.get_by_slug(data_source['type'])
|
|
if named_data_source.cache_duration:
|
|
cache_duration = int(named_data_source.cache_duration)
|
|
data_source = named_data_source.data_source
|
|
|
|
if data_source.get('type') == 'formula':
|
|
# the result of a python expression, it must be a list.
|
|
# - of strings
|
|
# - of dictionaries, in which case it has to have both a "id" and a
|
|
# "text" keys
|
|
# - of lists or tuples, in which case it may have up to three elements:
|
|
# - three elements, (id, text, key)
|
|
# - two elements, (id, text)
|
|
# - a single element, (id,)
|
|
variables = get_publisher().substitutions.get_context_variables(mode=mode)
|
|
global_eval_dict = get_publisher().get_global_eval_dict()
|
|
global_eval_dict.update(data_source_functions)
|
|
try:
|
|
value = eval(data_source.get('value'), global_eval_dict, variables)
|
|
if not isinstance(value, collections.Iterable):
|
|
get_logger().warn('Python data source (%r) gave a non-iterable result' % \
|
|
data_source.get('value'))
|
|
return []
|
|
if len(value) == 0:
|
|
return []
|
|
value = misc.json_encode_helper(value, get_publisher().site_charset)
|
|
if isinstance(value[0], list) or isinstance(value[0], tuple):
|
|
if len(value[0]) >= 3:
|
|
return [{'id': x[0], 'text': x[1], 'key': x[2]} for x in value]
|
|
elif len(value[0]) == 2:
|
|
return [{'id': x[0], 'text': x[1]} for x in value]
|
|
elif len(value[0]) == 1:
|
|
return [{'id': x[0], 'text': x[0]} for x in value]
|
|
return value
|
|
elif isinstance(value[0], six.string_types):
|
|
return [{'id': x, 'text': x} for x in value]
|
|
return value
|
|
except:
|
|
get_logger().warn('Failed to eval() Python data source (%r)' % data_source.get('value'))
|
|
return []
|
|
elif data_source.get('type') == 'json':
|
|
# the content available at a json URL, it must answer with a dict with
|
|
# a 'data' key holding the list of items, each of them being a dict
|
|
# with at least both an "id" and a "text" key.
|
|
url = data_source.get('value')
|
|
if not url:
|
|
get_logger().warn('Empty URL in JSON data source')
|
|
return []
|
|
url = url.strip()
|
|
if Template.is_template_string(url):
|
|
vars = get_publisher().substitutions.get_context_variables(mode=mode)
|
|
url = get_variadic_url(url, vars)
|
|
|
|
request = get_request()
|
|
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
|
|
return request.datasources_cache[url]
|
|
|
|
if cache_duration:
|
|
cache_key = 'data-source-%s' % force_str(hashlib.md5(force_bytes(url)).hexdigest())
|
|
from django.core.cache import cache
|
|
items = cache.get(cache_key)
|
|
if items is not None:
|
|
return items
|
|
|
|
items = request_json_items(url)
|
|
if items is None:
|
|
return []
|
|
if hasattr(request, 'datasources_cache'):
|
|
request.datasources_cache[url] = items
|
|
if cache_duration:
|
|
cache.set(cache_key, items, cache_duration)
|
|
return items
|
|
return []
|
|
|
|
|
|
def get_real(data_source):
|
|
if not data_source:
|
|
return None
|
|
ds_type = data_source.get('type')
|
|
if ds_type in ('json', 'jsonp', 'formula'):
|
|
return data_source
|
|
if ds_type and ds_type.startswith('carddef:'):
|
|
return data_source
|
|
return NamedDataSource.get_by_slug(ds_type).data_source
|
|
|
|
|
|
def get_object(data_source):
|
|
if not data_source:
|
|
return None
|
|
ds_type = data_source.get('type')
|
|
if ds_type in ('json', 'jsonp', 'formula'):
|
|
named_data_source = NamedDataSource()
|
|
named_data_source.data_source = data_source
|
|
return named_data_source
|
|
if ds_type.startswith('carddef:'):
|
|
named_data_source = NamedDataSource()
|
|
named_data_source.data_source = data_source
|
|
return named_data_source
|
|
return NamedDataSource.get_by_slug(ds_type)
|
|
|
|
|
|
class NamedDataSource(XmlStorableObject):
|
|
_names = 'datasources'
|
|
_xml_tagname = 'datasources'
|
|
|
|
name = None
|
|
slug = None
|
|
description = None
|
|
data_source = None
|
|
cache_duration = None
|
|
query_parameter = None
|
|
id_parameter = None
|
|
|
|
# declarations for serialization
|
|
XML_NODES = [('name', 'str'), ('slug', 'str'), ('description', 'str'),
|
|
('cache_duration', 'str'),
|
|
('query_parameter', 'str'),
|
|
('id_parameter', 'str'),
|
|
('data_source', 'data_source'),
|
|
]
|
|
|
|
def __init__(self, name=None):
|
|
StorableObject.__init__(self)
|
|
self.name = name
|
|
|
|
@property
|
|
def type(self):
|
|
return self.data_source.get('type')
|
|
|
|
def can_jsonp(self):
|
|
if self.type == 'jsonp':
|
|
return True
|
|
if self.type == 'json' and self.query_parameter:
|
|
return True
|
|
return False
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if not self.slug:
|
|
# .store() will take care of setting the slug
|
|
changed = True
|
|
if changed:
|
|
self.store()
|
|
|
|
def store(self):
|
|
if self.slug is None:
|
|
# set slug if it's not yet there
|
|
self.slug = self.get_new_slug()
|
|
super(NamedDataSource, self).store()
|
|
|
|
def get_new_slug(self):
|
|
new_slug = misc.simplify(self.name, space='_')
|
|
base_new_slug = new_slug
|
|
suffix_no = 0
|
|
while True:
|
|
try:
|
|
obj = self.get_on_index(new_slug, 'slug', ignore_migration=True)
|
|
except KeyError:
|
|
break
|
|
if obj.id == self.id:
|
|
break
|
|
suffix_no += 1
|
|
new_slug = '%s-%s' % (base_new_slug, suffix_no)
|
|
return new_slug
|
|
|
|
def export_data_source_to_xml(self, element, attribute_name, charset):
|
|
data_source = getattr(self, attribute_name)
|
|
ET.SubElement(element, 'type').text = data_source.get('type')
|
|
ET.SubElement(element, 'value').text = force_text(data_source.get('value') or '', charset)
|
|
|
|
def import_data_source_from_xml(self, element, charset):
|
|
return {
|
|
'type': force_str(element.find('type').text),
|
|
'value': force_str(element.find('value').text or ''),
|
|
}
|
|
|
|
@classmethod
|
|
def get_by_slug(cls, slug):
|
|
objects = [x for x in cls.select() if x.slug == slug]
|
|
if objects:
|
|
return objects[0]
|
|
get_logger().warn("data source '%s' does not exist" % slug)
|
|
return StubNamedDataSource(name=slug)
|
|
|
|
def get_json_query_url(self):
|
|
url = self.data_source.get('value').strip()
|
|
if Template.is_template_string(url):
|
|
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
|
url = get_variadic_url(url, vars)
|
|
if not url:
|
|
return ''
|
|
if not '?' in url:
|
|
url += '?' + self.query_parameter + '='
|
|
else:
|
|
url += '&' + self.query_parameter + '='
|
|
return url
|
|
|
|
def get_jsonp_url(self):
|
|
if self.type == 'jsonp':
|
|
return self.data_source.get('value')
|
|
if self.type == 'json' and self.query_parameter:
|
|
return '/api/autocomplete/%s' % (
|
|
get_session().get_data_source_query_url_token(self.get_json_query_url()))
|
|
return None
|
|
|
|
def get_value_by_id(self, param_name, param_value):
|
|
url = self.data_source.get('value').strip()
|
|
if Template.is_template_string(url):
|
|
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
|
url = get_variadic_url(url, vars)
|
|
|
|
if not '?' in url:
|
|
url += '?'
|
|
else:
|
|
url += '&'
|
|
url += param_name + '=' + urllib.quote(param_value)
|
|
|
|
request = get_request()
|
|
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
|
|
items = request.datasources_cache[url]
|
|
if not items: # cache may contains empty list from get_structured_items
|
|
return None
|
|
return items[0]
|
|
|
|
items = request_json_items(url)
|
|
if not items: # None or empty list are not valid
|
|
return None
|
|
if hasattr(request, 'datasources_cache'):
|
|
request.datasources_cache[url] = items
|
|
return items[0]
|
|
|
|
def get_display_value(self, option_id):
|
|
value = self.get_structured_value(option_id)
|
|
if value:
|
|
return value.get('text')
|
|
return None
|
|
|
|
def get_structured_value(self, option_id):
|
|
value = None
|
|
if self.type == 'json' and self.id_parameter:
|
|
value = self.get_value_by_id(self.id_parameter, option_id)
|
|
else:
|
|
structured_items = get_structured_items(self.data_source, mode='lazy')
|
|
for item in structured_items:
|
|
if str(item['id']) == str(option_id):
|
|
value = item
|
|
break
|
|
else:
|
|
# recheck in case option label was given instead of option id.
|
|
for item in structured_items:
|
|
if str(item['text']) == str(option_id):
|
|
value = item
|
|
break
|
|
if value is None:
|
|
return None
|
|
return value
|
|
|
|
@classmethod
|
|
def get_substitution_variables(cls):
|
|
return {'data_source': DataSourcesSubstitutionProxy()}
|
|
|
|
def type_label(self):
|
|
data_source_labels = {
|
|
'json': _('JSON'),
|
|
'jsonp': _('JSONP'),
|
|
'formula': _('Python Expression'),
|
|
}
|
|
data_source_type = self.data_source.get('type')
|
|
return data_source_labels.get(data_source_type)
|
|
|
|
def humanized_cache_duration(self):
|
|
return seconds2humanduration(int(self.cache_duration))
|
|
|
|
def get_variadic_url(self):
|
|
url = self.data_source.get('value').strip()
|
|
if url and Template.is_template_string(url):
|
|
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
|
url = get_variadic_url(url, vars)
|
|
return url
|
|
|
|
def is_used_in_formdef(self, formdef):
|
|
from .fields import WidgetField
|
|
for field in formdef.fields or []:
|
|
data_source = getattr(field, 'data_source', None)
|
|
if not data_source:
|
|
continue
|
|
if data_source.get('type') == self.slug:
|
|
return True
|
|
return False
|
|
|
|
|
|
class StubNamedDataSource(NamedDataSource):
|
|
type = 'formula'
|
|
data_source = {'type': 'formula', 'value': []}
|
|
cache_duration = None
|
|
|
|
def __init__(self, name=None):
|
|
self.name = name
|
|
|
|
def store(self):
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return '<StubNamedDataSource %r>' % self.name
|
|
|
|
|
|
class DataSourcesSubstitutionProxy(object):
|
|
def __getattr__(self, attr):
|
|
return get_structured_items(NamedDataSource.get_by_slug(attr).data_source)
|
|
|
|
def inspect_keys(self):
|
|
return []
|