wcs/wcs/data_sources.py

241 lines
8.6 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2012 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import json
import urllib2
import xml.etree.ElementTree as ET
from quixote import get_publisher
from quixote.html import TemplateIO
from qommon.form import *
from qommon.misc import get_variadic_url, site_encode
import qommon.misc
from qommon import get_logger
from qommon.storage import StorableObject
from qommon.xml_storage import XmlStorableObject
data_source_functions = {}
def register_data_source_function(function, function_name=None):
if not function_name:
function_name = function.__name__
if not function_name in data_source_functions:
data_source_functions[function_name] = function
class DataSourceSelectionWidget(CompositeWidget):
def __init__(self, name, value=None, allow_jsonp=True,
allow_named_sources=True, **kwargs):
CompositeWidget.__init__(self, name, value, **kwargs)
if not value:
value = {}
options = [('none', _('None')),
('formula', _('Formula (Python)')),
('json', _('JSON URL'))]
if allow_jsonp:
options.append(('jsonp', _('JSONP URL')))
if allow_named_sources:
options.extend([(x.slug, x.name) for x in
NamedDataSource.select(order_by='name')])
self.add(SingleSelectWidget, 'type', options = options, value = value.get('type'))
self.parse()
if not self.value:
self.value = {}
if self.value.get('type') in ('formula', 'json', 'jsonp'):
self.add(StringWidget, 'value', value = value.get('value'), size=80)
self.add(SubmitWidget, 'apply', value = _('Apply'))
self._parsed = False
def _parse(self, request):
values = {}
for name in ('type', 'value'):
value = self.get(name)
if value:
values[name] = value
if values.get('type', '') == 'none':
values = None
self.value = values or None
def render_content(self):
r = TemplateIO(html=True)
for widget in self.get_widgets():
r += widget.render_content()
return r.getvalue()
def get_items(data_source):
structured_items = get_structured_items(data_source)
tupled_items = [(site_encode(x.get('id')),
site_encode(x.get('text')),
site_encode(x.get('key'))) for x in structured_items]
if tupled_items and tupled_items[0][2] is None: # no key
tupled_items = [tuple(x[:2]) + (x[0],) for x in tupled_items]
return tupled_items
def get_structured_items(data_source):
data_source = get_real(data_source)
if data_source.get('type') == 'formula':
# the result of a python expression, it must be a list.
# - of strings
# - of dictionaries, in which case it has to have both a "id" and a
# "text" keys
# - of lists or tuples, in which case it may have up to three elements:
# - three elements, (id, text, key)
# - two elements, (id, text)
# - a single element, (id,)
vars = get_publisher().substitutions.get_context_variables()
try:
value = eval(data_source.get('value'), vars, data_source_functions)
if not isinstance(value, collections.Iterable):
get_logger().warn('Python data source (%r) gave a non-iterable result' % \
data_source.get('value'))
return []
if len(value) == 0:
return []
if isinstance(value[0], list) or isinstance(value[0], tuple):
if len(value[0]) >= 3:
return [{'id': x[0], 'text': x[1], 'key': x[2]} for x in value]
elif len(value[0]) == 2:
return [{'id': x[0], 'text': x[1]} for x in value]
elif len(value[0]) == 1:
return [{'id': x[0], 'text': x[0]} for x in value]
return value
elif isinstance(value[0], basestring):
return [{'id': x, 'text': x} for x in value]
return value
except:
get_logger().warn('Failed to eval() Python data source (%r)' % data_source.get('value'))
return []
elif data_source.get('type') == 'json':
# the content available at a json URL, it must answer with a dict with
# a 'data' key holding the list of items, each of them being a dict
# with at least both an "id" and a "text" key.
url = data_source.get('value')
if not url:
get_logger().warn('Empty URL in JSON data source')
return []
if '[' in url:
vars = get_publisher().substitutions.get_context_variables()
url = get_variadic_url(url, vars)
charset = get_publisher().site_charset
try:
results = []
entries = json.load(urllib2.urlopen(url))
if type(entries) is not dict:
raise ValueError('not a json dict')
if type(entries.get('data')) is not list:
raise ValueError('not a json dict with a data list attribute')
return entries.get('data')
except urllib2.HTTPError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except urllib2.URLError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except ValueError as e:
get_logger().warn('Error reading JSON data source output (%s)' % str(e))
return []
def get_real(data_source):
if not data_source:
return None
ds_type = data_source.get('type')
if ds_type in ('json', 'jsonp', 'formula'):
return data_source
return NamedDataSource.get_by_slug(ds_type).data_source
class NamedDataSource(XmlStorableObject):
_names = 'datasources'
_xml_tagname = 'datasources'
name = None
slug = None
description = None
data_source = None
# declarations for serialization
XML_NODES = [('name', 'str'), ('slug', 'str'), ('description', 'str'),
('data_source', 'data_source')]
def __init__(self, name=None):
StorableObject.__init__(self)
self.name = name
def migrate(self):
changed = False
if not self.slug:
# .store() will take care of setting the slug
changed = True
if changed:
self.store()
def store(self):
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
super(NamedDataSource, self).store()
def get_new_slug(self):
new_slug = qommon.misc.simplify(self.name, space='_')
base_new_slug = new_slug
suffix_no = 0
while True:
try:
obj = self.get_on_index(new_slug, 'slug', ignore_migration=True)
except KeyError:
break
if obj.id == self.id:
break
suffix_no += 1
new_slug = '%s-%s' % (base_new_slug, suffix_no)
return new_slug
def export_data_source_to_xml(self, element, attribute_name, charset):
data_source = getattr(self, attribute_name)
ET.SubElement(element, 'type').text = data_source.get('type')
ET.SubElement(element, 'value').text = data_source.get('value') or ''
def import_data_source_from_xml(self, element, charset):
return {
'type': str(element.find('type').text),
'value': str(element.find('value').text),
}
def get_by_slug(cls, slug):
objects = [x for x in cls.select() if x.slug == slug]
if objects:
return objects[0]
raise KeyError()
get_by_slug = classmethod(get_by_slug)
@classmethod
def get_substitution_variables(cls):
return {'data_source': DataSourcesSubstitutionProxy()}
class DataSourcesSubstitutionProxy(object):
def __getattr__(self, attr):
return get_structured_items(NamedDataSource.get_by_slug(attr).data_source)