re-use dimension tables from previous run (#30752)

This commit is contained in:
Emmanuel Cazenave 2019-03-01 14:11:15 +01:00
parent f53b4bf9d0
commit 90cc6318b9
5 changed files with 234 additions and 67 deletions

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import sys
import subprocess
import time
import os
import shutil
@ -11,9 +10,11 @@ from contextlib import closing
from collections import namedtuple
import psycopg2
import pytest
import utils
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
@ -92,6 +93,7 @@ formdef.fields = [
fields.ItemField(id='2', label='2nd field', type='item',
items=['foo', 'bar', 'baz'], varname='item'),
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'),
fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'),
]
formdef.store()
@ -105,12 +107,19 @@ for i in range(50):
if i%4 == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
formdata.data['4'] = 'open_one'
formdata.data['4_display'] = 'open_one'
elif i%4 == 1:
formdata.data['2'] = 'bar'
formdata.data['2_display'] = 'bar'
formdata.data['4'] = 'open_two'
formdata.data['4_display'] = 'open_two'
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
formdata.data['4'] = "open'three"
formdata.data['4_display'] = "open'three"
formdata.data['3'] = bool(i % 2)
if i%3 == 0:
formdata.jump_status('new')
@ -123,58 +132,47 @@ for i in range(50):
}
@pytest.fixture(scope='session')
def wcs(tmp_path_factory):
@pytest.fixture
def wcs_dir(tmp_path_factory):
return tmp_path_factory.mktemp('wcs')
@pytest.fixture
def wcs(tmp_path_factory, wcs_dir):
'''Session scoped wcs fixture, so read-only.'''
if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']):
pytest.skip('WCSCTL not defined in environment')
WCSCTL = os.environ.get('WCSCTL')
WCS_DIR = tmp_path_factory.mktemp('wcs')
HOSTNAME = '127.0.0.1'
PORT = 8899
ADDRESS = '0.0.0.0'
WCS_PID = None
def run_wcs_script(script, hostname):
'''Run python script inside w.c.s. environment'''
script_path = WCS_DIR / (script + '.py')
with script_path.open('w') as fd:
fd.write(WCS_SCRIPTS[script])
subprocess.check_call(
[WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
str(script_path)])
tenant_dir = WCS_DIR / HOSTNAME
tenant_dir = wcs_dir / utils.HOSTNAME
tenant_dir.mkdir()
run_wcs_script('setup-auth', HOSTNAME)
run_wcs_script('create-user', HOSTNAME)
run_wcs_script('create-data', HOSTNAME)
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
with (tenant_dir / 'site-options.cfg').open('w') as fd:
fd.write(u'''[api-secrets]
olap = olap
''')
with (WCS_DIR / 'wcs.cfg').open('w') as fd:
with (wcs_dir / 'wcs.cfg').open('w') as fd:
fd.write(u'''[main]
app_dir = %s\n''' % WCS_DIR)
app_dir = %s\n''' % wcs_dir)
with (WCS_DIR / 'local_settings.py').open('w') as fd:
with (wcs_dir / 'local_settings.py').open('w') as fd:
fd.write(u'''
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
THEMES_DIRECTORY = '/'
ALLOWED_HOSTS = ['%s']
''' % (WCS_DIR, HOSTNAME))
''' % (wcs_dir, utils.HOSTNAME))
# launch a Django worker for running w.c.s.
WCS_PID = os.fork()
if not WCS_PID:
os.chdir(os.path.dirname(WCSCTL))
os.chdir(os.path.dirname(utils.WCSCTL))
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
sys.exit(0)
@ -197,9 +195,9 @@ ALLOWED_HOSTS = ['%s']
if pid:
assert False, 'w.c.s. stopped with exit-code %s' % exit_code
yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
os.kill(WCS_PID, 9)
shutil.rmtree(str(WCS_DIR))
shutil.rmtree(str(wcs_dir))
@pytest.fixture

View File

