misc: adapt formdata/formdef functions dictionary on workflow change (#53766) #588
|
@ -220,6 +220,9 @@ def test_card_workflow_change(pub):
|
|||
resp = resp.click(href='workflow', index=1)
|
||||
assert resp.form['workflow_id'].options[0][2] == 'Default (cards)'
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert resp.pyquery('.afterjob').text() == 'completed'
|
||||
AfterJob.wipe()
|
||||
resp = resp.click('Back')
|
||||
assert CardDef.select()[0].workflow_id is None
|
||||
|
||||
|
||||
carddata = CardDef.select()[0].data_class()()
|
||||
|
|
|
@ -779,6 +779,42 @@ def test_dispatch_user(pub):
|
|||
)
|
||||
|
||||
|
||||
def test_workflow_roles_on_workflow_change(pub):
|
||||
Workflow.wipe()
|
||||
FormDef.wipe()
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='xxx')
|
||||
role.store()
|
||||
|
||||
wf1 = Workflow(name='wf1')
|
||||
st_wf1 = wf1.add_status('status')
|
||||
wf1.roles['_other'] = 'Other Function'
|
||||
wf1.roles['_yet_another'] = 'Yet another Function'
|
||||
wf1.store()
|
||||
|
||||
wf2 = Workflow(name='wf2')
|
||||
st_wf2 = wf2.add_status('status')
|
||||
wf2.roles['_other'] = 'Other Function'
|
||||
wf2.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.workflow = wf1
|
||||
formdef.workflow_roles = {'_other': role.id}
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.workflow_roles = {'_other': [role.id], '_yet_another': [role.id]}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
formdef.change_workflow(wf2, status_mapping={st_wf1.id: st_wf2.id})
|
||||
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.workflow_roles == {'_other': [role.id]}
|
||||
|
||||
|
||||
def test_roles(pub):
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='xxx')
|
||||
|
|
|
@ -1114,8 +1114,11 @@ class FormDefPage(Directory):
|
|||
if self.formdef.data_class().count():
|
||||
# there are existing formdata, status will have to be mapped
|
||||
return redirect('workflow-status-remapping?new=%s' % workflow_id)
|
||||
self.formdef.change_workflow(Workflow.get(workflow_id))
|
||||
return redirect('.')
|
||||
|
||||
job = WorkflowChangeJob(formdef=self.formdef, new_workflow_id=workflow_id, status_mapping={})
|
||||
job.store()
|
||||
get_response().add_after_job(job)
|
||||
return redirect(job.get_processing_url())
|
||||
|
||||
def has_remapping_jobs(self):
|
||||
return bool(
|
||||
|
|
|
@ -1883,10 +1883,24 @@ class FormDef(StorableObject):
|
|||
self.workflow = new_workflow
|
||||
if new_workflow.has_action('geolocate') and not self.geolocations:
|
||||
self.geolocations = {'base': str(_('Geolocation'))}
|
||||
removed_functions = set(old_workflow.roles.keys()).difference(set(new_workflow.roles.keys()))
|
||||
for function_key in removed_functions:
|
||||
if function_key in (self.roles or {}):
|
||||
del self.roles[function_key]
|
||||
self.store(comment=_('Workflow change'))
|
||||
if formdata_count:
|
||||
# instruct formdef to update its security rules
|
||||
self.data_class().rebuild_security()
|
||||
if removed_functions:
|
||||
# change in functions, must update all formdatas to remove old keys
|
||||
for formdata in self.data_class().select_iterator(ignore_errors=True, itersize=200):
|
||||
changed = False
|
||||
for function_key in removed_functions:
|
||||
if function_key in (formdata.workflow_roles or {}):
|
||||
del formdata.workflow_roles[function_key]
|
||||
changed = True
|
||||
if changed:
|
||||
formdata.store()
|
||||
|
||||
def i18n_scan(self):
|
||||
location = '%s/%s/' % (self.backoffice_section, self.id)
|
||||
|
|
Loading…
Reference in New Issue
Adaptation à ce test parce qu'on passe désormais de manière systématique par un afterjob.