datasource: duplicate and configure agenda datasource (#63173)
gitea-wip/wcs/pipeline/head Build started... Details

This commit is contained in:
Lauréline Guérin 2022-04-08 16:04:25 +02:00
parent 40d08443dd
commit acbd328678
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
7 changed files with 463 additions and 79 deletions

View File

@ -301,6 +301,37 @@ def test_data_sources_type_options_jsonp(pub):
assert 'jsonp' in [x[0] for x in resp.form['data_source$type'].options]
def test_data_sources_agenda_manual_qs_data_type_options(pub):
create_superuser(pub)
data_source = NamedDataSource(name='foobar')
data_source.external = 'agenda_manual'
data_source.store()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'disable-python-expressions', 'false')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
assert resp.form['qs_data$element0value$type'].options == [
('text', False, 'Text'),
('template', False, 'Template'),
]
pub.site_options.set('options', 'disable-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
assert resp.form['qs_data$element0value$type'].options == [
('text', False, 'Text'),
('template', False, 'Template'),
]
def test_data_sources_category(pub):
create_superuser(pub)
@ -387,6 +418,10 @@ def test_data_sources_view(pub):
data_source.data_source = {'type': 'formula', 'value': '[]'}
data_source.store()
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'This data source is readonly.' not in resp
assert 'href="edit"' in resp
assert 'href="delete"' in resp
assert 'href="duplicate"' in resp
assert 'Type of source: Python Expression' in resp.text
assert 'Python Expression' in resp.text
assert 'Preview' not in resp.text
@ -571,6 +606,25 @@ def test_data_sources_view_with_exception_in_preview(pub):
assert 'Unexpected fatal error getting items for preview' in resp.text
def test_data_sources_duplicate(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': '{{data_source.foobar}}'}
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
resp = resp.click(href='duplicate')
resp = resp.forms[0].submit()
assert NamedDataSource.count() == 2
new_data_source = NamedDataSource.select(order_by='id')[1]
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
assert new_data_source.data_source == {'type': 'json', 'value': '{{data_source.foobar}}'}
assert new_data_source.external is None
def test_data_sources_agenda_view(pub):
create_superuser(pub)
NamedDataSource.wipe()
@ -583,8 +637,131 @@ def test_data_sources_agenda_view(pub):
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'This data source is readonly.' in resp
assert '/backoffice/settings/data-sources/%s/edit' % data_source.id not in resp
assert '/backoffice/settings/data-sources/%s/delete' % data_source.id not in resp
assert 'href="edit"' not in resp
assert 'href="delete"' not in resp
assert 'href="duplicate"' in resp
def test_data_sources_agenda_duplicate(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
data_source.external = 'agenda'
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
resp = resp.forms[0].submit('cancel')
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s/' % data_source.id
assert NamedDataSource.count() == 1
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
resp = resp.click(href='duplicate')
resp = resp.forms[0].submit()
assert NamedDataSource.count() == 2
new_data_source = NamedDataSource.select(order_by='id')[1]
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'}
assert new_data_source.external == 'agenda_manual'
assert new_data_source.qs_data is None
def test_data_sources_agenda_manual_view(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
data_source.external = 'agenda_manual'
data_source.qs_data = {'var1': 'value1', 'var2': 'value2'}
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'This data source is readonly.' not in resp
assert 'href="edit"' in resp
assert 'href="delete"' in resp
assert 'href="duplicate"' in resp
assert 'Type of source: Agenda data' in resp
assert 'Copy of' not in resp
assert 'Extra query string data' in resp
assert '<li>var1: value1</li>' in resp
assert '<li>var2: value2</li>' in resp
data_source.qs_data = None
data_source.store()
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'Extra Query string data' not in resp
data_source2 = NamedDataSource(name='foobar')
data_source2.data_source = {'type': 'json', 'value': 'http://some.url'}
data_source2.external = 'agenda'
data_source2.store()
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert (
'Copy of: <a href="http://example.net/backoffice/settings/data-sources/%s/">foobar</a>'
% data_source2.id
in resp
)
def test_data_sources_agenda_manual_duplicate(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
data_source.external = 'agenda_manual'
data_source.qs_data = {'var1': 'value1', 'var2': 'value2'}
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
resp = resp.click(href='duplicate')
resp = resp.forms[0].submit()
assert NamedDataSource.count() == 2
new_data_source = NamedDataSource.select(order_by='id')[1]
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'}
assert new_data_source.external == 'agenda_manual'
assert new_data_source.qs_data == {'var1': 'value1', 'var2': 'value2'}
def test_data_sources_user_view(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'wcs:users'}
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'This data source is readonly.' not in resp
assert 'href="edit"' in resp
assert 'href="delete"' in resp
assert 'href="duplicate"' in resp
def test_data_sources_user_duplicate(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'wcs:users'}
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
resp = resp.click(href='duplicate')
resp = resp.forms[0].submit()
assert NamedDataSource.count() == 2
new_data_source = NamedDataSource.select(order_by='id')[1]
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
assert new_data_source.data_source == {'type': 'wcs:users', 'value': ''}
assert new_data_source.external is None
def test_data_sources_edit(pub):
@ -635,6 +812,25 @@ def test_data_sources_edit_duplicate_name(pub):
assert resp.location == 'http://example.net/backoffice/settings/data-sources/1/'
def test_data_sources_agenda_manual_edit(pub):
create_superuser(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.external = 'agenda_manual'
data_source.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
resp.forms[0]['qs_data$element0key'] = 'arg1'
resp.forms[0]['qs_data$element0value$value_template'] = '{{ foobar }}'
resp.forms[0]['qs_data$element0value$type'] = 'template'
resp = resp.forms[0].submit('submit')
data_source = NamedDataSource.get(data_source.id)
assert data_source.qs_data == {'arg1': '{{ foobar }}'}
def test_data_sources_delete(pub):
create_superuser(pub)
NamedDataSource.wipe()

View File

@ -439,7 +439,7 @@ def test_json_datasource(pub, requests_pub, http_requests):
]
def test_json_datasource_bad_url(pub, error_email, http_requests, emails, caplog):
def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
datasource = {'type': 'json', 'value': 'http://remote.example.net/404'}
assert data_sources.get_items(datasource) == []
assert emails.count() == 0
@ -545,6 +545,36 @@ def test_json_datasource_bad_url_scheme(pub, error_email, emails):
)
@pytest.mark.parametrize('notify', [True, False])
@pytest.mark.parametrize('record', [True, False])
def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record):
datasource = {
'type': 'json',
'value': "https://whatever.com/json",
'qs_data': {'foo': '{% for invalid %}', 'bar': '{{ valid }}'},
'notify_on_errors': notify,
'record_on_errors': record,
}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': [{'id': '1', 'text': 'foo'}]}))
assert data_sources.get_items(datasource) == [('1', 'foo', '1', {'id': '1', 'text': 'foo'})]
url = urlopen.call_args[0][0]
assert url == 'https://whatever.com/json?bar='
message = '[DATASOURCE] Failed to compute value "{% for invalid %}" for "foo" query parameter'
if notify:
assert emails.count() == 1
assert message in emails.get_latest('subject')
else:
assert emails.count() == 0
if pub.is_using_postgresql():
if record:
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == message
else:
assert pub.loggederror_class.count() == 0
def test_geojson_datasource(pub, requests_pub, http_requests):
get_request()
get_request().datasources_cache = {}
@ -1080,10 +1110,12 @@ def test_data_source_unicode(pub):
]
def test_data_source_signed(no_request_pub):
@pytest.mark.parametrize('qs_data', [{}, {'arg1': 'val1', 'arg2': 'val2'}])
def test_data_source_signed(no_request_pub, qs_data):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
data_source.qs_data = qs_data
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
@ -1099,6 +1131,9 @@ def test_data_source_signed(no_request_pub):
assert querystring['nonce'][0]
assert querystring['timestamp'][0]
assert querystring['signature'][0]
if qs_data:
assert querystring['arg1'][0] == 'val1'
assert querystring['arg2'][0] == 'val2'
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
@ -1115,6 +1150,9 @@ def test_data_source_signed(no_request_pub):
assert querystring['timestamp'][0]
assert querystring['signature'][0]
assert querystring['foo'][0] == 'bar'
if qs_data:
assert querystring['arg1'][0] == 'val1'
assert querystring['arg2'][0] == 'val2'
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
@ -1122,7 +1160,21 @@ def test_data_source_signed(no_request_pub):
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'
if qs_data:
assert unsigned_url == 'https://no-secret.example.com/json?arg1=val1&arg2=val2'
else:
assert unsigned_url == 'https://no-secret.example.com/json'
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
if qs_data:
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar&arg1=val1&arg2=val2'
else:
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar'
def test_named_datasource_json_cache(requests_pub):
@ -1341,3 +1393,11 @@ def test_data_source_with_category(pub):
export = ET.tostring(data_source.export_to_xml(include_id=True))
data_source3 = NamedDataSource.import_from_xml_tree(ET.fromstring(export), include_id=True)
assert data_source3.category_id is None
def test_data_source_with_qs_data(pub):
data_source = NamedDataSource(name='test')
data_source.qs_data = {'arg1': 'val1', 'arg2': 'val2'}
data_source.store()
data_source2 = assert_import_export_works(data_source, include_id=True)
assert data_source2.qs_data == {'arg1': 'val1', 'arg2': 'val2'}

View File

@ -208,23 +208,31 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
pub.load_site_options()
NamedDataSource.wipe()
# create some datasource, with same urls, but not external
# create some datasource, with same urls, but external != 'agenda'
ds = NamedDataSource(name='Foo A')
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'}
ds.store()
ds = NamedDataSource(name='Foo B')
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'}
ds.store()
ds = NamedDataSource(name='Foo A')
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'}
ds.external = 'agenda_manual'
ds.store()
ds = NamedDataSource(name='Foo B')
ds.external = 'agenda_manual'
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'}
ds.store()
# error during collect
mock_collect.return_value = None
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 # no changes
assert NamedDataSource.count() == 4 # no changes
# no agenda datasource found in chrono
mock_collect.return_value = []
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 # no changes
assert NamedDataSource.count() == 4 # no changes
# 2 agenda datasources found
mock_collect.return_value = [
@ -242,9 +250,9 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
# agenda datasources does not exist, create them
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 + 2
datasource1 = NamedDataSource.get(2 + 1)
datasource2 = NamedDataSource.get(2 + 2)
assert NamedDataSource.count() == 4 + 2
datasource1 = NamedDataSource.get(4 + 1)
datasource2 = NamedDataSource.get(4 + 2)
assert datasource1.name == 'Events A'
assert datasource1.slug == 'chrono_ds_sluga'
assert datasource1.external == 'agenda'
@ -270,9 +278,9 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
datasource2.slug = 'wrong_again'
datasource2.store()
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 + 2
datasource1 = NamedDataSource.get(2 + 1)
datasource2 = NamedDataSource.get(2 + 2)
assert NamedDataSource.count() == 4 + 2
datasource1 = NamedDataSource.get(4 + 1)
datasource2 = NamedDataSource.get(4 + 2)
assert datasource1.name == 'Events A'
assert datasource1.slug == 'wrong'
assert datasource2.name == 'Events B'
@ -283,10 +291,10 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
datasource1.store()
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 + 2
assert NamedDataSource.count() == 4 + 2
# first datasource was deleted, because not found and not used
datasource2 = NamedDataSource.get(2 + 2)
datasource3 = NamedDataSource.get(2 + 3)
datasource2 = NamedDataSource.get(4 + 2)
datasource3 = NamedDataSource.get(4 + 3)
assert datasource2.name == 'Events B'
assert datasource2.external == 'agenda'
assert datasource2.external_status is None
@ -314,10 +322,10 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
datasource3.data_source['value'] = 'http://chrono.example.net/api/agenda/events-FOOBAR/datetimes/'
datasource3.store()
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 + 3
datasource2 = NamedDataSource.get(2 + 2)
datasource3 = NamedDataSource.get(2 + 3)
datasource4 = NamedDataSource.get(2 + 4)
assert NamedDataSource.count() == 4 + 3
datasource2 = NamedDataSource.get(4 + 2)
datasource3 = NamedDataSource.get(4 + 3)
datasource4 = NamedDataSource.get(4 + 4)
assert datasource2.name == 'Events B'
assert datasource2.slug == 'wrong_again'
assert datasource2.external == 'agenda'
@ -347,6 +355,6 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
datasource4.external_status = 'not-found'
datasource4.store()
build_agenda_datasources(pub)
assert NamedDataSource.count() == 2 + 3
datasource4 = NamedDataSource.get(2 + 4)
assert NamedDataSource.count() == 4 + 3
datasource4 = NamedDataSource.get(4 + 4)
assert datasource4.external_status is None

View File

@ -496,6 +496,8 @@ def test_datasource_snapshot_browse(pub):
assert '<p>%s</p>' % localstrftime(snapshot.timestamp) in resp.text
with pytest.raises(IndexError):
resp = resp.click('Edit')
with pytest.raises(IndexError):
resp = resp.click('Duplicate')
def test_form_snapshot_browse(pub, formdef_with_history):

View File

@ -36,6 +36,7 @@ from wcs.qommon.backoffice.menu import html_top
from wcs.qommon.errors import AccessForbiddenError
from wcs.qommon.form import (
CheckboxWidget,
ComputedExpressionWidget,
DurationWidget,
FileWidget,
Form,
@ -44,6 +45,7 @@ from wcs.qommon.form import (
SlugWidget,
StringWidget,
TextWidget,
WidgetDict,
WidgetList,
get_response,
get_session,
@ -78,7 +80,9 @@ class NamedDataSourceUI:
rows=5,
value=self.datasource.description,
)
if not self.datasource or self.datasource.type != 'wcs:users':
if not self.datasource or (
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual'
):
form.add(
DataSourceSelectionWidget,
'data_source',
@ -105,7 +109,9 @@ class NamedDataSourceUI:
'data-dynamic-display-value-in': 'json|geojson',
},
)
if not self.datasource or self.datasource.type != 'wcs:users':
if not self.datasource or (
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual'
):
form.add(
StringWidget,
'query_parameter',
@ -249,6 +255,17 @@ class NamedDataSourceUI:
title=_('Record on errors'),
value=self.datasource.record_on_errors,
)
if self.datasource.external == 'agenda_manual':
form.add(
WidgetDict,
'qs_data',
title=_('Query string data'),
value=self.datasource.qs_data or {},
element_value_type=ComputedExpressionWidget,
element_value_kwargs={'allow_python': False},
allow_empty_values=True,
value_for_empty_value='',
)
if not self.datasource.is_readonly():
form.add_submit('submit', _('Submit'))
@ -289,6 +306,7 @@ class NamedDataSourcePage(Directory):
'edit',
'delete',
'export',
'duplicate',
('history', 'snapshots_dir'),
]
do_not_call_in_templates = True
@ -311,6 +329,8 @@ class NamedDataSourcePage(Directory):
if hasattr(self.datasource, 'snapshot_object'):
r += utils.snapshot_info_block(snapshot=self.datasource.snapshot_object)
r += htmltext('<ul id="sidebar-actions">')
if self.datasource.external == 'agenda' or not self.datasource.is_readonly():
r += htmltext('<li><a href="duplicate" rel="popup">%s</a></li>') % _('Duplicate')
if not self.datasource.is_readonly():
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('<li><a href="export">%s</a></li>') % _('Export')
@ -444,6 +464,32 @@ class NamedDataSourcePage(Directory):
content_type='application/x-wcs-datasource',
)
def duplicate(self):
if hasattr(self.datasource, 'snapshot_object'):
return redirect('.')
form = Form(enctype='multipart/form-data')
form.add_submit('duplicate', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('duplicate', _('Duplicate')))
html_top('datasources', title=_('Duplicate Data Source'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Duplicating Data Source:'), self.datasource.name)
r += form.render()
return r.getvalue()
tree = self.datasource.export_to_xml(include_id=True)
new_datasource = NamedDataSource.import_from_xml_tree(tree)
new_datasource.name = _('Copy of %s' % new_datasource.name)
new_datasource.slug = new_datasource.get_new_slug(new_datasource.slug)
if self.datasource.agenda_ds:
new_datasource.external = 'agenda_manual'
new_datasource.store()
return redirect('../%s' % new_datasource.id)
class NamedDataSourcesDirectory(Directory):
_q_exports = [

View File

@ -417,6 +417,32 @@ def _get_structured_items(data_source, mode=None, raise_on_error=False):
if Template.is_template_string(url):
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, vars)
if data_source.get('qs_data'): # merge qs_data into url
from wcs.workflows import WorkflowStatusItem
parsed = urllib.parse.urlparse(url)
qs = list(urllib.parse.parse_qsl(parsed.query))
for key, value in data_source['qs_data'].items():
try:
value = WorkflowStatusItem.compute(value, raises=True, record_errors=False)
value = str(value) if value is not None else ''
except Exception as e:
get_publisher().record_error(
_(
'Failed to compute value "%(value)s" for "%(query)s" query parameter'
% {'value': value, 'query': key}
),
context='[DATASOURCE]',
exception=e,
notify=data_source.get('notify_on_errors'),
record=data_source.get('record_on_errors'),
)
else:
key = force_str(key)
value = force_str(value)
qs.append((key, value))
qs = urllib.parse.urlencode(qs)
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
request = get_request()
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
@ -493,6 +519,7 @@ class NamedDataSource(XmlStorableObject):
id_attribute = None
text_attribute = None
id_property = None
qs_data = None
label_template_property = None
external = None
external_status = None
@ -517,6 +544,7 @@ class NamedDataSource(XmlStorableObject):
('id_attribute', 'str'),
('text_attribute', 'str'),
('id_property', 'str'),
('qs_data', 'qs_data'),
('label_template_property', 'str'),
('external', 'str'),
('external_status', 'str'),
@ -574,6 +602,7 @@ class NamedDataSource(XmlStorableObject):
'data_attribute': self.data_attribute,
'id_attribute': self.id_attribute,
'text_attribute': self.text_attribute,
'qs_data': self.qs_data,
'notify_on_errors': notify_on_errors,
'record_on_errors': record_on_errors,
}
@ -603,6 +632,20 @@ class NamedDataSource(XmlStorableObject):
def maybe_datetimes(self):
return self.type == 'json' and 'datetimes' in (self.data_source.get('value') or '')
@property
def agenda_ds(self):
return self.external in ['agenda', 'agenda_manual']
@property
def agenda_ds_origin(self):
if self.external != 'agenda_manual':
return
for datasource in NamedDataSource.select():
if datasource.external != 'agenda':
continue
if datasource.data_source.get('value') == self.data_source.get('value'):
return datasource
def migrate(self):
changed = False
@ -651,6 +694,30 @@ class NamedDataSource(XmlStorableObject):
'value': force_str(element.find('value').text or ''),
}
def export_qs_data_to_xml(self, element, attribute_name, *args, **kwargs):
if not self.qs_data:
return
for (key, value) in self.qs_data.items():
item = ET.SubElement(element, 'item')
if isinstance(key, str):
ET.SubElement(item, 'name').text = force_text(key)
else:
raise AssertionError('unknown type for key (%r)' % key)
if isinstance(value, str):
ET.SubElement(item, 'value').text = force_text(value)
else:
raise AssertionError('unknown type for value (%r)' % key)
def import_qs_data_from_xml(self, element, **kwargs):
if element is None:
return
qs_data = {}
for item in element.findall('item'):
key = force_str(item.find('name').text)
value = force_str(item.find('value').text or '')
qs_data[key] = value
return qs_data
def get_dependencies(self):
yield self.category
@ -674,10 +741,7 @@ class NamedDataSource(XmlStorableObject):
return data_source
def get_json_query_url(self):
url = self.data_source.get('value').strip()
if Template.is_template_string(url):
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, vars)
url = self.get_variadic_url()
if not url:
return ''
if '?' not in url:
@ -741,27 +805,22 @@ class NamedDataSource(XmlStorableObject):
def get_geojson_url(self):
assert self.type == 'geojson'
url = self.data_source.get('value').strip()
if Template.is_template_string(url):
context = get_publisher().substitutions.get_context_variables(mode='lazy')
new_url = get_variadic_url(url, context)
if new_url != url:
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
token, created = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
if created:
token.store()
return '/api/geojson/%s' % token.id
new_url = self.get_variadic_url()
if new_url != url:
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
token, created = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
if created:
token.store()
return '/api/geojson/%s' % token.id
return '/api/geojson/%s' % self.slug
def get_geojson_data(self, force_url=None):
if force_url:
url = force_url
else:
url = self.data_source.get('value').strip()
if Template.is_template_string(url):
context = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, context)
url = self.get_variadic_url()
request = get_request()
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
@ -802,10 +861,7 @@ class NamedDataSource(XmlStorableObject):
return data
def get_value_by_id(self, param_name, param_value):
url = self.data_source.get('value').strip()
if Template.is_template_string(url):
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, vars)
url = self.get_variadic_url()
if '?' not in url:
url += '?'

