admin: allow creating workflow tests from formdata (#87545)
gitea/wcs/pipeline/head This commit looks good
Details
gitea/wcs/pipeline/head This commit looks good
Details
This commit is contained in:
parent
bf442ecf44
commit
16e844a049
|
@ -1,7 +1,9 @@
|
|||
import datetime
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from django.utils.html import escape
|
||||
from django.utils.timezone import make_aware
|
||||
|
||||
from wcs import workflow_tests
|
||||
from wcs.formdef import FormDef, fields
|
||||
|
@ -582,3 +584,41 @@ def test_workflow_tests_run(pub):
|
|||
assert 'Form status when error occured: New status' in resp.text
|
||||
assert 'Email body: \nabc' in resp.text
|
||||
assert resp.pyquery('li#test-action').text() == 'Test action: Assert email is sent'
|
||||
|
||||
|
||||
def test_workfow_tests_creation_from_formdata(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
end_status = workflow.add_status(name='End status')
|
||||
|
||||
jump = new_status.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.receipt_time = make_aware(datetime.datetime(2022, 1, 1, 0, 0))
|
||||
formdata.store()
|
||||
formdata.perform_workflow()
|
||||
formdata.store()
|
||||
|
||||
resp = app.get('/backoffice/forms/%s/tests/new' % formdef.id)
|
||||
resp.form['name'] = 'First test'
|
||||
resp.form['creation_mode'] = 'formdata-wf'
|
||||
resp.form['formdata'].select(text='1-1 - Unknown User - 2022-01-01 00:00')
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
testdef = TestDef.select()[0]
|
||||
assert len(testdef.workflow_tests.actions) == 1
|
||||
assert testdef.workflow_tests.actions[0].key == 'assert-status'
|
||||
assert testdef.workflow_tests.actions[0].status_name == 'End status'
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
@ -6,11 +7,12 @@ from wcs import workflow_tests
|
|||
from wcs.formdef import FormDef, fields
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.testdef import TestDef, WebserviceResponse
|
||||
from wcs.wf.jump import JumpWorkflowStatusItem
|
||||
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
||||
from wcs.workflow_tests import WorkflowTestError
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
|
||||
|
||||
from .utilities import create_temporary_pub
|
||||
from .backoffice_pages.test_all import create_user
|
||||
from .utilities import create_temporary_pub, get_app, login
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -19,6 +21,7 @@ def pub():
|
|||
|
||||
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
||||
pub.set_app_dir(req)
|
||||
pub.cfg['identification'] = {'methods': ['password']}
|
||||
pub.write_cfg()
|
||||
|
||||
pub.user_class.wipe()
|
||||
|
@ -645,3 +648,98 @@ def test_workflow_tests_webservice_status_jump(pub):
|
|||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
]
|
||||
|
||||
|
||||
def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
||||
role = pub.role_class(name='test role')
|
||||
role.store()
|
||||
user = create_user(pub, is_admin=True)
|
||||
user.roles = [role.id]
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||||
workflow.backoffice_fields_formdef.fields = [fields.StringField(id='bo1', label='Text')]
|
||||
|
||||
new_status = workflow.add_status('New status', 'new-status')
|
||||
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
|
||||
status_with_button = workflow.add_status('Status with button', 'status-with-button')
|
||||
transition_status = workflow.add_status('Transition status', 'transition-status')
|
||||
end_status = workflow.add_status('End status', 'end-status')
|
||||
|
||||
jump = new_status.add_action('jump')
|
||||
jump.status = status_with_timeout_jump.id
|
||||
|
||||
jump = status_with_timeout_jump.add_action('jump')
|
||||
jump.status = status_with_button.id
|
||||
jump.timeout = '{{ 1 }} day'
|
||||
|
||||
choice = status_with_button.add_action('choice')
|
||||
choice.label = 'Accept'
|
||||
choice.status = transition_status.id
|
||||
choice.by = [role.id]
|
||||
|
||||
wscall = transition_status.add_action('webservice_call')
|
||||
wscall.url = 'http://remote.example.net/json'
|
||||
wscall.varname = 'test_webservice'
|
||||
|
||||
sendmail = transition_status.add_action('sendmail')
|
||||
sendmail.to = ['test@example.org']
|
||||
sendmail.subject = 'In new status'
|
||||
sendmail.body = 'xxx'
|
||||
|
||||
set_backoffice_fields = transition_status.add_action('set-backoffice-fields')
|
||||
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
|
||||
|
||||
jump = transition_status.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.user_id = user.id
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
formdata.record_workflow_event('frontoffice-created')
|
||||
formdata.perform_workflow()
|
||||
formdata.store()
|
||||
|
||||
freezer.tick(datetime.timedelta(days=2))
|
||||
_apply_timeouts(pub)
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(formdata.get_url())
|
||||
resp.form.submit('button1').follow()
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.status == 'wf-end-status'
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata, add_workflow_tests=True)
|
||||
testdef.run(formdef)
|
||||
|
||||
actions = testdef.workflow_tests.actions
|
||||
assert len(actions) == 8
|
||||
|
||||
assert actions[0].key == 'assert-status'
|
||||
assert actions[0].status_name == 'Status with timeout jump'
|
||||
|
||||
assert actions[1].key == 'skip-time'
|
||||
assert actions[1].seconds == 172800
|
||||
|
||||
assert actions[2].key == 'assert-status'
|
||||
assert actions[2].status_name == 'Status with button'
|
||||
|
||||
assert actions[3].key == 'button-click'
|
||||
assert actions[3].button_name == 'Accept'
|
||||
|
||||
assert actions[4].key == 'assert-webservice-call'
|
||||
assert actions[5].key == 'assert-email'
|
||||
assert actions[6].key == 'assert-backoffice-field'
|
||||
|
||||
assert actions[-1].key == 'assert-status'
|
||||
assert actions[-1].status_name == 'End status'
|
||||
|
|
|
@ -344,13 +344,18 @@ class TestsDirectory(Directory):
|
|||
]
|
||||
|
||||
if formdata_options:
|
||||
creation_options = [
|
||||
('empty', _('Fill data manually'), 'empty'),
|
||||
('formdata', _('Import data from form'), 'formdata'),
|
||||
]
|
||||
if get_publisher().has_site_option('enable-workflow-tests'):
|
||||
creation_options.append(
|
||||
('formdata-wf', _('Import data from form (and initialise workflow tests)'), 'formdata-wf')
|
||||
)
|
||||
form.add(
|
||||
RadiobuttonsWidget,
|
||||
'creation_mode',
|
||||
options=[
|
||||
('empty', _('Fill data manually'), 'empty'),
|
||||
('formdata', _('Import data from form'), 'formdata'),
|
||||
],
|
||||
options=creation_options,
|
||||
value='empty',
|
||||
attrs={'data-dynamic-display-parent': 'true'},
|
||||
)
|
||||
|
@ -362,7 +367,7 @@ class TestsDirectory(Directory):
|
|||
hint=_('Form is only used for initial data alimentation, no link is kept with created test.'),
|
||||
attrs={
|
||||
'data-dynamic-display-child-of': 'creation_mode',
|
||||
'data-dynamic-display-value-in': 'formdata',
|
||||
'data-dynamic-display-value-in': 'formdata|formdata-wf',
|
||||
},
|
||||
**{'data-autocomplete': 'true'},
|
||||
)
|
||||
|
@ -392,7 +397,11 @@ class TestsDirectory(Directory):
|
|||
formdata_id = form.get_widget('formdata').parse()
|
||||
formdata = self.objectdef.data_class().get(formdata_id)
|
||||
|
||||
testdef = TestDef.create_from_formdata(self.objectdef, formdata)
|
||||
testdef = TestDef.create_from_formdata(
|
||||
self.objectdef,
|
||||
formdata,
|
||||
add_workflow_tests=bool(creation_mode_widget.parse() == 'formdata-wf'),
|
||||
)
|
||||
testdef.name = form.get_widget('name').parse()
|
||||
testdef.agent_id = get_session().user
|
||||
testdef.store()
|
||||
|
|
|
@ -185,7 +185,7 @@ class TestDef(sql.TestDef):
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def create_from_formdata(cls, formdef, formdata):
|
||||
def create_from_formdata(cls, formdef, formdata, add_workflow_tests=False):
|
||||
testdef = cls()
|
||||
testdef.object_type = formdef.get_table_name()
|
||||
testdef.object_id = formdef.id
|
||||
|
@ -213,6 +213,10 @@ class TestDef(sql.TestDef):
|
|||
'fields': field_data,
|
||||
'user': formdata.user.get_json_export_dict() if formdata.user else None,
|
||||
}
|
||||
|
||||
if add_workflow_tests:
|
||||
testdef.workflow_tests.add_actions_from_formdata(formdata)
|
||||
|
||||
return testdef
|
||||
|
||||
def build_formdata(self, objectdef, include_fields=False):
|
||||
|
|
|
@ -112,7 +112,40 @@ class WorkflowTests(XmlStorableObject):
|
|||
return str(int(max(x.id for x in self.actions)) + 1)
|
||||
|
||||
def add_action(self, action_class):
|
||||
self.actions.append(action_class(id=self.get_new_action_id()))
|
||||
action = action_class(id=self.get_new_action_id())
|
||||
self.actions.append(action)
|
||||
return action
|
||||
|
||||
def add_actions_from_formdata(self, formdata):
|
||||
test_action_class_by_trace_id = {
|
||||
'sendmail': AssertEmail,
|
||||
'webservice_call': AssertWebserviceCall,
|
||||
'set-backoffice-fields': AssertBackofficeFieldValues,
|
||||
'button': ButtonClick,
|
||||
'timeout-jump': SkipTime,
|
||||
}
|
||||
|
||||
previous_trace = None
|
||||
workflow_traces = formdata.get_workflow_traces()
|
||||
for trace in workflow_traces:
|
||||
trace_id = trace.event or trace.action_item_key
|
||||
|
||||
if trace_id not in test_action_class_by_trace_id:
|
||||
previous_trace = trace
|
||||
continue
|
||||
|
||||
if trace.event:
|
||||
action = self.add_action(AssertStatus)
|
||||
action.set_attributes_from_trace(formdata.formdef, trace)
|
||||
|
||||
action = self.add_action(test_action_class_by_trace_id[trace_id])
|
||||
action.set_attributes_from_trace(formdata.formdef, trace, previous_trace)
|
||||
|
||||
previous_trace = trace
|
||||
|
||||
if workflow_traces:
|
||||
action = self.add_action(AssertStatus)
|
||||
action.set_attributes_from_trace(formdata.formdef, workflow_traces[-1])
|
||||
|
||||
def store(self, *args, **kwargs):
|
||||
super().store(*args, **kwargs)
|
||||
|
@ -173,6 +206,9 @@ class WorkflowTestAction(XmlStorableObject):
|
|||
if field != 'id' and field not in self.optional_fields and not getattr(self, field)
|
||||
)
|
||||
|
||||
def set_attributes_from_trace(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def render_as_line(self):
|
||||
if not self.is_configured:
|
||||
return _('not configured')
|
||||
|
@ -197,6 +233,16 @@ class ButtonClick(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return _('Click on "%s"') % self.button_name
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
try:
|
||||
item = [
|
||||
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id']
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
|
||||
self.button_name = item.label
|
||||
|
||||
def perform(self, formdata, user):
|
||||
status = formdata.get_status()
|
||||
form = status.get_action_form(formdata, user)
|
||||
|
@ -208,11 +254,14 @@ class ButtonClick(WorkflowTestAction):
|
|||
form.get_submit = lambda: button_widget.name
|
||||
status.handle_form(form, formdata, user, check_replay=False)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
possible_button_names = set()
|
||||
@staticmethod
|
||||
def get_all_choice_actions(formdef):
|
||||
for item in formdef.workflow.get_all_items():
|
||||
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
|
||||
possible_button_names.add(item.label)
|
||||
yield item
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
|
||||
|
||||
if not possible_button_names:
|
||||
return
|
||||
|
@ -248,6 +297,14 @@ class AssertStatus(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return _('Status is "%s"') % self.status_name
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
try:
|
||||
status = formdef.workflow.get_status(trace.status_id)
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
self.status_name = status.name
|
||||
|
||||
def perform(self, formdata, user):
|
||||
status = formdata.get_status()
|
||||
if status.name != self.status_name:
|
||||
|
@ -375,6 +432,10 @@ class SkipTime(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return seconds2humanduration(self.seconds)
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
if previous_trace:
|
||||
self.seconds = (trace.timestamp - previous_trace.timestamp).total_seconds()
|
||||
|
||||
def rewind(self, formdata):
|
||||
def rewind_time(timestamp):
|
||||
return timestamp - datetime.timedelta(seconds=self.seconds)
|
||||
|
|
Loading…
Reference in New Issue