misc: give a unique key to (card/form)data (#42330)

And use it for advisory locking and tracking codes.
This commit is contained in:
Frédéric Péters 2020-04-30 14:17:01 +02:00
parent 797e21218e
commit 3c93c6cb82
8 changed files with 56 additions and 60 deletions

View File

@ -4362,7 +4362,7 @@ def test_backoffice_advisory_lock_related_formdatas(pub):
resp = app.get('/backoffice/management/form-title/%s/' % formdatas[0].id)
app.get(re.findall('data-async-url="(.*/user-pending-forms)"', resp.text)[0])
session = pub.session_manager.session_class.select(lambda x: x.user == user.id)[0]
assert 'formdata-other-form-%d' % other_formdata.id in session.visiting_objects.keys()
assert 'formdef-other-form-%d' % other_formdata.id in session.visiting_objects.keys()
def test_backoffice_resubmit(pub):

View File

@ -92,68 +92,75 @@ def test_session_expire(pub, user, app):
def test_sessions_visiting_objects(pub, http_request):
# check it starts with nothing
assert len(pub.get_visited_objects()) == 0
assert len(pub.session_class.get_visited_objects()) == 0
class MockFormData():
def __init__(self, id):
self.id = id
def get_object_key(self):
return 'formdata-foobar-%s' % self.id
# mark two visits
session1 = pub.session_class(id='session1')
session1.user = 'FOO'
session1.mark_visited_object('formdata-foobar-1')
session1.mark_visited_object('formdata-foobar-2')
session1.mark_visited_object(MockFormData(1))
session1.mark_visited_object(MockFormData(2))
session1.store()
assert len(pub.get_visited_objects()) == 2
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-2')]) == set(['FOO'])
assert len(pub.session_class.get_visited_objects()) == 2
assert set([x[0] for x in pub.session_class.get_object_visitors(MockFormData(2))]) == set(['FOO'])
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35*60
session1.store()
assert len(pub.get_visited_objects()) == 1
assert len(pub.session_class.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object('formdata-foobar-2')
session1.mark_visited_object(MockFormData(2))
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.get_visited_objects()) == 1
assert list(pub.get_visited_objects()) == ['formdata-foobar-2']
assert len(pub.session_class.get_visited_objects()) == 1
assert list(pub.session_class.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session1.mark_visited_object('formdata-foobar-1')
session1.mark_visited_object('formdata-foobar-2')
session1.mark_visited_object(MockFormData(1))
session1.mark_visited_object(MockFormData(2))
session1.store()
assert len(pub.get_visited_objects()) == 2
assert len(pub.session_class.get_visited_objects()) == 2
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35*60
session1.store()
assert len(pub.get_visited_objects()) == 1
assert len(pub.session_class.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object('formdata-foobar-2')
session1.mark_visited_object(MockFormData(2))
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.get_visited_objects()) == 1
assert list(pub.get_visited_objects()) == ['formdata-foobar-2']
assert len(pub.session_class.get_visited_objects()) == 1
assert list(pub.session_class.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session2 = pub.session_class(id='session2')
session2.user = 'BAR'
session2.store()
assert len(pub.get_visited_objects()) == 1
session2.mark_visited_object('formdata-foobar-2')
assert len(pub.session_class.get_visited_objects()) == 1
session2.mark_visited_object(MockFormData(2))
session2.store()
assert len(pub.get_visited_objects()) == 1
session2.mark_visited_object('formdata-foobar-3')
assert len(pub.session_class.get_visited_objects()) == 1
session2.mark_visited_object(MockFormData(3))
session2.store()
assert len(pub.get_visited_objects()) == 2
assert len(pub.session_class.get_visited_objects()) == 2
assert list(pub.get_visited_objects(exclude_user='BAR')) == ['formdata-foobar-2']
assert list(pub.session_class.get_visited_objects(exclude_user='BAR')) == ['formdata-foobar-2']
# check visitors
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-2')]) == set(['FOO', 'BAR'])
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-1')]) == set([])
assert set([x[0] for x in pub.session_class.get_object_visitors(MockFormData(2))]) == set(['FOO', 'BAR'])
assert set([x[0] for x in pub.session_class.get_object_visitors(MockFormData(1))]) == set([])
def test_session_do_not_reuse_id(pub, user, app):

View File

@ -877,14 +877,14 @@ class ManagementDirectory(Directory):
r += htmltext('</tr></thead>')
r += htmltext('<tbody>')
workflows = {}
visited_objects = get_publisher().get_visited_objects(exclude_user=get_session().user)
session = get_session()
visited_objects = session.get_visited_objects(exclude_user=session.user)
for formdata in formdatas:
if not formdata.formdef.workflow_id in workflows:
workflows[formdata.formdef.workflow_id] = formdata.formdef.workflow
classes = ['status-%s-%s' % (formdata.formdef.workflow.id, formdata.status)]
object_key = 'formdata-%s-%s' % (formdata.formdef.url_name, formdata.id)
if object_key in visited_objects:
if formdata.get_object_key() in visited_objects:
classes.append('advisory-lock')
if formdata.backoffice_submission:
classes.append('backoffice-submission')
@ -2500,17 +2500,17 @@ class FormBackOfficeStatusPage(FormStatusPage):
response = self.get_user_pending_forms()
# preemptive locking of forms
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.filled.id)
all_visitors = get_publisher().get_object_visitors(object_key)
all_visitors = get_session().get_object_visitors(self.filled)
visitors = [x for x in all_visitors if x[0] != get_session().user]
me_in_visitors = bool(get_session().user in [x[0] for x in all_visitors])
if not visitors or me_in_visitors:
related_user_forms = getattr(self.filled, 'related_user_forms', None) or []
user_roles = set(get_request().user.get_roles())
session = get_session()
for user_formdata in related_user_forms:
if user_roles.intersection(user_formdata.actions_roles):
user_formdata.mark_as_being_visited()
session.mark_visited_object(user_formdata)
return response

View File

@ -1096,9 +1096,8 @@ class FormData(StorableObject):
data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
return json.dumps(data, cls=misc.JSONEncoder)
def mark_as_being_visited(self):
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.id)
get_session().mark_visited_object(object_key)
def get_object_key(self):
return '%s-%s-%s' % (self.formdef.xml_root_node, self.formdef.url_name, self.id)
def feed_session(self):
# this gives a chance to fields to initialize things that would rely on

View File

@ -16,7 +16,7 @@
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_request, get_publisher, redirect
from quixote import get_request, get_publisher, get_session, redirect
from quixote.html import htmltext, TemplateIO
from ..qommon import _
@ -224,7 +224,7 @@ class FormDefUI(object):
url_action = ''
root_url = get_publisher().get_root_url()
user = get_request().user
visited_objects = get_publisher().get_visited_objects(exclude_user=user.id)
visited_objects = get_session().get_visited_objects(exclude_user=user.id)
include_criticality_level = bool(self.formdef.workflow.criticality_levels)
for i, filled in enumerate(items):
classes = ['status-%s-%s' % (filled.formdef.workflow.id, filled.status)]
@ -233,8 +233,7 @@ class FormDefUI(object):
else:
classes.append('odd')
object_key = 'formdata-%s-%s' % (filled.formdef.url_name, filled.id)
if object_key in visited_objects:
if filled.get_object_key() in visited_objects:
classes.append('advisory-lock')
if filled.backoffice_submission:
classes.append('backoffice-submission')

View File

@ -517,19 +517,17 @@ class FormStatusPage(Directory, FormTemplateMixin):
return r.getvalue()
def status(self):
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.filled.id)
if get_request().get_query() == 'unlock':
# mark user as active visitor of the object, then redirect to self,
# the unlocked form will appear.
self.filled.mark_as_being_visited()
get_session().mark_visited_object(self.filled)
return redirect('./#lock-notice')
user = self.check_receiver()
form = self.get_workflow_form(user)
response = self.check_submitted_form(form)
if response:
get_session().unmark_visited_object(object_key)
get_session().unmark_visited_object(self.filled)
return response
get_logger().info('form %s - id: %s - view status' % (self.formdef.name, self.filled.id))
@ -551,7 +549,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
locked = False
if form:
all_visitors = get_publisher().get_object_visitors(object_key)
all_visitors = get_session().get_object_visitors(self.filled)
visitors = [x for x in all_visitors if x[0] != get_session().user]
me_in_visitors = bool(get_session().user in [x[0] for x in all_visitors])
if visitors:
@ -582,7 +580,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
if not visitors or me_in_visitors:
r += htmltext(self.actions_workflow_messages())
r += form.render()
self.filled.mark_as_being_visited()
get_session().mark_visited_object(self.filled)
if not locked:
if (self.filled.get_status() and self.filled.get_status().backoffice_info_text) or (
@ -764,7 +762,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
f.edit_action_id = action_id
f.action_url = 'wfedit-%s' % action_id
if get_request().is_in_backoffice():
self.filled.mark_as_being_visited()
get_session().mark_visited_object(self.filled)
get_response().breadcrumb = get_response().breadcrumb[:-1]
get_response().breadcrumb.append((f.action_url, _('Edit')))
return f._q_index()

View File

@ -286,15 +286,6 @@ class WcsPublisher(StubWcsPublisher):
z.close()
return results
def get_object_visitors(self, object_key):
session_manager = self.session_manager_class(session_class=self.session_class)
return session_manager.session_class.get_object_visitors(object_key)
def get_visited_objects(self, exclude_user=None):
session_manager = self.session_manager_class(session_class=self.session_class)
return session_manager.session_class.get_visited_objects(
exclude_user=exclude_user)
def initialize_sql(self):
from . import sql
sql.get_connection(new=True)

View File

@ -56,15 +56,15 @@ class BasicSession(Session):
def mark_anonymous_formdata(self, formdata):
if not self.anonymous_formdata_keys:
self.anonymous_formdata_keys = {}
self.anonymous_formdata_keys['%s-%s' % (formdata.formdef.id, formdata.id)] = True
self.anonymous_formdata_keys[formdata.get_object_key()] = True
def is_anonymous_submitter(self, formdata):
if not self.anonymous_formdata_keys:
return False
formdata_key = '%s-%s' % (formdata.formdef.id, formdata.id)
return formdata_key in self.anonymous_formdata_keys
return formdata.get_object_key() in self.anonymous_formdata_keys
def mark_visited_object(self, key):
def mark_visited_object(self, formdata):
key = formdata.get_object_key()
if not self.visiting_objects:
self.visiting_objects = {}
# first clean older objects
@ -115,8 +115,9 @@ class BasicSession(Session):
yield session
@classmethod
def get_object_visitors(cls, object_key):
def get_object_visitors(cls, formdata):
'''return tuples of (user_id, last_visit_timestamp)'''
object_key = formdata.get_object_key()
current_timestamp = time.time()
visitors = {}
for session in cls.get_sessions_with_visited_object(object_key):
@ -125,7 +126,8 @@ class BasicSession(Session):
visitors[session.user] = max(object_timestamp, visitors.get(session.user, 0))
return visitors.items()
def unmark_visited_object(self, object_key):
def unmark_visited_object(self, formdata):
object_key = formdata.get_object_key()
# remove from current session
if object_key in (getattr(self, 'visiting_objects', None) or {}):
del self.visiting_objects[object_key]