workflow: check datasource existence on import (#48164)

This commit is contained in:
Lauréline Guérin 2020-11-09 15:49:55 +01:00
parent f8eead3dc2
commit abbc8aacdc
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
8 changed files with 177 additions and 78 deletions

View File

@ -3,8 +3,11 @@
import pytest
import xml.etree.ElementTree as ET
from quixote.http_request import Upload
from django.utils.six import BytesIO
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import (
@ -21,8 +24,13 @@ from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.wf.external_workflow import ExternalWorkflowGlobalAction
from wcs.roles import Role
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.fields import StringField, FileField
from wcs.qommon.form import UploadedFile
from wcs.roles import Role
from wcs.workflows import ExportToModel, WorkflowVariablesFieldsFormDef, DisplayMessageWorkflowStatusItem
from wcs.qommon.misc import indent_xml as indent
@ -110,9 +118,6 @@ def test_action_dispatch(pub):
role.name = 'Test Role'
role.store()
from wcs.wf.dispatch import DispatchWorkflowStatusItem
dispatch = DispatchWorkflowStatusItem()
dispatch.id = '_x'
dispatch.role_id = 5
@ -250,8 +255,6 @@ def test_display_form_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
@ -270,10 +273,6 @@ def test_export_to_model_action(pub):
wf.store()
st1 = wf.add_status('Status1', 'st1')
from quixote.http_request import Upload
from wcs.qommon.form import UploadedFile
from wcs.workflows import ExportToModel
export_to = ExportToModel()
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
@ -326,7 +325,6 @@ def test_jump_action(pub):
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
from wcs.wf.jump import JumpWorkflowStatusItem
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
@ -391,7 +389,6 @@ def test_commentable_action(pub):
def test_variables_formdef(pub):
wf = Workflow(name='variables')
from wcs.workflows import WorkflowVariablesFieldsFormDef
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.variables_formdef.fields.append(StringField(label='Test', type='string'))
wf2 = assert_import_export_works(wf)
@ -535,8 +532,6 @@ def test_display_message_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from wcs.workflows import DisplayMessageWorkflowStatusItem
display = DisplayMessageWorkflowStatusItem()
display.message = 'hey'
display.to = ['_submitter', '1']
@ -848,3 +843,83 @@ def test_worklow_with_mail_template(pub):
export = ET.tostring(wf.export_to_xml(include_id=True))
with pytest.raises(WorkflowImportError, match='Unknown referenced mail template'):
Workflow.import_from_xml_tree(ET.fromstring(export), include_id=True)
def test_unknown_data_source(pub):
wf1 = Workflow(name='status')
st1 = wf1.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
st1.items.append(display_form)
display_form.parent = st1
wf2 = Workflow(name='variables')
wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
wf2.variables_formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
wf3 = Workflow(name='bo fields')
wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
wf3.backoffice_fields_formdef.fields = [
StringField(
id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah',
data_source={'type': 'foobar'})
]
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))
# carddef as datasource
CardDef.wipe()
carddef = CardDef()
carddef.name = 'foo'
carddef.fields = [StringField(id='1', label='Test', type='string', varname='foo')]
carddef.store()
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo'}
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo'}
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
Workflow.import_from_xml(BytesIO(export))
display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))
# carddef custom view as datasource
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'card view'
custom_view.formdef = carddef
custom_view.columns = {'list': [{'id': 'id'}]}
custom_view.filters = {}
custom_view.visibility = 'datasource'
custom_view.store()
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
Workflow.import_from_xml(BytesIO(export))
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))

View File

@ -1070,7 +1070,7 @@ class FormDefPage(Directory):
new_formdef = self.formdef_class.import_from_xml(fp, include_id=True)
except FormdefImportError as e:
error = True
reason = _(e.msg)
reason = _(e.msg) % e.msg_args
if e.details:
reason += ' [%s]' % e.details
except ValueError:
@ -1738,7 +1738,7 @@ class FormsDirectory(AccessControlled, Directory):
get_session().message = ('info', _(self.import_error_message))
except FormdefImportError as e:
error = True
reason = _(e.msg)
reason = _(e.msg) % e.msg_args
if e.details:
reason += ' [%s]' % e.details
except ValueError:

