wcs/tests/test_ctl.py

1138 lines
39 KiB
Python

import io
import json
import os
import pickle
import shutil
import tempfile
import zipfile
from unittest import mock
import django
import psycopg2
import pytest
import responses
from django.core.management import CommandError, call_command
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.ctl.management.commands.trigger_jumps import select_and_jump_formdata
from wcs.fields import EmailField, ItemField, PageField, StringField
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
from wcs.qommon.management.commands.migrate_schemas import Command as CmdMigrateSchemas
from wcs.sql import cleanup_connection, get_connection_and_cursor
from wcs.wf.create_formdata import Mapping
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
from wcs.wscalls import NamedWsCall
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub():
pub = create_temporary_pub()
cleanup_connection()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
yield pub
clean_temporary_pub()
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def alt_tempdir():
alt_tempdir = tempfile.mkdtemp()
yield alt_tempdir
shutil.rmtree(alt_tempdir)
def test_collectstatic(pub, tmp_path):
CmdCollectStatic.collectstatic(pub)
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'js', 'qommon.forms.js'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'gadjo.css'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'xstatic', 'jquery.js'))
CmdCollectStatic.collectstatic(pub, clear=True, link=True)
assert os.path.islink(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
# create a broken link
required_tmp = os.path.join(tmp_path, 'required.png')
required_link = os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png')
shutil.copy2(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'), required_tmp)
os.unlink(required_link)
os.symlink(required_tmp, required_link)
os.unlink(required_tmp)
# check that we have a broken link
assert os.path.islink(required_link) and not os.path.exists(required_link)
# still works if broken link exists
CmdCollectStatic.collectstatic(pub, link=True)
# link not broken any more
assert os.path.islink(required_link) and os.path.exists(required_link)
def test_migrate(pub):
pub.cleanup()
CmdMigrate().handle()
def test_migrate_schemas(pub):
pub.cleanup()
CmdMigrateSchemas().handle()
def test_wipe_formdata(pub):
form_1 = FormDef()
form_1.name = 'example'
form_1.fields = [StringField(id='0', label='Your Name'), EmailField(id='1', label='Email')]
form_1.store()
form_1.data_class().wipe()
formdata_1 = form_1.data_class()()
formdata_1.data = {'0': 'John Doe', '1': 'john@example.net'}
formdata_1.store()
assert form_1.data_class().count() == 1
form_2 = FormDef()
form_2.name = 'example2'
form_2.fields = [StringField(id='0', label='First Name'), StringField(id='1', label='Last Name')]
form_2.store()
form_2.data_class().wipe()
formdata_2 = form_2.data_class()()
formdata_2.data = {'0': 'John', '1': 'Doe'}
formdata_2.store()
assert form_2.data_class().count() == 1
# no support for --all-tenants
with pytest.raises(CommandError):
call_command('wipe_data', '--all-tenants')
# dry-run mode
output = io.StringIO()
call_command('wipe_data', '--domain=example.net', '--all', stdout=output)
assert form_1.data_class().count() == 1
assert form_2.data_class().count() == 1
assert (
output.getvalue()
== '''SIMULATION MODE: no actual wiping will happen.
(use --no-simulate after checking results)
example: 1
example2: 1
'''
)
# test with no options
call_command('wipe_data', '--domain=example.net', '--no-simulate')
assert form_1.data_class().count() == 1
assert form_2.data_class().count() == 1
# wipe one form formdatas
call_command('wipe_data', '--domain=example.net', '--no-simulate', '--forms=%s' % form_1.url_name)
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 1
# wipe all formdatas
call_command('wipe_data', '--domain=example.net', '--no-simulate', '--all')
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 0
# exclude some forms
formdata_1.store()
formdata_2.store()
call_command(
'wipe_data', '--domain=example.net', '--no-simulate', '--all', '--exclude-forms=%s' % form_2.url_name
)
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 1
def test_trigger_jumps(pub):
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'goto2'
jump.mode = 'trigger'
jump.status = 'st2'
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
StringField(id='0', label='Your Name', varname='name'),
EmailField(id='1', label='Email', varname='email'),
]
formdef.workflow_id = workflow.id
formdef.store()
def run_trigger(trigger, rows):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.id = 1
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id1 = formdata.id
formdata = formdef.data_class()()
formdata.id = 2
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id2 = formdata.id
select_and_jump_formdata(formdef, trigger, rows)
return formdef.data_class().get(id1), formdef.data_class().get(id2)
f1, f2 = run_trigger('goto2', '__all__')
assert f1.status == f2.status == 'wf-%s' % st2.id
# check publisher substitutions vars after the last jump_and_perform (#13964)
assert pub in pub.substitutions.sources
assert formdef in pub.substitutions.sources
# we cannot know which formdata is the last one, test each possibility
if f1 in pub.substitutions.sources:
assert f2 not in pub.substitutions.sources
if f2 in pub.substitutions.sources:
assert f1 not in pub.substitutions.sources
f1, f2 = run_trigger('goto2', [{'select': {}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
assert f1.status == 'wf-%s' % st2.id
assert f2.status == 'wf-%s' % st1.id
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
assert f1.status == 'wf-%s' % st1.id
assert f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {}, 'data': {'foo': 'bar'}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}, 'data': {'foo': 'bar'}}])
assert f1.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == 'bar'
assert f2.status == 'wf-%s' % st1.id
assert not f2.workflow_data
f1, f2 = run_trigger('badtrigger', '__all__')
assert f1.status == f2.status == 'wf-%s' % st1.id
assert not f1.workflow_data
assert not f2.workflow_data
def test_delete_tenant_with_sql(freezer):
pub = create_temporary_pub()
assert os.path.isdir(pub.app_dir)
freezer.move_to('2018-12-01T00:00:00')
call_command('delete_tenant', '--vhost=example.net')
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
conn, cur = get_connection_and_cursor()
cur.execute(
"""SELECT schema_name
FROM information_schema.schemata
WHERE schema_name like 'removed_20181201_%%%s'"""
% pub.cfg['postgresql']['database']
)
assert len(cur.fetchall()) == 1
clean_temporary_pub()
pub = create_temporary_pub()
call_command('delete_tenant', '--vhost=example.net', '--force-drop')
conn, cur = get_connection_and_cursor(new=True)
assert not os.path.isdir(pub.app_dir)
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'"""
)
assert not cur.fetchall()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert cur.fetchall()
clean_temporary_pub()
pub = create_temporary_pub()
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres',
}
pub.write_cfg()
pub.cleanup()
call_command('delete_tenant', '--vhost=example.net', '--force-drop')
connect_kwargs = {'dbname': 'postgres', 'user': pub.cfg['postgresql']['user']}
pgconn = psycopg2.connect(**connect_kwargs)
cur = pgconn.cursor()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert not cur.fetchall()
cur.close()
pgconn.close()
clean_temporary_pub()
pub = create_temporary_pub()
cleanup_connection()
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres',
}
pub.write_cfg()
call_command('delete_tenant', '--vhost=example.net')
cleanup_connection()
pgconn = psycopg2.connect(**connect_kwargs)
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname like 'removed_20181201_%%%s'"""
% pub.cfg['postgresql']['database']
)
result = cur.fetchall()
assert len(result) == 1
# clean this db after test
cur.execute("""DROP DATABASE %s""" % result[0][0])
cur.execute(
"""SELECT datname
FROM pg_database
WHERE datname = '%s'"""
% pub.cfg['postgresql']['database']
)
assert not cur.fetchall()
cur.close()
conn.close()
clean_temporary_pub()
def test_rebuild_indexes(pub):
form = FormDef()
form.name = 'example'
form.store()
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
os.unlink(os.path.join(pub.app_dir, 'formdefs-url_name', 'example'))
os.symlink('../formdefs/1', os.path.join(pub.app_dir, 'formdefs-url_name', 'XXX'))
call_command('rebuild_indexes', '--all-tenants')
assert 'example' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
assert 'XXX' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
call_command('rebuild_indexes', '--all-tenants', '--destroy')
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
def test_runscript(pub):
with pytest.raises(CommandError):
call_command('runscript')
with pytest.raises(CommandError):
call_command('runscript', '--domain=a', '--all-tenants')
with open(os.path.join(pub.app_dir, 'test2.py'), 'w') as fd:
fd.write(
'''
import os
from quixote import get_publisher
open(os.path.join(get_publisher().app_dir, 'runscript.test'), 'w').close()
'''
)
call_command('runscript', '--domain=example.net', os.path.join(pub.app_dir, 'test2.py'))
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
os.unlink(os.path.join(pub.app_dir, 'runscript.test'))
call_command('runscript', '--all-tenants', os.path.join(pub.app_dir, 'test2.py'))
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
os.unlink(os.path.join(pub.app_dir, 'runscript.test'))
call_command(
'runscript', '--all-tenants', '--exclude-tenants=example.net', os.path.join(pub.app_dir, 'test2.py')
)
assert not os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
call_command(
'runscript', '--all-tenants', '--exclude-tenants=example2.net', os.path.join(pub.app_dir, 'test2.py')
)
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
def test_import_site():
with pytest.raises(CommandError):
call_command('import_site')
pub = create_temporary_pub()
FormDef.wipe()
assert FormDef.count() == 0
assert 'workflows' not in os.listdir(pub.app_dir)
site_zip_path = os.path.join(os.path.dirname(__file__), 'site.zip')
call_command('import_site', '--domain=example.net', site_zip_path)
assert FormDef.count() == 1
assert Workflow.count() == 1
assert 'workflows' in os.listdir(pub.app_dir)
FormDef.wipe()
assert FormDef.count() == 0
assert Workflow.count() == 1
call_command('import_site', '--domain=example.net', '--if-empty', site_zip_path)
assert FormDef.count() == 0
assert Workflow.count() == 1
site_zip_path = os.path.join(os.path.dirname(__file__), 'missing_file.zip')
with pytest.raises(CommandError, match='missing file:'):
call_command('import_site', '--domain=example.net', site_zip_path)
def test_export_site(tmp_path):
pub = create_temporary_pub()
Workflow.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.store()
site_zip_path = os.path.join(tmp_path, 'site.zip')
call_command('export_site', '--domain=example.net', f'--output={site_zip_path}')
with zipfile.ZipFile(site_zip_path, mode='r') as zfile:
assert set(zfile.namelist()) == {'formdefs_xml/1', 'config.pck'}
assert 'postgresql' in pub.cfg
assert 'postgresql' not in pickle.loads(zfile.read('config.pck'))
def test_shell():
with pytest.raises(CommandError):
call_command('shell') # missing tenant name
class AfterJobForTest(AfterJob):
def execute(self):
self.test_result = WorkflowStatusItem().compute('{{ global_title|default:"FAIL" }}')
self.l10n_month = WorkflowStatusItem().compute('{{ "10/10/2010"|date:"F" }}')
class AfterJobForTestWithException(AfterJob):
def execute(self):
raise ZeroDivisionError()
def test_runjob(pub):
with pytest.raises(CommandError):
call_command('runjob')
with pytest.raises(CommandError):
call_command('runjob', '--domain=example.net', '--job-id=%s' % 'invalid')
pub.load_site_options()
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'global_title', 'HELLO')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
job = AfterJobForTest(label='test')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'completed'
assert AfterJob.get(job.id).test_result == 'HELLO'
assert AfterJob.get(job.id).l10n_month == 'October'
pub.cfg['language'] = {'language': 'fr'}
pub.write_cfg()
job = AfterJobForTest(label='test2')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'completed'
assert AfterJob.get(job.id).l10n_month == 'octobre'
completion_time = AfterJob.get(job.id).completion_time
# running again the job will skip it
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).completion_time == completion_time
# --force-replay will force the job to run again
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay')
assert AfterJob.get(job.id).completion_time != completion_time
# test exception handling
job = AfterJobForTestWithException(label='test3')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'failed'
assert 'ZeroDivisionError' in AfterJob.get(job.id).exception
# check --raise
with pytest.raises(ZeroDivisionError):
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay', '--raise')
def test_dbshell(pub):
with pytest.raises(CommandError):
call_command('dbshell') # missing tenant name
with mock.patch('subprocess.call' if django.VERSION < (3, 2) else 'subprocess.run') as call:
call.side_effect = lambda *args, **kwargs: 0
call_command('dbshell', '--domain', 'example.net')
assert call.call_args[0][-1][0] == 'psql'
assert call.call_args[0][-1][-1] == pub.cfg['postgresql']['database']
def test_makemessages(pub):
# just make sure it loads correctly
with pytest.raises(SystemExit):
call_command('makemessages', '--help')
def test_grep(pub):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [StringField(id='1', label='Your Name'), EmailField(id='2', label='Email')]
formdef.options = {'x': 'Name'}
formdef.store()
workflow = Workflow()
workflow.name = 'test'
st = workflow.add_status('status')
st.add_action('aggregationemail')
workflow.store()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.org/api/test', 'qs_data': {'a': 'b'}}
wscall.store()
mail_template = MailTemplate(name='test mail template')
mail_template.subject = 'test subject'
mail_template.body = 'test body'
mail_template.attachments = ['form_var_file1_raw']
mail_template.store()
with pytest.raises(CommandError):
call_command('grep')
with pytest.raises(CommandError):
call_command('grep', 'xxx')
with pytest.raises(CommandError):
call_command('grep', '--all-tenants', '--domain', 'example.net', 'xxx')
with pytest.raises(CommandError):
call_command('grep', '--domain', 'example.net', '--type', 'foo', 'xxx')
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', '--type', 'action-types', 'email')
assert print_hit.call_args[0] == ('http://example.net/backoffice/workflows/1/status/1/items/1/',)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', '--type', 'field-types', 'email')
assert print_hit.call_args[0] == ('http://example.net/backoffice/forms/1/fields/2/',)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'Name')
assert print_hit.call_count == 2
assert print_hit.call_args_list[0].args == (
'http://example.net/backoffice/forms/1/fields/1/',
'Your Name',
)
assert print_hit.call_args_list[1].args == (
'http://example.net/backoffice/forms/1/workflow-variables',
'Name',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', '/api/test')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/settings/wscalls/hello/',
'http://example.org/api/test',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'form_var_file1_raw')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/mail-templates/1/',
'form_var_file1_raw',
)
print_hit.reset_mock()
call_command('grep', '--domain', 'example.net', 'xxx')
assert print_hit.call_count == 0
print_hit.reset_mock()
def test_grep_prefill(pub):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
# template prefill
formdef.fields = [
StringField(
id='1', label='Your Name', prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'}
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'prefill_string')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'a{{foo.prefill_string}}b',
)
# formula prefill
formdef.fields = [
StringField(id='1', label='Your Name', prefill={'type': 'formula', 'value': 'form_var_foo'})
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_var_foo')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'form_var_foo',
)
@pytest.mark.parametrize('data_source_type', ['json', 'jsonp', 'python'])
def test_grep_data_source(pub, data_source_type):
FormDef.wipe()
Workflow.wipe()
NamedWsCall.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'test'
# template prefill
formdef.fields = [
ItemField(
id='1',
label='Your Name',
data_source={'type': data_source_type, 'value': '{{ machin_url }}/data-source/x/'},
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'data-source/x')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/fields/1/',
'{{ machin_url }}/data-source/x/',
)
def test_grep_create_carddata(pub):
CardDef.wipe()
FormDef.wipe()
Workflow.wipe()
carddef = CardDef()
carddef.name = 'My card'
carddef.fields = [
StringField(id='1', label='string'),
]
carddef.store()
wf = Workflow(name='create-carddata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = wf.possible_status[1].add_action('create_carddata', id='_create', prepend=True)
create.label = 'Create CardDef'
create.varname = 'mycard'
create.formdef_slug = carddef.url_name
create.mappings = [
Mapping(field_id='1', expression='{{ foo_bar }}'),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/_create/',
'{{ foo_bar }}',
)
def test_grep_edit_carddata(pub):
CardDef.wipe()
FormDef.wipe()
Workflow.wipe()
carddef = CardDef()
carddef.name = 'My card'
carddef.fields = [
StringField(id='1', label='string'),
]
carddef.store()
wf = Workflow(name='edit-carddata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
edit = wf.possible_status[1].add_action('edit_carddata', id='edit', prepend=True)
edit.label = 'Edit CardDef'
edit.varname = 'mycard'
edit.formdef_slug = carddef.url_name
edit.mappings = [
Mapping(field_id='1', expression='{{ foo_bar }}'),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/edit/',
'{{ foo_bar }}',
)
def test_grep_backoffice_fields(pub):
Workflow.wipe()
wf = Workflow(name='test-backoffice-fields')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(
id='bo1',
label='field',
varname='blah',
prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'},
),
]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'prefill_string')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/backoffice-fields/fields/bo1/',
'a{{foo.prefill_string}}b',
)
def test_grep_webservice_call(pub):
FormDef.wipe()
Workflow.wipe()
wf = Workflow(name='webservice-call')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
webservice_call = wf.possible_status[1].add_action('webservice_call', id='webservice-call', prepend=True)
webservice_call.url = 'http://remote.example.net'
webservice_call.qs_data = {
'django': '{{ form_number }}',
}
webservice_call.post_data = {
'django': '{{ foo_bar }}',
}
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_number')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
'{{ form_number }}',
)
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
'{{ foo_bar }}',
)
def test_grep_set_backoffice_fields_action(pub):
FormDef.wipe()
Workflow.wipe()
wf = Workflow(name='webservice-call')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', varname='plop'),
]
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
set_backoffice_fields = wf.possible_status[1].add_action(
'set-backoffice-fields', id='set-backoffice-fields', prepend=True
)
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': '{{ foo_bar }}'}]
wf.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/new/items/set-backoffice-fields/',
'{{ foo_bar }}',
)
def test_grep_action_condition(pub):
Workflow.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.store()
workflow.possible_status[0].items[2].condition = {'type': 'django', 'value': 'foo_bar'}
workflow.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/workflows/2/status/just_submitted/items/_jump_to_new/',
'foo_bar',
)
def test_grep_field_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.id = '2'
formdef.name = 'Foo'
formdef.fields = [
StringField(
id='1',
label='Bar',
size='40',
required=True,
condition={'type': 'django', 'value': 'foo_bar'},
)
]
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/1/',
'foo_bar',
)
def test_grep_page_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.store()
formdef.fields = [
PageField(
id='0',
label='1st page',
condition={'type': 'django', 'value': 'foo_bar'},
post_conditions=[
{
'condition': {'type': 'django', 'value': 'form_xx'},
'error_message': 'You shall not pass.',
}
],
),
]
formdef.id = '2'
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/0/',
'foo_bar',
)
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'form_xx')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/2/fields/0/',
'form_xx',
)
def test_grep_workflow_options(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_options = {'a': 'foo_bar'}
formdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/1/workflow-variables',
'foo_bar',
)
def test_grep_block(pub):
FormDef.wipe()
BlockDef.wipe()
blockdef = BlockDef()
blockdef.name = 'Foo'
blockdef.fields = [
StringField(
id='1',
label='bar',
size='40',
required=True,
post_conditions=[
{
'condition': {'type': 'django', 'value': 'webservice.world == "test"'},
'error_message': 'error',
},
],
)
]
blockdef.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'bar')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/blocks/%s/1/' % blockdef.id,
'bar',
)
call_command('grep', '--domain', 'example.net', 'webservice.world')
assert print_hit.call_args[0] == (
'http://example.net/backoffice/forms/blocks/%s/1/' % blockdef.id,
'bar',
)
def test_grep_workflow_multiple(pub):
FormDef.wipe()
BlockDef.wipe()
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
choice = st1.add_action('choice')
choice.label = '{{foo_bar}}'
workflow.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_unique_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
'{{foo_bar}}',
)
assert print_unique_hit.call_count == 1
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
call_command('grep', '--domain', 'example.net', '--urls', 'foo_bar')
assert print_unique_hit.call_args[0] == (
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
)
assert print_unique_hit.call_count == 1
def test_grep_workflow_global_action_trigger(pub):
FormDef.wipe()
BlockDef.wipe()
Workflow.wipe()
workflow = Workflow(name='test')
action = workflow.add_global_action('FOOBAR')
trigger1 = action.append_trigger('timeout')
trigger1.anchor = 'creation'
trigger1.timeout = '{{ form_var_foo_bar }}'
trigger2 = action.append_trigger('webservice')
trigger2.identifier = 'delete'
workflow.store()
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
call_command('grep', '--domain', 'example.net', 'foo_bar')
assert print_hit.call_args[0] == (
f'http://example.net/backoffice/workflows/{workflow.id}/global-actions/{action.id}/triggers/{trigger1.id}/',
'{{ form_var_foo_bar }}',
)
def test_configdb(pub):
call_command('configdb', '--domain', 'example.net')
call_command('configdb', '--domain', 'example.net', '--info')
database = pub.cfg['postgresql']['database']
user = pub.cfg['postgresql']['user']
pub.cfg['postgresql']['database'] = ''
pub.write_cfg()
call_command('configdb', '--domain', 'example.net', '--database', database, '--user', user)
pub.reload_cfg()
assert pub.cfg['postgresql']['database'] == database
@pytest.mark.parametrize('source_type', ['local-file', 'http'])
def test_replace_python(pub, alt_tempdir, source_type):
FormDef.wipe()
Workflow.wipe()
MailTemplate.wipe()
formdef = FormDef()
formdef.name = 'Foo'
formdef.fields = [
PageField(
id='1',
label='Bar',
size='40',
required=True,
condition={'type': 'python', 'value': 'python condition'},
post_conditions=[
{
'condition': {'type': 'python', 'value': 'python condition'},
'error_message': 'You shall not pass.',
},
],
),
StringField(id='2', label='Foo', prefill={'type': 'formula', 'value': 'form_var_foo'}),
StringField(id='3', label='Bar', prefill={'type': 'formula', 'value': 'form_var_bar'}),
]
formdef.store()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
item = st1.add_action('choice')
item.label = 'label'
item.condition = {'type': 'python', 'value': 'no replacement'}
item = st1.add_action('choice')
item.label = 'label2'
item.condition = {'type': 'python', 'value': 'python condition'}
item = st1.add_action('set-backoffice-fields')
item.fields = [{'field_id': 'bo1', 'value': '=form_var_foo'}]
item = st1.add_action('create_formdata')
item.varname = 'resubmitted'
item.mappings = [
Mapping(field_id='0', expression='=form_var_foo'),
]
item = st1.add_action('sendmail')
item.to = ['=form_var_foo']
item.subject = '=form_var_foo'
item.attachments = ['{{ form_var_xxx }}', 'getattr(form_attachments, "form_var_bar", None)']
item = st1.add_action('webservice_call')
item.varname = 'xxx'
item.post_data = {'str': 'abcd', 'expr': '=form_var_foo'}
item.qs_data = {'str': 'abcd', 'expr': '=form_var_foo', 'expr2': '=form_var_bar'}
global_action = workflow.add_global_action('foobar')
trigger1 = global_action.append_trigger('timeout')
trigger1.anchor = 'python'
trigger1.anchor_expression = 'datetime.date(2023, 12, 27)'
trigger2 = global_action.append_trigger('timeout')
trigger2.anchor = 'python'
trigger2.anchor_expression = 'nope'
workflow.store()
mail_template = MailTemplate(name='test mail template')
mail_template.subject = '=form_var_foo'
mail_template.attachments = ['{{ form_var_xxx }}', 'getattr(form_attachments, "form_var_bar", None)']
mail_template.store()
with open(os.path.join(alt_tempdir, 'replacements.json'), 'w') as fp:
replacements = {
'conditions': {
'python condition': 'django condition',
},
'templates': {
'form_var_foo': '{{ form_var_foo }}',
'datetime.date(2023, 12, 27)': '2023-12-27',
'getattr(form_attachments, "form_var_bar", None)': '{{ form_var_bar }}',
},
}
json.dump(replacements, fp)
if source_type == 'local-file':
call_command(
'replace_python',
'--domain',
'example.net',
'--filename',
os.path.join(alt_tempdir, 'replacements.json'),
)
else:
source = 'https://test/replacements.json'
with responses.RequestsMock() as rsps:
rsps.get(source, json=replacements)
call_command('replace_python', '--domain', 'example.net', '--url', source)
formdef.refresh_from_storage()
assert formdef.fields[0].condition == {'type': 'django', 'value': 'django condition'}
assert formdef.fields[0].post_conditions == [
{
'condition': {'type': 'django', 'value': 'django condition'},
'error_message': 'You shall not pass.',
},
]
assert formdef.fields[1].prefill == {'type': 'string', 'value': '{{ form_var_foo }}'}
assert formdef.fields[2].prefill == {'type': 'formula', 'value': 'form_var_bar'} # no replacement
workflow.refresh_from_storage()
assert workflow.possible_status[0].items[0].condition == {'type': 'python', 'value': 'no replacement'}
assert workflow.possible_status[0].items[1].condition == {'type': 'django', 'value': 'django condition'}
assert workflow.possible_status[0].items[2].fields == [{'field_id': 'bo1', 'value': '{{ form_var_foo }}'}]
assert workflow.possible_status[0].items[3].mappings[0].expression == '{{ form_var_foo }}'
assert workflow.possible_status[0].items[4].subject == '{{ form_var_foo }}'
assert workflow.possible_status[0].items[4].to == ['{{ form_var_foo }}']
assert workflow.possible_status[0].items[4].attachments == ['{{ form_var_xxx }}', '{{ form_var_bar }}']
assert workflow.possible_status[0].items[5].post_data == {'str': 'abcd', 'expr': '{{ form_var_foo }}'}
assert workflow.possible_status[0].items[5].qs_data == {
'str': 'abcd',
'expr': '{{ form_var_foo }}',
'expr2': '=form_var_bar',
}
assert workflow.global_actions[0].triggers[1].anchor == 'template'
assert workflow.global_actions[0].triggers[1].anchor_expression is None
assert workflow.global_actions[0].triggers[1].anchor_template == '2023-12-27'
assert workflow.global_actions[0].triggers[2].anchor == 'python'
assert workflow.global_actions[0].triggers[2].anchor_expression == 'nope'
mail_template.refresh_from_storage()
assert mail_template.subject == '{{ form_var_foo }}'
assert mail_template.attachments == ['{{ form_var_xxx }}', '{{ form_var_bar }}']