wcs/tests/test_ctl.py

966 lines
32 KiB
Python

import collections
import os
import shutil
import sys
from unittest import mock
import django
import psycopg2
import pytest
from django.core.management import CommandError, call_command
import wcs.qommon.ctl
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.ctl.delete_tenant import CmdDeleteTenant
from wcs.ctl.management.commands.trigger_jumps import select_and_jump_formdata
from wcs.ctl.rebuild_indexes import rebuild_vhost_indexes
from wcs.ctl.wipe_data import CmdWipeData
from wcs.fields import EmailField, ItemField, PageField, StringField
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
from wcs.qommon.management.commands.migrate_schemas import Command as CmdMigrateSchemas
from wcs.sql import cleanup_connection, get_connection_and_cursor
from wcs.wf.create_formdata import Mapping
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
from wcs.wscalls import NamedWsCall
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub():
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
yield pub
clean_temporary_pub()
def teardown_module(module):
clean_temporary_pub()
def test_loading():
ctl = wcs.qommon.ctl.Ctl(cmd_prefixes=['wcs.ctl'])
ctl.load_all_commands(ignore_errors=False)
# noqa pylint: disable=consider-iterating-dictionary
assert 'shell' in ctl.get_commands().keys()
# call all __init__() methods
for cmd in ctl.get_commands().values():
cmd()
def test_collectstatic(pub, tmp_path):
CmdCollectStatic.collectstatic(pub)
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'js', 'qommon.forms.js'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'gadjo.css'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'xstatic', 'jquery.js'))
CmdCollectStatic.collectstatic(pub, clear=True, link=True)
assert os.path.islink(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
# create a broken link
required_tmp = os.path.join(tmp_path, 'required.png')
required_link = os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png')
shutil.copy2(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'), required_tmp)
os.unlink(required_link)
os.symlink(required_tmp, required_link)
os.unlink(required_tmp)
# check that we have a broken link
assert os.path.islink(required_link) and not os.path.exists(required_link)
# still works if broken link exists
CmdCollectStatic.collectstatic(pub, link=True)
# link not broken any more
assert os.path.islink(required_link) and os.path.exists(required_link)
def test_migrate(pub):
pub.cleanup()
CmdMigrate().handle()
def test_migrate_schemas(pub):
pub.cleanup()
CmdMigrateSchemas().handle()
def test_wipe_formdata(pub):
form_1 = FormDef()
form_1.name = 'example'
form_1.fields = [StringField(id='0', label='Your Name'), EmailField(id='1', label='Email')]
form_1.store()
form_1.data_class().wipe()
formdata_1 = form_1.data_class()()
formdata_1.data = {'0': 'John Doe', '1': 'john@example.net'}
formdata_1.store()
assert form_1.data_class().count() == 1
form_2 = FormDef()
form_2.name = 'example2'
form_2.fields = [StringField(id='0', label='First Name'), StringField(id='1', label='Last Name')]
form_2.store()
form_2.data_class().wipe()
formdata_2 = form_2.data_class()()
formdata_2.data = {'0': 'John', '1': 'Doe'}
formdata_2.store()
assert form_2.data_class().count() == 1
wipe_cmd = CmdWipeData()
# check command options
options, args = wipe_cmd.parse_args(['--all'])
assert options.all
options, args = wipe_cmd.parse_args([form_1.url_name, form_2.url_name])
assert form_1.url_name in args
assert form_2.url_name in args
sub_options_class = collections.namedtuple('Options', ['all'])
sub_options = sub_options_class(False)
# test with no options
wipe_cmd.wipe(pub, sub_options, [])
assert form_1.data_class().count() == 1
assert form_2.data_class().count() == 1
# wipe one form formdatas
wipe_cmd.wipe(pub, sub_options, [form_1.url_name])
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 1
# wipe all formdatas
sub_options = sub_options_class(True)
wipe_cmd.wipe(pub, sub_options, [])
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 0
def test_trigger_jumps(pub):
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'goto2'
jump.status = 'st2'
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
StringField(id='0', label='Your Name', varname='name'),
EmailField(id='1', label='Email', varname='email'),
]
formdef.workflow_id = workflow.id
formdef.store()
def run_trigger(trigger, rows):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.id = 1
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id1 = formdata.id
formdata = formdef.data_class()()
formdata.id = 2
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id2 = formdata.id
select_and_jump_formdata(formdef, trigger, rows)
return formdef.data_class().get(id1), formdef.data_class().get(id2)
f1, f2 = run_trigger('goto2', '__all__')
assert f1.status == f2.status == 'wf-%s' % st2.id
# check publisher substitutions vars after the last jump_and_perform (#13964)
assert pub in pub.substitutions.sources
assert formdef in pub.substitutions.sources
# we cannot know which formdata is the last one, test each possibility
if f1 in pub.substitutions.sources:
assert f2 not in pub.substitutions.sources
if f2 in pub.substitutions.sources:
assert f1 not in pub.substitutions.sources
f1, f2 = run_trigger('goto2', [{'select': {}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
assert f1.status == 'wf-%s' % st2.id
assert f2.status == 'wf-%s' % st1.id
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
assert f1.status == 'wf-%s' % st1.id
assert f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {}, 'data': {'foo': 'bar'}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}, 'data': {'foo': 'bar'}}])
assert f1.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == 'bar'
assert f2.status == 'wf-%s' % st1.id
assert not f2.workflow_data
f1, f2 = run_trigger('badtrigger', '__all__')
assert f1.status == f2.status == 'wf-%s' % st1.id
assert not f1.workflow_data
assert not f2.workflow_data
def test_delete_tenant_with_sql(freezer):
pub = create_temporary_pub()
delete_cmd = CmdDeleteTenant()
assert os.path.isdir(pub.app_dir)
sub_options_class = collections.namedtuple('Options', ['force_drop'])
sub_options = sub_options_class(False)
freezer.move_to('2018-12-01T00:00:00')
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
conn, cur = get_connection_and_cursor()
cur.execute(
"""SELECT schema_name
FROM information_schema.schemata
WHERE schema_name like 'removed_20181201_%%%s'"""
% pub.cfg['postgresql']['database']
)
assert len(cur.fetchall()) == 1
clean_temporary_pub()
pub = create_temporary_pub()
sub_options = sub_options_class(True)
delete_cmd.delete_tenant(pub, sub_options, [])
conn, cur = get_connection_and_cursor(new=True)
assert not os.path.isdir(pub.app_dir)
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'"""
)
assert not cur.fetchall()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert cur.fetchall()
clean_temporary_pub()
pub = create_temporary_pub()
cleanup_connection()
sub_options = sub_options_class(True)
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres',
}
delete_cmd.delete_tenant(pub, sub_options, [])
connect_kwargs = {'dbname': 'postgres', 'user': pub.cfg['postgresql']['user']}
pgconn = psycopg2.connect(**connect_kwargs)
cur = pgconn.cursor()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert not cur.fetchall()
cur.close()
pgconn.close()
clean_temporary_pub()
pub = create_temporary_pub()
cleanup_connection()
sub_options = sub_options_class(False)
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres',
}
delete_cmd.delete_tenant(pub, sub_options, [])
pgconn = psycopg2.connect(**connect_kwargs)
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname like 'removed_20181201_%%%s'"""
% pub.cfg['postgresql']['database']
)
result = cur.fetchall()
assert len(result) == 1
# clean this db after test
cur.execute("""DROP DATABASE %s""" % result[0][0])
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert not cur.fetchall()
cur.close()
conn.close()
clean_temporary_pub()
def test_delete_tenant_without_sql():
pub = create_temporary_pub()
delete_cmd = CmdDeleteTenant()
assert os.path.isdir(pub.app_dir)
sub_options_class = collections.namedtuple('Options', ['force_drop'])
sub_options = sub_options_class(False)
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
clean_temporary_pub()
pub = create_temporary_pub()
assert os.path.isdir(pub.app_dir)
sub_options = sub_options_class(True)
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
clean_temporary_pub()
def test_rebuild_indexes(pub):
form = FormDef()
form.name = 'example'
form.store()
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
os.unlink(os.path.join(pub.app_dir, 'formdefs-url_name', 'example'))
os.symlink('../formdefs/1', os.path.join(pub.app_dir, 'formdefs-url_name', 'XXX'))
rebuild_vhost_indexes(pub, destroy=False)
assert 'example' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
assert 'XXX' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
rebuild_vhost_indexes(pub, destroy=True)
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
def test_runscript(pub):
with pytest.raises(CommandError):
call_command('runscript')
with pytest.raises(CommandError):
call_command('runscript', '--domain=a', '--all-tenants')
with open(os.path.join(pub.app_dir, 'test2.py'), 'w') as fd:
fd.write(
'''
import os
from quixote import get_publisher
open(os.path.join(get_publisher().app_dir, 'runscript.test'), 'w').close()
'''
)
call_command('runscript', '--domain=example.net', os.path.join(pub.app_dir, 'test2.py'))
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
os.unlink(os.path.join(pub.app_dir, 'runscript.test'))
call_command('runscript', '--all-tenants', os.path.join(pub.app_dir, 'test2.py'))
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
def test_import_site():
with pytest.raises(CommandError):
call_command('import_site')
pub = create_temporary_pub()
FormDef.wipe()
assert FormDef.count() == 0
assert 'workflows' not in os.listdir(pub.app_dir)
site_zip_path = os.path.join(os.path.dirname(__file__), 'site.zip')
call_command('import_site', '--domain=example.net', site_zip_path)
assert FormDef.count() == 1
assert Workflow.count() == 1
assert 'workflows' in os.listdir(pub.app_dir)
FormDef.wipe()
assert FormDef.count() == 0
assert Workflow.count() == 1
call_command('import_site', '--domain=example.net', '--if-empty', site_zip_path)
assert FormDef.count() == 0
assert Workflow.count() == 1
site_zip_path = os.path.join(os.path.dirname(__file__), 'missing_file.zip')
with pytest.raises(CommandError, match='missing file:'):
call_command('import_site', '--domain=example.net', site_zip_path)
def test_shell():
with pytest.raises(CommandError):
call_command('shell') # missing tenant name
class AfterJobForTest(AfterJob):
def execute(self):
self.test_result = WorkflowStatusItem().compute('{{ global_title|default:"FAIL" }}')
self.l10n_month = WorkflowStatusItem().compute('{{ "10/10/2010"|date:"F" }}')
class AfterJobForTestWithException(AfterJob):
def execute(self):
raise ZeroDivisionError()
def test_runjob(pub):
with pytest.raises(CommandError):
call_command('runjob')
with pytest.raises(CommandError):
call_command('runjob', '--domain=example.net', '--job-id=%s' % 'invalid')
pub.load_site_options()
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'global_title', 'HELLO')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
job = AfterJobForTest(label='test')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'completed'
assert AfterJob.get(job.id).test_result == 'HELLO'
assert AfterJob.get(job.id).l10n_month == 'October'
pub.cfg['language'] = {'language': 'fr'}
pub.write_cfg()
job = AfterJobForTest(label='test2')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'completed'
assert AfterJob.get(job.id).l10n_month == 'octobre'
completion_time = AfterJob.get(job.id).completion_time
# running again the job will skip it
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).completion_time == completion_time
# --force-replay will force the job to run again
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay')
assert AfterJob.get(job.id).completion_time != completion_time
# test exception handling
job = AfterJobForTestWithException(label='test3')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'failed'
assert 'ZeroDivisionError' in AfterJob.get(job.id).exception
# check --raise
with pytest.raises(ZeroDivisionError):
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay', '--raise')
def test_ctl_print_help(capsys):
ctl = wcs.qommon.ctl.Ctl(cmd_prefixes=['wcs.ctl'])
with pytest.raises(SystemExit):
ctl.print_help()
captured = capsys.readouterr()
assert 'runscript' in captured.out
def test_ctl_no_command(capsys):
ctl = wcs.qommon.ctl.Ctl(cmd_prefixes=['wcs.ctl'])
old_argv, sys.argv = sys.argv, ['wcsctl']
try:
with pytest.raises(SystemExit):
ctl.run(None)
captured = capsys.readouterr()
assert 'error: You must use a command' in captured.err
finally:
sys.argv = old_argv
def test_dbshell(pub):
with pytest.raises(CommandError):
call_command('dbshell') # missing tenant name
with mock.patch('subprocess.call' if django.VERSION < (3, 2) else 'subprocess.run') as call:
call.side_effect = lambda *args, **kwargs: 0
call_command('dbshell', '--domain', 'example.net')
assert call.call_args[0][-1][0] == 'psql'
assert call.call_args[0][-1][-1] == pub.cfg['postgresql']['database']
def test_makemessages(pub):
# just make sure it loads correctly
with pytest.raises(SystemExit):
call_command('makemessages', '--help')
def test_grep(pub):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [StringField(id='1', label='Your Name'), EmailField(id='2', label='Email')]
formdef.options = {'x': 'Name'}
formdef.store()
workflow = Workflow()
workflow.name = 'test'
st = workflow.add_status('status')
st.add_action('aggregationemail')
workflow.store()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.org/api/test', 'qs_data': {'a': 'b'}}
wscall.store()
mail_template = MailTemplate(name='test mail template')
mail_template.subject = 'test subject'
mail_template.body = 'test body'
mail_template.attachments = ['form_var_file1_raw']
mail_template.store()
with pytest.raises(CommandError):
call_command('grep')
with pytest.raises(CommandError):
call_command('grep', 'xxx')
with pytest.raises(CommandError):
call_command('grep', '--all-tenants', '--domain', 'example.net', 'xxx')
with pytest.raises(CommandError):
call_command('grep', '--domain', 'example.net', '--type', 'foo', 'xxx')
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', '--type', 'action-types', 'email')
assert print_hit.call_args[0] == ('http://example.net/backoffice/workflows/1/status/1/items/1/',)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', '--type', 'field-types', 'email')
assert print_hit.call_args[0] == ('http://example.net/backoffice/forms/1/fields/2/',)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'Name')
assert print_hit.call_count == 2
assert print_hit.call_args_list[0].args == (
'http://example.net/backoffice/forms/1/fields/1/',
'Your Name',
)
assert print_hit.call_args_list[1].args == (
'http://example.net/backoffice/forms/1/workflow-variables',
'Name',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', '/api/test')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/settings/wscalls/hello/',
'http://example.org/api/test',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'form_var_file1_raw')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/mail-templates/1/',
'form_var_file1_raw',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'xxx')
assert print_hit.call_count == 0
print_hit.reset_mock()
def test_grep_prefill(pub):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
# template prefill
formdef.fields = [
StringField(
id='1', label='Your Name', prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'}
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'prefill_string')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'a{{foo.prefill_string}}b',
)
# formula prefill
formdef.fields = [
StringField(id='1', label='Your Name', prefill={'type': 'formula', 'value': 'form_var_foo'})
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_var_foo')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'form_var_foo',
)
@pytest.mark.parametrize('data_source_type', ['json', 'jsonp', 'python'])
def test_grep_data_source(pub, data_source_type):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
# template prefill
formdef.fields = [
ItemField(
id='1',
label='Your Name',
data_source={'type': data_source_type, 'value': '{{ machin_url }}/data-source/x/'},
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'data-source/x')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'{{ machin_url }}/data-source/x/',
)
def test_grep_create_carddata(pub):
CardDef.wipe()
FormDef.wipe()
Workflow.wipe()
carddef = CardDef()
carddef.name = 'My card'
carddef.fields = [
StringField(id='1', label='string'),
]
carddef.store()
wf = Workflow(name='create-carddata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = wf.possible_status[1].add_action('create_carddata', id='_create', prepend=True)
create.label = 'Create CardDef'
create.varname = 'mycard'
create.formdef_slug = carddef.url_name
create.mappings = [
Mapping(field_id='1', expression='{{ foo_bar }}'),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/_create/',
'{{ foo_bar }}',
)
def test_grep_edit_carddata(pub):
CardDef.wipe()
FormDef.wipe()
Workflow.wipe()
carddef = CardDef()
carddef.name = 'My card'
carddef.fields = [
StringField(id='1', label='string'),
]
carddef.store()
wf = Workflow(name='edit-carddata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
edit = wf.possible_status[1].add_action('edit_carddata', id='edit', prepend=True)
edit.label = 'Edit CardDef'
edit.varname = 'mycard'
edit.formdef_slug = carddef.url_name
edit.mappings = [
Mapping(field_id='1', expression='{{ foo_bar }}'),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/edit/',
'{{ foo_bar }}',
)
def test_grep_backoffice_fields(pub):
Workflow.wipe()
wf = Workflow(name='test-backoffice-fields')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(
id='bo1',
label='field',
type='string',
varname='blah',
prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'},
),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'prefill_string')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/backoffice-fields/fields/bo1/',
'a{{foo.prefill_string}}b',
)
def test_grep_webservice_call(pub):
FormDef.wipe()
Workflow.wipe()
wf = Workflow(name='webservice-call')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
webservice_call = wf.possible_status[1].add_action('webservice_call', id='webservice-call', prepend=True)
webservice_call.url = 'http://remote.example.net'
webservice_call.qs_data = {
'django': '{{ form_number }}',
}
webservice_call.post_data = {
'django': '{{ foo_bar }}',
}
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_number')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
'{{ form_number }}',
)
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
'{{ foo_bar }}',
)
def test_grep_set_backoffice_fields_action(pub):
FormDef.wipe()
Workflow.wipe()
wf = Workflow(name='webservice-call')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string', varname='plop'),
]
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
set_backoffice_fields = wf.possible_status[1].add_action(
'set-backoffice-fields', id='set-backoffice-fields', prepend=True
)
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': '{{ foo_bar }}'}]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/set-backoffice-fields/',
'{{ foo_bar }}',
)
def test_grep_action_condition(pub):
Workflow.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.store()
workflow.possible_status[0].items[2].condition = {'type': 'django', 'value': 'foo_bar'}
workflow.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/2/status/just_submitted/items/_jump_to_new/',
'foo_bar',
)
def test_grep_field_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.id = '2'
formdef.name = 'Foo'
formdef.fields = [
StringField(
type='string',
id='1',
label='Bar',
size='40',
required=True,
condition={'type': 'django', 'value': 'foo_bar'},
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/1/',
'foo_bar',
)
def test_grep_page_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.store()
formdef.fields = [
PageField(
id='0',
label='1st page',
type='page',
condition={'type': 'django', 'value': 'foo_bar'},
post_conditions=[
{
'condition': {'type': 'django', 'value': 'form_xx'},
'error_message': 'You shall not pass.',
}
],
),
]
formdef.id = '2'
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/0/',
'foo_bar',
)
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_xx')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/0/',
'form_xx',
)
def test_grep_workflow_options(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_options = {'a': 'foo_bar'}
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/workflow-variables',
'foo_bar',
)
def test_grep_block(pub):
FormDef.wipe()
BlockDef.wipe()
blockdef = BlockDef()
blockdef.name = 'Foo'
blockdef.fields = [StringField(type='string', id='1', label='bar', size='40', required=True)]
blockdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/blocks/%s/1/' % blockdef.id,
'bar',
)
def test_grep_workflow_multiple(pub):
FormDef.wipe()
BlockDef.wipe()
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
choice = st1.add_action('choice')
choice.label = '{{foo_bar}}'
workflow.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_unique_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
'{{foo_bar}}',
)
assert print_unique_hit.call_count == 1
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
call_command('grep', '--domain', 'example.net', '--urls', 'foo_bar')
assert print_unique_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
)
assert print_unique_hit.call_count == 1