View File

@ -18,39 +18,55 @@
<div class="section">
<h3>{% trans "Configuration" %}</h3>
<ul>
<li>{% trans "Type of source:" %} {{ datasource.type_label }}</li>
{% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %}
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
{% elif datasource.data_source.type == 'formula' %}
<li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li>
{% elif datasource.data_source.type == 'wcs:users' %}
{% spaceless %}
<li>{% trans "Users with roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_included_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% if datasource.agenda_ds %}
{% with datasource.agenda_ds_origin as origin %}
<li>{% trans "Type of source:" %} {% trans "Agenda data" %}{% if origin %} ({% trans "Copy of:" %} <a href="{{ origin.get_admin_url }}">{{ origin.name }}</a>){% endif %}</li>
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
{% if datasource.qs_data %}
<li>{% trans "Extra query string data:" %}
<ul>
{% for k, v in datasource.qs_data.items %}
<li>{% blocktrans %}{{ k }}:{% endblocktrans %} {{ v }}</li>
{% endfor %}
</ul>
</li>
<li>{% trans "Users without roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_excluded_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% endfor %}
</ul>
</li>
<li>{% trans "Include disabled users:" %}
{% if datasource.include_disabled_users %}
{% trans "Yes" %}
{% else %}
{% trans "No" %}
</ul>
</li>
{% endif %}
{% endwith %}
{% else %}
<li>{% trans "Type of source:" %} {{ datasource.type_label }}</li>
{% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %}
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
{% elif datasource.data_source.type == 'formula' %}
<li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li>
{% elif datasource.data_source.type == 'wcs:users' %}
{% spaceless %}
<li>{% trans "Users with roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_included_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% endfor %}
</ul>
</li>
<li>{% trans "Users without roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_excluded_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% endfor %}
</ul>
</li>
<li>{% trans "Include disabled users:" %}
{% if datasource.include_disabled_users %}
{% trans "Yes" %}
{% else %}
{% trans "No" %}
{% endif %}
</li>
{% endspaceless %}
{% endif %}
</li>
{% endspaceless %}
{% endif %}
{% if datasource.cache_duration %}
<li>{% trans "Cache Duration:" %} {{ datasource.humanized_cache_duration }}