Compare commits

..

9 Commits

Author SHA1 Message Date
Pierre Ducroquet 17ac728f78 tests: add a test for new FTS on formdefs (#86527)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-05 10:41:25 +01:00
Frédéric Péters ab38410f34 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 17:49:49 +01:00
Frédéric Péters 2eb29089a1 templatetags: make sure |get_preference gets a user (#87741)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 16:50:09 +01:00
Valentin Deniaud f418c515f5 testdef: allow setting status code of webservice response (#87542)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 16:06:35 +01:00
Valentin Deniaud d784acba07 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 15:33:45 +01:00
Valentin Deniaud d9267a79ca workflow_tests: allow testing recipient email address (#87566)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 14:33:00 +01:00
Frédéric Péters 9cf2aae477 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 13:47:43 +01:00
Frédéric Péters d10392f0fe workflows: reword webservice error recording options (#54931)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 13:45:52 +01:00
Valentin Deniaud 7dfc90a1e5 testdef: do not run workflow if no test actions (#87696)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-04 12:45:07 +01:00
14 changed files with 208 additions and 200 deletions

View File

@ -2,6 +2,7 @@ import os
import pytest
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
@ -86,6 +87,10 @@ def test_admin_for_all(pub):
user = create_superuser(pub)
role = create_role(pub)
formdef = FormDef()
formdef.name = 'test'
formdef.store()
try:
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w'):
pass # create empty file

View File

@ -1369,6 +1369,7 @@ def test_tests_webservice_response(pub):
resp = resp.click('Test response')
resp.form['url'] = 'http://example.com/'
resp.form['payload'] = '{"a": "b"}'
resp.form['status_code'] = '400'
resp.form['qs_data$element0key'] = 'foo'
resp.form['method'] = 'POST (JSON)'
resp.form['post_data$element0key'] = 'bar'
@ -1381,6 +1382,7 @@ def test_tests_webservice_response(pub):
assert response.name == 'Test response'
assert response.url == 'http://example.com/'
assert response.payload == '{"a": "b"}'
assert response.status_code == 400
assert response.qs_data == {'foo': ''}
assert response.method == 'POST'
assert response.post_data == {'bar': ''}

View File

@ -3789,8 +3789,16 @@ def test_workflows_wscall_options(pub, value):
baz_status.add_action('webservice_call')
workflow.store()
pub.cfg['debug'] = {}
pub.write_cfg()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
assert 'notify_on_errors' not in resp.form.fields
pub.cfg['debug'] = {'error_email': 'test@localhost'}
pub.write_cfg()
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
assert resp.form['notify_on_errors'].value is None
assert resp.form['record_on_errors'].value == 'yes'
resp.form['notify_on_errors'] = value

View File

@ -64,6 +64,32 @@ def test_workflow_tests_ignore_unsupported_items(pub, monkeypatch):
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
def test_workflow_tests_no_actions(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = []
with mock.patch('wcs.workflow_tests.WorkflowTests.run') as mocked_run:
testdef.run(formdef)
mocked_run.assert_not_called()
def test_workflow_tests_button_click(pub):
role = pub.role_class(name='test role')
role.store()
@ -316,7 +342,9 @@ def test_workflow_tests_sendmail(mocked_send_email, pub):
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertEmail(subject_strings=['In new status'], body_strings=['xxx']),
workflow_tests.AssertEmail(
addresses=['test@example.org'], subject_strings=['In new status'], body_strings=['xxx']
),
workflow_tests.ButtonClick(button_name='Go to end status'),
workflow_tests.AssertStatus(status_name='End status'),
workflow_tests.AssertEmail(subject_strings=['end status'], body_strings=['yyy']),
@ -347,6 +375,15 @@ def test_workflow_tests_sendmail(mocked_send_email, pub):
testdef.run(formdef)
assert str(excinfo.value) == 'Email body does not contain "bli".'
testdef.workflow_tests.actions = [
workflow_tests.AssertEmail(addresses=['test@example.org', 'other@example.org']),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Email was not sent to address "other@example.org".'
assert 'Email addresses: test@example.org' in excinfo.value.details
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to end status'),
workflow_tests.AssertEmail(subject_strings=['In new status'], body_strings=['xxx']),
@ -506,3 +543,53 @@ def test_workflow_tests_webservice(pub):
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
def test_workflow_tests_webservice_status_jump(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
end_status = workflow.add_status(name='Error status')
wscall = new_status.add_action('webservice_call')
wscall.url = 'http://example.com/json'
wscall.varname = 'test_webservice'
wscall.action_on_4xx = end_status.id
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.store()
response = WebserviceResponse()
response.testdef_id = testdef.id
response.name = 'Fake response'
response.url = 'http://example.com/json'
response.payload = '{"foo": "foo"}'
response.store()
testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='New status'),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
]
testdef.run(formdef)
response.status_code = 400
response.store()
testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='End status'),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
]

View File

@ -754,6 +754,16 @@ class WebserviceResponsePage(Directory):
validation_function=validate_json,
)
form.add(
RadiobuttonsWidget,
'status_code',
title=_('Response status code'),
required=True,
options=[200, 204, 400, 401, 403, 404, 500, 502, 503],
value=self.webservice_response.status_code,
extra_css_class='widget-inline-radio',
)
form.add(
WidgetDict,
'qs_data',
@ -820,6 +830,7 @@ class WebserviceResponsePage(Directory):
self.webservice_response.name = form.get_widget('name').parse()
self.webservice_response.payload = form.get_widget('payload').parse()
self.webservice_response.url = form.get_widget('url').parse()
self.webservice_response.status_code = form.get_widget('status_code').parse()
self.webservice_response.qs_data = form.get_widget('qs_data').parse()
self.webservice_response.method = form.get_widget('method').parse()
self.webservice_response.post_data = form.get_widget('post_data').parse()

View File

@ -4,8 +4,8 @@ msgid ""
msgstr ""
"Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-03-01 17:03+0100\n"
"PO-Revision-Date: 2024-03-01 17:03+0100\n"
"POT-Creation-Date: 2024-03-04 17:49+0100\n"
"PO-Revision-Date: 2024-03-04 17:49+0100\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n"
"Language: fr\n"
@ -725,11 +725,11 @@ msgstr "Attribut du texte (text)"
msgid "Name of the attribute containing the label of an entry (default: text)"
msgstr "Nom de lattribut contenant le texte dune donnée (par défaut : text)"
#: admin/data_sources.py admin/wscalls.py wf/wscall.py
#: admin/data_sources.py admin/wscalls.py
msgid "Notify on errors"
msgstr "Notifier en cas derreur"
#: admin/data_sources.py admin/wscalls.py wf/wscall.py
#: admin/data_sources.py admin/wscalls.py
msgid "Record on errors"
msgstr "Enregistrer les erreurs"
@ -2628,6 +2628,10 @@ msgstr "JSON invalide : %s"
msgid "Response payload (JSON)"
msgstr "Contenu de la réponse (JSON)"
#: admin/tests.py
msgid "Response status code"
msgstr "Code de statut de la réponse"
#: admin/tests.py
msgid "Restrict to query string data"
msgstr "Limiter aux paramètres de lURL"
@ -11936,8 +11940,26 @@ msgid "Action on network errors"
msgstr "Action en cas derreur réseau"
#: wf/wscall.py
msgid "Record errors in the log"
msgstr "Enregistrer les erreurs dans lhistorique"
msgid "Notify errors by email"
msgstr "Notifier les erreurs par courriel "
#: wf/wscall.py
#, python-format
msgid "Error traces will be sent to %s"
msgstr "Les traces des erreurs seront envoyées à %s."
#: wf/wscall.py
msgid ""
"Record errors in the central error screen, for management by administrators"
msgstr ""
" Enregistrer les erreurs dans lécran de centralisation des erreurs, pour "
"traitement par léquipe dadministration"
#: wf/wscall.py
msgid "Record errors in card/form history log, for agents"
msgstr ""
"Enregistrer les erreurs dans lhistorique de la demande ou fiche, pour "
"visualisation par les agents"
#: wf/wscall.py
msgctxt "wscall-parameter"
@ -12029,10 +12051,25 @@ msgstr "Nom du statut"
msgid "Assert email is sent"
msgstr "Vérifier lenvoi dun courriel"
#: workflow_tests.py
#, python-format
msgid "Email to \"%s\""
msgstr "Courriel vers « %s »"
#: workflow_tests.py
msgid "No email was sent."
msgstr "Aucun courriel envoyé."
#: workflow_tests.py
#, python-format
msgid "Email addresses: %s"
msgstr "Adresses de courriel : %s"
#: workflow_tests.py
#, python-format
msgid "Email was not sent to address \"%s\"."
msgstr "Le courriel na pas été envoyé vers ladresse « %s »."
#: workflow_tests.py
#, python-format
msgid "Email subject: %s"
@ -12053,6 +12090,14 @@ msgstr "Corps du courriel : %s"
msgid "Email body does not contain \"%s\"."
msgstr "Le corps du courriel ne contient pas « %s »."
#: workflow_tests.py
msgid "Email addresses"
msgstr "Adresses de courriel"
#: workflow_tests.py
msgid "Add address"
msgstr "Ajouter une adresse"
#: workflow_tests.py
msgid "Subject must contain"
msgstr "Le sujet doit contenir"

View File

@ -455,7 +455,6 @@ class WcsPublisher(QommonPublisher):
for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur)
sql.init_search_tokens()
cur.close()
def record_deprecated_usage(self, *args, **kwargs):

View File

@ -690,11 +690,6 @@ class QommonPublisher(Publisher):
for error in self.loggederror_class.select(clause=clauses):
self.loggederror_class.remove_object(error.id)
def clean_search_tokens(self, **kwargs):
from wcs import sql
sql.purge_obsolete_search_tokens()
@classmethod
def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
@ -707,9 +702,6 @@ class QommonPublisher(Publisher):
cls.register_cronjob(
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
)
cls.register_cronjob(
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_tokens')
)
_initialized = False

View File

@ -1281,7 +1281,7 @@ def intcomma(value):
@register.filter
def get_preference(user, pref_name):
return user.get_preference(pref_name)
return user.get_preference(pref_name) if user else None
@register.simple_tag(takes_context=True)

View File

@ -96,20 +96,6 @@ SQL_TYPE_MAPPING = {
}
def _table_exists(cur, table_name):
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s;', (table_name,))
rows = cur.fetchall()
return len(rows) > 0
def _trigger_exists(cur, table_name, trigger_name):
cur.execute(
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s;', (table_name, trigger_name)
)
rows = cur.fetchall()
return len(rows) > 0
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -1593,8 +1579,6 @@ def do_global_views(conn, cur):
% (name, category.id)
)
init_search_tokens_triggers(cur)
def clean_global_views(conn, cur):
# Purge of any dead data
@ -1687,154 +1671,11 @@ def init_global_table(conn=None, cur=None):
endpoint_status=endpoint_status_filter,
)
)
init_search_tokens_data(cur)
if own_conn:
cur.close()
def init_search_tokens(conn=None, cur=None):
own_cur = False
if cur is None:
own_cur = True
conn, cur = get_connection_and_cursor()
# Create table
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
# Create triggers
init_search_tokens_triggers(cur)
# Fill table
init_search_tokens_data(cur)
# Index at the end, small performance trick... not that useful, but it's free...
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
cur.execute(
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
)
# And last: functions to use this brand new table
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
cur.execute(
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
RETURNS tsquery
LANGUAGE sql
STABLE
AS $function$
with
tokenized as (select unnest(regexp_split_to_array($1, '\s+')) w),
super_tokenized as (
select w,
coalesce((select plainto_tsquery(perfect.token) from wcs_search_tokens perfect where perfect.token = plainto_tsquery(w)::text),
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
plainto_tsquery(w)) tokens
from tokenized
left join wcs_search_tokens partial on partial.token % w and w not similar to '%[0-9]{2,}%'
group by w)
select tsquery_agg_and(tokens) from super_tokenized;
$function$;"""
)
if own_cur:
cur.close()
def init_search_tokens_triggers(cur):
# We define only appending triggers, ie on INSERT and UPDATE.
# It would be far heavier to maintain deletions here, and having extra data has
# no or marginal side effect on search performances, and absolutely no impact
# on search results.
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
# personal data is kept uselessly.
# First part: the appending function
cur.execute(
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
RETURN NEW;
END;
$function$;"""
)
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort trigger creation if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
):
# Second part: insert and update triggers for wcs_all_forms
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
AFTER INSERT ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
AFTER UPDATE OF fts ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
):
# Third part: insert and update triggers for searchable_formdefs
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
AFTER INSERT ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
AFTER UPDATE OF fts ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
def init_search_tokens_data(cur):
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort table data initialization if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
ON CONFLICT(token) DO NOTHING;"""
)
if _table_exists(cur, 'searchable_formdefs'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
ON CONFLICT(token) DO NOTHING;"""
)
def purge_obsolete_search_tokens(cur=None):
own_cur = False
if cur is None:
own_cur = True
_, cur = get_connection_and_cursor()
cur.execute(
"""DELETE FROM wcs_search_tokens
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
)
if own_cur:
cur.close()
class SqlMixin:
_table_name = None
_numerical_id = True
@ -4948,6 +4789,7 @@ class SearchableFormDef(SqlMixin):
% (cls._table_name, cls._table_name)
)
cls.do_indexes(cur)
cur.close()
from wcs.carddef import CardDef
from wcs.formdef import FormDef
@ -4956,8 +4798,6 @@ class SearchableFormDef(SqlMixin):
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
):
cls.update(obj=objectdef)
init_search_tokens(cur)
cur.close()
@classmethod
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
@ -4995,7 +4835,7 @@ class SearchableFormDef(SqlMixin):
def search(cls, obj_type, string):
_, cur = get_connection_and_cursor()
cur.execute(
'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)',
(FtsMatch.get_fts_value(string),),
)
ids = [x[0] for x in cur.fetchall()]
@ -5260,7 +5100,7 @@ def get_period_total(
# latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number)
SQL_LEVEL = (106, 'improved fts method')
SQL_LEVEL = (105, 'change test result json structure')
def migrate_global_views(conn, cur):
@ -5593,10 +5433,6 @@ def migrate():
for formdef in FormDef.select() + CardDef.select():
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
if sql_level < 106:
# 106: new fts mechanism with tokens table
init_search_tokens()
if sql_level != SQL_LEVEL[0]:
cur.execute(
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',

View File

@ -373,11 +373,6 @@ class FtsMatch(Criteria):
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
class WcsFtsMatch(FtsMatch):
def as_sql(self):
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
class ElementEqual(Criteria):
def __init__(self, attribute, key, value, **kwargs):
super().__init__(attribute, value)

View File

@ -287,7 +287,7 @@ class TestDef(sql.TestDef):
def _run(self, objectdef):
formdata = self.run_form_fill(objectdef)
if self.agent_id:
if self.agent_id and self.workflow_tests.actions:
agent_user = get_publisher().user_class.get(self.agent_id)
self.workflow_tests.run(formdata, agent_user)
@ -640,7 +640,7 @@ class MockWebserviceResponseAdapter(requests.adapters.HTTPAdapter):
}
raw_response = HTTPResponse(
status=200,
status=response.status_code,
body=io.BytesIO(response.payload.encode()),
headers=headers,
original_response=self.make_original_response(headers),
@ -669,6 +669,7 @@ class WebserviceResponse(XmlStorableObject):
name = ''
payload = None
url = None
status_code = 200
qs_data = None
method = ''
post_data = None
@ -678,6 +679,7 @@ class WebserviceResponse(XmlStorableObject):
('name', 'str'),
('payload', 'str'),
('url', 'str'),
('status_code', 'int'),
('qs_data', 'kv_data'),
('method', 'str'),
('post_data', 'kv_data'),

View File

@ -185,9 +185,9 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
'action_on_5xx',
'action_on_bad_data',
'action_on_network_errors',
'notify_on_errors',
'record_on_errors',
'record_errors',
'record_on_errors',
'notify_on_errors',
'condition',
'set_marker_on_status',
)
@ -378,11 +378,12 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
default_value=getattr(self.__class__, attribute),
)
if 'notify_on_errors' in parameters:
if 'notify_on_errors' in parameters and get_publisher().logger.error_email:
form.add(
CheckboxWidget,
'%snotify_on_errors' % prefix,
title=_('Notify on errors'),
title=_('Notify errors by email'),
hint=_('Error traces will be sent to %s') % get_publisher().logger.error_email,
value=self.notify_on_errors,
tab=('error', _('Error Handling')),
)
@ -391,7 +392,7 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
form.add(
CheckboxWidget,
'%srecord_on_errors' % prefix,
title=_('Record on errors'),
title=_('Record errors in the central error screen, for management by administrators'),
value=self.record_on_errors,
tab=('error', _('Error Handling')),
default_value=self.__class__.record_on_errors,
@ -401,7 +402,7 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
form.add(
CheckboxWidget,
'%srecord_errors' % prefix,
title=_('Record errors in the log'),
title=_('Record errors in card/form history log, for agents'),
value=self.record_errors,
tab=('error', _('Error Handling')),
)

View File

@ -19,7 +19,7 @@ import uuid
from wcs import wf
from wcs.qommon import _
from wcs.qommon.form import IntWidget, SingleSelectWidget, StringWidget, WidgetList
from wcs.qommon.form import EmailWidget, IntWidget, SingleSelectWidget, StringWidget, WidgetList
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from wcs.qommon.xml_storage import XmlStorableObject
from wcs.testdef import TestError, WebserviceResponse
@ -268,24 +268,35 @@ class AssertEmail(WorkflowTestAction):
label = _('Assert email is sent')
key = 'assert-email'
addresses = None
subject_strings = None
body_strings = None
optional_fields = ['subject_strings', 'body_strings']
optional_fields = ['addresses', 'subject_strings', 'body_strings']
XML_NODES = WorkflowTestAction.XML_NODES + [
('addresses', 'str_list'),
('subject_strings', 'str_list'),
('body_strings', 'str_list'),
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.addresses = self.addresses or []
self.subject_strings = self.subject_strings or []
self.body_strings = self.body_strings or []
@property
def details_label(self):
return ''
if not self.addresses:
return ''
label = _('Email to "%s"') % self.addresses[0]
if len(self.addresses) > 1:
label = '%s (+%s)' % (label, len(self.addresses) - 1)
return label
def perform(self, formdata, user):
try:
@ -293,6 +304,11 @@ class AssertEmail(WorkflowTestAction):
except IndexError:
raise WorkflowTestError(_('No email was sent.'))
for address in self.addresses:
details = [_('Email addresses: %s') % ', '.join(email.email_msg.to)]
if address not in email.email_msg.to:
raise WorkflowTestError(_('Email was not sent to address "%s".') % address, details=details)
for subject in self.subject_strings:
details = [_('Email subject: %s') % email.email_msg.subject]
if subject not in email.email_msg.subject:
@ -304,6 +320,15 @@ class AssertEmail(WorkflowTestAction):
raise WorkflowTestError(_('Email body does not contain "%s".') % body, details=details)
def fill_admin_form(self, form, formdef):
form.add(
WidgetList,
'addresses',
element_type=EmailWidget,
title=_('Email addresses'),
value=self.addresses,
add_element_label=_('Add address'),
element_kwargs={'render_br': False, 'size': 50},
)
form.add(
WidgetList,
'subject_strings',