misc: replace get_role_translation() with a get_function_roles() method (#.....)
gitea-wip/wcs/pipeline/head Build started... Details
gitea/wcs/pipeline/head Something is wrong with the build of this commit Details

It removes the unobvious "_translation()" that has nothing to do with
languages and returns a set() instead of a single role identifier; this
may allow multiple roles filling a single function, at a later stage.
This commit is contained in:
Frédéric Péters 2021-04-25 14:36:33 +02:00
parent 14c2aa6f9f
commit 8ec27fd255
6 changed files with 50 additions and 51 deletions

View File

@ -3304,18 +3304,20 @@ class FormBackOfficeStatusPage(FormStatusPage):
r += htmltext('<ul class="form-inspector biglist">')
for key, label in (workflow.roles or {}).items():
r += htmltext('<li><span class="label">%s</span>') % label
acting_role_id = self.filled.get_role_translation(key)
if acting_role_id:
r += htmltext('<div class="value">')
acting_role_ids = self.filled.get_function_roles(key)
for acting_role_id in acting_role_ids:
try:
acting_role = get_publisher().role_class.get(acting_role_id)
r += htmltext('<div class="value"><span>%s</span></div>') % acting_role.name
r += htmltext('<span>%s</span> ') % acting_role.name
except KeyError:
r += htmltext('<div class="value"><span>%s %s</span></div>') % (
r += htmltext('<span>%s %s</span> ') % (
acting_role_id,
_('(deleted)'),
)
else:
r += htmltext('<div class="value"><span class="unset">%s</span></div>') % _('unset')
if not acting_role_ids:
r += htmltext('<span class="unset">%s</span>') % _('unset')
r += htmltext('</div>')
r += htmltext('</li>\n')
r += htmltext('</ul>')
r += htmltext('</div>')

View File

@ -672,7 +672,8 @@ class FormData(StorableObject):
def get_display_id(self):
return str(self.id_display or self.id)
def get_role_translation(self, role_name):
def get_function_roles(self, role_name):
# get a function name or role identifier and return a set of role identifiers
if role_name == '_submitter':
raise Exception('_submitter is not a valid role')
if str(role_name).startswith('_'):
@ -682,14 +683,15 @@ class FormData(StorableObject):
if not role_id and self.formdef.workflow_roles:
role_id = self.formdef.workflow_roles.get(role_name)
if role_id is None:
return role_id
return str(role_id)
return str(role_name)
return set()
return set([str(role_id)])
return set([str(role_name)])
def get_handling_role_id(self):
# TODO: look at current status and return the role(s) actually
# concerned by the handling of the formdata
return self.get_role_translation('_receiver')
for role_id in self.get_function_roles('_receiver'):
return role_id
def get_handling_role(self):
try:
@ -989,8 +991,7 @@ class FormData(StorableObject):
if self.is_submitter(user):
return True
elif user:
role = self.get_role_translation(role)
if role in user.get_roles():
if self.get_function_roles(role).intersection(user.get_roles()):
return True
return False
@ -1010,8 +1011,7 @@ class FormData(StorableObject):
# the very end (where it may be that there is no workflow status item
# at all).
for function_key in self.formdef.workflow.roles.keys():
handling_role = self.get_role_translation(function_key)
if handling_role:
for handling_role in self.get_function_roles(function_key):
status_action_roles.add(handling_role)
wf_status = self.get_status()
@ -1042,8 +1042,7 @@ class FormData(StorableObject):
if role == '_submitter':
status_action_roles.add(role)
else:
real_role = self.get_role_translation(role)
if real_role:
for real_role in self.get_function_roles(role):
status_action_roles.add(real_role)
return status_action_roles

View File

@ -49,7 +49,7 @@ class HookDirectory(Directory):
break
if not user:
continue
if self.formdata.get_role_translation(role) in user.get_roles():
if self.formdata.get_function_roles(role).intersection(user.get_roles()):
break
else:
raise errors.AccessForbiddenError('insufficient roles')

View File

@ -63,16 +63,16 @@ class AggregationEmailWorkflowStatusItem(WorkflowStatusItem):
return
for dest in self.to:
dest = formdata.get_role_translation(dest)
try:
aggregate = AggregationEmail.get(dest)
except KeyError:
aggregate = AggregationEmail(id=dest)
for dest_id in formdata.get_function_roles(dest):
try:
aggregate = AggregationEmail.get(dest_id)
except KeyError:
aggregate = AggregationEmail(id=dest_id)
aggregate.append(
{'formdef': formdata.formdef.id, 'formdata': formdata.id, 'formurl': formdata.get_url()}
)
aggregate.store()
aggregate.append(
{'formdef': formdata.formdef.id, 'formdata': formdata.id, 'formurl': formdata.get_url()}
)
aggregate.store()
register_item_class(AggregationEmailWorkflowStatusItem)

View File

@ -152,12 +152,12 @@ class SendNotificationWorkflowStatusItem(WebserviceCallStatusItem):
users.append(formdata.get_user())
continue
dest = formdata.get_role_translation(dest)
try:
role = get_publisher().role_class.get(dest)
except KeyError:
continue
users.extend(get_publisher().user_class.get_users_with_role(role.id))
for dest_id in formdata.get_function_roles(dest):
try:
role = get_publisher().role_class.get(dest_id)
except KeyError:
continue
users.extend(get_publisher().user_class.get_users_with_role(role.id))
for user in users:
if not user or not user.is_active:

View File

@ -531,10 +531,12 @@ class Workflow(StorableObject):
if '_submitter' in (trigger.roles or []) and formdata.is_submitter(user):
actions.append(action)
break
roles = [
formdata.get_role_translation(x) for x in (trigger.roles or []) if x != '_submitter'
]
if set(roles).intersection(user.get_roles()):
roles = set()
for role_id in trigger.roles or []:
if role_id == '_submitter':
continue
roles |= formdata.get_function_roles(role_id)
if roles.intersection(user.get_roles()):
actions.append(action)
break
return actions
@ -1642,8 +1644,7 @@ class WorkflowStatus:
continue
if user is None:
continue
role = filled.get_role_translation(role)
if role in user.get_roles():
if filled.get_function_roles(role).intersection(user.get_roles()):
break
else:
continue
@ -1738,9 +1739,8 @@ class WorkflowStatus:
for role in visibility_roles:
if role != '_submitter':
role = formdata.get_role_translation(role)
if role in user_roles:
return True
if formdata.get_function_roles(role).intersection(user_roles):
return True
return False
def is_endpoint(self):
@ -1940,8 +1940,7 @@ class WorkflowStatusItem(XmlSerialisable):
continue
if not user:
continue
role = formdata.get_role_translation(role)
if role in user.get_roles():
if formdata.get_function_roles(role).intersection(user.get_roles()):
return True
return False
@ -2955,13 +2954,12 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
addresses.append(submitter_email)
continue
dest = formdata.get_role_translation(dest)
try:
role = get_publisher().role_class.get(dest)
except KeyError:
continue
addresses.extend(role.get_emails())
for real_dest in formdata.get_function_roles(dest):
try:
role = get_publisher().role_class.get(real_dest)
except KeyError:
continue
addresses.extend(role.get_emails())
if not addresses:
return