workflows: handle concerned/actions roles with conditions on actions (#23839)

When a workflow is changed security details of all related have to be
recomputed (rebuild_security).  With the introduction of conditions on
actions this process now requires to have objects available in the
context for evaluating conditions.
This commit is contained in:
Frédéric Péters 2018-05-15 12:58:49 +02:00
parent 01e009142d
commit 95668f69e5
4 changed files with 87 additions and 7 deletions

View File

@ -3429,3 +3429,63 @@ def test_workflow_jump_condition_migration(pub):
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].condition == {
'type': 'python', 'value': 'foobar'}
def test_workflow_action_condition(two_pubs):
pub = two_pubs
workflow = Workflow(name='jump condition migration')
st1 = workflow.add_status('Status1', 'st1')
workflow.store()
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
choice = ChoiceWorkflowStatusItem()
choice.by = [role.id]
choice.id = '_x'
st1.items.append(choice)
choice.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.workflow_id = workflow.id
formdef.store()
formdata1 = formdef.data_class()()
formdata1.data = {'1': 'foo'}
formdata1.just_created()
formdata1.store()
formdata2 = formdef.data_class()()
formdata2.data = {'2': 'bar'}
formdata2.just_created()
formdata2.store()
assert formdata1.get_actions_roles() == set([role.id])
assert formdata2.get_actions_roles() == set([role.id])
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
choice.condition = {'type': 'python', 'value': 'form_var_foo == "foo"'}
workflow.store()
with pub.substitutions.temporary_feed(formdata1):
assert FormDef.get(formdef.id).data_class().get(formdata1.id).get_actions_roles() == set([role.id])
with pub.substitutions.temporary_feed(formdata2):
assert FormDef.get(formdef.id).data_class().get(formdata2.id).get_actions_roles() == set()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 1
# check with a formdef condition
choice.condition = {'type': 'python', 'value': 'form_name == "test"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
choice.condition = {'type': 'python', 'value': 'form_name == "baz"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2

View File

@ -784,7 +784,8 @@ class FormData(StorableObject):
@classmethod
def rebuild_security(cls):
cls.rebuild_indexes(indexes=['concerned_roles', 'actions_roles'])
with get_publisher().substitutions.temporary_feed(cls._formdef):
cls.rebuild_indexes(indexes=['concerned_roles', 'actions_roles'])
def is_submitter(self, user):
if self.user_id and user and str(self.user_id) == str(user.id):
@ -836,8 +837,9 @@ class FormData(StorableObject):
for item in wf_status.items or []:
if not hasattr(item, 'by') or not item.by:
continue
if not item.check_condition(self):
continue
with get_publisher().substitutions.temporary_feed(self):
if not item.check_condition(self):
continue
for role in item.by:
if role == '_submitter':
status_action_roles.add(role)

View File

@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from contextlib import contextmanager
from quixote import get_request
from quixote.html import htmltext, TemplateIO
@ -66,6 +68,18 @@ class Substitutions(object):
self.sources.append(source)
self.invalidate_cache()
@contextmanager
def temporary_feed(self, source):
if source is None or source in self.sources:
yield
return
self.sources.append(source)
self.invalidate_cache()
yield
self.sources.remove(source)
self.invalidate_cache()
@classmethod
def invalidate_cache(cls):
request = get_request()

View File

@ -1544,10 +1544,14 @@ class SqlFormData(SqlMixin, wcs.formdata.FormData):
SET concerned_roles_array = %%(roles)s,
actions_roles_array = %%(actions_roles)s
WHERE id = %%(id)s''' % cls._table_name
cur.execute(sql_statement, {
'id': formdata.id,
'roles': [str(x) for x in formdata.concerned_roles if x],
'actions_roles': [str(x) for x in formdata.actions_roles if x]})
with get_publisher().substitutions.temporary_feed(formdata):
# formdata is already added to sources list in individual
# {concerned,actions}_roles but adding it first here will
# allow cached values to be reused between the properties.
cur.execute(sql_statement, {
'id': formdata.id,
'roles': [str(x) for x in formdata.concerned_roles if x],
'actions_roles': [str(x) for x in formdata.actions_roles if x]})
conn.commit()
cur.close()