sql: add concerned_roles and is_at_endpoint columns to views (#7925)

This commit is contained in:
Frédéric Péters 2015-07-23 18:11:20 +02:00
parent 01cb0c4300
commit fb32cc5f38
6 changed files with 109 additions and 11 deletions

View File

@ -43,6 +43,7 @@ réservé aux usages internes.
mode est activé</p></item>
<item><p><var>receipt_time</var> : date et heure de réception</p></item>
<item><p><var>status</var> : statut courant</p></item>
<item><p><var>is_at_endpoint</var> : indicateur de fin de traitement</p></item>
<item><p><var>category_id</var> : identifiant de la catégorie</p></item>
<item><p><var>formdef_id</var> : identifiant du type de formulaire</p></item>
</list>

View File

@ -10,6 +10,7 @@ from quixote import cleanup
from wcs import formdef, publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
from wcs import sql
import wcs.qommon.storage as st
@ -772,3 +773,65 @@ def test_migration_2_formdef_id_in_views():
conn.commit()
cur.close()
def drop_formdef_tables():
conn, cur = sql.get_connection_and_cursor()
cur.execute('''SELECT table_name FROM information_schema.tables''')
table_names = []
while True:
row = cur.fetchone()
if row is None:
break
table_names.append(row[0])
for table_name in table_names:
if table_name.startswith('formdata_'):
cur.execute('DROP TABLE %s CASCADE' % table_name)
@postgresql
def test_is_at_endpoint():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test endpoint'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.forced_endpoint = True
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2']
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 2)

View File

@ -265,6 +265,12 @@ class FormDef(StorableObject):
rebuild_global_views=True)
return t
def update_endpoints(self):
if get_publisher().is_using_postgresql():
import sql
sql.do_formdef_tables(self, rebuild_views=True,
rebuild_global_views=True)
def get_category(self):
if self.category_id:
try:

View File

@ -213,9 +213,11 @@ class StorableObject(object):
return len(cls.keys())
count = classmethod(count)
def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
def select(cls, clause=None, order_by=None, ignore_errors=False,
ignore_migration=False, limit=None, offset=None):
keys = cls.keys()
objects = (cls.get(k, ignore_errors = ignore_errors) for k in keys)
objects = (cls.get(k, ignore_errors=ignore_errors,
ignore_migration=ignore_migration) for k in keys)
if ignore_errors:
objects = (x for x in objects if x is not None)
if clause:

View File

@ -570,6 +570,14 @@ def do_views(formdef, conn, cur, rebuild_global_views=True):
''' ORDER BY %s_evolutions.time)''' % ((table_name,) * 4),
'status_history'))
# add a is_at_endpoint column, dynamically created againt the endpoint status.
endpoint_status = formdef.workflow.get_endpoint_status()
view_fields.append(('''(SELECT status = ANY(ARRAY[[%s]]::text[]))''' % \
', '.join(["'wf-%s'" % x.id for x in endpoint_status]),
'''is_at_endpoint'''))
view_fields.append(('concerned_roles_array', 'concerned_roles_array'))
fields_list = ', '.join(['%s AS %s' % x for x in view_fields])
cur.execute('''CREATE VIEW %s AS SELECT %s FROM %s''' % (
@ -611,6 +619,8 @@ def do_global_views(conn, cur):
from wcs.formdef import FormDef
fake_formdef = FormDef()
common_fields = get_view_fields(fake_formdef)
common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
', '.join([y[1] for y in common_fields]), x) for x in view_names])
@ -1584,7 +1594,7 @@ def get_yearly_totals(period_start=None, period_end=None, criterias=None):
return result
SQL_LEVEL = 4
SQL_LEVEL = 5
def migrate_global_views(conn, cur):
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
@ -1601,6 +1611,14 @@ def get_sql_level(conn, cur):
sql_level = int(cur.fetchone()[0])
return sql_level
def migrate_views(conn, cur):
drop_views(None, conn, cur)
from wcs.formdef import FormDef
for formdef in FormDef.select():
# make sure all formdefs have up-to-date views
do_formdef_tables(formdef, conn=conn, cur=cur, rebuild_views=True)
migrate_global_views(conn, cur)
@guard_postgres
def migrate():
conn, cur = get_connection_and_cursor()
@ -1611,16 +1629,13 @@ def migrate():
if sql_level < 1: # 1: introduction of tracking_code table
do_tracking_code_table()
if sql_level < 2: # 2: introduction of formdef_id in views
drop_views(None, conn, cur)
from wcs.formdef import FormDef
for formdef in FormDef.select():
# make sure all formdefs have up-to-date views
do_formdef_tables(formdef, conn=conn, cur=cur, rebuild_views=True)
migrate_global_views(conn, cur)
migrate_views(conn, cur)
if sql_level < 4:
# 3: introduction of _structured for user fields
# 4: removal of identification_token
do_user_table()
if sql_level < 5: # 5: add concerned_roles_array and is_at_endpoint to views
migrate_views(conn, cur)
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (
str(SQL_LEVEL), 'sql_level'))

View File

@ -187,6 +187,14 @@ class Workflow(StorableObject):
self.store()
def store(self):
must_update_endpoints = False
if self.id:
old_self = self.get(self.id, ignore_errors=True, ignore_migration=True)
if old_self:
old_endpoints = set([x.id for x in old_self.get_endpoint_status()])
if old_endpoints != set([x.id for x in self.get_endpoint_status()]):
must_update_endpoints = True
self.last_modification_time = time.localtime()
if get_request() and get_request().user:
self.last_modification_user_id = str(get_request().user.id)
@ -194,10 +202,13 @@ class Workflow(StorableObject):
self.last_modification_user_id = None
StorableObject.store(self)
# instruct all formdefs to update their security rules
# instruct all related formdefs to update their security rules, and
# their views if endpoints have changed.
from formdef import FormDef
for form in FormDef.select(lambda x: x.workflow_id == self.id):
for form in FormDef.select(lambda x: x.workflow_id == self.id, ignore_migration=True):
form.data_class().rebuild_security()
if must_update_endpoints:
form.update_endpoints()
def get(cls, id, ignore_errors=False, ignore_migration=False):
if id == '_default':