@ -264,6 +264,17 @@
"type": "bool",
"value": "\"field_bool\"",
"value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)"
},
{
"filter" : true,
"join" : [
"item_open"
],
"label" : "4rth field",
"name" : "item_open",
"type" : "integer",
"value" : "\"item_open\".id",
"value_label" : "\"item_open\".label"
}
],
"fact_table" : "formdata_demande",
@ -329,6 +340,12 @@
"master" : "field_item",
"name" : "item",
"table" : "formdata_demande_field_item"
},
{
"detail" : "id",
"master" : "field_item_open",
"name" : "item_open",
"table" : "formdata_demande_field_item_open"
}
],
"key" : "id",

View File

@ -6,6 +6,8 @@ import requests
import pathlib2
import mock
import utils
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
olap_cmd()
@ -55,9 +57,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
('formdata_demande', 'field_string'),
('formdata_demande', 'field_item'),
('formdata_demande', 'field_bool'),
('formdata_demande', 'field_item_open'),
('formdata_demande', 'function__receiver'),
('formdata_demande_field_item', 'id'),
('formdata_demande_field_item', 'label'),
('formdata_demande_field_item_open', 'id'),
('formdata_demande_field_item_open', 'label'),
('formdef', 'id'),
('formdef', 'category_id'),
('formdef', 'label'),
@ -113,3 +118,79 @@ def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False)
assert 'Invalid JSON content' in caplog.text
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog):
olap_cmd()
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SET search_path TO \'olap\'')
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
refs = c.fetchall()
assert len(refs) == 3
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
open_refs = c.fetchall()
assert len(open_refs) == 3
# Change an item of the field
script = u"""
import datetime
import random
from quixote import get_publisher
from wcs.formdef import FormDef
formdef = FormDef.get_by_urlname('demande')
for field in formdef.fields:
if field.label == '2nd field':
ref_field = field
break
ref_field.items = ['foo', 'bar', 'bazouka']
formdef.store()
user = get_publisher().user_class.select()[0]
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
formdata.data = {'1': 'FOO BAR 1'}
formdata.data['2'] = 'bazouka'
formdata.data['2_display'] = 'bazouka'
formdata.data['4'] = 'open_new_value'
formdata.data['4_display'] = 'open_new_value'
formdata.jump_status('new')
formdata.store()
"""
utils.run_wcs_script(wcs_dir, script, 'toto')
olap_cmd()
# We expect to find in the new dimension table
# the same records as before (including the one of the item that disappeared)
# plus the new item
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SET search_path TO \'olap\'')
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
new_refs = c.fetchall()
assert len(new_refs) == 4
for ref in refs:
assert ref in new_refs
assert new_refs[-1][1] == 'bazouka'
bazouka_id = new_refs[-1][0]
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
new_open_refs = c.fetchall()
assert len(new_open_refs) == 4
for ref in open_refs:
assert ref in new_open_refs
assert new_open_refs[-1][1] == 'open_new_value'
open_new_id = new_open_refs[-1][0]
c.execute('''SELECT field_item, field_item_open
FROM formdata_demande ORDER BY id''')
formdata = c.fetchone()
assert formdata[0] == bazouka_id
assert formdata[1] == open_new_id

17
tests/utils.py Normal file
View File

@ -0,0 +1,17 @@
import os
import subprocess
HOSTNAME = '127.0.0.1'
WCSCTL = os.environ.get('WCSCTL')
def run_wcs_script(wcs_dir, script, script_name):
'''Run python script inside w.c.s. environment'''
script_path = wcs_dir / (script_name + '.py')
with script_path.open('w') as fd:
fd.write(script)
subprocess.check_call(
[WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME,
str(script_path)])

View File

