203 lines
5.5 KiB
Python
203 lines
5.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import subprocess
|
|
import time
|
|
import os
|
|
import shutil
|
|
import random
|
|
import socket
|
|
from contextlib import closing
|
|
from collections import namedtuple
|
|
|
|
import psycopg2
|
|
|
|
import pytest
|
|
|
|
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
|
|
|
|
|
|
class Database(object):
|
|
def __init__(self):
|
|
self.db_name = 'db%s' % random.getrandbits(20)
|
|
self.dsn = 'dbname=%s' % self.db_name
|
|
with closing(psycopg2.connect('')) as conn:
|
|
conn.set_isolation_level(0)
|
|
with conn.cursor() as cursor:
|
|
cursor.execute('CREATE DATABASE %s' % self.db_name)
|
|
|
|
def conn(self):
|
|
return closing(psycopg2.connect(self.dsn))
|
|
|
|
def delete(self):
|
|
with closing(psycopg2.connect('')) as conn:
|
|
conn.set_isolation_level(0)
|
|
with conn.cursor() as cursor:
|
|
cursor.execute('DROP DATABASE IF EXISTS %s' % self.db_name)
|
|
|
|
def __repr__(self):
|
|
return '<Postgres Database %r>' % self.db_name
|
|
|
|
|
|
@pytest.fixture
|
|
def postgres_db():
|
|
db = Database()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.delete()
|
|
|
|
|
|
WCS_SCRIPTS = {
|
|
'setup-auth': u"""
|
|
from quixote import get_publisher
|
|
|
|
get_publisher().cfg['identification'] = {'methods': ['password']}
|
|
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
|
|
get_publisher().write_cfg()
|
|
""",
|
|
'create-user': u"""
|
|
from quixote import get_publisher
|
|
from qommon.ident.password_accounts import PasswordAccount
|
|
|
|
user = get_publisher().user_class()
|
|
user.name = 'foo bar'
|
|
user.email = 'foo@example.net'
|
|
user.store()
|
|
account = PasswordAccount(id='user')
|
|
account.set_password('user')
|
|
account.user_id = user.id
|
|
account.store()
|
|
""",
|
|
'create-data': u"""
|
|
import datetime
|
|
import random
|
|
from quixote import get_publisher
|
|
|
|
from wcs.categories import Category
|
|
from wcs.formdef import FormDef
|
|
from wcs.roles import Role
|
|
from wcs import fields
|
|
|
|
cat = Category()
|
|
cat.name = 'Catégorie'
|
|
cat.description = ''
|
|
cat.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'Demande'
|
|
formdef.category_id = cat.id
|
|
formdef.fields = [
|
|
fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='field_string'),
|
|
fields.ItemField(id='2', label='2nd field', type='item',
|
|
items=['foo', 'bar', 'baz'], varname='field_item'),
|
|
fields.BoolField(id='3', label='3rd field', type='bool', varname='field_bool'),
|
|
]
|
|
formdef.store()
|
|
|
|
user = get_publisher().user_class.select()[0]
|
|
|
|
for i in range(50):
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
|
|
formdata.data = {'1': 'FOO BAR %d' % i}
|
|
if i%4 == 0:
|
|
formdata.data['2'] = 'foo'
|
|
formdata.data['2_display'] = 'foo'
|
|
elif i%4 == 1:
|
|
formdata.data['2'] = 'bar'
|
|
formdata.data['2_display'] = 'bar'
|
|
else:
|
|
formdata.data['2'] = 'baz'
|
|
formdata.data['2_display'] = 'baz'
|
|
formdata.data['3'] = bool(i % 2)
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
if i%7 == 0:
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
""",
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def wcs(tmp_path_factory):
|
|
'''Session scoped wcs fixture, so read-only.'''
|
|
if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']):
|
|
pytest.skip('WCSCTL not defined in environment')
|
|
WCSCTL = os.environ.get('WCSCTL')
|
|
WCS_DIR = tmp_path_factory.mktemp('wcs')
|
|
HOSTNAME = '127.0.0.1'
|
|
PORT = 8899
|
|
ADDRESS = '0.0.0.0'
|
|
WCS_PID = None
|
|
|
|
def run_wcs_script(script, hostname):
|
|
'''Run python script inside w.c.s. environment'''
|
|
|
|
script_path = WCS_DIR / (script + '.py')
|
|
with script_path.open('w') as fd:
|
|
fd.write(WCS_SCRIPTS[script])
|
|
|
|
subprocess.check_call(
|
|
[WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
|
|
str(script_path)])
|
|
|
|
tenant_dir = WCS_DIR / HOSTNAME
|
|
tenant_dir.mkdir()
|
|
|
|
run_wcs_script('setup-auth', HOSTNAME)
|
|
run_wcs_script('create-user', HOSTNAME)
|
|
run_wcs_script('create-data', HOSTNAME)
|
|
|
|
with (tenant_dir / 'site-options.cfg').open('w') as fd:
|
|
fd.write(u'''[api-secrets]
|
|
olap = olap
|
|
''')
|
|
|
|
with (WCS_DIR / 'wcs.cfg').open('w') as fd:
|
|
fd.write(u'''[main]
|
|
app_dir = %s\n''' % WCS_DIR)
|
|
|
|
with (WCS_DIR / 'local_settings.py').open('w') as fd:
|
|
fd.write(u'''
|
|
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
|
|
THEMES_DIRECTORY = '/'
|
|
ALLOWED_HOSTS = ['%s']
|
|
''' % (WCS_DIR, HOSTNAME))
|
|
|
|
# launch a Django worker for running w.c.s.
|
|
WCS_PID = os.fork()
|
|
if not WCS_PID:
|
|
os.chdir(os.path.dirname(WCSCTL))
|
|
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
|
|
os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
|
|
os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
|
|
sys.exit(0)
|
|
|
|
# verify w.c.s. is launched
|
|
s = socket.socket()
|
|
i = 0
|
|
while True:
|
|
i += 1
|
|
try:
|
|
s.connect((ADDRESS, PORT))
|
|
except Exception:
|
|
time.sleep(0.1)
|
|
else:
|
|
s.close()
|
|
break
|
|
assert i < 50, 'no connection found after 5 seconds'
|
|
|
|
# verify w.c.s. is still running
|
|
pid, exit_code = os.waitpid(WCS_PID, os.WNOHANG)
|
|
if pid:
|
|
assert False, 'w.c.s. stopped with exit-code %s' % exit_code
|
|
|
|
yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
|
|
os.kill(WCS_PID, 9)
|
|
shutil.rmtree(str(WCS_DIR))
|