formdata: add support for multiple roles for a single function (#55019)
This commit is contained in:
parent
454d63dcd2
commit
bb97514701
|
@ -6352,3 +6352,55 @@ def test_backoffice_dispatch_lose_access(pub):
|
|||
resp = app.get('/backoffice/management/%s/%s/' % (formdef.url_name, formdata.id))
|
||||
resp = resp.form.submit('button_add_function')
|
||||
assert resp.location == '..' # no access -> to listing
|
||||
|
||||
|
||||
def test_backoffice_dispatch_multi(pub):
|
||||
user = create_user(pub)
|
||||
create_environment(pub)
|
||||
|
||||
role1 = pub.role_class(name='xxx1')
|
||||
role1.store()
|
||||
role2 = pub.role_class(name='xxx2')
|
||||
role2.store()
|
||||
user.roles.append(role1.id)
|
||||
user.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test dispatch multi'
|
||||
formdef.fields = []
|
||||
|
||||
wf = Workflow(name='dispatch')
|
||||
wf.roles['_foobar'] = 'Foobar'
|
||||
|
||||
st1 = wf.add_status('Status1')
|
||||
dispatch = DispatchWorkflowStatusItem()
|
||||
dispatch.id = '_dispatch'
|
||||
dispatch.role_key = '_receiver'
|
||||
dispatch.role_id = role2.id
|
||||
dispatch.operation_mode = 'add'
|
||||
st1.items.append(dispatch)
|
||||
dispatch.parent = st1
|
||||
|
||||
add_function = ChoiceWorkflowStatusItem()
|
||||
add_function.id = '_add_function'
|
||||
add_function.label = 'Add function'
|
||||
add_function.by = ['_receiver']
|
||||
add_function.status = st1.id
|
||||
st1.items.append(add_function)
|
||||
add_function.parent = st1
|
||||
|
||||
wf.store()
|
||||
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.workflow_roles = {'_receiver': role1.id}
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/management/%s/%s/' % (formdef.url_name, formdata.id))
|
||||
resp = resp.form.submit('button_add_function').follow(status=200) # access to role1 is still ok
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.workflow_roles == {'_receiver': [role1.id, role2.id]}
|
||||
|
|
|
@ -485,7 +485,68 @@ def test_dispatch(pub):
|
|||
item.role_key = '_receiver'
|
||||
item.role_id = role.id
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
|
||||
def test_dispatch_multi(pub):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.store()
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='xxx')
|
||||
role.store()
|
||||
role2 = pub.role_class(name='xxx2')
|
||||
role2.store()
|
||||
role3 = pub.role_class(name='xxx3')
|
||||
role3.store()
|
||||
|
||||
item = DispatchWorkflowStatusItem()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
item.perform(formdata)
|
||||
assert not formdata.workflow_roles
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
item.role_key = '_receiver'
|
||||
item.role_id = role.id
|
||||
item.operation_mode = 'add'
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
item.role_id = role2.id
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id, role2.id]}
|
||||
|
||||
item.operation_mode = 'set'
|
||||
item.role_id = role3.id
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role3.id]}
|
||||
|
||||
# test adding to function defined at the formdef level
|
||||
formdef.workflow_roles = {'_receiver': role.id}
|
||||
formdef.store()
|
||||
formdata.workflow_roles = {}
|
||||
formdata.store()
|
||||
|
||||
item.operation_mode = 'add'
|
||||
item.role_id = role2.id
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id, role2.id]}
|
||||
|
||||
# test adding a second time doesn't change anything
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id, role2.id]}
|
||||
|
||||
# test removing
|
||||
item.operation_mode = 'remove'
|
||||
item.role_id = role2.id
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
# test removing a second time doesn't change anything
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
|
||||
def test_dispatch_auto(two_pubs):
|
||||
|
@ -544,14 +605,14 @@ def test_dispatch_auto(two_pubs):
|
|||
two_pubs.substitutions.reset()
|
||||
two_pubs.substitutions.feed(formdata)
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': role1.id}
|
||||
assert formdata.workflow_roles == {'_receiver': [role1.id]}
|
||||
|
||||
# other match
|
||||
formdata.data = {'1': 'bar'}
|
||||
two_pubs.substitutions.reset()
|
||||
two_pubs.substitutions.feed(formdata)
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': role2.id}
|
||||
assert formdata.workflow_roles == {'_receiver': [role2.id]}
|
||||
|
||||
# unknown role
|
||||
formdata.data = {'1': 'foo'}
|
||||
|
@ -599,13 +660,13 @@ def test_dispatch_computed(two_pubs):
|
|||
item.role_key = '_receiver'
|
||||
item.role_id = '="yyy"' # slug
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
item.role_key = '_receiver'
|
||||
item.role_id = '="xxx"' # name
|
||||
item.perform(formdata)
|
||||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||||
assert formdata.workflow_roles == {'_receiver': [role.id]}
|
||||
|
||||
# unknown role
|
||||
formdata = formdef.data_class()()
|
||||
|
|
|
@ -3416,16 +3416,17 @@ class FormBackOfficeStatusPage(FormStatusPage):
|
|||
r += htmltext('<li><span class="label">%s</span>') % label
|
||||
r += htmltext('<div class="value">')
|
||||
acting_role_ids = self.filled.get_function_roles(key)
|
||||
acting_role_names = []
|
||||
for acting_role_id in acting_role_ids:
|
||||
try:
|
||||
acting_role = get_publisher().role_class.get(acting_role_id)
|
||||
r += htmltext('<span>%s</span> ') % acting_role.name
|
||||
acting_role_names.append(acting_role.name)
|
||||
except KeyError:
|
||||
r += htmltext('<span>%s %s</span> ') % (
|
||||
acting_role_id,
|
||||
_('(deleted)'),
|
||||
)
|
||||
if not acting_role_ids:
|
||||
acting_role_names.append('%s (%s)' % (acting_role_id, _('deleted')))
|
||||
if acting_role_names:
|
||||
acting_role_names.sort()
|
||||
r += ', '.join(acting_role_names)
|
||||
else:
|
||||
r += htmltext('<span class="unset">%s</span>') % _('unset')
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('</li>\n')
|
||||
|
|
|
@ -689,6 +689,8 @@ class FormData(StorableObject):
|
|||
role_id = self.formdef.workflow_roles.get(role_name)
|
||||
if role_id is None:
|
||||
return set()
|
||||
if isinstance(role_id, list):
|
||||
return set(role_id)
|
||||
return set([str(role_id)])
|
||||
return set([str(role_name)])
|
||||
|
||||
|
@ -844,6 +846,9 @@ class FormData(StorableObject):
|
|||
workflow_roles.update(self.workflow_roles)
|
||||
|
||||
for role_type, role_id in workflow_roles.items():
|
||||
if isinstance(role_id, list):
|
||||
# only return first role
|
||||
role_id = role_id[0]
|
||||
prefix = 'form_role_%s_' % role_type.replace('-', '_').strip('_')
|
||||
try:
|
||||
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
||||
|
@ -1196,7 +1201,10 @@ class FormData(StorableObject):
|
|||
if self.workflow_roles:
|
||||
workflow_roles.update(self.workflow_roles)
|
||||
for workflow_role in workflow_roles:
|
||||
data['roles'][workflow_role] = [workflow_roles.get(workflow_role)]
|
||||
value = workflow_roles.get(workflow_role)
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
data['roles'][workflow_role] = value
|
||||
data['roles']['concerned'] = self.get_concerned_roles()
|
||||
data['roles']['actions'] = self.get_actions_roles()
|
||||
|
||||
|
|
|
@ -1445,17 +1445,25 @@ class FormDef(StorableObject):
|
|||
if not self.workflow_roles:
|
||||
self.workflow_roles = {}
|
||||
|
||||
user_roles = set(user.get_roles())
|
||||
|
||||
# if the formdef itself has some function attributed to the user, grant
|
||||
# access.
|
||||
for role_id in self.workflow_roles.values():
|
||||
if role_id in user.get_roles():
|
||||
if role_id in user_roles:
|
||||
return True
|
||||
|
||||
# if there was some redispatching of function, values will be different
|
||||
# in formdata, check them.
|
||||
if formdata and formdata.workflow_roles:
|
||||
for role_id in formdata.workflow_roles.values():
|
||||
if role_id in user.get_roles():
|
||||
if role_id is None:
|
||||
continue
|
||||
if isinstance(role_id, list):
|
||||
role_ids = set(role_id)
|
||||
else:
|
||||
role_ids = set([role_id])
|
||||
if user_roles.intersection(role_ids):
|
||||
return True
|
||||
|
||||
# if no formdata was given, lookup if there are some existing formdata
|
||||
|
@ -1510,7 +1518,11 @@ class FormDef(StorableObject):
|
|||
self.workflow_roles = {}
|
||||
form_roles = [x for x in self.workflow_roles.values() if x]
|
||||
if formdata and formdata.workflow_roles:
|
||||
form_roles.extend([x for x in formdata.workflow_roles.values() if x])
|
||||
for x in formdata.workflow_roles.values():
|
||||
if isinstance(x, list):
|
||||
form_roles.extend(x)
|
||||
elif x:
|
||||
form_roles.append(x)
|
||||
return self.is_user_allowed_read(user, formdata=formdata)
|
||||
|
||||
def is_disabled(self):
|
||||
|
|
|
@ -553,6 +553,8 @@ def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild
|
|||
'page_no',
|
||||
'anonymised',
|
||||
'workflow_roles',
|
||||
# workflow_roles_array is created from workflow_roles to be used in
|
||||
# get_ids_with_indexed_value
|
||||
'workflow_roles_array',
|
||||
'concerned_roles_array',
|
||||
'tracking_code',
|
||||
|
@ -1980,7 +1982,12 @@ class SqlDataMixin(SqlMixin):
|
|||
else:
|
||||
sql_dict['receipt_time'] = None
|
||||
if self.workflow_roles:
|
||||
sql_dict['workflow_roles_array'] = [str(x) for x in self.workflow_roles.values() if x is not None]
|
||||
sql_dict['workflow_roles_array'] = []
|
||||
for x in self.workflow_roles.values():
|
||||
if isinstance(x, list):
|
||||
sql_dict['workflow_roles_array'].extend(x)
|
||||
elif x:
|
||||
sql_dict['workflow_roles_array'].append(str(x))
|
||||
else:
|
||||
sql_dict['workflow_roles_array'] = None
|
||||
for attr in ('workflow_data', 'workflow_roles', 'submission_context', 'prefilling_data'):
|
||||
|
|
|
@ -509,6 +509,9 @@ class LazyFormData(LazyFormDef):
|
|||
d = {}
|
||||
for role_type, role_id in workflow_roles.items():
|
||||
prefix = '%s_' % role_type.replace('-', '_').strip('_')
|
||||
if isinstance(role_id, list):
|
||||
# only return first role
|
||||
role_id = role_id[0]
|
||||
try:
|
||||
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
||||
except KeyError:
|
||||
|
|
|
@ -94,9 +94,10 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
|
|||
dispatch_type = 'manual'
|
||||
variable = None
|
||||
rules = None
|
||||
operation_mode = 'set'
|
||||
|
||||
def get_parameters(self):
|
||||
return ('role_key', 'dispatch_type', 'role_id', 'variable', 'rules', 'condition')
|
||||
return ('role_key', 'dispatch_type', 'role_id', 'variable', 'rules', 'operation_mode', 'condition')
|
||||
|
||||
def role_id_export_to_xml(self, item, charset, include_id=False):
|
||||
self._role_export_to_xml('role_id', item, charset, include_id=include_id)
|
||||
|
@ -189,6 +190,20 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
|
|||
'data-dynamic-display-value': dispatch_types.get('automatic'),
|
||||
},
|
||||
)
|
||||
if 'operation_mode' in parameters:
|
||||
form.add(
|
||||
RadiobuttonsWidget,
|
||||
'%soperation_mode' % prefix,
|
||||
title=_('Operation Mode'),
|
||||
options=[
|
||||
('set', _('Set role to function')),
|
||||
('add', _('Add role to function')),
|
||||
('remove', _('Remove role from function')),
|
||||
],
|
||||
value=self.operation_mode,
|
||||
required=True,
|
||||
extra_css_class='widget-inline-radio',
|
||||
)
|
||||
|
||||
def get_role_id_parameter_view_value(self):
|
||||
return get_role_name(self.role_id)
|
||||
|
@ -239,14 +254,29 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
|
|||
new_role_id = rule.get('role_id')
|
||||
break
|
||||
|
||||
if new_role_id:
|
||||
if not get_publisher().role_class.get(new_role_id, ignore_errors=True):
|
||||
get_publisher().record_error(
|
||||
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata
|
||||
)
|
||||
if new_role_id and not get_publisher().role_class.get(new_role_id, ignore_errors=True):
|
||||
get_publisher().record_error(
|
||||
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata
|
||||
)
|
||||
elif new_role_id:
|
||||
new_role_id = str(new_role_id)
|
||||
if not formdata.workflow_roles.get(self.role_key):
|
||||
formdef_workflow_roles = formdata.formdef.workflow_roles or {}
|
||||
formdata.workflow_roles[self.role_key] = formdef_workflow_roles.get(self.role_key)
|
||||
if self.operation_mode == 'set':
|
||||
formdata.workflow_roles[self.role_key] = [new_role_id]
|
||||
else:
|
||||
formdata.workflow_roles[self.role_key] = str(new_role_id)
|
||||
formdata.store()
|
||||
if not isinstance(formdata.workflow_roles[self.role_key], list):
|
||||
if formdata.workflow_roles[self.role_key]:
|
||||
formdata.workflow_roles[self.role_key] = [formdata.workflow_roles[self.role_key]]
|
||||
else:
|
||||
formdata.workflow_roles[self.role_key] = []
|
||||
roles = formdata.workflow_roles[self.role_key]
|
||||
if self.operation_mode == 'add' and new_role_id not in roles:
|
||||
roles.append(new_role_id)
|
||||
elif self.operation_mode == 'remove' and new_role_id in roles:
|
||||
roles.remove(new_role_id)
|
||||
formdata.store()
|
||||
|
||||
|
||||
register_item_class(DispatchWorkflowStatusItem)
|
||||
|
|
Loading…
Reference in New Issue