general: store structured data associated to list items (#7466)

This commit is contained in:
Frédéric Péters 2015-06-04 22:45:53 +02:00
parent adbb8a9d43
commit be261fe9a5
7 changed files with 185 additions and 14 deletions

View File

@ -52,9 +52,11 @@ def test_item_field_python_datasource():
assert widget.parse() == '1'
def test_python_datasource():
plain_list = repr([('1', 'foo'), ('2', 'bar')])
plain_list = [('1', 'foo'), ('2', 'bar')]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == plain_list
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
def test_json_datasource():
datasource = {'type': 'json', 'value': ''}
@ -93,4 +95,16 @@ def test_json_datasource():
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [('1', 'foo', '1'), ('2', 'bar', '2')]
assert data_sources.get_items(datasource) == [('1', 'foo'), ('2', 'bar')]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
# a json file with additional keys
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [('1', 'foo'), ('2', 'bar')]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]

View File

@ -759,3 +759,76 @@ def test_preview_form(pub):
assert next_page.status_int == 302
assert next_page.location == 'http://example.net/preview/test/'
assert formdef.data_class().count() == 0
def test_form_item_data_source_field_submit(pub):
def submit_item_data_source_field(ds):
formdef = create_formdef()
formdef.fields = [fields.ItemField(id='0', label='string', data_source=ds)]
formdef.store()
resp = get_app(pub).get('/test/')
formdef.data_class().wipe()
resp.forms[0]['f0'] = 'un'
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.body
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.body
assert formdef.data_class().count() == 1
data_id = formdef.data_class().select()[0].id
return formdef.data_class().get(data_id).data
ds = {
'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')]),
}
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
ds['value'] = repr([{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}])
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
ds['value'] = repr([
{'id': '1', 'text': 'un', 'more': 'foo'},
{'id': '2', 'text': 'deux', 'more': 'bar'}])
assert submit_item_data_source_field(ds) == {
'0': '1', '0_display': 'un', '0_structured': {'id': '1', 'text': 'un', 'more': 'foo'}}
def test_form_items_data_source_field_submit(pub):
def submit_items_data_source_field(ds):
formdef = create_formdef()
formdef.fields = [fields.ItemsField(id='0', label='string', data_source=ds)]
formdef.store()
resp = get_app(pub).get('/test/')
formdef.data_class().wipe()
resp.forms[0]['f0$element1'].checked = True
resp.forms[0]['f0$element3'].checked = True
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.body
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.body
assert formdef.data_class().count() == 1
data_id = formdef.data_class().select()[0].id
return formdef.data_class().get(data_id).data
ds = {
'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux'), ('3', 'trois')]),
}
assert submit_items_data_source_field(ds) == {'0': ['1', '3'], '0_display': 'un, trois'}
ds['value'] = repr([{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'},
{'id': '3', 'text': 'trois'}])
assert submit_items_data_source_field(ds) == {'0': ['1', '3'], '0_display': 'un, trois'}
ds['value'] = repr([
{'id': '1', 'text': 'un', 'more': 'foo'},
{'id': '2', 'text': 'deux', 'more': 'bar'},
{'id': '3', 'text': 'trois', 'more': 'baz'}])
assert submit_items_data_source_field(ds) == {
'0': ['1', '3'],
'0_display': 'un, trois',
'0_structured': [
{'id': '1', 'more': 'foo', 'text': 'un'},
{'id': '3', 'more': 'baz', 'text': 'trois'}]}

View File

@ -20,7 +20,7 @@ import urllib2
from quixote.html import TemplateIO
from qommon.form import *
from qommon.misc import get_variadic_url
from qommon.misc import get_variadic_url, site_encode
from qommon import get_logger
@ -75,9 +75,25 @@ class DataSourceSelectionWidget(CompositeWidget):
r += widget.render_content()
return r.getvalue()
def get_items(data_source):
structured_items = get_structured_items(data_source)
tupled_items = [(site_encode(x.get('id')),
site_encode(x.get('text')),
site_encode(x.get('key'))) for x in structured_items]
if tupled_items and tupled_items[0][2] is None: # no key
tupled_items = [x[:2] for x in tupled_items]
return tupled_items
def get_structured_items(data_source):
if data_source.get('type') == 'formula':
# the result of a python expression, it must be a list.
# - of strings
# - of dictionaries, in which case it has to have both a "id" and a
# "text" keys
# - of lists or tuples, in which case it may have up to three elements:
# - three elements, (id, text, key)
# - two elements, (id, text)
# - a single element, (id,)
vars = get_publisher().substitutions.get_context_variables()
try:
value = eval(data_source.get('value'), vars, data_source_functions)
@ -85,11 +101,26 @@ def get_items(data_source):
get_logger().warn('Python data source (%r) gave a non-iterable result' % \
data_source.get('value'))
return []
if len(value) == 0:
return []
if isinstance(value[0], list) or isinstance(value[0], tuple):
if len(value[0]) >= 3:
return [{'id': x[0], 'text': x[1], 'key': x[2]} for x in value]
elif len(value[0]) == 2:
return [{'id': x[0], 'text': x[1]} for x in value]
elif len(value[0]) == 1:
return [{'id': x[0], 'text': x[0]} for x in value]
return value
elif isinstance(value[0], basestring):
return [{'id': x, 'text': x} for x in value]
return value
except:
get_logger().warn('Failed to eval() Python data source (%r)' % data_source.get('value'))
return []
elif data_source.get('type') == 'json':
# the content available at a json URL, it must answer with a dict with
# a 'data' key holding the list of items, each of them being a dict
# with at least both an "id" and a "text" key.
url = data_source.get('value')
if not url:
get_logger().warn('Empty URL in JSON data source')
@ -105,15 +136,7 @@ def get_items(data_source):
raise ValueError('not a json dict')
if type(entries.get('data')) is not list:
raise ValueError('not a json dict with a data list attribute')
for entry in entries.get('data'):
id = entry.get('id')
text = entry.get('text')
if type(id) is unicode:
id = id.encode(charset)
if type(text) is unicode:
text = text.encode(charset)
results.append((id, text, id))
return results
return entries.get('data')
except urllib2.HTTPError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except urllib2.URLError as e:

View File

@ -123,6 +123,7 @@ class Field(object):
in_listing = False
prefill = None
store_display_value = None
store_structured_value = None
anonymise = True
stats = None
@ -941,6 +942,20 @@ class ItemField(WidgetField):
return option_value
return str(value)
def store_structured_value(self, data, field_id):
value = data.get(field_id)
if not self.data_source:
return
structured_options = data_sources.get_structured_items(self.data_source)
if not structured_options:
return
if not set(structured_options[0].keys()) != set(['id', 'text']):
return
for structured_option in structured_options:
if str(structured_option.get('id')) == str(value):
return structured_option
return None
def fill_admin_form(self, form):
WidgetField.fill_admin_form(self, form)
form.add(CheckboxWidget, 'show_as_radio', title = _('Show as radio buttons'),
@ -1106,6 +1121,23 @@ class ItemsField(WidgetField):
choices.append(option_value)
break
return ', '.join(choices)
def store_structured_value(self, data, field_id):
value = data.get(field_id)
if not self.data_source:
return
structured_options = data_sources.get_structured_items(self.data_source)
if not structured_options:
return
if not set(structured_options[0].keys()) != set(['id', 'text']):
return
structured_value = []
for structured_option in structured_options:
for choice in data.get(field_id) or []:
if str(structured_option.get('id')) == str(choice):
structured_value.append(structured_option)
return structured_value
register_field_class(ItemsField)

View File

@ -424,6 +424,12 @@ class FormDef(StorableObject):
d['%s_display' % field.id] = display_value
elif d.has_key('%s_display' % field.id):
del d['%s_display' % field.id]
if d.get(field.id) and field.store_structured_value:
structured_value = field.store_structured_value(d, field.id)
if structured_value:
d['%s_structured' % field.id] = structured_value
elif '%s_structured' % field.id in d:
del d['%s_structured' % field.id]
if widget and widget.cleanup:
widget.cleanup()

View File

@ -179,7 +179,14 @@ def get_as_datetime(s):
pass
raise ValueError()
def site_encode(s):
if s is None:
return None
if isinstance(s, str):
return s
if not isinstance(s, unicode):
s = unicode(s, get_publisher().site_charset, 'replace')
return s.encode(get_publisher().site_charset)
def ellipsize(s, length = 30):
if type(s) is not unicode:

View File

@ -347,6 +347,11 @@ def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild
if 'f%s_display' % field.id not in existing_fields:
cur.execute('''ALTER TABLE %s ADD COLUMN %s varchar''' % (
table_name, 'f%s_display' % field.id))
if field.store_structured_value:
needed_fields.add('f%s_structured' % field.id)
if 'f%s_structured' % field.id not in existing_fields:
cur.execute('''ALTER TABLE %s ADD COLUMN %s bytea''' % (
table_name, 'f%s_structured' % field.id))
# delete obsolete fields
for field in (existing_fields - needed_fields):
@ -795,6 +800,9 @@ class SqlMixin(object):
sql_dict['f%s' % field.id] = value
if field.store_display_value:
sql_dict['f%s_display' % field.id] = data.get('%s_display' % field.id)
if field.store_structured_value:
sql_dict['f%s_structured' % field.id] = bytearray(
cPickle.dumps(data.get('%s_structured' % field.id)))
return sql_dict
def _row2obdata(cls, row, formdef):
@ -826,6 +834,12 @@ class SqlMixin(object):
value = row[i]
obdata['%s_display' % field.id] = value
i += 1
if field.store_structured_value:
value = row[i]
obdata['%s_structured' % field.id] = cPickle.loads(str(value))
if obdata['%s_structured' % field.id] is None:
del obdata['%s_structured' % field.id]
i += 1
return obdata
_row2obdata = classmethod(_row2obdata)
@ -1101,6 +1115,8 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
data_fields.append('f%s' % field.id)
if field.store_display_value:
data_fields.append('f%s_display' % field.id)
if field.store_structured_value:
data_fields.append('f%s_structured' % field.id)
return data_fields
get_data_fields = classmethod(get_data_fields)