wcs-olap/tests/test_wcs.py

279 lines
9.8 KiB
Python

# wcs_olap
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import pytest
import pathlib
import requests
import httmock
import utils
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
# create temp schema remnant to see if it's cleaned
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('CREATE SCHEMA olap_temp')
c.execute('CREATE TABLE olap_temp.coin()')
c.execute('COMMIT')
olap_cmd()
expected_schema = [
('agent', 'id'),
('agent', 'label'),
('category', 'id'),
('category', 'ref'),
('category', 'label'),
('channel', 'id'),
('channel', 'label'),
('evolution', 'id'),
('evolution', 'generic_status_id'),
('evolution', 'formdata_id'),
('evolution', 'time'),
('evolution', 'date'),
('evolution', 'hour_id'),
('evolution_demande', 'id'),
('evolution_demande', 'status_id'),
('evolution_demande', 'formdata_id'),
('evolution_demande', 'time'),
('evolution_demande', 'date'),
('evolution_demande', 'hour_id'),
('formdata', 'id'),
('formdata', 'formdef_id'),
('formdata', 'receipt_time'),
('formdata', 'hour_id'),
('formdata', 'channel_id'),
('formdata', 'backoffice'),
('formdata', 'generic_status_id'),
('formdata', 'endpoint_delay'),
('formdata', 'first_agent_id'),
('formdata', 'geolocation_base'),
('formdata', 'json_data'),
('formdata_demande', 'id'),
('formdata_demande', 'formdef_id'),
('formdata_demande', 'receipt_time'),
('formdata_demande', 'hour_id'),
('formdata_demande', 'channel_id'),
('formdata_demande', 'backoffice'),
('formdata_demande', 'generic_status_id'),
('formdata_demande', 'endpoint_delay'),
('formdata_demande', 'first_agent_id'),
('formdata_demande', 'geolocation_base'),
('formdata_demande', 'json_data'),
('formdata_demande', 'status_id'),
('formdata_demande', 'field_string'),
('formdata_demande', 'field_item'),
('formdata_demande', 'field_bool'),
('formdata_demande', 'field_itemOpen'),
('formdata_demande', 'field_stringCaseSensitive-\xe9'),
('formdata_demande', 'field_integer'),
('formdata_demande', 'function__receiver'),
('formdata_demande_field_item', 'id'),
('formdata_demande_field_item', 'label'),
('formdata_demande_field_itemOpen', 'id'),
('formdata_demande_field_itemOpen', 'label'),
('formdef', 'id'),
('formdef', 'ref'),
('formdef', 'category_id'),
('formdef', 'label'),
('hour', 'id'),
('hour', 'label'),
('role', 'id'),
('role', 'label'),
('status', 'id'),
('status', 'label'),
('status_demande', 'id'),
('status_demande', 'label')
]
# verify SQL schema
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SELECT table_name, column_name '
'FROM information_schema.columns '
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position')
assert list(c.fetchall()) == expected_schema
# verify dates table
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SELECT MIN(date) FROM public.dates')
assert c.fetchone()[0] == datetime.date(2010, 1, 1)
# delete some dates
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute("DELETE FROM public.dates WHERE date < '2011-01-01' OR date > '2017-01-01'")
c.execute('COMMIT')
c.execute('SELECT MIN(date), MAX(date) FROM public.dates')
assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), datetime.date(2017, 1, 1))
# recreate missing dates
olap_cmd()
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SELECT MIN(date), MAX(date) FROM public.dates')
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date)
c.execute('''SELECT COUNT(*) FROM
GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date)
FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''',
vars=[datetime.date(2011, 1, 1), last_date])
assert c.fetchone()[0] == 0
# verify JSON schema
with (olap_cmd.model_dir / 'olap.model').open() as fd, \
(pathlib.Path(__file__).parent / 'olap.model').open() as fd2:
json_schema = json.load(fd)
expected_json_schema = json.load(fd2)
expected_json_schema['pg_dsn'] = postgres_db.dsn
assert json_schema == expected_json_schema
def test_requests_exception(wcs, postgres_db, tmpdir, olap_cmd, caplog):
@httmock.urlmatch()
def requests_raise(url, request):
raise requests.RequestException('wat!')
with httmock.HTTMock(requests_raise):
with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False)
assert 'wat!' in caplog.text
def test_requests_not_ok(wcs, postgres_db, tmpdir, olap_cmd, caplog):
@httmock.urlmatch()
def return_401(url, request):
return {'status_code': 401, 'content': {"err": 1, "err_desc": "invalid signature"}}
with httmock.HTTMock(return_401):
with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False)
assert 'invalid signature' in caplog.text
def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog):
@httmock.urlmatch()
def return_invalid_json(url, request):
return 'x'
with httmock.HTTMock(return_invalid_json):
with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False)
assert 'Invalid JSON content' in caplog.text
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog):
olap_cmd()
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SET search_path TO \'olap\'')
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
refs = c.fetchall()
assert len(refs) == 3
c.execute('SELECT * FROM "formdata_demande_field_itemOpen" ORDER BY id')
open_refs = c.fetchall()
assert len(open_refs) == 3
# Change an item of the field
script = u"""
import datetime
import random
from quixote import get_publisher
from wcs.formdef import FormDef
formdef = FormDef.get_by_urlname('demande')
for field in formdef.fields:
if field.label == '2nd field':
ref_field = field
break
ref_field.items = ['foo', 'bar', 'bazouka']
formdef.store()
user = get_publisher().user_class.select()[0]
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
formdata.data = {'1': 'FOO BAR 1'}
formdata.data['2'] = 'bazouka'
formdata.data['2_display'] = 'bazouka'
formdata.data['4'] = 'open_new_value'
formdata.data['4_display'] = 'open_new_value'
formdata.jump_status('new')
formdata.store()
"""
utils.run_wcs_script(wcs_dir, script, 'toto')
olap_cmd()
# We expect to find in the new dimension table
# the same records as before (including the one of the item that disappeared)
# plus the new item
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SET search_path TO \'olap\'')
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
new_refs = c.fetchall()
assert len(new_refs) == 4
for ref in refs:
assert ref in new_refs
assert new_refs[-1][1] == 'bazouka'
bazouka_id = new_refs[-1][0]
c.execute('SELECT * FROM "formdata_demande_field_itemOpen" ORDER BY id')
new_open_refs = c.fetchall()
assert len(new_open_refs) == 4
for ref in open_refs:
assert ref in new_open_refs
assert new_open_refs[-1][1] == 'open_new_value'
open_new_id = new_open_refs[-1][0]
c.execute('''SELECT field_item, "field_itemOpen"
FROM formdata_demande ORDER BY id''')
formdata = c.fetchone()
assert formdata[0] == bazouka_id
assert formdata[1] == open_new_id
def test_create_reference_column(postgres_db, olap_cmd):
olap_cmd()
conn = postgres_db.conn()
with postgres_db.conn() as conn:
with conn.cursor() as c:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
c.execute('ALTER TABLE olap.category DROP COLUMN ref')
c.execute('ALTER TABLE olap.formdef DROP COLUMN ref')
olap_cmd()
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SELECT * FROM olap.category')
for line in c.fetchall():
assert len(line) == 3