342 lines
11 KiB
Python
342 lines
11 KiB
Python
import os
|
|
import pytest
|
|
import collections
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
import psycopg2
|
|
|
|
from wcs.formdef import FormDef
|
|
from wcs.workflows import Workflow
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
from wcs.fields import StringField, EmailField
|
|
import wcs.qommon.ctl
|
|
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
|
|
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
|
|
from wcs.ctl.process_bounce import CmdProcessBounce
|
|
from wcs.ctl.wipe_data import CmdWipeData
|
|
from wcs.ctl.management.commands.trigger_jumps import select_and_jump_formdata
|
|
from wcs.ctl.delete_tenant import CmdDeleteTenant
|
|
from wcs.sql import get_connection_and_cursor, cleanup_connection
|
|
|
|
from utilities import create_temporary_pub, clean_temporary_pub
|
|
|
|
@pytest.fixture
|
|
def pub():
|
|
return create_temporary_pub()
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'two_pubs' in metafunc.fixturenames:
|
|
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def two_pubs(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
return pub
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
def test_loading():
|
|
ctl = wcs.qommon.ctl.Ctl(cmd_prefixes=['wcs.ctl'])
|
|
ctl.load_all_commands(ignore_errors=False)
|
|
assert 'export_settings' in ctl.get_commands().keys()
|
|
# call all __init__() methods
|
|
for cmd in ctl.get_commands().values():
|
|
cmd()
|
|
|
|
def test_collectstatic(pub):
|
|
CmdCollectStatic.collectstatic(pub)
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'wcs.css'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'qommon.css'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'gadjo.css'))
|
|
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'xstatic', 'jquery.js'))
|
|
CmdCollectStatic.collectstatic(pub, clear=True, link=True)
|
|
assert os.path.islink(os.path.join(pub.app_dir, 'collectstatic', 'css', 'wcs.css'))
|
|
|
|
def test_migrate(two_pubs):
|
|
CmdMigrate().handle()
|
|
|
|
def test_get_bounce_addrs():
|
|
msg = MIMEText('Hello world')
|
|
assert CmdProcessBounce.get_bounce_addrs(msg) is None
|
|
|
|
msg = MIMEMultipart(_subtype='mixed')
|
|
msg.attach(MIMEText('Hello world'))
|
|
msg.attach(MIMEText('<p>Hello world</p>', _subtype='html'))
|
|
assert CmdProcessBounce.get_bounce_addrs(msg) is None
|
|
|
|
msg = MIMEText('Hello world')
|
|
msg['x-failed-recipients'] = 'foobar@localhost'
|
|
assert CmdProcessBounce.get_bounce_addrs(msg) == ['foobar@localhost']
|
|
|
|
msg = MIMEText('''failed addresses follow:
|
|
foobar@localhost
|
|
message text follows:''')
|
|
assert CmdProcessBounce.get_bounce_addrs(msg) == ['foobar@localhost']
|
|
|
|
def test_wipe_formdata(pub):
|
|
form_1 = FormDef()
|
|
form_1.name = 'example'
|
|
form_1.fields = [StringField(id='0', label='Your Name'),
|
|
EmailField(id='1', label='Email')]
|
|
form_1.store()
|
|
form_1.data_class().wipe()
|
|
formdata_1 = form_1.data_class()()
|
|
|
|
formdata_1.data = {'0': 'John Doe', '1': 'john@example.net'}
|
|
formdata_1.store()
|
|
|
|
assert form_1.data_class().count() == 1
|
|
|
|
form_2 = FormDef()
|
|
form_2.name = 'example2'
|
|
form_2.fields = [StringField(id='0', label='First Name'),
|
|
StringField(id='1', label='Last Name')]
|
|
form_2.store()
|
|
form_2.data_class().wipe()
|
|
formdata_2 = form_2.data_class()()
|
|
formdata_2.data = {'0': 'John', '1': 'Doe'}
|
|
formdata_2.store()
|
|
assert form_2.data_class().count() == 1
|
|
|
|
wipe_cmd = CmdWipeData()
|
|
|
|
# check command options
|
|
options, args = wipe_cmd.parse_args(['--all'])
|
|
assert options.all
|
|
|
|
options, args = wipe_cmd.parse_args([form_1.url_name, form_2.url_name])
|
|
assert form_1.url_name in args
|
|
assert form_2.url_name in args
|
|
|
|
sub_options_class = collections.namedtuple('Options', ['all'])
|
|
sub_options = sub_options_class(False)
|
|
|
|
# test with no options
|
|
wipe_cmd.wipe(pub, sub_options, [])
|
|
assert form_1.data_class().count() == 1
|
|
assert form_2.data_class().count() == 1
|
|
|
|
# wipe one form formdatas
|
|
wipe_cmd.wipe(pub, sub_options, [form_1.url_name])
|
|
assert form_1.data_class().count() == 0
|
|
assert form_2.data_class().count() == 1
|
|
|
|
# wipe all formdatas
|
|
sub_options = sub_options_class(True)
|
|
wipe_cmd.wipe(pub, sub_options, [])
|
|
assert form_1.data_class().count() == 0
|
|
assert form_2.data_class().count() == 0
|
|
|
|
def test_trigger_jumps(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.trigger = 'goto2'
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [StringField(id='0', label='Your Name', varname='name'),
|
|
EmailField(id='1', label='Email', varname='email')]
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
def run_trigger(trigger, rows):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
|
|
formdata.status = 'wf-%s' % st1.id
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
|
|
formdata.status = 'wf-%s' % st1.id
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
select_and_jump_formdata(formdef, trigger, rows)
|
|
return formdef.data_class().get(id1), formdef.data_class().get(id2)
|
|
|
|
f1, f2 = run_trigger('goto2', '__all__')
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
|
|
# check publisher substitutions vars after the last jump_and_perform (#13964)
|
|
assert pub in pub.substitutions.sources
|
|
assert formdef in pub.substitutions.sources
|
|
# we cannot know which formdata is the last one, test each possibility
|
|
if f1 in pub.substitutions.sources:
|
|
assert f2 not in pub.substitutions.sources
|
|
if f2 in pub.substitutions.sources:
|
|
assert f1 not in pub.substitutions.sources
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {}}])
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
|
|
assert f1.status == 'wf-%s' % st2.id
|
|
assert f2.status == 'wf-%s' % st1.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
|
|
assert f1.status == 'wf-%s' % st1.id
|
|
assert f2.status == 'wf-%s' % st2.id
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {},
|
|
'data': {'foo': 'bar'}}])
|
|
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
|
|
|
|
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'},
|
|
'data': {'foo': 'bar'}}])
|
|
assert f1.status == 'wf-%s' % st2.id
|
|
assert f1.workflow_data['foo'] == 'bar'
|
|
assert f2.status == 'wf-%s' % st1.id
|
|
assert not f2.workflow_data
|
|
|
|
f1, f2 = run_trigger('badtrigger', '__all__')
|
|
assert f1.status == f2.status == 'wf-%s' % st1.id
|
|
assert not f1.workflow_data
|
|
assert not f2.workflow_data
|
|
|
|
|
|
def test_delete_tenant_with_sql():
|
|
pub = create_temporary_pub(sql_mode=True)
|
|
delete_cmd = CmdDeleteTenant()
|
|
|
|
assert os.path.isdir(pub.app_dir)
|
|
|
|
sub_options_class = collections.namedtuple('Options', ['force_drop'])
|
|
sub_options = sub_options_class(False)
|
|
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
parent_dir = os.path.dirname(pub.app_dir)
|
|
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
|
|
assert False
|
|
|
|
conn, cur = get_connection_and_cursor()
|
|
cur.execute("""SELECT schema_name
|
|
FROM information_schema.schemata
|
|
WHERE schema_name like '%removed%'""")
|
|
|
|
assert len(cur.fetchall()) == 1
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub(sql_mode=True)
|
|
|
|
sub_options = sub_options_class(True)
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
conn, cur = get_connection_and_cursor(new=True)
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
cur.execute("""SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_type = 'BASE TABLE'""")
|
|
|
|
assert not cur.fetchall()
|
|
|
|
cur.execute("""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
|
|
|
|
assert cur.fetchall()
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub(sql_mode=True)
|
|
|
|
cleanup_connection()
|
|
sub_options = sub_options_class(True)
|
|
pub.cfg['postgresql']['createdb-connection-params'] = {
|
|
'user': pub.cfg['postgresql']['user'],
|
|
'database': 'postgres'
|
|
}
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
pgconn = psycopg2.connect(**pub.cfg['postgresql']['createdb-connection-params'])
|
|
cur = pgconn.cursor()
|
|
|
|
cur.execute("""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
|
|
assert not cur.fetchall()
|
|
cur.close()
|
|
pgconn.close()
|
|
|
|
clean_temporary_pub()
|
|
pub = create_temporary_pub(sql_mode=True)
|
|
cleanup_connection()
|
|
|
|
sub_options = sub_options_class(False)
|
|
pub.cfg['postgresql']['createdb-connection-params'] = {
|
|
'user': pub.cfg['postgresql']['user'],
|
|
'database': 'postgres'
|
|
}
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
pgconn = psycopg2.connect(**pub.cfg['postgresql']['createdb-connection-params'])
|
|
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = pgconn.cursor()
|
|
|
|
cur.execute("""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname like '%removed%'""")
|
|
|
|
result = cur.fetchall()
|
|
assert len(result) == 1
|
|
|
|
#clean this db after test
|
|
cur.execute("""DROP DATABASE %s""" % result[0][0])
|
|
|
|
cur.execute("""SELECT datname
|
|
FROM pg_database
|
|
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
|
|
|
|
assert not cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_delete_tenant_without_sql():
|
|
pub = create_temporary_pub()
|
|
delete_cmd = CmdDeleteTenant()
|
|
|
|
assert os.path.isdir(pub.app_dir)
|
|
|
|
sub_options_class = collections.namedtuple('Options', ['force_drop'])
|
|
sub_options = sub_options_class(False)
|
|
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
parent_dir = os.path.dirname(pub.app_dir)
|
|
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
|
|
assert False
|
|
|
|
clean_temporary_pub()
|
|
|
|
pub = create_temporary_pub()
|
|
assert os.path.isdir(pub.app_dir)
|
|
|
|
sub_options = sub_options_class(True)
|
|
|
|
delete_cmd.delete_tenant(pub, sub_options, [])
|
|
|
|
assert not os.path.isdir(pub.app_dir)
|
|
parent_dir = os.path.dirname(pub.app_dir)
|
|
if [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
|
|
assert False
|
|
|
|
clean_temporary_pub()
|