workflows: optimise timeout jump queries to exclude recent formdatas (#54238)

This commit is contained in:
Frédéric Péters 2021-05-24 14:20:00 +02:00
parent 671d651784
commit 948eb64360
2 changed files with 89 additions and 23 deletions

View File

@ -2518,6 +2518,15 @@ def test_webservice_with_complex_data(http_requests, pub):
assert payload['items_raw'] == repr(['a', 'b'])
def rewind(formdata, seconds):
# utility function to move formdata back in time
def rewind_time(timetuple):
return time.localtime(datetime.datetime.fromtimestamp(time.mktime(timetuple) - seconds).timestamp())
formdata.receipt_time = rewind_time(formdata.receipt_time)
formdata.evolution[-1].time = rewind_time(formdata.evolution[-1].time)
def test_timeout(two_pubs):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
@ -2526,7 +2535,7 @@ def test_timeout(two_pubs):
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.timeout = 30 * 60 # 30 minutes
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
@ -2542,14 +2551,44 @@ def test_timeout(two_pubs):
formdata = formdef.data_class()()
formdata.just_created()
rewind(formdata, seconds=40 * 60)
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(two_pubs)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
if two_pubs.is_using_postgresql():
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
with mock.patch('wcs.wf.jump.JumpWorkflowStatusItem.must_jump') as must_jump:
must_jump.return_value = False
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0 # not enough time has passed
# check a lower than minimal delay is not considered
jump.timeout = 5 * 50 # 5 minutes
workflow.store()
rewind(formdata, seconds=10 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0
# but is executed once delay is reached
rewind(formdata, seconds=10 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 1
# check a templated timeout is considered as minimal delay for explicit evaluation
jump.timeout = '{{ "0" }}'
workflow.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 2
# check there's no crash on workflow without jumps
formdef = FormDef()
formdef.name = 'xxx'
@ -2564,7 +2603,7 @@ def test_legacy_timeout(pub):
jump = TimeoutWorkflowStatusItem()
jump.id = '_jump'
jump.timeout = 0.1
jump.timeout = 30 * 60 # 30 minutes
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
@ -2580,10 +2619,10 @@ def test_legacy_timeout(pub):
formdata = formdef.data_class()()
formdata.just_created()
rewind(formdata, seconds=40 * 60)
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(pub)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
@ -2597,7 +2636,7 @@ def test_timeout_then_remove(two_pubs):
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.timeout = 30 * 60 # 30 minutes
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
@ -2617,12 +2656,12 @@ def test_timeout_then_remove(two_pubs):
formdata = formdef.data_class()()
formdata.just_created()
rewind(formdata, seconds=40 * 60)
formdata.store()
formdata_id = formdata.id
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
time.sleep(0.2)
_apply_timeouts(two_pubs)
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
@ -2636,7 +2675,7 @@ def test_timeout_with_mark(two_pubs):
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.timeout = 30 * 60 # 30 minutes
jump.status = 'st2'
jump.set_marker_on_status = True
st1.items.append(jump)
@ -2653,10 +2692,10 @@ def test_timeout_with_mark(two_pubs):
formdata = formdef.data_class()()
formdata.just_created()
rewind(formdata, seconds=40 * 60)
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(two_pubs)
formdata = formdef.data_class().get(formdata_id)
@ -2675,7 +2714,7 @@ def test_jump_missing_previous_mark(two_pubs):
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.status = '_previous'
jump.timeout = 0.1
jump.timeout = 30 * 60 # 30 minutes
st1.items.append(jump)
jump.parent = st1
@ -2689,9 +2728,9 @@ def test_jump_missing_previous_mark(two_pubs):
formdata = formdef.data_class()()
formdata.just_created()
rewind(formdata, seconds=40 * 60)
formdata.store()
time.sleep(0.3)
two_pubs.loggederror_class.wipe()
_apply_timeouts(two_pubs)
assert two_pubs.loggederror_class.count() == 1

View File

@ -14,8 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import itertools
import json
import math
import os
import time
@ -32,6 +34,8 @@ from ..qommon.cron import CronJob
from ..qommon.form import ComputedExpressionWidget, SingleSelectWidget, StringWidget, WidgetList
from ..qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from ..qommon.publisher import get_publisher_class
from ..qommon.storage import Equal, LessOrEqual
from ..qommon.template import Template
JUMP_TIMEOUT_INTERVAL = max((60 // int(os.environ.get('WCS_JUMP_TIMEOUT_CHECKS', '3')), 1))
@ -284,25 +288,25 @@ register_item_class(JumpWorkflowStatusItem)
def workflows_with_timeout():
"""Return a list of workflow objects with at least a status with a timeout object"""
"""Returns {workflow id: {status id: [jump_item...]}}"""
wfs_status = {}
for id in Workflow.keys():
workflow = Workflow.get(id, ignore_errors=True)
for workflow_id in Workflow.keys():
workflow = Workflow.get(workflow_id, ignore_errors=True)
if not workflow:
continue
for status in workflow.possible_status:
status_str = 'wf-%s' % status.id
status_str_id = 'wf-%s' % status.id
for item in status.items:
if hasattr(item, 'status') and hasattr(item, 'timeout') and (item.status and item.timeout):
if id not in wfs_status:
wfs_status[id] = {}
if status_str not in wfs_status[id]:
wfs_status[id][status_str] = []
if workflow_id not in wfs_status:
wfs_status[workflow_id] = {}
if status_str_id not in wfs_status[workflow_id]:
wfs_status[workflow_id][status_str_id] = []
if not item.get_target_status():
# this will catch status being a removed status
continue
wfs_status[id][status_str].append(item)
wfs_status[workflow_id][status_str_id].append(item)
return wfs_status
@ -320,14 +324,37 @@ def _apply_timeouts(publisher):
continue
formdata_class = formdef.data_class()
for status_id in status_ids:
for formdata in formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True):
for x in wfs_status[str(formdef.workflow_id)][formdata.status]:
if publisher.is_using_postgresql():
# get minimum delay for jumps in this status
delay = math.inf
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
if Template.is_template_string(jump_action.timeout):
delay = 0
break
delay = min(delay, int(jump_action.timeout))
# limit delay to minimal delay
if delay < JUMP_TIMEOUT_INTERVAL * 60:
delay = JUMP_TIMEOUT_INTERVAL * 60
criterias = [
Equal('status', status_id),
LessOrEqual(
'last_update_time',
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
),
]
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
else:
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
for formdata in formdatas:
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
if x.must_jump(formdata):
jump_and_perform(formdata, x)
if jump_action.must_jump(formdata):
jump_and_perform(formdata, jump_action)
break