@ -2,6 +2,7 @@
import six
import copy
import itertools
import os
import json
import hashlib
@ -342,20 +343,66 @@ CREATE TABLE public.dates AS (SELECT
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
def create_labeled_table(self, name, labels, serial=False, comment=None):
if serial:
id_type = 'serial primary key'
else:
id_type = 'smallint primary key'
self.create_table(name,
[
['id', id_type],
['label', 'varchar']
], comment=comment)
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
if not values:
return
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
def prev_table_exists(self, name):
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
WHERE table_schema = '{schema}' AND table_name = %s)"""
self.ex(query, vars=(name,))
return self.cur.fetchone()[0]
def create_labeled_table_serial(self, name, comment):
self.create_table(
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
ctx={'name': name}
)
# Update sequence
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT MAX(id) FROM {name}))""", ctx={'name': name})
def create_labeled_table(self, name, labels, comment=None):
self.create_table(
name,
[
['id', 'smallint primary key'],
['label', 'varchar']
], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
ctx={'name': name}
)
# Find what is missing
to_insert = []
for _id, _label in labels:
self.ex(
'SELECT * FROM {name} WHERE label = %s', ctx={'name': name}, vars=(_label,))
if self.cur.fetchone() is None:
to_insert.append(_label)
labels = None
if to_insert:
self.ex('SELECT MAX(id) FROM {name}', ctx={'name': name})
next_id = self.cur.fetchone()[0] + 1
ids = range(next_id, next_id + len(to_insert))
labels = zip(ids, to_insert)
if labels:
labels = list(labels)
tmpl = ', '.join(['(%s, %s)'] * len(labels))
query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl
self.ex(query_str, ctx={'name': name}, vars=list(itertools.chain(*labels)))
res = {}
self.ex("SELECT id, label FROM %s" % str(name))
for id_, label in self.cur.fetchall():
res[label] = id_
return res
def tpl(self, o, ctx=None):
ctx = ctx or {}
@ -379,30 +426,31 @@ CREATE TABLE public.dates AS (SELECT
def do_base_table(self):
# channels
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels],
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
comment=u'canal')
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
self.create_labeled_table('{role_table}', roles.items(), comment=u'role')
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
tmp_role_map = self.create_labeled_table('role', roles.items(), comment=u'role')
self.role_mapping = dict(
(role.id, tmp_role_map[role.name]) for role in self.roles)
# categories
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
comment=u'catégorie')
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
tmp_cat_map = self.create_labeled_table(
'category', enumerate(c.name for c in self.categories), comment=u'catégorie')
self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))),
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
comment=u'heures')
self.create_labeled_table('{generic_status_table}', self.status,
self.create_labeled_table('status', self.status,
comment=u'statuts simplifiés')
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
' category_id integer REFERENCES {category_table} (id),'
' label varchar)')
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',))
# agents
self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents')
self.create_labeled_table_serial('agent', comment=u'agents')
self.columns = [
['id', 'serial primary key'],
@ -476,7 +524,11 @@ CREATE TABLE public.dates AS (SELECT
self.connection.close()
def insert_agent(self, name):
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name])
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,))
return self.cur.fetchone()[0]
def get_agent(self, user):
@ -523,11 +575,10 @@ class WcsFormdefFeeder(object):
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
self.olap_feeder.create_labeled_table(self.status_table_name,
enumerate([s.name for s in statuses]),
comment=u'statuts du formulaire « %s »' %
self.formdef.schema.name)
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
tmp_status_map = self.olap_feeder.create_labeled_table(
self.status_table_name, enumerate([s.name for s in statuses]),
comment=u'statuts du formulaire « %s »' % self.formdef.schema.name)
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
def do_data_table(self):
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
@ -560,13 +611,11 @@ class WcsFormdefFeeder(object):
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
# create table and mapping
if field.items:
self.create_labeled_table(table_name, enumerate(field.items),
comment=comment)
self.items_mappings[field.varname] = dict(
(item, i) for i, item in enumerate(field.items))
self.items_mappings[field.varname] = self.create_labeled_table(
table_name, enumerate(field.items), comment=comment)
else:
# open item field, from data sources...
self.create_labeled_table(table_name, [], serial=True, comment=comment)
self.create_labeled_table_serial(table_name, comment=comment)
field_def = 'smallint REFERENCES %s (id)' % table_name
elif field.type == 'bool':
field_def = 'boolean'
@ -629,7 +678,12 @@ class WcsFormdefFeeder(object):
def insert_item_value(self, field, value):
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value],
self.ex("SELECT id FROM {item_table} WHERE label = %s",
ctx={'item_table': table_name}, vars=(value,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
ctx={'item_table': table_name})
return self.cur.fetchone()[0]