feeder: preseve categories and form names order (#36930)
This commit is contained in:
parent
3afc79b42e
commit
d11d196fa8
|
@ -29,8 +29,8 @@
|
|||
"label" : "catégorie",
|
||||
"name" : "category",
|
||||
"order_by" : "category.label",
|
||||
"type" : "integer",
|
||||
"value" : "category.id",
|
||||
"type" : "string",
|
||||
"value" : "category.ref",
|
||||
"value_label" : "category.label"
|
||||
},
|
||||
{
|
||||
|
@ -40,8 +40,8 @@
|
|||
"label": "formulaire",
|
||||
"name": "formdef",
|
||||
"order_by": "formdef.label",
|
||||
"type": "integer",
|
||||
"value": "formdef.id",
|
||||
"type": "string",
|
||||
"value": "formdef.ref",
|
||||
"value_label": "formdef.label"
|
||||
},
|
||||
{
|
||||
|
|
|
@ -7,6 +7,8 @@ import httmock
|
|||
|
||||
import utils
|
||||
|
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||
|
||||
|
||||
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
||||
# create temp schema remnant to see if it's cleaned
|
||||
|
@ -22,6 +24,7 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
('agent', 'id'),
|
||||
('agent', 'label'),
|
||||
('category', 'id'),
|
||||
('category', 'ref'),
|
||||
('category', 'label'),
|
||||
('channel', 'id'),
|
||||
('channel', 'label'),
|
||||
|
@ -71,6 +74,7 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
('formdata_demande_field_itemOpen', 'id'),
|
||||
('formdata_demande_field_itemOpen', 'label'),
|
||||
('formdef', 'id'),
|
||||
('formdef', 'ref'),
|
||||
('formdef', 'category_id'),
|
||||
('formdef', 'label'),
|
||||
('hour', 'id'),
|
||||
|
@ -122,7 +126,6 @@ def test_requests_not_ok(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
olap_cmd(no_log_errors=False)
|
||||
assert 'invalid signature' in caplog.text
|
||||
|
||||
|
||||
def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
||||
@httmock.urlmatch()
|
||||
def return_invalid_json(url, request):
|
||||
|
@ -208,3 +211,22 @@ formdata.store()
|
|||
formdata = c.fetchone()
|
||||
assert formdata[0] == bazouka_id
|
||||
assert formdata[1] == open_new_id
|
||||
|
||||
|
||||
def test_create_reference_column(postgres_db, olap_cmd):
|
||||
|
||||
olap_cmd()
|
||||
conn = postgres_db.conn()
|
||||
with postgres_db.conn() as conn:
|
||||
with conn.cursor() as c:
|
||||
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
c.execute('ALTER TABLE olap.category DROP COLUMN ref')
|
||||
c.execute('ALTER TABLE olap.formdef DROP COLUMN ref')
|
||||
|
||||
olap_cmd()
|
||||
|
||||
with postgres_db.conn() as conn:
|
||||
with conn.cursor() as c:
|
||||
c.execute('SELECT * FROM olap.category')
|
||||
for line in c.fetchall():
|
||||
assert len(line) == 3
|
||||
|
|
|
@ -178,8 +178,8 @@ class WcsOlapFeeder(object):
|
|||
'name': 'category',
|
||||
'label': 'catégorie',
|
||||
'join': ['formdef', 'category'],
|
||||
'type': 'integer',
|
||||
'value': 'category.id',
|
||||
'type': 'string',
|
||||
'value': 'category.ref',
|
||||
'value_label': 'category.label',
|
||||
'order_by': 'category.label',
|
||||
},
|
||||
|
@ -187,8 +187,8 @@ class WcsOlapFeeder(object):
|
|||
'name': 'formdef',
|
||||
'label': 'formulaire',
|
||||
'join': ['formdef'],
|
||||
'type': 'integer',
|
||||
'value': 'formdef.id',
|
||||
'type': 'string',
|
||||
'value': 'formdef.ref',
|
||||
'value_label': 'formdef.label',
|
||||
'order_by': 'formdef.label',
|
||||
},
|
||||
|
@ -358,6 +358,10 @@ CREATE TABLE public.dates AS (SELECT
|
|||
self.ex(query, vars=(name,))
|
||||
return self.cur.fetchone()[0]
|
||||
|
||||
def update_table_sequence_number(self, name):
|
||||
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
|
||||
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
|
||||
|
||||
def create_labeled_table_serial(self, name, comment):
|
||||
self.create_table(
|
||||
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
|
||||
|
@ -368,9 +372,45 @@ CREATE TABLE public.dates AS (SELECT
|
|||
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
|
||||
ctx={'name': quote(name)}
|
||||
)
|
||||
# Update sequence
|
||||
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
|
||||
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
|
||||
self.update_table_sequence_number(name)
|
||||
|
||||
def create_referenced_table(self, name, fields, comment):
|
||||
# add primary key and reference fields
|
||||
new_fields = [['id', 'serial primary key'], ['ref', 'varchar UNIQUE']] + fields
|
||||
self.create_table(name, new_fields, comment=comment)
|
||||
|
||||
|
||||
def do_referenced_data(self, name, data, result_column, update_column='label'):
|
||||
to_insert = []
|
||||
|
||||
for item in data:
|
||||
ref = item[0]
|
||||
self.ex(
|
||||
'SELECT ref, {column} FROM {name} WHERE ref = %s',
|
||||
ctx={'name': quote(name), 'column': quote(update_column)},
|
||||
vars=(ref,))
|
||||
if self.cur.fetchall():
|
||||
for item in self.cur.fetchall():
|
||||
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
|
||||
ctx={'name': quote(name), 'column': quote(update_column)},
|
||||
vars=[item[1], ref])
|
||||
else:
|
||||
to_insert.append(item)
|
||||
if to_insert:
|
||||
columns_values = ', '.join(['%s' for x in range(len(item))])
|
||||
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
|
||||
|
||||
query = 'INSERT INTO {name} VALUES %s' % tmpl
|
||||
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
|
||||
vars=list(itertools.chain(*to_insert)))
|
||||
|
||||
result = {}
|
||||
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
|
||||
'column': result_column})
|
||||
for _id, column in self.cur.fetchall():
|
||||
result[column] = _id
|
||||
return result
|
||||
|
||||
|
||||
def create_labeled_table(self, name, labels, comment=None):
|
||||
self.create_table(
|
||||
|
@ -416,6 +456,30 @@ CREATE TABLE public.dates AS (SELECT
|
|||
res[label] = id_
|
||||
return res
|
||||
|
||||
def do_category_table(self):
|
||||
fields = [['label', 'varchar']]
|
||||
table_name = self.default_ctx['category_table']
|
||||
self.create_referenced_table(table_name, fields, 'catégorie')
|
||||
categories_data = [(c.slug, c.title) for c in self.categories]
|
||||
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
|
||||
self.update_table_sequence_number(table_name)
|
||||
# remap categories ids to ids in the table
|
||||
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories)
|
||||
|
||||
def do_formdef_table(self):
|
||||
categories_mapping = self.do_category_table()
|
||||
|
||||
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
|
||||
['label', 'varchar']
|
||||
]
|
||||
table_name = self.default_ctx['form_table']
|
||||
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
|
||||
|
||||
formdefs = [(form.slug, categories_mapping.get(form.schema.category),
|
||||
form.schema.name) for form in self.formdefs if form.count]
|
||||
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
|
||||
self.update_table_sequence_number(table_name)
|
||||
|
||||
def do_base_table(self):
|
||||
# channels
|
||||
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
|
||||
|
@ -427,20 +491,15 @@ CREATE TABLE public.dates AS (SELECT
|
|||
self.role_mapping = dict(
|
||||
(role.id, tmp_role_map[role.name]) for role in self.roles)
|
||||
|
||||
# categories
|
||||
tmp_cat_map = self.create_labeled_table(
|
||||
'category', enumerate(c.title for c in self.categories), comment='catégorie')
|
||||
self.categories_mapping = dict((c.slug, tmp_cat_map[c.title]) for c in self.categories)
|
||||
# forms
|
||||
self.do_formdef_table()
|
||||
|
||||
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
|
||||
comment='heures')
|
||||
|
||||
self.create_labeled_table('status', self.status,
|
||||
comment='statuts simplifiés')
|
||||
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
|
||||
' category_id integer REFERENCES {category_table} (id),'
|
||||
' label varchar)')
|
||||
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=('types de formulaire',))
|
||||
|
||||
# agents
|
||||
self.create_labeled_table_serial('agent', comment='agents')
|
||||
|
||||
|
@ -576,11 +635,6 @@ class WcsFormdefFeeder(object):
|
|||
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
|
||||
|
||||
def do_data_table(self):
|
||||
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
|
||||
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
|
||||
self.formdef.schema.name])
|
||||
self.formdef_sql_id = self.cur.fetchone()[0]
|
||||
|
||||
columns = OrderedDict()
|
||||
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
|
||||
|
||||
|
@ -736,7 +790,7 @@ class WcsFormdefFeeder(object):
|
|||
if channel == 'web' and data.submission.backoffice:
|
||||
channel = 'backoffice'
|
||||
row = {
|
||||
'formdef_id': self.formdef_sql_id,
|
||||
'formdef_id': self.formdefs_mapping[self.formdef.slug],
|
||||
'receipt_time': data.receipt_time,
|
||||
'hour_id': data.receipt_time.hour,
|
||||
'channel_id': self.channel_to_id[channel],
|
||||
|
|
Loading…
Reference in New Issue