From 90cc6318b9c534078b20cb9c5fffa5495f19b184 Mon Sep 17 00:00:00 2001 From: Emmanuel Cazenave Date: Fri, 1 Mar 2019 14:11:15 +0100 Subject: [PATCH] re-use dimension tables from previous run (#30752) --- tests/conftest.py | 62 +++++++++++------------ tests/olap.model | 17 +++++++ tests/test_wcs.py | 81 +++++++++++++++++++++++++++++ tests/utils.py | 17 +++++++ wcs_olap/feeder.py | 124 ++++++++++++++++++++++++++++++++------------- 5 files changed, 234 insertions(+), 67 deletions(-) create mode 100644 tests/utils.py diff --git a/tests/conftest.py b/tests/conftest.py index 20431cc..2c5246f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- import sys -import subprocess import time import os import shutil @@ -11,9 +10,11 @@ from contextlib import closing from collections import namedtuple import psycopg2 - import pytest +import utils + + Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid']) @@ -92,6 +93,7 @@ formdef.fields = [ fields.ItemField(id='2', label='2nd field', type='item', items=['foo', 'bar', 'baz'], varname='item'), fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'), + fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'), ] formdef.store() @@ -105,12 +107,19 @@ for i in range(50): if i%4 == 0: formdata.data['2'] = 'foo' formdata.data['2_display'] = 'foo' + formdata.data['4'] = 'open_one' + formdata.data['4_display'] = 'open_one' elif i%4 == 1: formdata.data['2'] = 'bar' formdata.data['2_display'] = 'bar' + formdata.data['4'] = 'open_two' + formdata.data['4_display'] = 'open_two' else: formdata.data['2'] = 'baz' formdata.data['2_display'] = 'baz' + formdata.data['4'] = "open'three" + formdata.data['4_display'] = "open'three" + formdata.data['3'] = bool(i % 2) if i%3 == 0: formdata.jump_status('new') @@ -123,58 +132,47 @@ for i in range(50): } -@pytest.fixture(scope='session') -def wcs(tmp_path_factory): +@pytest.fixture +def wcs_dir(tmp_path_factory): + return tmp_path_factory.mktemp('wcs') + + +@pytest.fixture +def wcs(tmp_path_factory, wcs_dir): '''Session scoped wcs fixture, so read-only.''' - if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']): - pytest.skip('WCSCTL not defined in environment') - WCSCTL = os.environ.get('WCSCTL') - WCS_DIR = tmp_path_factory.mktemp('wcs') - HOSTNAME = '127.0.0.1' PORT = 8899 ADDRESS = '0.0.0.0' WCS_PID = None - def run_wcs_script(script, hostname): - '''Run python script inside w.c.s. environment''' - - script_path = WCS_DIR / (script + '.py') - with script_path.open('w') as fd: - fd.write(WCS_SCRIPTS[script]) - - subprocess.check_call( - [WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname, - str(script_path)]) - - tenant_dir = WCS_DIR / HOSTNAME + tenant_dir = wcs_dir / utils.HOSTNAME tenant_dir.mkdir() - run_wcs_script('setup-auth', HOSTNAME) - run_wcs_script('create-user', HOSTNAME) - run_wcs_script('create-data', HOSTNAME) + utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth') + utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user') + utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data') with (tenant_dir / 'site-options.cfg').open('w') as fd: fd.write(u'''[api-secrets] olap = olap ''') - with (WCS_DIR / 'wcs.cfg').open('w') as fd: + with (wcs_dir / 'wcs.cfg').open('w') as fd: fd.write(u'''[main] -app_dir = %s\n''' % WCS_DIR) +app_dir = %s\n''' % wcs_dir) - with (WCS_DIR / 'local_settings.py').open('w') as fd: + with (wcs_dir / 'local_settings.py').open('w') as fd: fd.write(u''' WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg' THEMES_DIRECTORY = '/' ALLOWED_HOSTS = ['%s'] -''' % (WCS_DIR, HOSTNAME)) +''' % (wcs_dir, utils.HOSTNAME)) # launch a Django worker for running w.c.s. WCS_PID = os.fork() if not WCS_PID: - os.chdir(os.path.dirname(WCSCTL)) + os.chdir(os.path.dirname(utils.WCSCTL)) os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings' - os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py') + os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py') os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]) sys.exit(0) @@ -197,9 +195,9 @@ ALLOWED_HOSTS = ['%s'] if pid: assert False, 'w.c.s. stopped with exit-code %s' % exit_code - yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID) + yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID) os.kill(WCS_PID, 9) - shutil.rmtree(str(WCS_DIR)) + shutil.rmtree(str(wcs_dir)) @pytest.fixture diff --git a/tests/olap.model b/tests/olap.model index 8f33ab8..d34f177 100644 --- a/tests/olap.model +++ b/tests/olap.model @@ -264,6 +264,17 @@ "type": "bool", "value": "\"field_bool\"", "value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)" + }, + { + "filter" : true, + "join" : [ + "item_open" + ], + "label" : "4rth field", + "name" : "item_open", + "type" : "integer", + "value" : "\"item_open\".id", + "value_label" : "\"item_open\".label" } ], "fact_table" : "formdata_demande", @@ -329,6 +340,12 @@ "master" : "field_item", "name" : "item", "table" : "formdata_demande_field_item" + }, + { + "detail" : "id", + "master" : "field_item_open", + "name" : "item_open", + "table" : "formdata_demande_field_item_open" } ], "key" : "id", diff --git a/tests/test_wcs.py b/tests/test_wcs.py index d9d2109..b19c6ad 100644 --- a/tests/test_wcs.py +++ b/tests/test_wcs.py @@ -6,6 +6,8 @@ import requests import pathlib2 import mock +import utils + def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): olap_cmd() @@ -55,9 +57,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): ('formdata_demande', 'field_string'), ('formdata_demande', 'field_item'), ('formdata_demande', 'field_bool'), + ('formdata_demande', 'field_item_open'), ('formdata_demande', 'function__receiver'), ('formdata_demande_field_item', 'id'), ('formdata_demande_field_item', 'label'), + ('formdata_demande_field_item_open', 'id'), + ('formdata_demande_field_item_open', 'label'), ('formdef', 'id'), ('formdef', 'category_id'), ('formdef', 'label'), @@ -113,3 +118,79 @@ def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog): with pytest.raises(SystemExit): olap_cmd(no_log_errors=False) assert 'Invalid JSON content' in caplog.text + + +def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog): + + olap_cmd() + + with postgres_db.conn() as conn: + with conn.cursor() as c: + c.execute('SET search_path TO \'olap\'') + c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') + refs = c.fetchall() + assert len(refs) == 3 + c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') + open_refs = c.fetchall() + assert len(open_refs) == 3 + + # Change an item of the field + script = u""" +import datetime +import random +from quixote import get_publisher +from wcs.formdef import FormDef +formdef = FormDef.get_by_urlname('demande') + +for field in formdef.fields: + if field.label == '2nd field': + ref_field = field + break + +ref_field.items = ['foo', 'bar', 'bazouka'] +formdef.store() + +user = get_publisher().user_class.select()[0] + +formdata = formdef.data_class()() +formdata.just_created() +formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple() +formdata.data = {'1': 'FOO BAR 1'} +formdata.data['2'] = 'bazouka' +formdata.data['2_display'] = 'bazouka' +formdata.data['4'] = 'open_new_value' +formdata.data['4_display'] = 'open_new_value' +formdata.jump_status('new') +formdata.store() +""" + utils.run_wcs_script(wcs_dir, script, 'toto') + olap_cmd() + + # We expect to find in the new dimension table + # the same records as before (including the one of the item that disappeared) + # plus the new item + with postgres_db.conn() as conn: + with conn.cursor() as c: + + c.execute('SET search_path TO \'olap\'') + c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') + new_refs = c.fetchall() + assert len(new_refs) == 4 + for ref in refs: + assert ref in new_refs + assert new_refs[-1][1] == 'bazouka' + bazouka_id = new_refs[-1][0] + + c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') + new_open_refs = c.fetchall() + assert len(new_open_refs) == 4 + for ref in open_refs: + assert ref in new_open_refs + assert new_open_refs[-1][1] == 'open_new_value' + open_new_id = new_open_refs[-1][0] + + c.execute('''SELECT field_item, field_item_open + FROM formdata_demande ORDER BY id''') + formdata = c.fetchone() + assert formdata[0] == bazouka_id + assert formdata[1] == open_new_id diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..373eda5 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,17 @@ +import os +import subprocess + + +HOSTNAME = '127.0.0.1' +WCSCTL = os.environ.get('WCSCTL') + + +def run_wcs_script(wcs_dir, script, script_name): + '''Run python script inside w.c.s. environment''' + script_path = wcs_dir / (script_name + '.py') + with script_path.open('w') as fd: + fd.write(script) + + subprocess.check_call( + [WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, + str(script_path)]) diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index cfecf9f..aad7816 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -2,6 +2,7 @@ import six import copy +import itertools import os import json import hashlib @@ -342,20 +343,66 @@ CREATE TABLE public.dates AS (SELECT if comment: self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,)) - def create_labeled_table(self, name, labels, serial=False, comment=None): - if serial: - id_type = 'serial primary key' - else: - id_type = 'smallint primary key' - self.create_table(name, - [ - ['id', id_type], - ['label', 'varchar'] - ], comment=comment) - values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) - if not values: - return - self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) + def prev_table_exists(self, name): + query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables + WHERE table_schema = '{schema}' AND table_name = %s)""" + self.ex(query, vars=(name,)) + return self.cur.fetchone()[0] + + def create_labeled_table_serial(self, name, comment): + self.create_table( + name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment) + + if self.prev_table_exists(name): + # Insert data from previous table + self.ex( + 'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', + ctx={'name': name} + ) + # Update sequence + self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'), + (SELECT MAX(id) FROM {name}))""", ctx={'name': name}) + + def create_labeled_table(self, name, labels, comment=None): + self.create_table( + name, + [ + ['id', 'smallint primary key'], + ['label', 'varchar'] + ], comment=comment) + + if self.prev_table_exists(name): + # Insert data from previous table + self.ex( + 'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', + ctx={'name': name} + ) + # Find what is missing + to_insert = [] + for _id, _label in labels: + self.ex( + 'SELECT * FROM {name} WHERE label = %s', ctx={'name': name}, vars=(_label,)) + if self.cur.fetchone() is None: + to_insert.append(_label) + + labels = None + if to_insert: + self.ex('SELECT MAX(id) FROM {name}', ctx={'name': name}) + next_id = self.cur.fetchone()[0] + 1 + ids = range(next_id, next_id + len(to_insert)) + labels = zip(ids, to_insert) + + if labels: + labels = list(labels) + tmpl = ', '.join(['(%s, %s)'] * len(labels)) + query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl + self.ex(query_str, ctx={'name': name}, vars=list(itertools.chain(*labels))) + + res = {} + self.ex("SELECT id, label FROM %s" % str(name)) + for id_, label in self.cur.fetchall(): + res[label] = id_ + return res def tpl(self, o, ctx=None): ctx = ctx or {} @@ -379,30 +426,31 @@ CREATE TABLE public.dates AS (SELECT def do_base_table(self): # channels - self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels], + self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment=u'canal') # roles roles = dict((i, role.name) for i, role in enumerate(self.roles)) - self.create_labeled_table('{role_table}', roles.items(), comment=u'role') - self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles)) + tmp_role_map = self.create_labeled_table('role', roles.items(), comment=u'role') + self.role_mapping = dict( + (role.id, tmp_role_map[role.name]) for role in self.roles) # categories - self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories), - comment=u'catégorie') - self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories)) + tmp_cat_map = self.create_labeled_table( + 'category', enumerate(c.name for c in self.categories), comment=u'catégorie') + self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories) - self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))), + self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment=u'heures') - self.create_labeled_table('{generic_status_table}', self.status, + self.create_labeled_table('status', self.status, comment=u'statuts simplifiés') self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,' ' category_id integer REFERENCES {category_table} (id),' ' label varchar)') self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',)) # agents - self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents') + self.create_labeled_table_serial('agent', comment=u'agents') self.columns = [ ['id', 'serial primary key'], @@ -476,7 +524,11 @@ CREATE TABLE public.dates AS (SELECT self.connection.close() def insert_agent(self, name): - self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name]) + self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,)) + res = self.cur.fetchone() + if res: + return res[0] + self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,)) return self.cur.fetchone()[0] def get_agent(self, user): @@ -523,11 +575,10 @@ class WcsFormdefFeeder(object): def do_statuses(self): statuses = self.formdef.schema.workflow.statuses - self.olap_feeder.create_labeled_table(self.status_table_name, - enumerate([s.name for s in statuses]), - comment=u'statuts du formulaire « %s »' % - self.formdef.schema.name) - self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses)) + tmp_status_map = self.olap_feeder.create_labeled_table( + self.status_table_name, enumerate([s.name for s in statuses]), + comment=u'statuts du formulaire « %s »' % self.formdef.schema.name) + self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) def do_data_table(self): self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)', @@ -560,13 +611,11 @@ class WcsFormdefFeeder(object): table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) # create table and mapping if field.items: - self.create_labeled_table(table_name, enumerate(field.items), - comment=comment) - self.items_mappings[field.varname] = dict( - (item, i) for i, item in enumerate(field.items)) + self.items_mappings[field.varname] = self.create_labeled_table( + table_name, enumerate(field.items), comment=comment) else: # open item field, from data sources... - self.create_labeled_table(table_name, [], serial=True, comment=comment) + self.create_labeled_table_serial(table_name, comment=comment) field_def = 'smallint REFERENCES %s (id)' % table_name elif field.type == 'bool': field_def = 'boolean' @@ -629,7 +678,12 @@ class WcsFormdefFeeder(object): def insert_item_value(self, field, value): table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) - self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value], + self.ex("SELECT id FROM {item_table} WHERE label = %s", + ctx={'item_table': table_name}, vars=(value,)) + res = self.cur.fetchone() + if res: + return res[0] + self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,), ctx={'item_table': table_name}) return self.cur.fetchone()[0]