datasources: users datasource (#52941)

This commit is contained in:
Lauréline Guérin 2021-04-13 14:40:14 +02:00
parent 85834b9244
commit c79e88fd02
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
8 changed files with 396 additions and 110 deletions

View File

@ -143,6 +143,30 @@ def test_data_sources_agenda(pub, chrono_url):
)
def test_data_sources_users(pub):
create_superuser(pub)
NamedDataSource.wipe()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/data-sources/')
if not pub.is_using_postgresql():
assert 'Users Data Sources' not in resp
assert 'new-users' not in resp
return
assert 'Users Data Sources' in resp
assert 'There are no users data sources defined.' in resp
assert 'new-users' in resp
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'wcs:users'}
data_source.store()
resp = app.get('/backoffice/settings/data-sources/')
assert 'There are no users data sources defined.' not in resp
assert '<li><a href="%s/">foobar (foobar)</a></li>' % data_source.id in resp
def test_data_sources_new(pub):
create_superuser(pub)
NamedDataSource.wipe()

View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import pytest
from wcs import data_sources
from wcs.data_sources import NamedDataSource
from wcs.qommon.http_request import HTTPRequest
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub():
pub = create_temporary_pub(sql_mode=True)
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
return pub
def teardown_module(module):
clean_temporary_pub()
def test_datasource_users(pub):
pub.role_class.wipe()
role1 = pub.role_class(name='role')
role1.store()
role2 = pub.role_class(name='role2')
role2.store()
pub.user_class.wipe()
NamedDataSource.wipe()
datasource = NamedDataSource(name='foo')
datasource.data_source = {'type': 'wcs:users'}
datasource.store()
assert data_sources.get_items({'type': datasource.slug}) == []
assert data_sources.get_items(datasource.extended_data_source) == []
assert data_sources.get_structured_items({'type': datasource.slug}) == []
assert data_sources.get_structured_items(datasource.extended_data_source) == []
users = []
for i in range(2):
user = pub.user_class(name='John Doe %s' % i)
user.roles = [role1.id]
user.store()
users.append(user)
assert data_sources.get_items({'type': datasource.slug}) == [
('1', 'John Doe 0', '1', {'id': 1, 'text': 'John Doe 0'}),
('2', 'John Doe 1', '2', {'id': 2, 'text': 'John Doe 1'}),
]
assert data_sources.get_items(datasource.extended_data_source) == [
('1', 'John Doe 0', '1', {'id': 1, 'text': 'John Doe 0'}),
('2', 'John Doe 1', '2', {'id': 2, 'text': 'John Doe 1'}),
]
assert data_sources.get_structured_items({'type': datasource.slug}) == [
{'id': 1, 'text': 'John Doe 0'},
{'id': 2, 'text': 'John Doe 1'},
]
assert data_sources.get_structured_items(datasource.extended_data_source) == [
{'id': 1, 'text': 'John Doe 0'},
{'id': 2, 'text': 'John Doe 1'},
]
datasource.users_included_roles = [role1.id]
datasource.store()
assert data_sources.get_structured_items({'type': datasource.slug}) == [
{'id': 1, 'text': 'John Doe 0'},
{'id': 2, 'text': 'John Doe 1'},
]
assert data_sources.get_structured_items(datasource.extended_data_source) == [
{'id': 1, 'text': 'John Doe 0'},
{'id': 2, 'text': 'John Doe 1'},
]
datasource.users_included_roles = [role1.id, role2.id]
datasource.store()
assert data_sources.get_structured_items({'type': datasource.slug}) == []
assert data_sources.get_structured_items(datasource.extended_data_source) == []
users[0].roles = [role1.id, role2.id]
users[0].store()
assert data_sources.get_structured_items({'type': datasource.slug}) == [{'id': 1, 'text': 'John Doe 0'}]
assert data_sources.get_structured_items(datasource.extended_data_source) == [
{'id': 1, 'text': 'John Doe 0'}
]
users[0].roles = [role2.id]
users[0].store()
datasource.users_included_roles = []
datasource.users_excluded_roles = [role1.id]
datasource.store()
assert data_sources.get_structured_items({'type': datasource.slug}) == [{'id': 1, 'text': 'John Doe 0'}]
assert data_sources.get_structured_items(datasource.extended_data_source) == [
{'id': 1, 'text': 'John Doe 0'}
]
datasource.users_excluded_roles = [role1.id, role2.id]
datasource.store()
assert data_sources.get_structured_items({'type': datasource.slug}) == []
assert data_sources.get_structured_items(datasource.extended_data_source) == []