View File

@ -1015,7 +1015,10 @@ class SettingsDirectory(QommonSettingsDirectory):
reason = _('Not a valid export file')
except WorkflowImportError as e:
results = None
reason = _('Failed to import a workflow (%s); site import did not complete.') % (_(e) % e.msg_args)
msg = _(e.msg) % e.msg_args
if e.details:
msg += ' [%s]' % e.details
reason = _('Failed to import a workflow (%s); site import did not complete.') % (msg)
html_top('settings', title = _('Import'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Import')

View File

@ -1973,7 +1973,9 @@ class WorkflowsDirectory(Directory):
workflow = Workflow.import_from_xml(fp)
except WorkflowImportError as e:
error = True
reason = _(e) % e.msg_args
reason = _(e.msg) % e.msg_args
if e.details:
reason += ' [%s]' % e.details
except ValueError:
error = True

View File

@ -54,8 +54,9 @@ if not hasattr(types, 'ClassType'):
class FormdefImportError(Exception):
def __init__(self, msg, details=None):
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
@ -1030,13 +1031,14 @@ class FormDef(StorableObject):
@classmethod
def import_from_xml(cls, fd, charset=None, include_id=False,
fix_on_error=False, check_datasources=True):
from wcs.carddef import CardDef
try:
tree = ET.parse(fd)
except:
raise ValueError()
formdef = cls.import_from_xml_tree(tree, charset=charset,
include_id=include_id, fix_on_error=fix_on_error)
formdef = cls.import_from_xml_tree(
tree, charset=charset,
include_id=include_id, fix_on_error=fix_on_error,
check_datasources=check_datasources)
if formdef.url_name:
try:
@ -1052,46 +1054,6 @@ class FormDef(StorableObject):
if formdef.max_field_id < max_field_id:
formdef.max_field_id = max_field_id
if check_datasources:
# check if datasources are defined
unknown_datasources = set()
for field in formdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
data_source_id = data_source.get('type')
if isinstance(data_sources.get_object(data_source),
data_sources.StubNamedDataSource):
unknown_datasources.add(data_source_id)
elif data_source_id and data_source_id.startswith('carddef:'):
parts = data_source_id.split(':')
# check if carddef exists
url_name = parts[1]
if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
# reference to itself, it's ok
continue
try:
CardDef.get_by_urlname(url_name)
except KeyError:
unknown_datasources.add(data_source_id)
continue
if len(parts) == 2:
continue
lookup_criterias = [
Equal('formdef_type', 'carddef'),
Equal('visibility', 'datasource'),
Equal('slug', parts[2]),
]
try:
get_publisher().custom_view_class.select(lookup_criterias)[0]
except IndexError:
unknown_datasources.add(data_source_id)
if unknown_datasources:
raise FormdefImportError(N_('Unknown datasources'),
details=', '.join(sorted(unknown_datasources)))
# check if all field id are unique
known_field_ids = set()
for field in formdef.fields:
@ -1103,7 +1065,9 @@ class FormDef(StorableObject):
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, charset=None,
fix_on_error=False, snapshot=False):
fix_on_error=False, snapshot=False, check_datasources=True):
from wcs.carddef import CardDef
if charset is None:
charset = get_publisher().site_charset
assert charset == 'utf-8'
@ -1262,6 +1226,47 @@ class FormDef(StorableObject):
for child in node:
formdef.required_authentication_contexts.append(str(child.text))
if check_datasources:
# check if datasources are defined
unknown_datasources = set()
for field in formdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
data_source_id = data_source.get('type')
if isinstance(data_sources.get_object(data_source),
data_sources.StubNamedDataSource):
unknown_datasources.add(data_source_id)
elif data_source_id and data_source_id.startswith('carddef:'):
parts = data_source_id.split(':')
# check if carddef exists
url_name = parts[1]
if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
# reference to itself, it's ok
continue
try:
CardDef.get_by_urlname(url_name)
except KeyError:
unknown_datasources.add(data_source_id)
continue
if len(parts) == 2:
continue
lookup_criterias = [
Equal('formdef_type', 'carddef'),
Equal('visibility', 'datasource'),
Equal('slug', parts[2]),
]
try:
get_publisher().custom_view_class.select(lookup_criterias)[0]
except IndexError:
unknown_datasources.add(data_source_id)
if unknown_datasources:
raise FormdefImportError(
N_('Unknown datasources'),
details=', '.join(sorted(unknown_datasources)))
return formdef
def get_detailed_email_form(self, formdata, url):

