misc: skip missing roles when exporting to XML (#24219) #729

Merged
fpeters merged 1 commits from wip/24219-skip-missing-roles-from-workflow-exports into main 2023-10-02 19:11:10 +02:00
8 changed files with 127 additions and 36 deletions

View File

@ -600,6 +600,7 @@ def test_workflows_copy_status_item(pub):
assert 'items/1/' in resp.text
assert 'items/2/' in resp.text
# check invalid role references are not copied
commentable = st1.add_action('commentable', id='_commentable')
commentable.by = ['unknown']
workflow.store()
@ -610,7 +611,8 @@ def test_workflows_copy_status_item(pub):
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, st1.id))
resp = resp.click(href='items/_commentable/copy')
resp = resp.form.submit('submit')
assert '<ul><li>Unknown roles: unknown</li></ul>' in resp
workflow.refresh_from_storage()
assert workflow.possible_status[0].items[-1].by == []
def test_workflows_copy_status_item_create_document(pub):
@ -957,6 +959,7 @@ def test_workflows_duplicate(pub):
assert Workflow.get(3).name == 'other copy'
assert Workflow.get(3).slug == 'other-copy'
# check invalid role references are not copied
commentable = st1.add_action('commentable', '_commentable')
commentable.by = ['unknown']
workflow.store()
@ -966,8 +969,10 @@ def test_workflows_duplicate(pub):
resp = app.get('/backoffice/workflows/1/')
resp = resp.click('Duplicate')
resp = resp.form.submit('submit')
assert '<ul><li>Unknown roles: unknown</li></ul>' in resp
resp = resp.form.submit('submit').follow()
workflow_id = resp.request.url.split('/')[-2]
duplicated_workflow = Workflow.get(workflow_id)
assert duplicated_workflow.possible_status[0].items[0].by == []
def test_workflows_add_all_actions(pub):
@ -3754,6 +3759,45 @@ def test_workflows_inspect_view(pub):
)
def test_workflows_inspect_view_dispatch_action(pub):
create_superuser(pub)
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
Workflow.wipe()
workflow = Workflow(name='foo')
status = workflow.add_status('plop')
action = status.add_action('dispatch')
action.dispatch_type = 'manual'
action.role_id = role.id
workflow.store()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/%s/inspect' % workflow.id)
assert resp.pyquery('.parameter-role_id').text() == 'Role: foobar'
action.dispatch_type = 'automatic'
action.rules = [
{'role_id': role.id, 'value': 'foo'},
]
workflow.store()
resp = app.get('/backoffice/workflows/%s/inspect' % workflow.id)
assert resp.pyquery('.rules').text() == 'foo → foobar'
# missing role
pub.role_class.wipe()
action.dispatch_type = 'manual'
workflow.store()
resp = app.get('/backoffice/workflows/%s/inspect' % workflow.id)
assert resp.pyquery('.parameter-role_id').text() == 'Role: Unknown role (%s)' % role.id
action.dispatch_type = 'automatic'
workflow.store()
resp = app.get('/backoffice/workflows/%s/inspect' % workflow.id)
assert resp.pyquery('.rules').text() == f'foo → Unknown role ({role.id})'

1, c'est l'id du rôle ?

1, c'est l'id du rôle ?

Oui, je vais appliquer ce changement avant de merger, pour que ça soit plus clair :

-    assert resp.pyquery('.rules').text() == 'foo → Unknown role (1)'
+    assert resp.pyquery('.rules').text() == f'foo → Unknown role ({role.id})'
Oui, je vais appliquer ce changement avant de merger, pour que ça soit plus clair : ``` - assert resp.pyquery('.rules').text() == 'foo → Unknown role (1)' + assert resp.pyquery('.rules').text() == f'foo → Unknown role ({role.id})' ```
def test_workflows_unused(pub):
create_superuser(pub)
FormDef.wipe()

View File