View File

@ -37,11 +37,14 @@ from wcs.qommon.form import (
FileWidget,
Form,
HtmlWidget,
SingleSelectWidget,
StringWidget,
TextWidget,
WidgetList,
get_response,
get_session,
)
from wcs.roles import get_user_roles
class NamedDataSourceUI:
@ -61,15 +64,16 @@ class NamedDataSourceUI:
rows=5,
value=self.datasource.description,
)
form.add(
DataSourceSelectionWidget,
'data_source',
value=self.datasource.data_source,
title=_('Data Source'),
allow_geojson=True,
allow_named_sources=False,
required=True,
)
if not self.datasource or self.datasource.type != 'wcs:users':
form.add(
DataSourceSelectionWidget,
'data_source',
value=self.datasource.data_source,
title=_('Data Source'),
allow_geojson=True,
allow_named_sources=False,
required=True,
)
form.add(
DurationWidget,
'cache_duration',
@ -87,59 +91,81 @@ class NamedDataSourceUI:
'data-dynamic-display-value-in': 'json|geojson',
},
)
form.add(
StringWidget,
'query_parameter',
value=self.datasource.query_parameter,
title=_('Query Parameter'),
hint=_('Name of the parameter to use for querying source (typically, q)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_parameter',
value=self.datasource.id_parameter,
title=_('Id Parameter'),
hint=_('Name of the parameter to use to get a given entry from data source (typically, id)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_property',
value=self.datasource.id_property,
title=_('Id Property'),
hint=_('Name of the property to use to get a given entry from data source (default: id)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'geojson',
},
)
form.add(
StringWidget,
'label_template_property',
value=self.datasource.label_template_property,
title=_('Label template'),
hint=_('Django expression to build label of each value (default: {{ text }})'),
required=False,
advanced=False,
size=80,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'geojson',
},
)
if not self.datasource or self.datasource.type != 'wcs:users':
form.add(
StringWidget,
'query_parameter',
value=self.datasource.query_parameter,
title=_('Query Parameter'),
hint=_('Name of the parameter to use for querying source (typically, q)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_parameter',
value=self.datasource.id_parameter,
title=_('Id Parameter'),
hint=_('Name of the parameter to use to get a given entry from data source (typically, id)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_property',
value=self.datasource.id_property,
title=_('Id Property'),
hint=_('Name of the property to use to get a given entry from data source (default: id)'),
required=False,
advanced=False,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'geojson',
},
)
form.add(
StringWidget,
'label_template_property',
value=self.datasource.label_template_property,
title=_('Label template'),
hint=_('Django expression to build label of each value (default: {{ text }})'),
required=False,
advanced=False,
size=80,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'geojson',
},
)
if self.datasource and self.datasource.type == 'wcs:users':
options = [(None, '---', None)]
options += get_user_roles()
form.add(
WidgetList,
'users_included_roles',
element_type=SingleSelectWidget,
value=self.datasource.users_included_roles,
title=_('Users with roles'),
add_element_label=_('Add Role'),
element_kwargs={'render_br': False, 'options': options},
)
form.add(
WidgetList,
'users_excluded_roles',
element_type=SingleSelectWidget,
value=self.datasource.users_excluded_roles,
title=_('Users without roles'),
add_element_label=_('Add Role'),
element_kwargs={'render_br': False, 'options': options},
)
if self.datasource.slug and not self.datasource.is_used():
form.add(
StringWidget,
@ -149,45 +175,46 @@ class NamedDataSourceUI:
required=True,
advanced=True,
)
form.add(
StringWidget,
'data_attribute',
value=self.datasource.data_attribute,
title=_('Data Attribute'),
hint=_('Name of the attribute containing the list of results (default: data)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_attribute',
value=self.datasource.id_attribute,
title=_('Id Attribute'),
hint=_('Name of the attribute containing the identifier of an entry (default: id)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'text_attribute',
value=self.datasource.text_attribute,
title=_('Text Attribute'),
hint=_('Name of the attribute containing the label of an entry (default: text)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
if not self.datasource or self.datasource.type != 'wcs:users':
form.add(
StringWidget,
'data_attribute',
value=self.datasource.data_attribute,
title=_('Data Attribute'),
hint=_('Name of the attribute containing the list of results (default: data)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'id_attribute',
value=self.datasource.id_attribute,
title=_('Id Attribute'),
hint=_('Name of the attribute containing the identifier of an entry (default: id)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
StringWidget,
'text_attribute',
value=self.datasource.text_attribute,
title=_('Text Attribute'),
hint=_('Name of the attribute containing the label of an entry (default: text)'),
required=False,
advanced=True,
attrs={
'data-dynamic-display-child-of': 'data_source$type',
'data-dynamic-display-value': 'json',
},
)
form.add(
CheckboxWidget,
'notify_on_errors',
@ -276,7 +303,7 @@ class NamedDataSourcePage(Directory):
get_response().filter['sidebar'] = self.get_sidebar()
return template.QommonTemplateResponse(
templates=['wcs/backoffice/data-source.html'],
context={'view': self, 'datasource': self.datasource},
context={'view': self, 'datasource': self.datasource, 'roles': get_user_roles()},
)
def usage_in_formdefs(self):
@ -289,7 +316,7 @@ class NamedDataSourcePage(Directory):
def preview_block(self):
data_source = self.datasource.extended_data_source
if data_source.get('type') not in ('json', 'geojson', 'formula'):
if data_source.get('type') not in ('json', 'geojson', 'formula', 'wcs:users'):
return ''
items = get_structured_items(data_source)
if not items:
@ -377,7 +404,13 @@ class NamedDataSourcePage(Directory):
class NamedDataSourcesDirectory(Directory):
_q_exports = ['', 'new', ('import', 'p_import'), ('sync-agendas', 'sync_agendas')]
_q_exports = [
'',
'new',
('new-users', 'new_users'),
('import', 'p_import'),
('sync-agendas', 'sync_agendas'),
]
def _q_traverse(self, path):
get_response().breadcrumb.append(('data-sources/', _('Data Sources')))
@ -386,10 +419,13 @@ class NamedDataSourcesDirectory(Directory):
def _q_index(self):
html_top('datasources', title=_('Data Sources'))
data_sources = []
user_data_sources = []
agenda_data_sources = []
for ds in NamedDataSource.select(order_by='name'):
if ds.external == 'agenda':
agenda_data_sources.append(ds)
elif ds.type == 'wcs:users':
user_data_sources.append(ds)
else:
data_sources.append(ds)
generated_data_sources = list(CardDef.get_carddefs_as_data_source())
@ -398,15 +434,20 @@ class NamedDataSourcesDirectory(Directory):
templates=['wcs/backoffice/data-sources.html'],
context={
'data_sources': data_sources,
'user_data_sources': user_data_sources,
'has_chrono': has_chrono(get_publisher()),
'has_users': get_publisher().is_using_postgresql(),
'agenda_data_sources': agenda_data_sources,
'generated_data_sources': generated_data_sources,
},
)
def new(self):
get_response().breadcrumb.append(('new', _('New')))
datasource_ui = NamedDataSourceUI(None)
def _new(self, url, breadcrumb, title, ds_type=None):
get_response().breadcrumb.append((url, breadcrumb))
datasource = NamedDataSource()
if ds_type is not None:
datasource.data_source = {'type': ds_type}
datasource_ui = NamedDataSourceUI(datasource)
form = datasource_ui.get_form()
if form.get_widget('cancel').parse():
return redirect('.')
@ -419,12 +460,23 @@ class NamedDataSourcesDirectory(Directory):
else:
return redirect('.')
html_top('datasources', title=_('New Data Source'))
html_top('datasources', title=title)
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Data Source')
r += htmltext('<h2>%s</h2>') % title
r += form.render()
return r.getvalue()
def new(self):
return self._new(url='new', breadcrumb=_('New'), title=_('New Data Source'))
def new_users(self):
return self._new(
url='new-users',
breadcrumb=_('New Users Data Source'),
title=_('New Users Data Source'),
ds_type='wcs:users',
)
def _q_lookup(self, component):
return NamedDataSourcePage(component)

View File

@ -69,6 +69,7 @@ class DataSourceSelectionWidget(CompositeWidget):
nds_options = []
nds_agenda_options = []
nds_users_options = []
for ds in NamedDataSource.select():
option = (
@ -82,6 +83,8 @@ class DataSourceSelectionWidget(CompositeWidget):
)
if ds.external == 'agenda':
nds_agenda_options.append(option)
elif ds.type == 'wcs:users':
nds_users_options.append(option)
else:
nds_options.append(option)
@ -90,6 +93,11 @@ class DataSourceSelectionWidget(CompositeWidget):
options.append(OptGroup(_('Agendas')))
options.extend(nds_agenda_options)
nds_users_options.sort(key=lambda x: misc.simplify(x[1]))
if nds_users_options:
options.append(OptGroup(_('Users')))
options.extend(nds_users_options)
nds_options.sort(key=lambda x: misc.simplify(x[1]))
if nds_options:
options.append(OptGroup(_('Manually Configured Data Sources')))
@ -261,13 +269,21 @@ def get_structured_items(data_source, mode=None):
return CardDef.get_data_source_items(data_source['type'])
if data_source.get('type') not in ('json', 'jsonp', 'geojson', 'formula'):
if data_source.get('type') not in ('json', 'jsonp', 'geojson', 'formula', 'wcs:users'):
# named data source
named_data_source = NamedDataSource.get_by_slug(data_source['type'])
if named_data_source.cache_duration:
cache_duration = int(named_data_source.cache_duration)
data_source = named_data_source.extended_data_source
if data_source.get('type') == 'wcs:users':
users = get_publisher().user_class.get_users_with_roles(
included_roles=data_source.get('included_roles'),
excluded_roles=data_source.get('excluded_roles'),
order_by='name',
)
return [{'id': u.id, 'text': u.name} for u in users]
if data_source.get('type') == 'formula':
# the result of a python expression, it must be a list.
# - of strings
@ -413,6 +429,8 @@ class NamedDataSource(XmlStorableObject):
external_status = None
notify_on_errors = False
record_on_errors = False
users_included_roles = None
users_excluded_roles = None
# declarations for serialization
XML_NODES = [
@ -432,6 +450,8 @@ class NamedDataSource(XmlStorableObject):
('data_source', 'data_source'),
('notify_on_errors', 'bool'),
('record_on_errors', 'bool'),
('users_included_roles', 'str_list'),
('users_excluded_roles', 'str_list'),
]
def __init__(self, name=None):
@ -440,6 +460,8 @@ class NamedDataSource(XmlStorableObject):
@property
def type(self):
if not self.data_source:
return None
return self.data_source.get('type')
@property
@ -467,6 +489,15 @@ class NamedDataSource(XmlStorableObject):
}
)
return data_source
if self.type == 'wcs:users':
data_source = self.data_source.copy()
data_source.update(
{
'included_roles': self.users_included_roles,
'excluded_roles': self.users_excluded_roles,
}
)
return data_source
return self.data_source
def can_jsonp(self):
@ -732,6 +763,7 @@ class NamedDataSource(XmlStorableObject):
def type_label(self):
data_source_labels = {
'wcs:users': _('Users'),
'json': _('JSON'),
'jsonp': _('JSONP'),
'geojson': _('GeoJSON'),

View File

@ -157,6 +157,13 @@ class NotContains(Contains):
return super().as_sql()
class ArrayContains(Contains):
sql_op = '@>'
def as_sql_param(self):
return {'c%s' % id(self.value): self.value}
class NotNull(Criteria):
sql_op = 'IS NOT NULL'
@ -221,6 +228,19 @@ class And(Criteria):
return d
class Not(Criteria):
def __init__(self, criteria, **kwargs):
sql_class = globals().get(criteria.__class__.__name__)
sql_element = sql_class(**criteria.__dict__)
self.criteria = sql_element
def as_sql(self):
return 'NOT ( %s )' % self.criteria.as_sql()
def as_sql_param(self):
return self.criteria.as_sql_param()
class Intersects(Criteria):
def as_sql(self):
if not self.value:

View File

@ -23,6 +23,27 @@
<li>{% trans "URL:" %} <a href="{{ datasource.get_variadic_url }}">{{ datasource.data_source.value }}</a></li>
{% elif datasource.data_source.type == 'formula' %}
<li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li>
{% elif datasource.data_source.type == 'users' %}
{% spaceless %}
<li>{% trans "Users with roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_included_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% endfor %}
</ul>
</li>
<li>{% trans "Users without roles:" %}
<ul>
{% for role in roles %}
{% if role.0 in datasource.users_excluded_roles %}
<li>{{ role.1 }}</li>
{% endif %}
{% endfor %}
</ul>
</li>
{% endspaceless %}
{% endif %}
{% if datasource.cache_duration %}
<li>{% trans "Cache Duration:" %} {{ datasource.humanized_cache_duration }}

View File

@ -8,6 +8,9 @@
<a href="sync-agendas">{% trans "Refresh agendas" %}</a>
{% endif %}
<a data-popup href="import">{% trans "Import" %}</a>
{% if has_users %}
<a href="new-users">{% trans "New User Data Source" %}</a>
{% endif %}
<a href="new">{% trans "New Data Source" %}</a>
{% endblock %}
@ -44,6 +47,23 @@
</div>
{% endif %}
{% if has_users %}
<div class="section">
<h2>{% trans "Users Data Sources" %}</h2>
{% if user_data_sources %}
<ul class="objects-list single-links">
{% for data_source in user_data_sources %}
<li><a href="{{ data_source.id }}/">{{ data_source.name }} ({{ data_source.slug }})</a></li>
{% endfor %}
</ul>
{% else %}
<div>
{% trans "There are no users data sources defined." %}
</div>
{% endif %}
</div>
{% endif %}
<div class="section">
<h2>{% trans "Manually Configured Data Sources" %}</h2>
{% if data_sources %}

View File

@ -185,6 +185,20 @@ class User(StorableObject):
# suitable for Intersects()
return cls.select([st.Null('deleted_timestamp'), st.Intersects('roles', [str(role_id)])])
@classmethod
def get_users_with_roles(cls, included_roles=None, excluded_roles=None, order_by=None):
if not get_publisher().is_using_postgresql():
return []
from wcs import sql
criterias = [sql.Null('deleted_timestamp')]
if included_roles:
criterias.append(sql.ArrayContains('roles', [str(r) for r in included_roles]))
if excluded_roles:
criterias.append(sql.Not(sql.Intersects('roles', [str(r) for r in excluded_roles])))
return cls.select(criterias, order_by=order_by)
@classmethod
def get_users_with_name_identifier(cls, name_identifier):
return cls.select(