workflow: perform action only on defined target (#45653)

This commit is contained in:
Lauréline Guérin 2020-12-08 11:55:38 +01:00 committed by Frédéric Péters
parent a7ba34aaec
commit f5e0ee46ec
2 changed files with 366 additions and 34 deletions

View File

@ -460,9 +460,12 @@ def test_dispatch(pub):
assert formdata.workflow_roles == {'_receiver': role.id}
def test_dispatch_auto(pub):
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
def test_dispatch_auto(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires sql')
return
two_pubs.loggederror_class.wipe()
formdef = FormDef()
formdef.name = 'baz'
@ -476,8 +479,8 @@ def test_dispatch_auto(pub):
item.dispatch_type = 'automatic'
formdata = formdef.data_class()()
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
@ -496,29 +499,29 @@ def test_dispatch_auto(pub):
{'role_id': role2.id, 'value': 'bar'},
]
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# no match
formdata.data = {'1': 'XXX'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# match
formdata.data = {'1': 'foo'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role1.id}
# other match
formdata.data = {'1': 'bar'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role2.id}
@ -529,23 +532,25 @@ def test_dispatch_auto(pub):
item.rules = [
{'role_id': 'foobar', 'value': 'foo'},
]
pub.substitutions.reset()
pub.substitutions.feed(formdata)
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
error = pub.loggederror_class.select()[0]
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
assert error.formdef_id == formdef.id
assert error.workflow_id == '_default'
assert error.summary == 'error in dispatch, missing role (foobar)'
assert error.occurences_count == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
assert error.formdef_id == formdef.id
assert error.workflow_id == '_default'
assert error.summary == 'error in dispatch, missing role (foobar)'
assert error.occurences_count == 1
def test_dispatch_computed(pub):
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
def test_dispatch_computed(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires sql')
return
two_pubs.loggederror_class.wipe()
formdef = FormDef()
formdef.name = 'baz'
@ -580,14 +585,13 @@ def test_dispatch_computed(pub):
item.role_id = '="foobar"'
item.perform(formdata)
assert not formdata.workflow_roles
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
error = pub.loggederror_class.select()[0]
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
assert error.formdef_id == formdef.id
assert error.workflow_id == '_default'
assert error.summary == 'error in dispatch, missing role (="foobar")'
assert error.occurences_count == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
assert error.formdef_id == formdef.id
assert error.workflow_id == '_default'
assert error.summary == 'error in dispatch, missing role (="foobar")'
assert error.occurences_count == 1
def test_roles(pub):
@ -5386,6 +5390,140 @@ def test_call_external_workflow_use_caller_variable(pub):
assert card.data['bo0'] == 'foo@example.com' # got called
def test_call_external_workflow_manual_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()
two_pubs.loggederror_class.wipe()
# carddef workflow, with global action to increment a counter in its
# backoffice fields.
carddef_wf = Workflow(name='Carddef Workflow')
carddef_wf.add_status(name='New')
carddef_wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(carddef_wf)
carddef_wf.backoffice_fields_formdef.fields = [
StringField(id='bo0', varname='bo', type='string', label='bo variable'),
]
global_action = carddef_wf.add_global_action('Update')
global_action.append_item('set-backoffice-fields')
setbo = global_action.items[0]
setbo.fields = [{'field_id': 'bo0',
'value': '{{ form_var_bo|default:"0"|add:1 }}'}]
trigger = global_action.append_trigger('webservice')
trigger.identifier = 'update'
carddef_wf.store()
# associated carddef
carddef = CardDef()
carddef.name = 'Data'
carddef.fields = [
StringField(id='0', label='string', varname='card_string'),
]
carddef.workflow = carddef_wf
carddef.store()
carddef.data_class().wipe()
# and sample carddatas
for i in range(1, 4):
carddata = carddef.data_class()()
carddata.data = {'0': 'Text %s' % i}
carddata.store()
# formdef workflow that will trigger the global action
wf = Workflow(name='External actions')
st1 = wf.add_status('Action')
update_global_action = wf.add_global_action('Update linked object data')
update_action = update_global_action.append_item('external_workflow_global_action')
update_action.slug = 'carddef:%s' % carddef.url_name
update_action.target_mode = 'manual'
update_action.target_id = None # not configured
update_action.trigger_id = 'action:update'
# and create carddata
create_carddata = CreateCarddataWorkflowStatusItem()
create_carddata.label = 'create linked card'
create_carddata.formdef_slug = carddef.url_name
create_carddata.varname = 'created_card'
create_carddata.id = '_create_card'
create_carddata.mappings = [
Mapping(field_id='0', expression='{{ form_var_string }}')
]
create_carddata.parent = st1
st1.items.append(create_carddata)
wf.store()
# associated formdef
datasource = {'type': 'carddef:%s' % carddef.url_name}
formdef = FormDef()
formdef.name = 'External action form'
formdef.fields = [
ItemField(
id='0', label='Card',
type='item', varname='card',
data_source=datasource),
StringField(id='1', label='string', varname='string'),
]
formdef.workflow = wf
formdef.store()
# and formdata
formdata = formdef.data_class()()
formdata.data = {
'0': '3', # set from datasource
'1': '1',
}
# set parent
formdata.submission_context = {
'orig_object_type': 'carddef',
'orig_formdata_id': '2',
'orig_formdef_id': str(carddef.id),
}
formdata.store()
formdata.just_created()
formdata.perform_workflow()
assert carddef.data_class().count() == 4
assert carddef.data_class().get(1).data['bo0'] is None
assert carddef.data_class().get(2).data['bo0'] is None
assert carddef.data_class().get(3).data['bo0'] is None
assert carddef.data_class().get(4).data['bo0'] is None
# linked carddata
assert carddef.data_class().get(4).data['0'] == '1'
# target not configured
perform_items([update_action], formdata)
assert carddef.data_class().get(1).data['bo0'] is None
assert carddef.data_class().get(2).data['bo0'] is None
assert carddef.data_class().get(3).data['bo0'] is None
assert carddef.data_class().get(4).data['bo0'] is None
assert two_pubs.loggederror_class.count() == 0
# configure target
update_action.target_id = '{{ form_var_string }}' # == '1'
wf.store()
perform_items([update_action], formdata)
assert carddef.data_class().get(1).data['bo0'] == '1'
assert carddef.data_class().get(2).data['bo0'] is None
assert carddef.data_class().get(3).data['bo0'] is None
assert carddef.data_class().get(4).data['bo0'] is None
# target not found
update_action.target_id = '42{{ form_var_string }}' # == '421'
wf.store()
perform_items([update_action], formdata)
assert carddef.data_class().get(1).data['bo0'] == '1'
assert carddef.data_class().get(2).data['bo0'] is None
assert carddef.data_class().get(3).data['bo0'] is None
assert carddef.data_class().get(4).data['bo0'] is None
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Could not find targeted "Data" object by id 421'
def test_edit_carddata_with_data_sourced_object(pub):
FormDef.wipe()
CardDef.wipe()
@ -5566,3 +5704,173 @@ def test_edit_carddata_with_linked_object(pub):
assert carddef.data_class().count() == 1
card_data = carddef.data_class().select()[0]
assert card_data.data['2'] == '3'
def test_edit_carddata_manual_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()
two_pubs.loggederror_class.wipe()
# carddef
carddef = CardDef()
carddef.name = 'Parent'
carddef.fields = [
StringField(id='0', label='First Name', varname='first_name'),
StringField(id='1', label='Last Name', varname='last_name'),
StringField(id='2', label='Kids number', varname='kids_number'),
]
carddef.store()
carddef.data_class().wipe()
# and sample carddatas
for i in range(1, 4):
carddata = carddef.data_class()()
carddata.data = {
'0': 'First name %s' % i,
'1': 'Last name %s' % i,
'2': '0',
}
carddata.store()
# formdef workflow that will update carddata
wf = Workflow(name='Card create and update')
st1 = wf.add_status('Create card', 'st1')
# create linked carddata
edit = CreateCarddataWorkflowStatusItem()
edit.formdef_slug = carddef.url_name
edit.mappings = [
Mapping(field_id='0', expression='{{ form_var_first_name }}'),
Mapping(field_id='1', expression='{{ form_var_last_name }}'),
Mapping(field_id='2', expression='{{ form_var_kids_number|default:"0" }}'),
]
st1.items.append(edit)
edit.parent = st1
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = wf.add_status('Update card', 'st2')
edit = EditCarddataWorkflowStatusItem()
edit.formdef_slug = carddef.url_name
edit.target_mode = 'manual' # not configured
edit.mappings = [
Mapping(field_id='2', expression='{{ form_var_kids_number|add:"1" }}'),
]
edit.id = 'edit'
st2.items.append(edit)
edit.parent = st2
wf.store()
# associated formdef
formdef = FormDef()
formdef.name = 'Parents'
datasource = {'type': 'carddef:%s' % carddef.url_name}
formdef.fields = [
StringField(id='0', label='First Name',
varname='first_name'),
StringField(id='1', label='Last Name',
varname='last_name'),
StringField(id='2', label='Number of kids',
varname='kids_number'),
ItemField(
id='3', label='Card',
type='item', varname='card',
data_source=datasource),
StringField(id='4', label='string', varname='string'),
]
formdef.workflow = wf
formdef.store()
# create formdatas
# target not configured
formdata = formdef.data_class()()
formdata.data = {
'0': 'Parent',
'1': 'Foo',
'2': '2',
'3': '3', # set from datasource
'4': '1',
}
# set parent
formdata.submission_context = {
'orig_object_type': 'carddef',
'orig_formdata_id': '2',
'orig_formdef_id': str(carddef.id),
}
formdata.store()
formdata.just_created()
formdata.perform_workflow()
assert carddef.data_class().count() == 4
assert carddef.data_class().get(1).data['2'] == '0'
assert carddef.data_class().get(2).data['2'] == '0'
assert carddef.data_class().get(3).data['2'] == '0'
assert carddef.data_class().get(4).data['2'] == '2'
assert two_pubs.loggederror_class.count() == 0
# configure target
edit.target_id = '{{ form_var_string }}' # == '1'
wf.store()
formdata = formdef.data_class()()
formdata.data = {
'0': 'Parent',
'1': 'Foo',
'2': '2',
'3': '3', # set from datasource
'4': '1',
}
# set parent
formdata.submission_context = {
'orig_object_type': 'carddef',
'orig_formdata_id': '2',
'orig_formdef_id': str(carddef.id),
}
formdata.store()
formdata.just_created()
formdata.perform_workflow()
assert carddef.data_class().count() == 5
assert carddef.data_class().get(1).data['2'] == '3' # 2 + 1
assert carddef.data_class().get(2).data['2'] == '0'
assert carddef.data_class().get(3).data['2'] == '0'
assert carddef.data_class().get(4).data['2'] == '2'
assert carddef.data_class().get(5).data['2'] == '2'
assert two_pubs.loggederror_class.count() == 0
# target not found
edit.target_id = '42{{ form_var_string }}' # == '421'
wf.store()
formdata = formdef.data_class()()
formdata.data = {
'0': 'Parent',
'1': 'Foo',
'2': '2',
'3': '3', # set from datasource
'4': '1',
}
# set parent
formdata.submission_context = {
'orig_object_type': 'carddef',
'orig_formdata_id': '2',
'orig_formdef_id': str(carddef.id),
}
formdata.store()
formdata.just_created()
formdata.perform_workflow()
assert carddef.data_class().count() == 6
assert carddef.data_class().get(1).data['2'] == '3' # not changed
assert carddef.data_class().get(2).data['2'] == '0'
assert carddef.data_class().get(3).data['2'] == '0'
assert carddef.data_class().get(4).data['2'] == '2'
assert carddef.data_class().get(5).data['2'] == '2'
assert carddef.data_class().get(6).data['2'] == '2'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Could not find targeted "Parent" object by id 421'

View File

@ -129,7 +129,31 @@ class ExternalWorkflowGlobalAction(WorkflowStatusItem):
'object_name': objectdef.name}
return _('not completed')
def get_manual_target(self, formdata):
if self.target_mode != 'manual':
return
objectdef = self.get_object_def()
target_id = self.compute(self.target_id, formdata=formdata, status_item=self)
if not target_id:
return
try:
return objectdef.data_class().get(target_id)
except KeyError as e:
# use custom error message depending on target type
get_publisher().record_error(_('Could not find targeted "%(object_name)s" object by id %(object_id)s') % {
'object_name': objectdef.name, 'object_id': target_id},
formdata=formdata, status_item=self, exception=e)
def iter_target_datas(self, formdata, objectdef):
if self.target_mode == 'manual':
# return only target
target = self.get_manual_target(formdata)
if target:
yield target
return
yield from formdata.iter_target_datas(objectdef=objectdef, object_type=self.slug, status_item=self)
def get_parameters(self):