@ -846,18 +846,21 @@ def test_workflow_snapshot_browse_with_missing_role(pub):
assert pub.role_class.count() == 0
assert pub.snapshot_class.count() == 1
snapshot = pub.snapshot_class.select_object_history(wf)[0]
assert 'foobar' not in snapshot.get_serialization() # missing role is not saved
app.get('/backoffice/workflows/%s/history/%s/view/' % (wf.id, snapshot.id), status=200)
assert pub.role_class.count() == 1 # missing role was created
assert pub.role_class.count() == 0 # missing role was created
# check behaviour is identical with idp-managed roles.
pub.cfg['sp'] = {'idp-manage-roles': True}
pub.write_cfg()
trigger.allow_as_mass_action = False # so there's a change in snapshot
trigger.roles = ['foobarbaz']
wf.store()
assert pub.role_class.count() == 1
assert pub.snapshot_class.count() == 2
snapshot = pub.snapshot_class.select_object_history(wf)[0]
assert 'foobarbaz' not in snapshot.get_serialization() # missing role is not saved
app.get('/backoffice/workflows/%s/history/%s/view/' % (wf.id, snapshot.id), status=200)
assert pub.role_class.count() == 1 # missing role was not created
assert pub.role_class.count() == 0 # missing role was not created
def test_workflow_snapshot_browse_with_import_error(pub):
@ -940,38 +943,45 @@ def test_workflow_snapshot_restore_with_missing_role(pub):
pub.role_class.wipe()
Workflow.wipe()
role = pub.role_class(name='foobar')
role.store()
wf = Workflow(name='status')
ac1 = wf.add_global_action('Action', 'ac1')
trigger = ac1.triggers[0]
assert trigger.key == 'manual'
trigger.roles = ['foobar']
trigger.roles = [role.id]
wf.store()
assert pub.role_class.count() == 0
assert pub.snapshot_class.count() == 1
snapshot = pub.snapshot_class.select_object_history(wf)[0]
pub.role_class.wipe()
resp = app.get('/backoffice/workflows/%s/history/%s/restore' % (wf.id, snapshot.id), status=200)
resp.form['action'].value = 'overwrite'
resp = resp.form.submit('submit')
assert pub.role_class.count() == 1 # missing role was created
wf = Workflow.get(resp.location.split('/')[-2])
assert wf.global_actions[0].triggers[0].roles == ['1']
assert pub.snapshot_class.count() == 2
assert pub.snapshot_class.count() == 1
pub.role_class.wipe()
role = pub.role_class(name='foobarbaz')
role.id = '123'
role.store()
pub.cfg['sp'] = {'idp-manage-roles': True}
pub.write_cfg()
wf.global_actions[0].triggers[0].roles = ['foobarbaz']
wf.global_actions[0].triggers[0].roles = [role.id]
wf.store()
assert pub.role_class.count() == 1
assert pub.snapshot_class.count() == 3
assert pub.snapshot_class.count() == 2
snapshot = pub.snapshot_class.select_object_history(wf)[0]
pub.role_class.wipe()
resp = app.get('/backoffice/workflows/%s/history/%s/restore' % (wf.id, snapshot.id), status=200)
resp.form['action'].value = 'overwrite'
resp = resp.form.submit('submit')
assert pub.role_class.count() == 1 # missing role was not created
assert pub.role_class.count() == 0 # missing role was not created
wf = Workflow.get(resp.location.split('/')[-2])
assert wf.global_actions[0].triggers[0].roles == ['foobarbaz']
assert wf.global_actions[0].triggers[0].roles == ['123'] # kept
# no error raised due to unknown role
app.get(
'/backoffice/workflows/%s/global-actions/ac1/triggers/%s/'

View File

@ -114,12 +114,17 @@ def test_action_dispatch(pub):
pub.cfg['sp'] = {'idp-manage-roles': True}
# now roles are managed: cannot create them
dispatch.role_id = 'unknown'
xml_export_orig = (
ET.tostring(export_to_indented_xml(wf, include_id=True))
.replace(b'slug="test-role"', b'slug="unknown-role"')
.replace(b'role_id="5"', b'role_id="23"')
.replace(b'>Test Role<', b'>unknown<')
)
with pytest.raises(WorkflowImportError) as excinfo:
wf2 = assert_import_export_works(wf)
wf2 = Workflow.import_from_xml_tree(ET.fromstring(xml_export_orig), include_id=True)
assert excinfo.value.msg == 'Unknown referenced objects'
assert excinfo.value.details == 'Unknown roles: unknown'
# but allow computed roles
# allow computed roles
dispatch.role_id = '=form_var_bar'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].role_id == '=form_var_bar'

View File

@ -262,7 +262,11 @@ class CustomView(StorableObject):
if self.visibility == 'role' and self.role_id:
from wcs.workflows import get_role_name_and_slug
role_name, role_slug = get_role_name_and_slug(self.role_id)
try:
role_name, role_slug = get_role_name_and_slug(self.role_id)
except KeyError:
# skip broken/missing roles
return None
sub = ET.SubElement(root, 'role')
if role_slug:
sub.attrib['slug'] = role_slug

View File

