wcs-olap/tests/conftest.py

358 lines
10 KiB
Python

import configparser
import os
import random
import shutil
import socket
import sys
import time
from collections import namedtuple
from contextlib import ExitStack, closing, contextmanager
from unittest import mock
import psycopg2
import pytest
import utils
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
class Database:
def __init__(self):
self.db_name = 'db%s' % random.getrandbits(20)
self.dsn = 'dbname=%s' % self.db_name
with closing(psycopg2.connect('')) as conn:
conn.set_isolation_level(0)
with conn.cursor() as cursor:
cursor.execute('CREATE DATABASE %s' % self.db_name)
def conn(self):
return closing(psycopg2.connect(self.dsn))
def delete(self):
with closing(psycopg2.connect('')) as conn:
conn.set_isolation_level(0)
with conn.cursor() as cursor:
cursor.execute('DROP DATABASE IF EXISTS %s' % self.db_name)
def __repr__(self):
return '<Postgres Database %r>' % self.db_name
@pytest.fixture
def postgres_db():
db = Database()
try:
yield db
finally:
db.delete()
@pytest.fixture
def wcs_db():
db = Database()
try:
yield db
finally:
db.delete()
WCS_SCRIPTS = {
'setup-auth': """
from quixote import get_publisher
get_publisher().cfg['identification'] = {'methods': ['password']}
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
get_publisher().write_cfg()
""",
'setup-storage': """
from quixote import get_publisher
get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r}
get_publisher().write_cfg()
get_publisher().initialize_sql()
""",
'create-user': """
from quixote import get_publisher
from qommon.ident.password_accounts import PasswordAccount
user = get_publisher().user_class()
user.name = 'foo bar'
user.email = 'foo@example.net'
user.store()
account = PasswordAccount(id='user')
account.set_password('user')
account.user_id = user.id
account.store()
""",
'create-data': """
import datetime
import random
from quixote import get_publisher
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.roles import Role
from wcs import fields
cat = Category()
cat.name = 'Catégorie'
cat.description = ''
cat.store()
formdef = FormDef()
formdef.name = 'Empty'
formdef.category_id = cat.id
formdef.fields = []
formdef.store()
formdef = FormDef()
formdef.name = 'Demande'
formdef.category_id = cat.id
formdef.fields = [
fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='string', display_locations=['statistics']),
fields.ItemField(id='2', label='2nd field', type='item',
items=['foo', 'bar', 'baz'], varname='item',
display_locations=['statistics']),
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool', display_locations=['statistics']),
fields.ItemField(id='4', label='4rth field', type='item', varname='itemOpen', display_locations=['statistics']),
fields.ItemField(id='5', label='5th field', type='item', anonymise=False, varname='itemCaseSensitive-é', display_locations=['statistics']),
fields.BoolField(id='6', label='6th field bad duplicate', type='bool', varname='duplicate', display_locations=['statistics']),
fields.ItemField(id='7', label='7th field bad duplicate', type='item', anonymise=False, varname='duplicate', display_locations=['statistics']),
fields.StringField(id='8', label='8th field integer', type='string', anonymise=False, varname='integer',
validation={'type': 'digits'}, display_locations=['statistics']),
fields.ItemField(id='9', label='9th field good duplicate', type='item', anonymise=False,
required=False, varname='good_duplicate', display_locations=['statistics'], items=['a', 'b']),
fields.ItemField(id='10', label='10th field good duplicate', type='item', anonymise=False,
required=False, varname='good_duplicate', display_locations=['statistics'], items=['a', 'b']),
fields.ItemField(id='11', label='11th field third bad duplicate', type='item',
anonymise=False, varname='duplicate', display_locations=['statistics']),
fields.BoolField(id='12', label='12th field', type='bool', varname='bool-other-no-stats', display_locations=[]),
]
formdef.store()
user = get_publisher().user_class.select()[0]
agent = get_publisher().user_class()
agent.name = 'agent1'
agent.email = 'agent@example.net'
agent.store()
for i in range(50):
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
formdata.data = {'1': 'FOO BAR é %d' % i}
if i%4 == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
formdata.data['4'] = 'open_one'
formdata.data['4_display'] = 'open_one'
formdata.data['9'] = 'a'
formdata.data['9_display'] = 'a'
elif i%4 == 1:
formdata.data['2'] = 'bar'
formdata.data['2_display'] = 'bar'
formdata.data['4'] = 'open_two'
formdata.data['4_display'] = 'open_two'
formdata.data['10'] = 'b'
formdata.data['10_display'] = 'b'
formdata.data['8'] = str(i)
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
formdata.data['4'] = "open'three"
formdata.data['4_display'] = "open'three"
formdata.data['9'] = 'a'
formdata.data['9_display'] = 'a'
formdata.data['10'] = 'b'
formdata.data['10_display'] = 'b'
formdata.data['8'] = '11111111111111111111111'
formdata.data['3'] = bool(i % 2)
formdata.data['12'] = bool(i % 2)
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
if i%7 == 0:
formdata.user_id = user.id
formdata.evolution[-1].who = agent.id
formdata.store()
""",
}
@pytest.fixture
def wcs_dir(tmp_path_factory):
return tmp_path_factory.mktemp('wcs')
@pytest.fixture
def wcs(tmp_path_factory, wcs_dir, wcs_db):
'''Session scoped wcs fixture, so read-only.'''
PORT = 8899
ADDRESS = '0.0.0.0'
WCS_PID = None
tenant_dir = wcs_dir / utils.HOSTNAME
tenant_dir.mkdir()
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
utils.run_wcs_script(
wcs_dir,
WCS_SCRIPTS['setup-storage']
% {
'dbname': wcs_db.db_name,
'port': os.environ.get('PGPORT'),
'host': os.environ.get('PGHOST'),
'user': os.environ.get('PGUSER'),
'password': os.environ.get('PGPASSWORD'),
},
'setup-storage',
)
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
with (tenant_dir / 'site-options.cfg').open('w') as fd:
fd.write(
'''[api-secrets]
olap = olap
'''
)
with (wcs_dir / 'wcs.cfg').open('w') as fd:
fd.write(
'''[main]
app_dir = %s\n'''
% (str(wcs_dir).replace('%', '%%'))
)
with (wcs_dir / 'local_settings.py').open('w') as fd:
fd.write(
'''
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
THEMES_DIRECTORY = '/'
ALLOWED_HOSTS = ['%s']
'''
% (wcs_dir, utils.HOSTNAME)
)
# launch a Django worker for running w.c.s.
WCS_PID = os.fork()
if not WCS_PID:
os.chdir(os.path.dirname(utils.WCS_MANAGE))
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
os.execvp(
'python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]
)
sys.exit(0)
# verify w.c.s. is launched
s = socket.socket()
i = 0
while True:
i += 1
try:
s.connect((ADDRESS, PORT))
except Exception:
time.sleep(0.1)
else:
s.close()
break
assert i < 50, 'no connection found after 5 seconds'
# verify w.c.s. is still running
pid, exit_code = os.waitpid(WCS_PID, os.WNOHANG)
if pid:
assert False, 'w.c.s. stopped with exit-code %s' % exit_code
yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
os.kill(WCS_PID, 9)
shutil.rmtree(str(wcs_dir))
@contextmanager
def config_manager(filename):
config = configparser.ConfigParser()
config.read([filename])
yield config
with open(filename, 'w') as fd:
config.write(fd)
@pytest.fixture
def olap_cmd(wcs, tmpdir, postgres_db):
config_ini = tmpdir / 'config.ini'
model_dir = tmpdir / 'model_dir'
model_dir.mkdir()
with config_ini.open('w') as fd:
fd.write(
'''
[wcs-olap]
cubes_model_dirs = {model_dir}
pg_dsn = {dsn}
[{wcs.url}]
orig = olap
key = olap
schema = olap
cubes_slug = olap-slug
'''.format(
wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn
)
)
import sys
from wcs_olap import cmd
def f(no_log_errors=True):
old_argv = sys.argv
try:
sys.argv = ['', str(config_ini)]
if no_log_errors:
sys.argv.insert(1, '--no-log-errors')
cmd.main2()
return 0
except SystemExit as e:
return e.code
finally:
sys.argv = old_argv
def config():
return config_manager(str(config_ini))
f.model_dir = model_dir
f.config = config
return f
@pytest.fixture
def mock_cursor_execute():
@contextmanager
def do(**execute_mock_kwargs):
import psycopg2
with ExitStack() as stack:
old_connect = psycopg2.connect
def connect(*args, **kwargs):
conn = old_connect(*args, **kwargs)
mocked_conn = mock.Mock(wraps=conn)
old_cursor = conn.cursor
def cursor(*args, **kwargs):
cur = old_cursor(*args, **kwargs)
mocked_cur = mock.Mock(wraps=cur)
mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs)
return mocked_cur
mocked_conn.cursor = cursor
return mocked_conn
stack.enter_context(mock.patch.object(psycopg2, 'connect', connect))
yield None
yield do