wcs/tests/test_ctl.py

327 lines
11 KiB
Python

import os
import pytest
import collections
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import psycopg2
from wcs.formdef import FormDef
from wcs.workflows import Workflow
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.fields import StringField, EmailField
import wcs.qommon.ctl
from wcs.ctl.collectstatic import CmdCollectStatic
from wcs.ctl.process_bounce import CmdProcessBounce
from wcs.ctl.wipe_data import CmdWipeData
from wcs.ctl.trigger_jumps import select_and_jump_formdata
from wcs.ctl.delete_tenant import CmdDeleteTenant
from wcs.sql import get_connection_and_cursor, cleanup_connection
from utilities import create_temporary_pub, clean_temporary_pub
@pytest.fixture
def pub():
return create_temporary_pub()
def teardown_module(module):
clean_temporary_pub()
def test_loading():
ctl = wcs.qommon.ctl.Ctl(cmd_prefixes=['wcs.ctl'])
ctl.load_all_commands(ignore_errors=False)
assert 'export_settings' in ctl.get_commands().keys()
# call all __init__() methods
for cmd in ctl.get_commands().values():
cmd()
def test_collectstatic(pub):
CmdCollectStatic.collectstatic(pub)
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'wcs.css'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'qommon.css'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'css', 'gadjo.css'))
assert os.path.exists(os.path.join(pub.app_dir, 'collectstatic', 'xstatic', 'jquery.js'))
CmdCollectStatic.collectstatic(pub, clear=True, link=True)
assert os.path.islink(os.path.join(pub.app_dir, 'collectstatic', 'css', 'wcs.css'))
def test_get_bounce_addrs():
msg = MIMEText('Hello world')
assert CmdProcessBounce.get_bounce_addrs(msg) is None
msg = MIMEMultipart(_subtype='mixed')
msg.attach(MIMEText('Hello world'))
msg.attach(MIMEText('<p>Hello world</p>', _subtype='html'))
assert CmdProcessBounce.get_bounce_addrs(msg) is None
msg = MIMEText('Hello world')
msg['x-failed-recipients'] = 'foobar@localhost'
assert CmdProcessBounce.get_bounce_addrs(msg) == ['foobar@localhost']
msg = MIMEText('''failed addresses follow:
foobar@localhost
message text follows:''')
assert CmdProcessBounce.get_bounce_addrs(msg) == ['foobar@localhost']
def test_wipe_formdata(pub):
form_1 = FormDef()
form_1.name = 'example'
form_1.fields = [StringField(id='0', label='Your Name'),
EmailField(id='1', label='Email')]
form_1.store()
form_1.data_class().wipe()
formdata_1 = form_1.data_class()()
formdata_1.data = {'0': 'John Doe', '1': 'john@example.net'}
formdata_1.store()
assert form_1.data_class().count() == 1
form_2 = FormDef()
form_2.name = 'example2'
form_2.fields = [StringField(id='0', label='First Name'),
StringField(id='1', label='Last Name')]
form_2.store()
form_2.data_class().wipe()
formdata_2 = form_2.data_class()()
formdata_2.data = {'0': 'John', '1': 'Doe'}
formdata_2.store()
assert form_2.data_class().count() == 1
wipe_cmd = CmdWipeData()
# check command options
options, args = wipe_cmd.parse_args(['--all'])
assert options.all
options, args = wipe_cmd.parse_args([form_1.url_name, form_2.url_name])
assert form_1.url_name in args
assert form_2.url_name in args
sub_options_class = collections.namedtuple('Options', ['all'])
sub_options = sub_options_class(False)
# test with no options
wipe_cmd.wipe(pub, sub_options, [])
assert form_1.data_class().count() == 1
assert form_2.data_class().count() == 1
# wipe one form formdatas
wipe_cmd.wipe(pub, sub_options, [form_1.url_name])
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 1
# wipe all formdatas
sub_options = sub_options_class(True)
wipe_cmd.wipe(pub, sub_options, [])
assert form_1.data_class().count() == 0
assert form_2.data_class().count() == 0
def test_trigger_jumps(pub):
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'goto2'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [StringField(id='0', label='Your Name', varname='name'),
EmailField(id='1', label='Email', varname='email')]
formdef.workflow_id = workflow.id
formdef.store()
def run_trigger(trigger, rows):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id1 = formdata.id
formdata = formdef.data_class()()
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
formdata.status = 'wf-%s' % st1.id
formdata.store()
id2 = formdata.id
select_and_jump_formdata(formdef, trigger, rows)
return formdef.data_class().get(id1), formdef.data_class().get(id2)
f1, f2 = run_trigger('goto2', '__all__')
assert f1.status == f2.status == 'wf-%s' % st2.id
# check publisher substitutions vars after the last jump_and_perform (#13964)
assert pub in pub.substitutions.sources
assert formdef in pub.substitutions.sources
# we cannot know which formdata is the last one, test each possibility
if f1 in pub.substitutions.sources:
assert f2 not in pub.substitutions.sources
if f2 in pub.substitutions.sources:
assert f1 not in pub.substitutions.sources
f1, f2 = run_trigger('goto2', [{'select': {}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
assert f1.status == 'wf-%s' % st2.id
assert f2.status == 'wf-%s' % st1.id
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
assert f1.status == 'wf-%s' % st1.id
assert f2.status == 'wf-%s' % st2.id
f1, f2 = run_trigger('goto2', [{'select': {},
'data': {'foo': 'bar'}}])
assert f1.status == f2.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'},
'data': {'foo': 'bar'}}])
assert f1.status == 'wf-%s' % st2.id
assert f1.workflow_data['foo'] == 'bar'
assert f2.status == 'wf-%s' % st1.id
assert not f2.workflow_data
f1, f2 = run_trigger('badtrigger', '__all__')
assert f1.status == f2.status == 'wf-%s' % st1.id
assert not f1.workflow_data
assert not f2.workflow_data
def test_delete_tenant_with_sql():
pub = create_temporary_pub(sql_mode=True)
delete_cmd = CmdDeleteTenant()
assert os.path.isdir(pub.app_dir)
sub_options_class = collections.namedtuple('Options', ['force_drop'])
sub_options = sub_options_class(False)
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
conn, cur = get_connection_and_cursor()
cur.execute("""SELECT schema_name
FROM information_schema.schemata
WHERE schema_name like '%removed%'""")
assert len(cur.fetchall()) == 1
clean_temporary_pub()
pub = create_temporary_pub(sql_mode=True)
sub_options = sub_options_class(True)
delete_cmd.delete_tenant(pub, sub_options, [])
conn, cur = get_connection_and_cursor(new=True)
assert not os.path.isdir(pub.app_dir)
cur.execute("""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'""")
assert not cur.fetchall()
cur.execute("""SELECT datname
FROM pg_database
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
assert cur.fetchall()
clean_temporary_pub()
pub = create_temporary_pub(sql_mode=True)
cleanup_connection()
sub_options = sub_options_class(True)
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres'
}
delete_cmd.delete_tenant(pub, sub_options, [])
pgconn = psycopg2.connect(**pub.cfg['postgresql']['createdb-connection-params'])
cur = pgconn.cursor()
cur.execute("""SELECT datname
FROM pg_database
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
assert not cur.fetchall()
cur.close()
pgconn.close()
clean_temporary_pub()
pub = create_temporary_pub(sql_mode=True)
cleanup_connection()
sub_options = sub_options_class(False)
pub.cfg['postgresql']['createdb-connection-params'] = {
'user': pub.cfg['postgresql']['user'],
'database': 'postgres'
}
delete_cmd.delete_tenant(pub, sub_options, [])
pgconn = psycopg2.connect(**pub.cfg['postgresql']['createdb-connection-params'])
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
cur.execute("""SELECT datname
FROM pg_database
WHERE datname like '%removed%'""")
result = cur.fetchall()
assert len(result) == 1
#clean this db after test
cur.execute("""DROP DATABASE %s""" % result[0][0])
cur.execute("""SELECT datname
FROM pg_database
WHERE datname = '%s'""" % pub.cfg['postgresql']['database'])
assert not cur.fetchall()
cur.close()
conn.close()
clean_temporary_pub()
def test_delete_tenant_without_sql():
pub = create_temporary_pub()
delete_cmd = CmdDeleteTenant()
assert os.path.isdir(pub.app_dir)
sub_options_class = collections.namedtuple('Options', ['force_drop'])
sub_options = sub_options_class(False)
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if not [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
clean_temporary_pub()
pub = create_temporary_pub()
assert os.path.isdir(pub.app_dir)
sub_options = sub_options_class(True)
delete_cmd.delete_tenant(pub, sub_options, [])
assert not os.path.isdir(pub.app_dir)
parent_dir = os.path.dirname(pub.app_dir)
if [filename for filename in os.listdir(parent_dir) if 'removed' in filename]:
assert False
clean_temporary_pub()