@ -1268,7 +1268,11 @@ class FormDef(StorableObject):
def add_role_element(roles_root, role_id):
if not role_id:
return
role_name, role_slug = get_role_name_and_slug(role_id)
try:
role_name, role_slug = get_role_name_and_slug(role_id)
except KeyError:
# skip broken/missing roles
return
sub = ET.SubElement(roles_root, 'role')
if role_slug:
sub.attrib['slug'] = role_slug
@ -1330,7 +1334,9 @@ class FormDef(StorableObject):
for view in get_publisher().custom_view_class.select_shared_for_formdef(formdef=self):
custom_views.append(view)
for view in custom_views:
custom_views_element.append(view.export_to_xml(charset=charset, include_id=include_id))
custom_view_node = view.export_to_xml(charset=charset, include_id=include_id)
if custom_view_node is not None:
custom_views_element.append(custom_view_node)
geolocations = ET.SubElement(root, 'geolocations')
for geoloc_key, geoloc_label in (self.geolocations or {}).items():

View File

@ -86,7 +86,7 @@ class RuleNode(XmlSerialisable):
return ('role_id', 'value')
def role_id_export_to_xml(self, item, charset, include_id=False):
self._role_export_to_xml('role_id', item, charset, include_id=include_id)
self._role_export_to_xml('role_id', item, charset, include_id=include_id, include_missing=True)
def role_id_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._role_init_with_xml('role_id', elem, charset, include_id=include_id, snapshot=snapshot)
@ -243,15 +243,24 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
)
def get_role_id_parameter_view_value(self):
return get_role_name_and_slug(self.role_id)[0]
try:
return get_role_name_and_slug(self.role_id)[0]
except KeyError:
return _('Unknown role (%s)') % self.role_id
def get_rules_parameter_view_value(self):
result = []
for rule in self.rules or []:
result.append(
htmltext('<li>%s%s</li>')
% (rule.get('value'), get_role_name_and_slug(rule.get('role_id'))[0])
)
try:
result.append(
htmltext('<li>%s%s</li>')
% (rule.get('value'), get_role_name_and_slug(rule.get('role_id'))[0])
)
except KeyError:
result.append(
htmltext('<li>%s%s</li>')
% (rule.get('value'), _('Unknown role (%s)') % rule.get('role_id'))
)
return htmltext('<ul class="rules">%s</ul>') % htmltext('').join(result)
def get_computed_user_id(self, user_identifier):

View File

@ -85,6 +85,9 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
return super()._get_role_id_from_xml(elem, charset, include_id=include_id, snapshot=snapshot)
def to_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('to', item, charset, include_id=include_id, include_missing=True)
def mail_template_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self.mail_template = None
if elem is None:

View File

@ -1494,7 +1494,7 @@ class XmlSerialisable:
else:
setattr(self, attribute, xml_node_text(el))
def _roles_export_to_xml(self, attribute, item, charset, include_id=False):
def _roles_export_to_xml(self, attribute, item, charset, include_id=False, include_missing=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
el = ET.SubElement(item, attribute)
@ -1502,7 +1502,14 @@ class XmlSerialisable:
if role_id is None:
continue
role_id = str(role_id)
role_name, role_slug = get_role_name_and_slug(role_id)
try:
role_name, role_slug = get_role_name_and_slug(role_id)
except KeyError:
if not include_missing:
# skip broken/missing roles
continue
role_name = role_id
role_slug = None
sub = ET.SubElement(el, 'item')
if role_slug:
sub.attrib['slug'] = role_slug
@ -1520,11 +1527,17 @@ class XmlSerialisable:
)
setattr(self, attribute, imported_roles)
def _role_export_to_xml(self, attribute, item, charset, include_id=False):
def _role_export_to_xml(self, attribute, item, charset, include_id=False, include_missing=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
role_id = str(getattr(self, attribute))
role_name, role_slug = get_role_name_and_slug(role_id)
try:
role_name, role_slug = get_role_name_and_slug(role_id)
except KeyError:
if include_missing:
# skip broken/missing roles
return
role_name, role_slug = role_id, role_id
sub = ET.SubElement(item, attribute)
if role_slug:
sub.attrib['slug'] = role_slug
@ -1564,7 +1577,7 @@ class XmlSerialisable:
# if the roles are managed by the idp, don't try further.
if get_publisher() and get_cfg('sp', {}).get('idp-manage-roles') is True:
if snapshot:
return value
return elem.attrib['role_id'] # snaphots always store the id
raise WorkflowImportUnknownReferencedError(
_('Unknown referenced role'), details={_('Unknown roles'): {value}}
)
@ -3292,11 +3305,8 @@ def get_role_name_and_slug(role_id):
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
return (str(role_id), None)
try:
role = get_publisher().role_class.get(role_id)
return (role.name, role.slug)
except KeyError:
return (str(role_id), None)
role = get_publisher().role_class.get(role_id)
return (role.name, role.slug)
def render_list_of_roles(workflow, roles):