backoffice: add storage/UI to store/assign roles to API accesses (#48752)
This commit is contained in:
parent
06b7412dd6
commit
27756287a0
|
@ -161,3 +161,25 @@ def test_api_access_delete(pub, api_access):
|
|||
resp = resp.form.submit('submit')
|
||||
assert resp.location == 'http://example.net/backoffice/settings/api-access/'
|
||||
assert ApiAccess.count() == 0
|
||||
|
||||
|
||||
def test_api_access_roles(pub, api_access):
|
||||
create_superuser(pub)
|
||||
|
||||
pub.role_class.wipe()
|
||||
role_a = pub.role_class(name='a')
|
||||
role_a.store()
|
||||
role_b = pub.role_class(name='b')
|
||||
role_b.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/settings/api-access/1/')
|
||||
resp = resp.click(href='edit')
|
||||
resp.form['roles$element0'] = role_a.id
|
||||
resp = resp.form.submit('roles$add_element')
|
||||
resp.form['roles$element1'] = role_b.id
|
||||
resp = resp.form.submit('submit')
|
||||
|
||||
api_access = ApiAccess.get(api_access.id)
|
||||
assert set(x.id for x in api_access.get_roles()) == {role_a.id, role_b.id}
|
||||
|
|
|
@ -16,14 +16,22 @@
|
|||
|
||||
import uuid
|
||||
|
||||
from quixote import get_response, redirect
|
||||
from quixote import get_publisher, get_response, redirect
|
||||
from quixote.directory import Directory
|
||||
from quixote.html import TemplateIO, htmltext
|
||||
|
||||
from wcs.api_access import ApiAccess
|
||||
from wcs.qommon import _, errors, template
|
||||
from wcs.qommon.backoffice.menu import html_top
|
||||
from wcs.qommon.form import CheckboxWidget, Form, HtmlWidget, StringWidget, TextWidget
|
||||
from wcs.qommon.form import (
|
||||
CheckboxWidget,
|
||||
Form,
|
||||
HtmlWidget,
|
||||
SingleSelectWidget,
|
||||
StringWidget,
|
||||
TextWidget,
|
||||
WidgetList,
|
||||
)
|
||||
|
||||
|
||||
class ApiAccessUI:
|
||||
|
@ -65,6 +73,21 @@ class ApiAccessUI:
|
|||
title=_('Restrict to anonymised data'),
|
||||
value=self.api_access.restrict_to_anonymised_data,
|
||||
)
|
||||
roles = list(get_publisher().role_class.select(order_by='name'))
|
||||
form.add(
|
||||
WidgetList,
|
||||
'roles',
|
||||
title=_('Roles'),
|
||||
element_type=SingleSelectWidget,
|
||||
value=self.api_access.roles,
|
||||
add_element_label=_('Add Role'),
|
||||
element_kwargs={
|
||||
'render_br': False,
|
||||
'options': [(None, '---', None)] + [(x, x.name, x.id) for x in roles if not x.is_internal()],
|
||||
},
|
||||
hint=_('Roles given with this access'),
|
||||
)
|
||||
|
||||
if not self.api_access.is_readonly():
|
||||
form.add_submit('submit', _('Submit'))
|
||||
form.add_submit('cancel', _('Cancel'))
|
||||
|
@ -86,7 +109,7 @@ class ApiAccessUI:
|
|||
|
||||
self.api_access.name = name
|
||||
self.api_access.access_identifier = access_identifier
|
||||
for attribute in ('description', 'access_key', 'restrict_to_anonymised_data'):
|
||||
for attribute in ('description', 'access_key', 'restrict_to_anonymised_data', 'roles'):
|
||||
setattr(self.api_access, attribute, form.get_widget(attribute).parse())
|
||||
self.api_access.store()
|
||||
|
||||
|
|
|
@ -14,6 +14,12 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.qommon.misc import xml_node_text
|
||||
from wcs.qommon.storage import Equal, Or
|
||||
from wcs.qommon.xml_storage import XmlStorableObject
|
||||
|
||||
|
||||
|
@ -25,6 +31,7 @@ class ApiAccess(XmlStorableObject):
|
|||
access_key = None
|
||||
description = None
|
||||
restrict_to_anonymised_data = False
|
||||
roles = None
|
||||
|
||||
# declarations for serialization
|
||||
XML_NODES = [
|
||||
|
@ -33,6 +40,7 @@ class ApiAccess(XmlStorableObject):
|
|||
('access_identifier', 'str'),
|
||||
('access_key', 'str'),
|
||||
('restrict_to_anonymised_data', 'bool'),
|
||||
('roles', 'roles'),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
|
@ -48,3 +56,27 @@ class ApiAccess(XmlStorableObject):
|
|||
if api_access:
|
||||
return api_access.access_key
|
||||
return None
|
||||
|
||||
def get_roles(self):
|
||||
return self.roles or []
|
||||
|
||||
def export_roles_to_xml(self, element, attribute_name, include_id=False, **kwargs):
|
||||
for role in self.get_roles():
|
||||
sub = ET.SubElement(element, 'role')
|
||||
if include_id:
|
||||
sub.attrib['role-id'] = role.id
|
||||
sub.attrib['role-slug'] = role.slug
|
||||
sub.text = role.name
|
||||
|
||||
def import_roles_from_xml(self, element, include_id=False, **kwargs):
|
||||
criterias = []
|
||||
for sub in element:
|
||||
if include_id and 'role-id' in sub.attrib:
|
||||
criterias.append(Equal('id', sub.attrib['role-id']))
|
||||
elif 'role-slug' in sub.attrib:
|
||||
criterias.append(Equal('slug', sub.attrib['role-slug']))
|
||||
else:
|
||||
role_name = xml_node_text(sub)
|
||||
if role_name:
|
||||
criterias.append(Equal('name', role_name))
|
||||
return get_publisher().role_class.select([Or(criterias)], order_by='name')
|
||||
|
|
|
@ -554,12 +554,12 @@ class NamedDataSource(XmlStorableObject):
|
|||
section = 'settings'
|
||||
return '%s/%s/data-sources/%s/' % (base_url, section, self.id)
|
||||
|
||||
def export_data_source_to_xml(self, element, attribute_name, charset):
|
||||
def export_data_source_to_xml(self, element, attribute_name, charset, **kwargs):
|
||||
data_source = getattr(self, attribute_name)
|
||||
ET.SubElement(element, 'type').text = data_source.get('type')
|
||||
ET.SubElement(element, 'value').text = force_text(data_source.get('value') or '', charset)
|
||||
|
||||
def import_data_source_from_xml(self, element, charset):
|
||||
def import_data_source_from_xml(self, element, **kwargs):
|
||||
return {
|
||||
'type': force_str(element.find('type').text),
|
||||
'value': force_str(element.find('value').text or ''),
|
||||
|
|
|
@ -50,22 +50,23 @@ class XmlStorableObject(StorableObject):
|
|||
if not getattr(self, attribute_name, None):
|
||||
continue
|
||||
element = ET.SubElement(root, attribute_name)
|
||||
getattr(self, 'export_%s_to_xml' % attribute_type)(element, attribute_name, charset=charset)
|
||||
export_method = getattr(self, 'export_%s_to_xml' % attribute_type)
|
||||
export_method(element, attribute_name, charset=charset, include_id=include_id)
|
||||
return root
|
||||
|
||||
def export_str_to_xml(self, element, attribute_name, charset):
|
||||
def export_str_to_xml(self, element, attribute_name, charset, **kwargs):
|
||||
element.text = force_text(getattr(self, attribute_name), charset)
|
||||
|
||||
def export_int_to_xml(self, element, attribute_name, charset):
|
||||
def export_int_to_xml(self, element, attribute_name, **kwargs):
|
||||
element.text = str(getattr(self, attribute_name))
|
||||
|
||||
def export_bool_to_xml(self, element, attribute_name, charset):
|
||||
def export_bool_to_xml(self, element, attribute_name, **kwargs):
|
||||
element.text = 'true' if getattr(self, attribute_name) else 'false'
|
||||
|
||||
def export_datetime_to_xml(self, element, attribute_name, charset):
|
||||
def export_datetime_to_xml(self, element, attribute_name, **kwargs):
|
||||
element.text = getattr(self, attribute_name).isoformat()
|
||||
|
||||
def export_str_list_to_xml(self, element, attribute_name, charset):
|
||||
def export_str_list_to_xml(self, element, attribute_name, **kwargs):
|
||||
for item in getattr(self, attribute_name, None) or []:
|
||||
ET.SubElement(element, 'item').text = item
|
||||
|
||||
|
@ -101,26 +102,27 @@ class XmlStorableObject(StorableObject):
|
|||
element = tree.find(attribute_name)
|
||||
if element is None:
|
||||
continue
|
||||
import_method = getattr(obj, 'import_%s_from_xml' % attribute_type)
|
||||
setattr(
|
||||
obj,
|
||||
attribute_name,
|
||||
getattr(obj, 'import_%s_from_xml' % attribute_type)(element, charset=charset),
|
||||
import_method(element, charset=charset, include_id=include_id),
|
||||
)
|
||||
return obj
|
||||
|
||||
def import_str_from_xml(self, element, charset):
|
||||
def import_str_from_xml(self, element, **kwargs):
|
||||
return xml_node_text(element)
|
||||
|
||||
def import_int_from_xml(self, element, charset):
|
||||
def import_int_from_xml(self, element, **kwargs):
|
||||
return int(element.text)
|
||||
|
||||
def import_bool_from_xml(self, element, charset):
|
||||
def import_bool_from_xml(self, element, **kwargs):
|
||||
return bool(element.text == 'true')
|
||||
|
||||
def import_datetime_from_xml(self, element, charset):
|
||||
def import_datetime_from_xml(self, element, **kwargs):
|
||||
return datetime.datetime.strptime(element.text[:19], '%Y-%m-%dT%H:%M:%S')
|
||||
|
||||
def import_str_list_from_xml(self, element, charset):
|
||||
def import_str_list_from_xml(self, element, **kwargs):
|
||||
value = []
|
||||
for item in element.findall('item'):
|
||||
value.append(item.text)
|
||||
|
|
|
@ -44,6 +44,9 @@ class Role(StorableObject):
|
|||
StorableObject.__init__(self, id=id)
|
||||
self.name = name
|
||||
|
||||
def __eq__(self, other):
|
||||
return bool(self.__class__ is other.__class__ and self.id == other.id)
|
||||
|
||||
def migrate(self):
|
||||
changed = False
|
||||
if not self.slug:
|
||||
|
|
|
@ -19,6 +19,13 @@
|
|||
<li>{% trans "Access identifier:" %} {{ api_access.access_identifier }}</li>
|
||||
<li>{% trans "Access key:" %} {{ api_access.access_key }}</li>
|
||||
{% if api_access.restrict_to_anonymised_data %}<li>{% trans "Restricted to anonymised data" %}</li>{% endif %}
|
||||
{% if api_access.get_roles %}
|
||||
<li>{% trans "Roles:" %}
|
||||
<ul>
|
||||
{% for role in api_access.get_roles %}<li>{{ role.name }}</li>{% endfor %}
|
||||
</ul>
|
||||
</li>
|
||||
{% endif %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
|
|
@ -201,7 +201,7 @@ class NamedWsCall(XmlStorableObject):
|
|||
base_url = get_publisher().get_backoffice_url()
|
||||
return '%s/settings/wscalls/%s/' % (base_url, self.slug)
|
||||
|
||||
def export_request_to_xml(self, element, attribute_name, charset):
|
||||
def export_request_to_xml(self, element, attribute_name, charset, **kwargs):
|
||||
request = getattr(self, attribute_name)
|
||||
for attr in ('url', 'request_signature_key', 'method'):
|
||||
ET.SubElement(element, attr).text = force_text(request.get(attr) or '', charset)
|
||||
|
@ -214,7 +214,7 @@ class NamedWsCall(XmlStorableObject):
|
|||
if request.get('post_formdata'):
|
||||
ET.SubElement(element, 'post_formdata')
|
||||
|
||||
def import_request_from_xml(self, element, charset):
|
||||
def import_request_from_xml(self, element, **kwargs):
|
||||
request = {}
|
||||
for attr in ('url', 'request_signature_key', 'method'):
|
||||
request[attr] = ''
|
||||
|
|
Loading…
Reference in New Issue