959 lines
30 KiB
Python
959 lines
30 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2023 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import uuid
|
|
|
|
from quixote import get_publisher, get_session
|
|
|
|
from wcs import wf
|
|
from wcs.qommon import _
|
|
from wcs.qommon.form import (
|
|
EmailWidget,
|
|
IntWidget,
|
|
JsonpSingleSelectWidget,
|
|
RadiobuttonsWidget,
|
|
SingleSelectWidget,
|
|
StringWidget,
|
|
TextWidget,
|
|
WidgetList,
|
|
)
|
|
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
|
|
from wcs.qommon.xml_storage import XmlStorableObject
|
|
from wcs.testdef import TestError
|
|
from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget
|
|
from wcs.wf.profile import FieldNode
|
|
|
|
|
|
class WorkflowTestError(TestError):
|
|
pass
|
|
|
|
|
|
def get_test_action_options():
|
|
actions = sorted(WorkflowTestAction.__subclasses__(), key=lambda x: x.label)
|
|
|
|
assertion_options = [(x.key, x.label, x.key) for x in actions if x.is_assertion]
|
|
other_options = [(x.key, x.label, x.key) for x in actions if not x.is_assertion]
|
|
|
|
return assertion_options + [('', '—', '')] + other_options
|
|
|
|
|
|
def get_test_action_class_by_type(action_type):
|
|
for action_class in WorkflowTestAction.__subclasses__():
|
|
if action_class.key == action_type:
|
|
return action_class
|
|
|
|
raise KeyError
|
|
|
|
|
|
class WorkflowTests(XmlStorableObject):
|
|
_names = 'workflow_tests'
|
|
xml_root_node = 'workflow_tests'
|
|
testdef_id = None
|
|
_actions = None
|
|
|
|
XML_NODES = [
|
|
('testdef_id', 'int'),
|
|
('actions', 'actions'),
|
|
]
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._actions = []
|
|
|
|
@property
|
|
def actions(self):
|
|
return self._actions
|
|
|
|
@actions.setter
|
|
def actions(self, actions):
|
|
self._actions = actions
|
|
for action in actions:
|
|
action.parent = self
|
|
|
|
def run(self, formdata):
|
|
self.mock_formdata_methods(formdata)
|
|
|
|
# mark formdata as running workflow tests
|
|
formdata.workflow_test = True
|
|
|
|
formdata.frozen_receipt_time = formdata.receipt_time
|
|
self.reset_formdata_test_attributes(formdata)
|
|
|
|
formdata.perform_workflow()
|
|
for action in self.actions:
|
|
status = formdata.get_status()
|
|
|
|
if not action.is_configured:
|
|
continue
|
|
|
|
if not action.is_assertion:
|
|
self.reset_formdata_test_attributes(formdata)
|
|
|
|
try:
|
|
action.perform(formdata)
|
|
except WorkflowTestError as e:
|
|
e.action_uuid = action.uuid
|
|
e.details.append(_('Form status when error occured: %s') % status.name)
|
|
raise e
|
|
|
|
def mock_formdata_methods(self, formdata):
|
|
from wcs.workflow_traces import WorkflowTrace
|
|
|
|
def record_workflow_event(event, **kwargs):
|
|
formdata.workflow_traces.append(WorkflowTrace(formdata=formdata, event=event, event_args=kwargs))
|
|
|
|
def record_workflow_action(action):
|
|
formdata.workflow_traces.append(WorkflowTrace(formdata=formdata, action=action))
|
|
|
|
formdata.record_workflow_event = record_workflow_event
|
|
formdata.record_workflow_action = record_workflow_action
|
|
|
|
formdata.store = lambda *args, **kwargs: None
|
|
|
|
def reset_formdata_test_attributes(self, formdata):
|
|
formdata.sent_sms = []
|
|
formdata.sent_emails = []
|
|
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
|
|
formdata.anonymisation_performed = False
|
|
formdata.redirect_to_url = None
|
|
formdata.history_messages = []
|
|
|
|
def get_new_action_id(self):
|
|
if not self.actions:
|
|
return '1'
|
|
|
|
return str(max(int(x.id) for x in self.actions) + 1)
|
|
|
|
def add_action(self, action_class):
|
|
action = action_class(id=self.get_new_action_id())
|
|
self.actions.append(action)
|
|
return action
|
|
|
|
def add_actions_from_formdata(self, formdata):
|
|
test_action_class_by_trace_id = {
|
|
'sendmail': AssertEmail,
|
|
'sendsms': AssertSMS,
|
|
'webservice_call': AssertWebserviceCall,
|
|
'set-backoffice-fields': AssertBackofficeFieldValues,
|
|
'button': ButtonClick,
|
|
'global-action-button': ButtonClick,
|
|
'timeout-jump': SkipTime,
|
|
'anonymise': AssertAnonymise,
|
|
'redirect_to_url': AssertRedirect,
|
|
'register-comment': AssertHistoryMessage,
|
|
'modify_criticality': AssertCriticality,
|
|
}
|
|
|
|
previous_trace = None
|
|
workflow_traces = formdata.get_workflow_traces()
|
|
for trace in workflow_traces:
|
|
trace_id = trace.event or trace.action_item_key
|
|
|
|
if trace_id not in test_action_class_by_trace_id:
|
|
previous_trace = trace
|
|
continue
|
|
|
|
if trace.event:
|
|
action = self.add_action(AssertStatus)
|
|
action.set_attributes_from_trace(formdata.formdef, trace)
|
|
|
|
action = self.add_action(test_action_class_by_trace_id[trace_id])
|
|
action.set_attributes_from_trace(formdata.formdef, trace, previous_trace)
|
|
|
|
previous_trace = trace
|
|
|
|
if workflow_traces:
|
|
action = self.add_action(AssertStatus)
|
|
action.set_attributes_from_trace(formdata.formdef, workflow_traces[-1])
|
|
|
|
def export_actions_to_xml(self, element, attribute_name, **kwargs):
|
|
for action in self.actions:
|
|
element.append(action.export_to_xml())
|
|
|
|
def import_actions_from_xml(self, element, **kwargs):
|
|
actions = []
|
|
for sub in element.findall('test-action'):
|
|
key = sub.findtext('key')
|
|
|
|
try:
|
|
klass = get_test_action_class_by_type(key)
|
|
except KeyError:
|
|
continue
|
|
|
|
actions.append(klass.import_from_xml_tree(sub))
|
|
|
|
return actions
|
|
|
|
|
|
class WorkflowTestAction(XmlStorableObject):
|
|
xml_root_node = 'test-action'
|
|
_names = 'test-action'
|
|
uuid = None
|
|
|
|
optional_fields = []
|
|
is_assertion = True
|
|
editable = True
|
|
|
|
XML_NODES = [
|
|
('id', 'str'),
|
|
('uuid', 'str'),
|
|
('key', 'str'),
|
|
]
|
|
|
|
def __init__(self, **kwargs):
|
|
self.uuid = str(uuid.uuid4())
|
|
|
|
allowed_key = {x[0] for x in self.XML_NODES}
|
|
for k, v in kwargs.items():
|
|
if k in allowed_key:
|
|
setattr(self, k, v)
|
|
|
|
def __str__(self):
|
|
return str(self.label)
|
|
|
|
@property
|
|
def is_configured(self):
|
|
return not any(
|
|
field
|
|
for field, _ in self.XML_NODES
|
|
if field != 'id' and field not in self.optional_fields and not getattr(self, field)
|
|
)
|
|
|
|
def set_attributes_from_trace(self, *args, **kwargs):
|
|
pass
|
|
|
|
def render_as_line(self):
|
|
if not self.is_configured:
|
|
return _('not configured')
|
|
|
|
return self.details_label
|
|
|
|
|
|
class ButtonClick(WorkflowTestAction):
|
|
label = _('Simulate click on action button')
|
|
empty_form_error = _('Workflow has no action that displays a button.')
|
|
|
|
key = 'button-click'
|
|
button_name = None
|
|
who = 'receiver'
|
|
who_id = None
|
|
|
|
optional_fields = ['who_id']
|
|
|
|
is_assertion = False
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('button_name', 'str'),
|
|
('who', 'str'),
|
|
('who_id', 'int'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
if self.who == 'receiver':
|
|
user = _('backoffice user')
|
|
elif self.who == 'submitter':
|
|
user = _('submitter')
|
|
else:
|
|
try:
|
|
user = get_publisher().user_class.get(self.who_id)
|
|
except KeyError:
|
|
user = _('missing user')
|
|
|
|
return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user}
|
|
|
|
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
|
if 'action_item_id' in trace.event_args:
|
|
try:
|
|
button_name = [
|
|
x.label
|
|
for x in self.get_all_choice_actions(formdef)
|
|
if x.id == trace.event_args['action_item_id']
|
|
][0]
|
|
except IndexError:
|
|
return
|
|
elif 'global_action_id' in trace.event_args:
|
|
try:
|
|
button_name = [
|
|
x.name
|
|
for x in self.get_all_global_actions(formdef)
|
|
if x.id == trace.event_args['global_action_id']
|
|
][0]
|
|
except IndexError:
|
|
return
|
|
|
|
self.button_name = button_name
|
|
|
|
def perform(self, formdata):
|
|
if self.who == 'receiver':
|
|
user = get_publisher().user_class.get(self.parent.testdef.agent_id)
|
|
elif self.who == 'submitter':
|
|
if formdata.user_id:
|
|
user = get_publisher().user_class.get(formdata.user_id)
|
|
else:
|
|
get_session().mark_anonymous_formdata(formdata)
|
|
user = None
|
|
else:
|
|
try:
|
|
user = get_publisher().user_class.get(self.who_id)
|
|
except KeyError:
|
|
raise WorkflowTestError(_('Broken, missing user'))
|
|
|
|
status = formdata.get_status()
|
|
form = status.get_action_form(formdata, user)
|
|
if not form or not any(
|
|
button_widget := x for x in form.submit_widgets if x.label == self.button_name
|
|
):
|
|
raise WorkflowTestError(_('Button "%s" is not displayed.') % self.button_name)
|
|
|
|
form.get_submit = lambda: button_widget.name
|
|
status.handle_form(form, formdata, user, check_replay=False)
|
|
|
|
@staticmethod
|
|
def get_all_choice_actions(formdef):
|
|
for item in formdef.workflow.get_all_items():
|
|
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
|
|
yield item
|
|
|
|
@staticmethod
|
|
def get_all_global_actions(formdef):
|
|
for action in formdef.workflow.global_actions or []:
|
|
if not action.is_interactive():
|
|
yield action
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
|
|
possible_button_names.update(action.name for action in self.get_all_global_actions(formdef))
|
|
|
|
if not possible_button_names:
|
|
return
|
|
|
|
possible_button_names = sorted(possible_button_names)
|
|
|
|
value = self.button_name
|
|
if value and value not in possible_button_names:
|
|
value = '%s (%s)' % (value, _('not available'))
|
|
possible_button_names.append(value)
|
|
|
|
form.add(
|
|
SingleSelectWidget,
|
|
'button_name',
|
|
title=_('Button name'),
|
|
options=possible_button_names,
|
|
required=True,
|
|
value=value,
|
|
)
|
|
|
|
form.add(
|
|
RadiobuttonsWidget,
|
|
'who',
|
|
title=_('User who clicks on button'),
|
|
options=[
|
|
('receiver', _('Backoffice user'), 'receiver'),
|
|
('submitter', _('Submitter'), 'submitter'),
|
|
('other', _('Other user'), 'other'),
|
|
],
|
|
value=self.who,
|
|
attrs={'data-dynamic-display-parent': 'true'},
|
|
)
|
|
|
|
form.attrs['data-enable-select2'] = 'on'
|
|
form.add(
|
|
JsonpSingleSelectWidget,
|
|
'who_id',
|
|
url='/api/users/',
|
|
value=self.who_id,
|
|
attrs={
|
|
'data-dynamic-display-child-of': 'who',
|
|
'data-dynamic-display-value-in': 'other',
|
|
},
|
|
)
|
|
|
|
|
|
class AssertStatus(WorkflowTestAction):
|
|
label = _('Assert form status')
|
|
|
|
key = 'assert-status'
|
|
status_name = None
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('status_name', 'str'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
return _('Status is "%s"') % self.status_name
|
|
|
|
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
|
try:
|
|
status = formdef.workflow.get_status(trace.status_id)
|
|
except KeyError:
|
|
return
|
|
|
|
self.status_name = status.name
|
|
|
|
def perform(self, formdata):
|
|
status = formdata.get_status()
|
|
if status.name != self.status_name:
|
|
raise WorkflowTestError(
|
|
_('Form should be in status "%(expected_status)s" but is in status "%(status)s".')
|
|
% {'expected_status': self.status_name, 'status': status.name}
|
|
)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
possible_statuses = [x.name for x in formdef.workflow.possible_status]
|
|
|
|
value = self.status_name
|
|
if value and value not in possible_statuses:
|
|
value = '%s (%s)' % (value, _('not available'))
|
|
possible_statuses.append(value)
|
|
|
|
form.add(
|
|
SingleSelectWidget,
|
|
'status_name',
|
|
title=_('Status name'),
|
|
options=possible_statuses,
|
|
required=True,
|
|
value=self.status_name,
|
|
)
|
|
|
|
|
|
class AssertEmail(WorkflowTestAction):
|
|
label = _('Assert email is sent')
|
|
|
|
key = 'assert-email'
|
|
addresses = None
|
|
subject_strings = None
|
|
body_strings = None
|
|
|
|
optional_fields = ['addresses', 'subject_strings', 'body_strings']
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('addresses', 'str_list'),
|
|
('subject_strings', 'str_list'),
|
|
('body_strings', 'str_list'),
|
|
]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.addresses = self.addresses or []
|
|
self.subject_strings = self.subject_strings or []
|
|
self.body_strings = self.body_strings or []
|
|
|
|
@property
|
|
def details_label(self):
|
|
if not self.addresses:
|
|
return ''
|
|
|
|
label = _('Email to "%s"') % self.addresses[0]
|
|
|
|
if len(self.addresses) > 1:
|
|
label = '%s (+%s)' % (label, len(self.addresses) - 1)
|
|
|
|
return label
|
|
|
|
def perform(self, formdata):
|
|
try:
|
|
email = formdata.sent_emails.pop(0)
|
|
except IndexError:
|
|
raise WorkflowTestError(_('No email was sent.'))
|
|
|
|
for address in self.addresses:
|
|
details = [_('Email addresses: %s') % ', '.join(email.email_msg.to)]
|
|
if address not in email.email_msg.to:
|
|
raise WorkflowTestError(_('Email was not sent to address "%s".') % address, details=details)
|
|
|
|
for subject in self.subject_strings:
|
|
details = [_('Email subject: %s') % email.email_msg.subject]
|
|
if subject not in email.email_msg.subject:
|
|
raise WorkflowTestError(_('Email subject does not contain "%s".') % subject, details=details)
|
|
|
|
for body in self.body_strings:
|
|
details = [_('Email body: %s') % email.email_msg.body]
|
|
if body not in email.email_msg.body:
|
|
raise WorkflowTestError(_('Email body does not contain "%s".') % body, details=details)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
WidgetList,
|
|
'addresses',
|
|
element_type=EmailWidget,
|
|
title=_('Email addresses'),
|
|
value=self.addresses,
|
|
add_element_label=_('Add address'),
|
|
element_kwargs={'render_br': False, 'size': 50},
|
|
)
|
|
form.add(
|
|
WidgetList,
|
|
'subject_strings',
|
|
element_type=StringWidget,
|
|
title=_('Subject must contain'),
|
|
value=self.subject_strings,
|
|
add_element_label=_('Add string'),
|
|
element_kwargs={'render_br': False, 'size': 50},
|
|
)
|
|
form.add(
|
|
WidgetList,
|
|
'body_strings',
|
|
element_type=StringWidget,
|
|
title=_('Body must contain'),
|
|
value=self.body_strings,
|
|
add_element_label=_('Add string'),
|
|
element_kwargs={'render_br': False, 'size': 50},
|
|
)
|
|
|
|
|
|
class SkipTime(WorkflowTestAction):
|
|
label = _('Move forward in time')
|
|
|
|
key = 'skip-time'
|
|
seconds = None
|
|
|
|
is_assertion = False
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('seconds', 'int'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
return seconds2humanduration(self.seconds)
|
|
|
|
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
|
if previous_trace:
|
|
self.seconds = (trace.timestamp - previous_trace.timestamp).total_seconds()
|
|
|
|
def rewind(self, formdata):
|
|
def rewind_time(timestamp):
|
|
return timestamp - datetime.timedelta(seconds=self.seconds)
|
|
|
|
formdata.receipt_time = rewind_time(formdata.receipt_time)
|
|
formdata.evolution[-1].time = rewind_time(formdata.evolution[-1].time)
|
|
|
|
def perform(self, formdata):
|
|
self.rewind(formdata)
|
|
|
|
jump_actions = []
|
|
status = formdata.get_status()
|
|
for item in status.items:
|
|
if hasattr(item, 'has_valid_timeout') and item.has_valid_timeout():
|
|
jump_actions.append(item)
|
|
|
|
delay = wf.jump.get_min_jumps_delay(jump_actions)
|
|
|
|
if formdata.last_update_time > formdata.frozen_receipt_time - datetime.timedelta(seconds=delay):
|
|
return
|
|
|
|
for jump_action in jump_actions:
|
|
if jump_action.check_condition(formdata):
|
|
wf.jump.jump_and_perform(formdata, jump_action)
|
|
break
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
StringWidget,
|
|
'seconds',
|
|
title=_('Value'),
|
|
value=seconds2humanduration(self.seconds),
|
|
hint=_('ex.: 1 day 12 hours. Usable units of time: %(variables)s.')
|
|
% {'variables': ','.join(timewords())},
|
|
)
|
|
|
|
def seconds_parse(self, value):
|
|
if not value:
|
|
return value
|
|
try:
|
|
return humanduration2seconds(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
class AssertBackofficeFieldRowWidget(SetBackofficeFieldRowWidget):
|
|
value_widget = StringWidget
|
|
value_placeholder = None
|
|
|
|
|
|
class AssertBackofficeFieldsTableWidget(SetBackofficeFieldsTableWidget):
|
|
element_type = AssertBackofficeFieldRowWidget
|
|
|
|
|
|
class AssertBackofficeFieldValues(WorkflowTestAction):
|
|
label = _('Assert backoffice field values')
|
|
|
|
key = 'assert-backoffice-field'
|
|
fields = []
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('fields', 'fields'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
return ''
|
|
|
|
def perform(self, formdata):
|
|
for field_dict in self.fields:
|
|
field_id = field_dict['field_id']
|
|
expected_value = field_dict['value']
|
|
formdata_value = formdata.data.get(field_id)
|
|
|
|
if formdata_value != expected_value:
|
|
fields = [x for x in formdata.formdef.workflow.get_backoffice_fields() if x.id == field_id]
|
|
if not fields:
|
|
raise WorkflowTestError(
|
|
_('Field %(field_id)s not found (expected value "%(value)s").')
|
|
% {
|
|
'field_id': field_id,
|
|
'value': expected_value,
|
|
}
|
|
)
|
|
|
|
field = fields[0]
|
|
raise WorkflowTestError(
|
|
_(
|
|
'Wrong value for backoffice field "%(field)s" (expected "%(expected_value)s", got "%(value)s").'
|
|
)
|
|
% {
|
|
'field': field.label,
|
|
'value': formdata_value,
|
|
'expected_value': expected_value,
|
|
}
|
|
)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
AssertBackofficeFieldsTableWidget,
|
|
'fields',
|
|
value_widget_class=StringWidget,
|
|
value=self.fields,
|
|
workflow=formdef.workflow,
|
|
)
|
|
|
|
def export_fields_to_xml(self, element, attribute_name, **kwargs):
|
|
for field in self.fields:
|
|
element.append(FieldNode(field).export_to_xml(include_id=True))
|
|
|
|
def import_fields_from_xml(self, element, **kwargs):
|
|
fields = []
|
|
for field_xml_node in element.findall('field'):
|
|
field_node = FieldNode()
|
|
field_node.init_with_xml(field_xml_node, include_id=True, snapshot=None)
|
|
fields.append(field_node.as_dict())
|
|
|
|
return fields
|
|
|
|
|
|
class AssertWebserviceCall(WorkflowTestAction):
|
|
label = _('Assert webservice call')
|
|
|
|
key = 'assert-webservice-call'
|
|
webservice_response_uuid = None
|
|
call_count = 1
|
|
|
|
optional_fields = ['call_count']
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('webservice_response_uuid', 'str'),
|
|
('call_count', 'int'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
webservice_responses = [
|
|
x
|
|
for x in self.parent.testdef.get_webservice_responses()
|
|
if x.uuid == self.webservice_response_uuid
|
|
]
|
|
if webservice_responses:
|
|
return webservice_responses[0].name
|
|
else:
|
|
return _('Broken, missing webservice response')
|
|
|
|
@property
|
|
def empty_form_error(self):
|
|
r = '<p>%s</p>' % _(
|
|
'In order to assert a webservice is called, you must define corresponding webservice response.'
|
|
)
|
|
r += '<p><a href="%swebservice-responses/">%s</a><p>' % (
|
|
self.parent.testdef.get_admin_url(),
|
|
_('Add webservice response'),
|
|
)
|
|
return r
|
|
|
|
def perform(self, formdata):
|
|
try:
|
|
response = [
|
|
x
|
|
for x in self.parent.testdef.get_webservice_responses()
|
|
if x.uuid == self.webservice_response_uuid
|
|
][0]
|
|
except IndexError:
|
|
raise WorkflowTestError(_('Broken, missing webservice response'))
|
|
|
|
call_count = 0
|
|
for used_response in formdata.used_webservice_responses.copy():
|
|
if used_response.uuid == self.webservice_response_uuid:
|
|
formdata.used_webservice_responses.remove(used_response)
|
|
call_count += 1
|
|
|
|
if call_count != self.call_count:
|
|
raise WorkflowTestError(
|
|
_('Webservice response %(name)s was used %(count)s times (instead of %(expected_count)s).')
|
|
% {'name': response.name, 'count': call_count, 'expected_count': self.call_count}
|
|
)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
webservice_response_options = [
|
|
(response.uuid, response.name, response.uuid)
|
|
for response in self.parent.testdef.get_webservice_responses()
|
|
]
|
|
|
|
if not webservice_response_options:
|
|
return
|
|
|
|
form.add(
|
|
SingleSelectWidget,
|
|
'webservice_response_uuid',
|
|
title=_('Webservice response'),
|
|
options=webservice_response_options,
|
|
required=True,
|
|
value=self.webservice_response_uuid,
|
|
)
|
|
form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count)
|
|
|
|
|
|
class AssertSMS(WorkflowTestAction):
|
|
label = _('Assert SMS is sent')
|
|
|
|
key = 'assert-sms'
|
|
phone_numbers = None
|
|
body = None
|
|
|
|
optional_fields = ['phone_numbers', 'body']
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('phone_numbers', 'str_list'),
|
|
('body', 'str'),
|
|
]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.phone_numbers = self.phone_numbers or []
|
|
|
|
@property
|
|
def details_label(self):
|
|
if not self.phone_numbers:
|
|
return ''
|
|
|
|
label = _('SMS to %s') % self.phone_numbers[0]
|
|
|
|
if len(self.phone_numbers) > 1:
|
|
label = '%s (+%s)' % (label, len(self.phone_numbers) - 1)
|
|
|
|
return label
|
|
|
|
def perform(self, formdata):
|
|
try:
|
|
sms = formdata.sent_sms.pop(0)
|
|
except IndexError:
|
|
raise WorkflowTestError(_('No SMS was sent.'))
|
|
|
|
for recipient in self.phone_numbers:
|
|
if recipient not in sms['phone_numbers']:
|
|
details = [_('SMS phone numbers: %s') % ', '.join(sms['phone_numbers'])]
|
|
raise WorkflowTestError(_('SMS was not sent to %s.') % recipient, details=details)
|
|
|
|
if self.body != sms['body']:
|
|
details = [_('SMS body: "%s"') % sms['body']]
|
|
raise WorkflowTestError(_('SMS body mismatch.'), details=details)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
WidgetList,
|
|
'phone_numbers',
|
|
title=_('Phone numbers'),
|
|
value=self.phone_numbers,
|
|
add_element_label=_('Add phone number'),
|
|
element_kwargs={'render_br': False, 'size': 50},
|
|
)
|
|
form.add(
|
|
StringWidget,
|
|
'body',
|
|
title=_('Body'),
|
|
value=self.body,
|
|
)
|
|
|
|
|
|
class AssertAnonymise(WorkflowTestAction):
|
|
label = _('Assert anonymisation is performed')
|
|
|
|
key = 'assert-anonymise'
|
|
|
|
editable = False
|
|
details_label = ''
|
|
|
|
def perform(self, formdata):
|
|
if not formdata.anonymisation_performed:
|
|
raise WorkflowTestError(_('Form was not anonymised.'))
|
|
|
|
|
|
class AssertRedirect(WorkflowTestAction):
|
|
label = _('Assert redirect is performed')
|
|
|
|
key = 'assert-redirect'
|
|
url = None
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('url', 'str'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
return self.url
|
|
|
|
def perform(self, formdata):
|
|
if not formdata.redirect_to_url:
|
|
raise WorkflowTestError(_('No redirection occured.'))
|
|
|
|
if formdata.redirect_to_url != self.url:
|
|
raise WorkflowTestError(
|
|
_('Expected redirection to %(expected_url)s but was redirected to %(url)s.')
|
|
% {'expected_url': self.url, 'url': formdata.redirect_to_url}
|
|
)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
StringWidget,
|
|
'url',
|
|
title=_('URL'),
|
|
value=self.url,
|
|
)
|
|
|
|
|
|
class AssertHistoryMessage(WorkflowTestAction):
|
|
label = _('Assert history message is displayed')
|
|
details_label = ''
|
|
|
|
key = 'assert-history-message'
|
|
message = None
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('message', 'str'),
|
|
]
|
|
|
|
def perform(self, formdata):
|
|
try:
|
|
message = formdata.history_messages.pop(0)
|
|
except IndexError:
|
|
raise WorkflowTestError(_('No history message.'))
|
|
|
|
if self.message not in message:
|
|
details = [
|
|
_('Displayed history message: %s') % message,
|
|
_('Expected history message: %s') % self.message,
|
|
]
|
|
raise WorkflowTestError(_('Wrong history message content.'), details=details)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
TextWidget,
|
|
'message',
|
|
title=_('Message'),
|
|
value=self.message,
|
|
hint=_('Assertion will pass if the text is contained in history message.'),
|
|
)
|
|
|
|
|
|
class AssertAlert(WorkflowTestAction):
|
|
label = _('Assert alert is displayed')
|
|
details_label = ''
|
|
|
|
key = 'assert-alert'
|
|
message = None
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('message', 'str'),
|
|
]
|
|
|
|
def perform(self, formdata):
|
|
messages = formdata.get_workflow_messages()
|
|
|
|
for message in messages:
|
|
if self.message in message:
|
|
break
|
|
else:
|
|
details = [
|
|
_('Displayed alerts: %s') % (', '.join(messages) if messages else _('None')),
|
|
_('Expected alert: %s') % self.message,
|
|
]
|
|
raise WorkflowTestError(_('No alert matching message.'), details=details)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
form.add(
|
|
TextWidget,
|
|
'message',
|
|
title=_('Message'),
|
|
value=self.message,
|
|
hint=_('Assertion will pass if the text is contained in alert message.'),
|
|
)
|
|
|
|
|
|
class AssertCriticality(WorkflowTestAction):
|
|
label = _('Assert criticality level')
|
|
empty_form_error = _('Workflow has no criticality levels.')
|
|
|
|
key = 'assert-criticality'
|
|
level_id = None
|
|
|
|
XML_NODES = WorkflowTestAction.XML_NODES + [
|
|
('level_id', 'str'),
|
|
]
|
|
|
|
@property
|
|
def details_label(self):
|
|
levels = [
|
|
x for x in self.parent.testdef.formdef.workflow.criticality_levels or [] if x.id == self.level_id
|
|
]
|
|
if not levels:
|
|
return _('Broken, missing criticality level')
|
|
|
|
return _('Criticality is "%s"') % levels[0].name
|
|
|
|
def perform(self, formdata):
|
|
levels = [x for x in formdata.formdef.workflow.criticality_levels or [] if x.id == self.level_id]
|
|
if not levels:
|
|
raise WorkflowTestError(_('Broken, missing criticality level'))
|
|
|
|
current_level = formdata.get_criticality_level_object()
|
|
if current_level.id != self.level_id:
|
|
raise WorkflowTestError(
|
|
_('Form should have criticality level "%(expected_level)s" but has level "%(level)s".')
|
|
% {'expected_level': levels[0].name, 'level': current_level.name}
|
|
)
|
|
|
|
def fill_admin_form(self, form, formdef):
|
|
if not formdef.workflow.criticality_levels:
|
|
return
|
|
|
|
form.add(
|
|
SingleSelectWidget,
|
|
'level_id',
|
|
title=_('Name'),
|
|
value=self.level_id,
|
|
options=[(x.id, x.name, x.id) for x in formdef.workflow.criticality_levels],
|
|
)
|