View File

@ -240,7 +240,8 @@ class WcsPublisher(StubWcsPublisher):
from wcs.workflows import Workflow
for f in z.namelist():
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
workflow = Workflow.import_from_xml(z.open(f), include_id=True)
workflow = Workflow.import_from_xml(
z.open(f), include_id=True, check_datasources=False)
workflow.store()
results['workflows'] += 1

View File

@ -139,14 +139,14 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
return item
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
WorkflowStatusItem.init_with_xml(self, elem, charset)
el = elem.find('formdef')
if el is None:
return
# we can always include id in the formdef export as it lives in
# a different space, isolated from other formdefs.
imported_formdef = FormDef.import_from_xml_tree(el, include_id=True, snapshot=snapshot)
imported_formdef = FormDef.import_from_xml_tree(el, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
self.formdef = WorkflowFormFieldsFormDef(item=self)
self.formdef.fields = imported_formdef.fields
if self.formdef.max_field_id is None and self.formdef.fields:

View File

@ -44,7 +44,7 @@ from .qommon.upload_storage import PicklableUpload, get_storage_object
from .conditions import Condition
from .roles import Role, logged_users_role, get_user_roles
from .fields import FileField
from .formdef import FormDef
from .formdef import FormDef, FormdefImportError
from .carddef import CardDef
from .formdata import Evolution
from .mail_templates import MailTemplate
@ -94,9 +94,10 @@ def perform_items(items, formdata, depth=20):
class WorkflowImportError(Exception):
def __init__(self, msg, msg_args=None):
super(WorkflowImportError, self).__init__(msg)
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
class AbortActionException(Exception):
@ -624,15 +625,15 @@ class Workflow(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False):
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
try:
tree = ET.parse(fd)
except:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id)
return cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False):
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
charset = get_publisher().site_charset
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
@ -666,7 +667,11 @@ class Workflow(StorableObject):
for status in tree.find('possible_status'):
status_o = WorkflowStatus()
status_o.parent = workflow
status_o.init_with_xml(status, charset, include_id=include_id, snapshot=snapshot)
try:
status_o.init_with_xml(
status, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.possible_status.append(status_o)
workflow.global_actions = []
@ -689,14 +694,22 @@ class Workflow(StorableObject):
variables = tree.find('variables')
if variables is not None:
formdef = variables.find('formdef')
imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True, snapshot=snapshot)
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields = imported_formdef.fields
variables = tree.find('backoffice-fields')
if variables is not None:
formdef = variables.find('formdef')
imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True, snapshot=snapshot)
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow=workflow)
workflow.backoffice_fields_formdef.fields = imported_formdef.fields
@ -930,7 +943,7 @@ class XmlSerialisable(object):
el.text = str(val)
return node
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
if include_id and elem.attrib.get('id'):
self.id = elem.attrib.get('id')
for attribute in self.get_parameters():
@ -1757,7 +1770,7 @@ class WorkflowStatus(object):
include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('colour') is not None:
@ -1779,7 +1792,7 @@ class WorkflowStatus(object):
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot)
item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources)
def __repr__(self):
return '<%s %s %r>' % (self.__class__.__name__, self.id, self.name)