general: allow adding users to functions (#55097)

This commit is contained in:
Frédéric Péters 2021-06-21 21:36:05 +02:00
parent bb97514701
commit bebfa9d81f
6 changed files with 164 additions and 7 deletions

View File

@ -6404,3 +6404,75 @@ def test_backoffice_dispatch_multi(pub):
resp = resp.form.submit('button_add_function').follow(status=200) # access to role1 is still ok
formdata.refresh_from_storage()
assert formdata.workflow_roles == {'_receiver': [role1.id, role2.id]}
@pytest.mark.parametrize(
'user_template',
[
'{{ session_user }}',
'{{ session_user_email }}',
'{{ session_user_nameid }}',
'{{ session_user_name }}',
'foobar', # a role, not an user
],
)
def test_backoffice_dispatch_single_user(pub, user_template):
pub.user_class.wipe()
user = create_user(pub)
user.name_identifiers = ['0123456789']
user.store()
create_environment(pub)
formdef = FormDef()
formdef.name = 'test dispatch user'
formdef.fields = []
wf = Workflow(name='dispatch')
wf.roles['_foobar'] = 'Foobar'
st1 = wf.add_status('Status1')
dispatch = DispatchWorkflowStatusItem()
dispatch.id = '_dispatch'
dispatch.role_key = '_foobar'
dispatch.role_id = user_template
st1.items.append(dispatch)
dispatch.parent = st1
add_function = ChoiceWorkflowStatusItem()
add_function.id = '_add_function'
add_function.label = 'Add function'
add_function.by = ['_receiver']
add_function.status = st1.id
st1.items.append(add_function)
add_function.parent = st1
a_button = ChoiceWorkflowStatusItem()
a_button.id = '_a_button'
a_button.label = 'A button'
a_button.by = ['_foobar']
a_button.status = st1.id
st1.items.append(a_button)
a_button.parent = st1
wf.store()
formdef.workflow_id = wf.id
formdef.workflow_roles = {'_receiver': user.roles[0]}
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/%s/%s/' % (formdef.url_name, formdata.id))
assert 'button_a_button' not in resp.text
resp = resp.form.submit('button_add_function').follow(status=200)
formdata.refresh_from_storage()
if user_template != 'foobar':
assert formdata.workflow_roles == {'_foobar': ['_user:%s' % user.id]}
else:
# check role are still dispatched by name
assert formdata.workflow_roles == {'_foobar': [user.roles[0]]}
assert 'button_a_button' in resp.text

View File

@ -683,6 +683,52 @@ def test_dispatch_computed(two_pubs):
assert error.occurences_count == 1
def test_dispatch_user(pub):
user = pub.user_class(name='foo')
user.email = 'foo@localhost'
user.name_identifiers = ['0123456789']
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
formdata.user_id = user.id
pub.substitutions.feed(formdata)
item.role_key = '_receiver'
item.role_id = '{{ form_user }}'
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': ['_user:%s' % user.id]}
formdata.workflow_roles = {}
item.role_id = '{{ form_user_nameid }}'
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': ['_user:%s' % user.id]}
formdata.workflow_roles = {}
item.role_id = '{{ form_user_email }}'
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': ['_user:%s' % user.id]}
formdata.workflow_roles = {}
item.role_id = '{{ form_user_name }}'
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': ['_user:%s' % user.id]}
formdata.workflow_roles = {}
item.role_id = 'xyz'
item.perform(formdata)
assert formdata.workflow_roles == {}
def test_roles(pub):
user = pub.user_class()
user.store()

View File

@ -3419,7 +3419,10 @@ class FormBackOfficeStatusPage(FormStatusPage):
acting_role_names = []
for acting_role_id in acting_role_ids:
try:
acting_role = get_publisher().role_class.get(acting_role_id)
if acting_role_id.startswith('_user:'):
acting_role = get_publisher().user_class.get(acting_role_id.split(':')[1])
else:
acting_role = get_publisher().role_class.get(acting_role_id)
acting_role_names.append(acting_role.name)
except KeyError:
acting_role_names.append('%s (%s)' % (acting_role_id, _('deleted')))

View File

@ -117,7 +117,7 @@ class User(StorableObject):
return self.name_identifiers[0] if self.name_identifiers else None
def get_roles(self):
return self.roles or []
return (self.roles or []) + ['_user:%s' % self.id]
def set_attributes_from_formdata(self, formdata):
users_cfg = get_cfg('users', {})
@ -210,6 +210,10 @@ class User(StorableObject):
def get_users_with_email(cls, email):
return cls.select([st.Null('deleted_timestamp'), st.Equal('email', email)])
@classmethod
def get_users_with_name(cls, name):
return cls.select([st.Null('deleted_timestamp'), st.Equal('name', name)])
def get_substitution_variables(self, prefix='session_'):
d = {
prefix + 'user': self,

View File

@ -1142,6 +1142,9 @@ class LazyUser:
def nameid(self):
return self._user.nameid
def get_value(self):
return self._user
def __getitem__(self, key):
return getattr(self, key)

View File

@ -33,6 +33,7 @@ from ..qommon.form import (
WidgetListAsTable,
)
from ..qommon.template import Template
from ..qommon.templatetags.qommon import unlazy
class AutomaticDispatchRowWidget(CompositeWidget):
@ -216,6 +217,28 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
)
return htmltext('<ul class="rules">%s</ul>') % htmltext('').join(result)
def get_computed_user_id(self, user_identifier):
# look for a user matching "user_identifier", "user_identifier" can be
# an actual user object, a nameid, an email, or a full name.
with get_publisher().complex_data():
maybe_user = self.compute(str(user_identifier), allow_complex=True)
if maybe_user:
maybe_user = unlazy(get_publisher().get_cached_complex_data(maybe_user))
if not maybe_user:
return None
if isinstance(maybe_user, get_publisher().user_class):
return maybe_user.id
users_by_nameid = get_publisher().user_class.get_users_with_name_identifier(maybe_user)
if users_by_nameid:
return users_by_nameid[0].id
users_by_email = get_publisher().user_class.get_users_with_email(maybe_user)
if users_by_email:
return users_by_email[0].id
users_by_name = get_publisher().user_class.get_users_with_name(maybe_user)
if users_by_name:
return users_by_name[0].id
return None
def perform(self, formdata):
if not formdata.workflow_roles:
formdata.workflow_roles = {}
@ -226,6 +249,10 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
if not (self.role_id and self.role_key):
return
new_role_id = self.get_computed_role_id(self.role_id)
if not new_role_id:
user_id = self.get_computed_user_id(self.role_id)
if user_id:
new_role_id = '_user:%s' % user_id
if not new_role_id:
get_publisher().record_error(
_('error in dispatch, missing role (%s)') % self.role_id, formdata=formdata
@ -254,11 +281,13 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
new_role_id = rule.get('role_id')
break
if new_role_id and not get_publisher().role_class.get(new_role_id, ignore_errors=True):
get_publisher().record_error(
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata
)
elif new_role_id:
if new_role_id and not get_publisher().role_class.get(new_role_id, ignore_errors=True):
get_publisher().record_error(
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata
)
new_role_id = None
if new_role_id:
new_role_id = str(new_role_id)
if not formdata.workflow_roles.get(self.role_key):
formdef_workflow_roles = formdata.formdef.workflow_roles or {}