backoffice: don't include anonymised formdata in listings (#33707)

This commit is contained in:
Frédéric Péters 2019-11-24 13:22:59 +01:00
parent d0afa819bf
commit 2e31a5731f
4 changed files with 78 additions and 11 deletions

View File

@ -475,6 +475,24 @@ def test_backoffice_listing_order(pub):
ids = [x.strip('/') for x in re.findall(r'data-link="(.*?)"', resp.text)]
assert ids == list(reversed(last_update_time_order))
def test_backoffice_listing_anonymised(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
create_environment(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/?limit=500')
assert resp.text.count('data-link') == 17
formdef = FormDef.get_by_urlname('form-title')
for i, formdata in enumerate(formdef.data_class().select(order_by='id')):
if i % 2:
formdata.anonymise()
resp = app.get('/backoffice/management/form-title/?limit=500')
assert resp.text.count('data-link') == 9
def test_backoffice_legacy_urls(pub):
create_superuser(pub)
create_environment(pub)
@ -3232,6 +3250,27 @@ def test_datetime_in_global_listing(pub):
resp = resp.forms['listing-settings'].submit()
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 37
def test_global_listing_anonymised(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/management/listing?limit=500&status=all')
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 70
formdef = FormDef.get_by_urlname('other-form')
for formdata in formdef.data_class().select():
formdata.anonymise()
resp = app.get('/backoffice/management/listing?limit=500&status=all')
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 50
resp = app.get('/backoffice/management/listing?limit=500&status=open')
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 17
def test_global_listing_geojson(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')

View File

@ -840,6 +840,7 @@ class ManagementDirectory(Directory):
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
criterias = self.get_global_listing_criterias()
criterias.append(Null('anonymised')) # exclude anonymised forms
total_count = sql.AnyFormData.count(criterias)
if offset > total_count:
get_request().form['offset'] = '0'

View File

@ -23,6 +23,7 @@ from ..qommon import _
from ..qommon import misc
from ..qommon.form import *
from ..qommon.backoffice.listing import pagination_links
from wcs.qommon.storage import Null
from wcs.roles import logged_users_role
class FormDefUI(object):
@ -35,10 +36,16 @@ class FormDefUI(object):
include_checkboxes=False):
partial_display = False
using_postgresql = get_publisher().is_using_postgresql()
if not items:
if offset and not limit:
limit = int(get_publisher().get_site_option('default-page-size') or 20)
if not criterias:
criterias = []
if using_postgresql:
criterias.append(Null('anonymised'))
items, total_count = self.get_listing_items(
selected_filter, offset, limit, query, order_by,
criterias=criterias)
@ -78,8 +85,6 @@ class FormDefUI(object):
r += htmltext('<col />')
r += htmltext('</colgroup>')
using_postgresql = get_publisher().is_using_postgresql()
r += htmltext('<thead><tr>')
if self.formdef.workflow.criticality_levels and using_postgresql:
r += htmltext('<th class="criticality-level-cell" data-field-sort-key="criticality_level"><span></span></th>')
@ -125,8 +130,12 @@ class FormDefUI(object):
def get_listing_item_ids(self, selected_filter='all', query=None, order_by=None, user=None, criterias=None, anonymise=False):
formdata_class = self.formdef.data_class()
clause_kwargs = {}
if get_publisher().is_using_postgresql():
# pass criterias to all queries
clause_kwargs = {'clause': criterias}
if selected_filter == 'all':
item_ids = formdata_class.keys()
item_ids = formdata_class.keys(**clause_kwargs)
drafts = {x: True for x in formdata_class.get_ids_with_indexed_value('status', 'draft')}
item_ids = [x for x in item_ids if x not in drafts]
elif selected_filter == 'waiting':
@ -145,7 +154,7 @@ class FormDefUI(object):
item_ids = []
for status_id in applied_filters:
item_ids.extend(formdata_class.get_ids_with_indexed_value(
str('status'), status_id))
'status', status_id, **clause_kwargs))
if query:
query_ids = formdata_class.get_ids_from_query(query)
@ -185,7 +194,7 @@ class FormDefUI(object):
# get_sorted_ids is only implemented in the SQL backend
order_by = None
if order_by and not anonymise:
ordered_ids = formdata_class.get_sorted_ids(order_by)
ordered_ids = formdata_class.get_sorted_ids(order_by, clause=criterias)
item_ids_dict = {x: True for x in item_ids}
item_ids = [x for x in ordered_ids if x in item_ids_dict]
else:

View File

@ -951,10 +951,14 @@ class SqlMixin(object):
@classmethod
@guard_postgres
def keys(cls):
def keys(cls, clause=None):
conn, cur = get_connection_and_cursor()
where_clauses, parameters, func_clause = parse_clause(clause)
assert not func_clause
sql_statement = 'SELECT id FROM %s' % cls._table_name
cur.execute(sql_statement)
if where_clauses:
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
cur.execute(sql_statement, parameters)
ids = [x[0] for x in cur.fetchall()]
conn.commit()
cur.close()
@ -1235,15 +1239,19 @@ class SqlMixin(object):
@classmethod
@guard_postgres
def get_sorted_ids(cls, order_by):
def get_sorted_ids(cls, order_by, clause=None):
conn, cur = get_connection_and_cursor()
sql_statement = 'SELECT id FROM %s' % cls._table_name
where_clauses, parameters, func_clause = parse_clause(clause)
assert not func_clause
if where_clauses:
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
if order_by.startswith('-'):
order_by = order_by[1:]
sql_statement += ' ORDER BY %s DESC' % order_by.replace('-', '_')
else:
sql_statement += ' ORDER BY %s' % order_by.replace('-', '_')
cur.execute(sql_statement)
cur.execute(sql_statement, parameters)
ids = [x[0] for x in cur.fetchall()]
conn.commit()
cur.close()
@ -1604,9 +1612,12 @@ class SqlDataMixin(SqlMixin):
@classmethod
@guard_postgres
def get_ids_with_indexed_value(cls, index, value, auto_fallback=True):
def get_ids_with_indexed_value(cls, index, value, auto_fallback=True, clause=None):
conn, cur = get_connection_and_cursor()
where_clauses, parameters, func_clause = parse_clause(clause)
assert not func_clause
if type(value) is int:
value = str(value)
@ -1618,7 +1629,14 @@ class SqlDataMixin(SqlMixin):
sql_statement = '''SELECT id FROM %s WHERE %s = %%(value)s''' % (
cls._table_name,
index)
cur.execute(sql_statement, {'value': value})
if where_clauses:
sql_statement += ' AND ' + ' AND '.join(where_clauses)
else:
parameters = {}
parameters.update({'value': value})
cur.execute(sql_statement, parameters)
all_ids = [x[0] for x in cur.fetchall()]
cur.close()
return all_ids