1118 lines
38 KiB
Python
1118 lines
38 KiB
Python
import io
|
|
import json
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
from unittest import mock
|
|
|
|
import django
|
|
import psycopg2
|
|
import pytest
|
|
import responses
|
|
from django.core.management import CommandError, call_command
|
|
|
|
from wcs.blocks import BlockDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.ctl.management.commands.trigger_jumps import select_and_jump_formdata
|
|
from wcs.fields import EmailField, ItemField, PageField, StringField
|
|
from wcs.formdef import FormDef
|
|
from wcs.mail_templates import MailTemplate
|
|
from wcs.qommon.afterjobs import AfterJob
|
|
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
|
|
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
|
|
from wcs.qommon.management.commands.migrate_schemas import Command as CmdMigrateSchemas
|
|
from wcs.sql import cleanup_connection, get_connection_and_cursor
|
|
from wcs.wf.create_formdata import Mapping
|
|
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
|
|
from wcs.wscalls import NamedWsCall
|
|
|
|
from .utilities import clean_temporary_pub, create_temporary_pub
|
|
|
|
|
|
@pytest.fixture
|
|
def pub():
|
|
pub = create_temporary_pub()
|
|
cleanup_connection()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
yield pub
|
|
clean_temporary_pub()
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
@pytest.fixture
|
|
def alt_tempdir():
|
|
alt_tempdir = tempfile.mkdtemp()
|
|
yield alt_tempdir
|
|
shutil.rmtree(alt_tempdir)
|
|
|
|
|
|
def test_collectstatic(pub, tmp_path):
|
|
CmdCollectStatic.collectstatic(pub)
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'js', 'qommon.forms.js'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'gadjo.css'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'xstatic', 'jquery.js'))
|
|
CmdCollectStatic.collectstatic(pub, clear=True, link=True)
|
|
assert os.path.islink(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'))
|
|
|
|
# create a broken link
|
|
required_tmp = os.path.join(tmp_path, 'required.png')
|
|
required_link = os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png')
|
|
shutil.copy2(os.path.join(pub.app_dir, 'collectstatic', 'css', 'required.png'), required_tmp)
|
|
os.unlink(required_link)
|
|
os.symlink(required_tmp, required_link)
|
|
os.unlink(required_tmp)
|
|
# check that we have a broken link
|
|
assert os.path.islink(required_link) and not os.path.exists(required_link)
|
|
# still works if broken link exists
|
|
CmdCollectStatic.collectstatic(pub, link=True)
|
|
# link not broken any more
|
|
assert os.path.islink(required_link) and os.path.exists(required_link)
|
|
|
|
|
|
def test_migrate(pub):
|
|
pub.cleanup()
|
|
CmdMigrate().handle()
|
|
|
|
|
|
def test_migrate_schemas(pub):
|
|
pub.cleanup()
|
|
CmdMigrateSchemas().handle()
|
|
|
|
|
|
def test_wipe_formdata(pub):
|
|
form_1 = FormDef()
|
|
form_1.name = 'example'
|
|
form_1.fields = [StringField(id='0', label='Your Name'), EmailField(id='1', label='Email')]
|
|
form_1.store()
|
|
form_1.data_class().wipe()
|
|
formdata_1 = form_1.data_class()()
|
|
|
|
formdata_1.data = {'0': 'John Doe', '1': 'john@example.net'}
|
|
formdata_1.store()
|
|
|
|
assert form_1.data_class().count() == 1
|
|
|
|
form_2 = FormDef()
|
|
form_2.name = 'example2'
|
|
form_2.fields = [StringField(id='0', label='First Name'), StringField(id='1', label='Last Name')]
|
|
form_2.store()
|
|
form_2.data_class().wipe()
|
|
formdata_2 = form_2.data_class()()
|
|
formdata_2.data = {'0': 'John', '1': 'Doe'}
|
|
formdata_2.store()
|
|
assert form_2.data_class().count() == 1
|
|
|
|
# no support for --all-tenants
|
|
with pytest.raises(CommandError):
|
|
call_command('wipe_data', '--all-tenants')
|
|
|
|
# dry-run mode
|
|
output = io.StringIO()
|
|
call_command('wipe_data', '--domain=example.net', '--all', stdout=output)
|
|
assert form_1.data_class().count() == 1
|
|
assert form_2.data_class().count() == 1
|
|
assert (
|
|
output.getvalue()
|
|
== '''SIMULATION MODE: no actual wiping will happen.
|
|
(use --no-simulate after checking results)
|
|
|
|
example: 1
|
|
example2: 1
|
|
'''
|
|
)
|
|
|
|
# test with no options
|
|
call_command('wipe_data', '--domain=example.net', '--no-simulate')
|
|
assert form_1.data_class().count() == 1
|
|
assert form_2.data_class().count() == 1
|
|
|
|
# wipe one form formdatas
|
|
call_command('wipe_data', '--domain=example.net', '--no-simulate', '--forms=%s' % form_1.url_name)
|
|
assert form_1.data_class().count() == 0
|
|
assert form_2.data_class().count() == 1
|
|
|
|
# wipe all formdatas
|
|
call_command('wipe_data', '--domain=example.net', '--no-simulate', '--all')
|
|
assert form_1.data_class().count() == 0
|
|
assert form_2.data_class().count() == 0
|
|
|
|
# exclude some forms
|
|
formdata_1.store()
|
|
formdata_2.store()
|
|
call_command(
|
|
'wipe_data', '--domain=example.net', '--no-simulate', '--all', '--exclude-forms=%s' % form_2.url_name
|
|
)
|
|
assert form_1.data_class().count() == 0
|
|
assert form_2.data_class().count() == 1
|
|
|
|
|
|
def test_trigger_jumps(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = st1.add_action('jump')
|
|
jump.trigger = 'goto2'
|
|
jump.status = 'st2'
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
StringField(id='0', label='Your Name', varname='name'),
|
|
EmailField(id='1', label='Email', varname='email'),
|
|
]
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
def run_trigger(trigger, rows):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.id = 1
|
|
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
|
|
formdata.status = 'wf-%s' % st1.id
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
formdata = formdef.data_class()()
|
|
formdata.id = 2
|
|
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
|
|
formdata.status = 'wf-%s' % st1.id
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
select_and_jump_formdata(formdef, trigger, rows)
|
|
return formdef.data_class().get(id1), formdef.data_class().get(id2)
|
|
|
|
f1, f2 = run_trigger('goto2', '__all__')
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
|
|
# check publisher substitutions vars after the last jump_and_perform (#13964)
|
|
assert pub in pub.substitutions.sources
|
|
assert formdef in pub.substitutions.sources
|
|
# we cannot know which formdata is the last one, test each possibility
|
|
if f1 in pub.substitutions.sources:
|
|
assert f2 not in pub.substitutions.sources
|
|
if f2 in pub.substitutions.sources:
|
|
assert f1 not in pub.substitutions.sources
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {}}])
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
|
|
assert f1.status == 'wf-%s' % st2.id
|
|
assert f2.status == 'wf-%s' % st1.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
|
|
assert f1.status == 'wf-%s' % st1.id
|
|
assert f2.status == 'wf-%s' % st2.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {}, 'data': {'foo': 'bar'}}])
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}, 'data': {'foo': 'bar'}}])
|
|
assert f1.status == 'wf-%s' % st2.id
|
|
assert f1.workflow_data['foo'] == 'bar'
|
|
assert f2.status == 'wf-%s' % st1.id
|
|
assert not f2.workflow_data
|
|
|
|
f1, f2 = run_trigger('badtrigger', '__all__')
|
|
assert f1.status == f2.status == 'wf-%s' % st1.id
|
|
assert not f1.workflow_data
|
|
assert not f2.workflow_data
|
|
|
|
|
|
def test_delete_tenant_with_sql(freezer):
|
|
pub = create_temporary_pub()
|
|
|
|
assert os.path.isdir(pub.app_dir)
|
|
|
|
freezer.move_to('2018-12-01T00:00:00')
|
|
call_command('delete_tenant', '--vhost=example.net')
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
parent_dir = os.path.dirname(pub.app_dir)
|
|
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
|
|
assert False
|
|
|
|
conn, cur = get_connection_and_cursor()
|
|
cur.execute(
|
|
"""SELECT schema_name
|
|
FROM information_schema.schemata
|
|
WHERE schema_name like 'removed_20181201_%%%s'"""
|
|
% pub.cfg['postgresql']['database']
|
|
)
|
|
|
|
assert len(cur.fetchall()) == 1
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub()
|
|
|
|
call_command('delete_tenant', '--vhost=example.net', '--force-drop')
|
|
|
|
conn, cur = get_connection_and_cursor(new=True)
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
cur.execute(
|
|
"""SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_type = 'BASE TABLE'"""
|
|
)
|
|
|
|
assert not cur.fetchall()
|
|
|
|
cur.execute(
|
|
"""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'"""
|
|
% pub.cfg['postgresql']['database']
|
|
)
|
|
|
|
assert cur.fetchall()
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub()
|
|
|
|
pub.cfg['postgresql']['createdb-connection-params'] = {
|
|
'user': pub.cfg['postgresql']['user'],
|
|
'database': 'postgres',
|
|
}
|
|
pub.write_cfg()
|
|
pub.cleanup()
|
|
call_command('delete_tenant', '--vhost=example.net', '--force-drop')
|
|
|
|
connect_kwargs = {'dbname': 'postgres', 'user': pub.cfg['postgresql']['user']}
|
|
pgconn = psycopg2.connect(**connect_kwargs)
|
|
cur = pgconn.cursor()
|
|
|
|
cur.execute(
|
|
"""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'"""
|
|
% pub.cfg['postgresql']['database']
|
|
)
|
|
assert not cur.fetchall()
|
|
cur.close()
|
|
pgconn.close()
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub()
|
|
cleanup_connection()
|
|
|
|
pub.cfg['postgresql']['createdb-connection-params'] = {
|
|
'user': pub.cfg['postgresql']['user'],
|
|
'database': 'postgres',
|
|
}
|
|
pub.write_cfg()
|
|
call_command('delete_tenant', '--vhost=example.net')
|
|
cleanup_connection()
|
|
|
|
pgconn = psycopg2.connect(**connect_kwargs)
|
|
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = pgconn.cursor()
|
|
|
|
cur.execute(
|
|
"""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname like 'removed_20181201_%%%s'"""
|
|
% pub.cfg['postgresql']['database']
|
|
)
|
|
|
|
result = cur.fetchall()
|
|
assert len(result) == 1
|
|
|
|
# clean this db after test
|
|
cur.execute("""DROP DATABASE %s""" % result[0][0])
|
|
|
|
cur.execute(
|
|
"""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'"""
|
|
% pub.cfg['postgresql']['database']
|
|
)
|
|
|
|
assert not cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_rebuild_indexes(pub):
|
|
form = FormDef()
|
|
form.name = 'example'
|
|
form.store()
|
|
|
|
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
|
|
|
|
os.unlink(os.path.join(pub.app_dir, 'formdefs-url_name', 'example'))
|
|
os.symlink('../formdefs/1', os.path.join(pub.app_dir, 'formdefs-url_name', 'XXX'))
|
|
call_command('rebuild_indexes', '--all-tenants')
|
|
assert 'example' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
|
|
assert 'XXX' in os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name'))
|
|
|
|
call_command('rebuild_indexes', '--all-tenants', '--destroy')
|
|
assert os.listdir(os.path.join(pub.app_dir, 'formdefs-url_name')) == ['example']
|
|
|
|
|
|
def test_runscript(pub):
|
|
with pytest.raises(CommandError):
|
|
call_command('runscript')
|
|
with pytest.raises(CommandError):
|
|
call_command('runscript', '--domain=a', '--all-tenants')
|
|
with open(os.path.join(pub.app_dir, 'test2.py'), 'w') as fd:
|
|
fd.write(
|
|
'''
|
|
import os
|
|
from quixote import get_publisher
|
|
open(os.path.join(get_publisher().app_dir, 'runscript.test'), 'w').close()
|
|
'''
|
|
)
|
|
call_command('runscript', '--domain=example.net', os.path.join(pub.app_dir, 'test2.py'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
|
|
|
|
os.unlink(os.path.join(pub.app_dir, 'runscript.test'))
|
|
call_command('runscript', '--all-tenants', os.path.join(pub.app_dir, 'test2.py'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
|
|
|
|
os.unlink(os.path.join(pub.app_dir, 'runscript.test'))
|
|
call_command(
|
|
'runscript', '--all-tenants', '--exclude-tenants=example.net', os.path.join(pub.app_dir, 'test2.py')
|
|
)
|
|
assert not os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
|
|
|
|
call_command(
|
|
'runscript', '--all-tenants', '--exclude-tenants=example2.net', os.path.join(pub.app_dir, 'test2.py')
|
|
)
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'runscript.test'))
|
|
|
|
|
|
def test_import_site():
|
|
with pytest.raises(CommandError):
|
|
call_command('import_site')
|
|
pub = create_temporary_pub()
|
|
FormDef.wipe()
|
|
assert FormDef.count() == 0
|
|
assert 'workflows' not in os.listdir(pub.app_dir)
|
|
site_zip_path = os.path.join(os.path.dirname(__file__), 'site.zip')
|
|
call_command('import_site', '--domain=example.net', site_zip_path)
|
|
assert FormDef.count() == 1
|
|
assert Workflow.count() == 1
|
|
assert 'workflows' in os.listdir(pub.app_dir)
|
|
|
|
FormDef.wipe()
|
|
assert FormDef.count() == 0
|
|
assert Workflow.count() == 1
|
|
call_command('import_site', '--domain=example.net', '--if-empty', site_zip_path)
|
|
assert FormDef.count() == 0
|
|
assert Workflow.count() == 1
|
|
|
|
site_zip_path = os.path.join(os.path.dirname(__file__), 'missing_file.zip')
|
|
with pytest.raises(CommandError, match='missing file:'):
|
|
call_command('import_site', '--domain=example.net', site_zip_path)
|
|
|
|
|
|
def test_export_site(tmp_path):
|
|
pub = create_temporary_pub()
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
site_zip_path = os.path.join(tmp_path, 'site.zip')
|
|
call_command('export_site', '--domain=example.net', f'--output={site_zip_path}')
|
|
with zipfile.ZipFile(site_zip_path, mode='r') as zfile:
|
|
assert set(zfile.namelist()) == {'formdefs_xml/1', 'config.pck'}
|
|
assert 'postgresql' in pub.cfg
|
|
assert 'postgresql' not in pickle.loads(zfile.read('config.pck'))
|
|
|
|
|
|
def test_shell():
|
|
with pytest.raises(CommandError):
|
|
call_command('shell') # missing tenant name
|
|
|
|
|
|
class AfterJobForTest(AfterJob):
|
|
def execute(self):
|
|
self.test_result = WorkflowStatusItem().compute('{{ global_title|default:"FAIL" }}')
|
|
self.l10n_month = WorkflowStatusItem().compute('{{ "10/10/2010"|date:"F" }}')
|
|
|
|
|
|
class AfterJobForTestWithException(AfterJob):
|
|
def execute(self):
|
|
raise ZeroDivisionError()
|
|
|
|
|
|
def test_runjob(pub):
|
|
with pytest.raises(CommandError):
|
|
call_command('runjob')
|
|
with pytest.raises(CommandError):
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % 'invalid')
|
|
|
|
pub.load_site_options()
|
|
if not pub.site_options.has_section('variables'):
|
|
pub.site_options.add_section('variables')
|
|
pub.site_options.set('variables', 'global_title', 'HELLO')
|
|
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
|
pub.site_options.write(fd)
|
|
|
|
job = AfterJobForTest(label='test')
|
|
job.store()
|
|
assert AfterJob.get(job.id).status == 'registered'
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
|
|
assert AfterJob.get(job.id).status == 'completed'
|
|
assert AfterJob.get(job.id).test_result == 'HELLO'
|
|
assert AfterJob.get(job.id).l10n_month == 'October'
|
|
|
|
pub.cfg['language'] = {'language': 'fr'}
|
|
pub.write_cfg()
|
|
job = AfterJobForTest(label='test2')
|
|
job.store()
|
|
assert AfterJob.get(job.id).status == 'registered'
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
|
|
assert AfterJob.get(job.id).status == 'completed'
|
|
assert AfterJob.get(job.id).l10n_month == 'octobre'
|
|
completion_time = AfterJob.get(job.id).completion_time
|
|
|
|
# running again the job will skip it
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
|
|
assert AfterJob.get(job.id).completion_time == completion_time
|
|
|
|
# --force-replay will force the job to run again
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay')
|
|
assert AfterJob.get(job.id).completion_time != completion_time
|
|
|
|
# test exception handling
|
|
job = AfterJobForTestWithException(label='test3')
|
|
job.store()
|
|
assert AfterJob.get(job.id).status == 'registered'
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
|
|
assert AfterJob.get(job.id).status == 'failed'
|
|
assert 'ZeroDivisionError' in AfterJob.get(job.id).exception
|
|
|
|
# check --raise
|
|
with pytest.raises(ZeroDivisionError):
|
|
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id, '--force-replay', '--raise')
|
|
|
|
|
|
def test_dbshell(pub):
|
|
with pytest.raises(CommandError):
|
|
call_command('dbshell') # missing tenant name
|
|
|
|
with mock.patch('subprocess.call' if django.VERSION < (3, 2) else 'subprocess.run') as call:
|
|
call.side_effect = lambda *args, **kwargs: 0
|
|
call_command('dbshell', '--domain', 'example.net')
|
|
assert call.call_args[0][-1][0] == 'psql'
|
|
assert call.call_args[0][-1][-1] == pub.cfg['postgresql']['database']
|
|
|
|
|
|
def test_makemessages(pub):
|
|
# just make sure it loads correctly
|
|
with pytest.raises(SystemExit):
|
|
call_command('makemessages', '--help')
|
|
|
|
|
|
def test_grep(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
NamedWsCall.wipe()
|
|
MailTemplate.wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [StringField(id='1', label='Your Name'), EmailField(id='2', label='Email')]
|
|
formdef.options = {'x': 'Name'}
|
|
formdef.store()
|
|
|
|
workflow = Workflow()
|
|
workflow.name = 'test'
|
|
st = workflow.add_status('status')
|
|
st.add_action('aggregationemail')
|
|
workflow.store()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello'
|
|
wscall.request = {'url': 'http://example.org/api/test', 'qs_data': {'a': 'b'}}
|
|
wscall.store()
|
|
|
|
mail_template = MailTemplate(name='test mail template')
|
|
mail_template.subject = 'test subject'
|
|
mail_template.body = 'test body'
|
|
mail_template.attachments = ['form_var_file1_raw']
|
|
mail_template.store()
|
|
|
|
with pytest.raises(CommandError):
|
|
call_command('grep')
|
|
|
|
with pytest.raises(CommandError):
|
|
call_command('grep', 'xxx')
|
|
|
|
with pytest.raises(CommandError):
|
|
call_command('grep', '--all-tenants', '--domain', 'example.net', 'xxx')
|
|
|
|
with pytest.raises(CommandError):
|
|
call_command('grep', '--domain', 'example.net', '--type', 'foo', 'xxx')
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', '--type', 'action-types', 'email')
|
|
assert print_hit.call_args[0] == ('http://example.net/backoffice/workflows/1/status/1/items/1/',)
|
|
print_hit.reset_mock()
|
|
|
|
call_command('grep', '--domain', 'example.net', '--type', 'field-types', 'email')
|
|
assert print_hit.call_args[0] == ('http://example.net/backoffice/forms/1/fields/2/',)
|
|
print_hit.reset_mock()
|
|
|
|
call_command('grep', '--domain', 'example.net', 'Name')
|
|
assert print_hit.call_count == 2
|
|
assert print_hit.call_args_list[0].args == (
|
|
'http://example.net/backoffice/forms/1/fields/1/',
|
|
'Your Name',
|
|
)
|
|
assert print_hit.call_args_list[1].args == (
|
|
'http://example.net/backoffice/forms/1/workflow-variables',
|
|
'Name',
|
|
)
|
|
print_hit.reset_mock()
|
|
|
|
call_command('grep', '--domain', 'example.net', '/api/test')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/settings/wscalls/hello/',
|
|
'http://example.org/api/test',
|
|
)
|
|
print_hit.reset_mock()
|
|
|
|
call_command('grep', '--domain', 'example.net', 'form_var_file1_raw')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/mail-templates/1/',
|
|
'form_var_file1_raw',
|
|
)
|
|
print_hit.reset_mock()
|
|
|
|
call_command('grep', '--domain', 'example.net', 'xxx')
|
|
assert print_hit.call_count == 0
|
|
print_hit.reset_mock()
|
|
|
|
|
|
def test_grep_prefill(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
NamedWsCall.wipe()
|
|
MailTemplate.wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
# template prefill
|
|
formdef.fields = [
|
|
StringField(
|
|
id='1', label='Your Name', prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'}
|
|
)
|
|
]
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'prefill_string')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/1/fields/1/',
|
|
'a{{foo.prefill_string}}b',
|
|
)
|
|
|
|
# formula prefill
|
|
formdef.fields = [
|
|
StringField(id='1', label='Your Name', prefill={'type': 'formula', 'value': 'form_var_foo'})
|
|
]
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'form_var_foo')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/1/fields/1/',
|
|
'form_var_foo',
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize('data_source_type', ['json', 'jsonp', 'python'])
|
|
def test_grep_data_source(pub, data_source_type):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
NamedWsCall.wipe()
|
|
MailTemplate.wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
# template prefill
|
|
formdef.fields = [
|
|
ItemField(
|
|
id='1',
|
|
label='Your Name',
|
|
data_source={'type': data_source_type, 'value': '{{ machin_url }}/data-source/x/'},
|
|
)
|
|
]
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'data-source/x')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/1/fields/1/',
|
|
'{{ machin_url }}/data-source/x/',
|
|
)
|
|
|
|
|
|
def test_grep_create_carddata(pub):
|
|
CardDef.wipe()
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
carddef = CardDef()
|
|
carddef.name = 'My card'
|
|
carddef.fields = [
|
|
StringField(id='1', label='string'),
|
|
]
|
|
carddef.store()
|
|
|
|
wf = Workflow(name='create-carddata')
|
|
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
create = wf.possible_status[1].add_action('create_carddata', id='_create', prepend=True)
|
|
create.label = 'Create CardDef'
|
|
create.varname = 'mycard'
|
|
create.formdef_slug = carddef.url_name
|
|
create.mappings = [
|
|
Mapping(field_id='1', expression='{{ foo_bar }}'),
|
|
]
|
|
wf.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/new/items/_create/',
|
|
'{{ foo_bar }}',
|
|
)
|
|
|
|
|
|
def test_grep_edit_carddata(pub):
|
|
CardDef.wipe()
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
carddef = CardDef()
|
|
carddef.name = 'My card'
|
|
carddef.fields = [
|
|
StringField(id='1', label='string'),
|
|
]
|
|
carddef.store()
|
|
|
|
wf = Workflow(name='edit-carddata')
|
|
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
edit = wf.possible_status[1].add_action('edit_carddata', id='edit', prepend=True)
|
|
edit.label = 'Edit CardDef'
|
|
edit.varname = 'mycard'
|
|
edit.formdef_slug = carddef.url_name
|
|
edit.mappings = [
|
|
Mapping(field_id='1', expression='{{ foo_bar }}'),
|
|
]
|
|
wf.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/new/items/edit/',
|
|
'{{ foo_bar }}',
|
|
)
|
|
|
|
|
|
def test_grep_backoffice_fields(pub):
|
|
Workflow.wipe()
|
|
|
|
wf = Workflow(name='test-backoffice-fields')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(
|
|
id='bo1',
|
|
label='field',
|
|
varname='blah',
|
|
prefill={'type': 'string', 'value': 'a{{foo.prefill_string}}b'},
|
|
),
|
|
]
|
|
wf.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'prefill_string')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/backoffice-fields/fields/bo1/',
|
|
'a{{foo.prefill_string}}b',
|
|
)
|
|
|
|
|
|
def test_grep_webservice_call(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
wf = Workflow(name='webservice-call')
|
|
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
webservice_call = wf.possible_status[1].add_action('webservice_call', id='webservice-call', prepend=True)
|
|
webservice_call.url = 'http://remote.example.net'
|
|
webservice_call.qs_data = {
|
|
'django': '{{ form_number }}',
|
|
}
|
|
|
|
webservice_call.post_data = {
|
|
'django': '{{ foo_bar }}',
|
|
}
|
|
wf.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'form_number')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
|
|
'{{ form_number }}',
|
|
)
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/new/items/webservice-call/',
|
|
'{{ foo_bar }}',
|
|
)
|
|
|
|
|
|
def test_grep_set_backoffice_fields_action(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
wf = Workflow(name='webservice-call')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='bo field 1', varname='plop'),
|
|
]
|
|
|
|
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
set_backoffice_fields = wf.possible_status[1].add_action(
|
|
'set-backoffice-fields', id='set-backoffice-fields', prepend=True
|
|
)
|
|
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': '{{ foo_bar }}'}]
|
|
wf.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/new/items/set-backoffice-fields/',
|
|
'{{ foo_bar }}',
|
|
)
|
|
|
|
|
|
def test_grep_action_condition(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.id = '2'
|
|
workflow.store()
|
|
workflow.possible_status[0].items[2].condition = {'type': 'django', 'value': 'foo_bar'}
|
|
workflow.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/2/status/just_submitted/items/_jump_to_new/',
|
|
'foo_bar',
|
|
)
|
|
|
|
|
|
def test_grep_field_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.id = '2'
|
|
formdef.name = 'Foo'
|
|
formdef.fields = [
|
|
StringField(
|
|
id='1',
|
|
label='Bar',
|
|
size='40',
|
|
required=True,
|
|
condition={'type': 'django', 'value': 'foo_bar'},
|
|
)
|
|
]
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/2/fields/1/',
|
|
'foo_bar',
|
|
)
|
|
|
|
|
|
def test_grep_page_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdef.fields = [
|
|
PageField(
|
|
id='0',
|
|
label='1st page',
|
|
condition={'type': 'django', 'value': 'foo_bar'},
|
|
post_conditions=[
|
|
{
|
|
'condition': {'type': 'django', 'value': 'form_xx'},
|
|
'error_message': 'You shall not pass.',
|
|
}
|
|
],
|
|
),
|
|
]
|
|
formdef.id = '2'
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/2/fields/0/',
|
|
'foo_bar',
|
|
)
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'form_xx')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/2/fields/0/',
|
|
'form_xx',
|
|
)
|
|
|
|
|
|
def test_grep_workflow_options(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.workflow_options = {'a': 'foo_bar'}
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/1/workflow-variables',
|
|
'foo_bar',
|
|
)
|
|
|
|
|
|
def test_grep_block(pub):
|
|
FormDef.wipe()
|
|
BlockDef.wipe()
|
|
|
|
blockdef = BlockDef()
|
|
blockdef.name = 'Foo'
|
|
blockdef.fields = [StringField(id='1', label='bar', size='40', required=True)]
|
|
blockdef.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'bar')
|
|
assert print_hit.call_args[0] == (
|
|
'http://example.net/backoffice/forms/blocks/%s/1/' % blockdef.id,
|
|
'bar',
|
|
)
|
|
|
|
|
|
def test_grep_workflow_multiple(pub):
|
|
FormDef.wipe()
|
|
BlockDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
choice = st1.add_action('choice')
|
|
choice.label = '{{foo_bar}}'
|
|
workflow.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_unique_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
|
|
'{{foo_bar}}',
|
|
)
|
|
assert print_unique_hit.call_count == 1
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_unique_hit') as print_unique_hit:
|
|
call_command('grep', '--domain', 'example.net', '--urls', 'foo_bar')
|
|
assert print_unique_hit.call_args[0] == (
|
|
'http://example.net/backoffice/workflows/1/status/st1/items/1/',
|
|
)
|
|
assert print_unique_hit.call_count == 1
|
|
|
|
|
|
def test_grep_workflow_global_action_trigger(pub):
|
|
FormDef.wipe()
|
|
BlockDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
workflow = Workflow(name='test')
|
|
action = workflow.add_global_action('FOOBAR')
|
|
trigger1 = action.append_trigger('timeout')
|
|
trigger1.anchor = 'creation'
|
|
trigger1.timeout = '{{ form_var_foo_bar }}'
|
|
trigger2 = action.append_trigger('webservice')
|
|
trigger2.identifier = 'delete'
|
|
workflow.store()
|
|
|
|
with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
|
|
call_command('grep', '--domain', 'example.net', 'foo_bar')
|
|
assert print_hit.call_args[0] == (
|
|
f'http://example.net/backoffice/workflows/{workflow.id}/global-actions/{action.id}/triggers/{trigger1.id}/',
|
|
'{{ form_var_foo_bar }}',
|
|
)
|
|
|
|
|
|
def test_configdb(pub):
|
|
call_command('configdb', '--domain', 'example.net')
|
|
|
|
call_command('configdb', '--domain', 'example.net', '--info')
|
|
|
|
database = pub.cfg['postgresql']['database']
|
|
user = pub.cfg['postgresql']['user']
|
|
pub.cfg['postgresql']['database'] = ''
|
|
pub.write_cfg()
|
|
|
|
call_command('configdb', '--domain', 'example.net', '--database', database, '--user', user)
|
|
pub.reload_cfg()
|
|
assert pub.cfg['postgresql']['database'] == database
|
|
|
|
|
|
@pytest.mark.parametrize('source_type', ['local-file', 'http'])
|
|
def test_replace_python(pub, alt_tempdir, source_type):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
MailTemplate.wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'Foo'
|
|
formdef.fields = [
|
|
PageField(
|
|
id='1',
|
|
label='Bar',
|
|
size='40',
|
|
required=True,
|
|
condition={'type': 'python', 'value': 'python condition'},
|
|
post_conditions=[
|
|
{
|
|
'condition': {'type': 'python', 'value': 'python condition'},
|
|
'error_message': 'You shall not pass.',
|
|
},
|
|
],
|
|
),
|
|
StringField(id='2', label='Foo', prefill={'type': 'formula', 'value': 'form_var_foo'}),
|
|
StringField(id='3', label='Bar', prefill={'type': 'formula', 'value': 'form_var_bar'}),
|
|
]
|
|
formdef.store()
|
|
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
item = st1.add_action('choice')
|
|
item.label = 'label'
|
|
item.condition = {'type': 'python', 'value': 'no replacement'}
|
|
|
|
item = st1.add_action('choice')
|
|
item.label = 'label2'
|
|
item.condition = {'type': 'python', 'value': 'python condition'}
|
|
|
|
item = st1.add_action('set-backoffice-fields')
|
|
item.fields = [{'field_id': 'bo1', 'value': '=form_var_foo'}]
|
|
|
|
item = st1.add_action('create_formdata')
|
|
item.varname = 'resubmitted'
|
|
item.mappings = [
|
|
Mapping(field_id='0', expression='=form_var_foo'),
|
|
]
|
|
|
|
item = st1.add_action('sendmail')
|
|
item.to = ['=form_var_foo']
|
|
item.subject = '=form_var_foo'
|
|
item.attachments = ['{{ form_var_xxx }}', 'getattr(form_attachments, "form_var_bar", None)']
|
|
|
|
item = st1.add_action('webservice_call')
|
|
item.varname = 'xxx'
|
|
item.post_data = {'str': 'abcd', 'expr': '=form_var_foo'}
|
|
item.qs_data = {'str': 'abcd', 'expr': '=form_var_foo', 'expr2': '=form_var_bar'}
|
|
|
|
global_action = workflow.add_global_action('foobar')
|
|
trigger1 = global_action.append_trigger('timeout')
|
|
trigger1.anchor = 'python'
|
|
trigger1.anchor_expression = 'datetime.date(2023, 12, 27)'
|
|
trigger2 = global_action.append_trigger('timeout')
|
|
trigger2.anchor = 'python'
|
|
trigger2.anchor_expression = 'nope'
|
|
|
|
workflow.store()
|
|
|
|
mail_template = MailTemplate(name='test mail template')
|
|
mail_template.subject = '=form_var_foo'
|
|
mail_template.attachments = ['{{ form_var_xxx }}', 'getattr(form_attachments, "form_var_bar", None)']
|
|
mail_template.store()
|
|
|
|
with open(os.path.join(alt_tempdir, 'replacements.json'), 'w') as fp:
|
|
replacements = {
|
|
'conditions': {
|
|
'python condition': 'django condition',
|
|
},
|
|
'templates': {
|
|
'form_var_foo': '{{ form_var_foo }}',
|
|
'datetime.date(2023, 12, 27)': '2023-12-27',
|
|
'getattr(form_attachments, "form_var_bar", None)': '{{ form_var_bar }}',
|
|
},
|
|
}
|
|
json.dump(replacements, fp)
|
|
|
|
if source_type == 'local-file':
|
|
call_command(
|
|
'replace_python',
|
|
'--domain',
|
|
'example.net',
|
|
'--filename',
|
|
os.path.join(alt_tempdir, 'replacements.json'),
|
|
)
|
|
else:
|
|
source = 'https://test/replacements.json'
|
|
with responses.RequestsMock() as rsps:
|
|
rsps.get(source, json=replacements)
|
|
call_command('replace_python', '--domain', 'example.net', '--url', source)
|
|
|
|
formdef.refresh_from_storage()
|
|
assert formdef.fields[0].condition == {'type': 'django', 'value': 'django condition'}
|
|
assert formdef.fields[0].post_conditions == [
|
|
{
|
|
'condition': {'type': 'django', 'value': 'django condition'},
|
|
'error_message': 'You shall not pass.',
|
|
},
|
|
]
|
|
assert formdef.fields[1].prefill == {'type': 'string', 'value': '{{ form_var_foo }}'}
|
|
assert formdef.fields[2].prefill == {'type': 'formula', 'value': 'form_var_bar'} # no replacement
|
|
workflow.refresh_from_storage()
|
|
assert workflow.possible_status[0].items[0].condition == {'type': 'python', 'value': 'no replacement'}
|
|
assert workflow.possible_status[0].items[1].condition == {'type': 'django', 'value': 'django condition'}
|
|
assert workflow.possible_status[0].items[2].fields == [{'field_id': 'bo1', 'value': '{{ form_var_foo }}'}]
|
|
assert workflow.possible_status[0].items[3].mappings[0].expression == '{{ form_var_foo }}'
|
|
assert workflow.possible_status[0].items[4].subject == '{{ form_var_foo }}'
|
|
assert workflow.possible_status[0].items[4].to == ['{{ form_var_foo }}']
|
|
assert workflow.possible_status[0].items[4].attachments == ['{{ form_var_xxx }}', '{{ form_var_bar }}']
|
|
assert workflow.possible_status[0].items[5].post_data == {'str': 'abcd', 'expr': '{{ form_var_foo }}'}
|
|
assert workflow.possible_status[0].items[5].qs_data == {
|
|
'str': 'abcd',
|
|
'expr': '{{ form_var_foo }}',
|
|
'expr2': '=form_var_bar',
|
|
}
|
|
assert workflow.global_actions[0].triggers[1].anchor == 'template'
|
|
assert workflow.global_actions[0].triggers[1].anchor_expression is None
|
|
assert workflow.global_actions[0].triggers[1].anchor_template == '2023-12-27'
|
|
assert workflow.global_actions[0].triggers[2].anchor == 'python'
|
|
assert workflow.global_actions[0].triggers[2].anchor_expression == 'nope'
|
|
|
|
mail_template.refresh_from_storage()
|
|
assert mail_template.subject == '{{ form_var_foo }}'
|
|
assert mail_template.attachments == ['{{ form_var_xxx }}', '{{ form_var_bar }}']
|