misc: split workflow tests

This commit is contained in:
Lauréline Guérin 2021-12-09 16:53:24 +01:00
parent 47e1a757e2
commit a64a01ed97
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
2 changed files with 280 additions and 228 deletions

View File

@ -5857,234 +5857,6 @@ def test_aggregation_email(pub, emails):
)
def test_create_formdata(two_pubs):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = CreateFormdataWorkflowStatusItem()
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.id = '_create'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = wf.possible_status[1]
wf.possible_status[1].items.insert(0, create)
wf.store()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = []
source_formdef.workflow_id = wf.id
source_formdef.store()
formdata = source_formdef.data_class()()
formdata.data = {}
formdata.just_created()
assert target_formdef.data_class().count() == 0
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
# check unconfigure action do nothing
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
create.formdef_slug = target_formdef.url_name
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
if two_pubs.is_using_postgresql():
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert 'form_var_toto_string' in errors[0].exception_message
assert 'Missing field' in errors[1].summary
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
# no tracking code has been created
created_formdata = target_formdef.data_class().select()[0]
assert created_formdata.tracking_code is None
assert two_pubs.tracking_code_class.count() == 0
# now we want one
target_formdef.enable_tracking_codes = True
target_formdef.store()
target_formdef.data_class().wipe()
formdata.perform_workflow()
# and a tracking code is created
assert target_formdef.data_class().count() == 1
created_formdata = target_formdef.data_class().select()[0]
assert created_formdata.tracking_code is not None
assert two_pubs.tracking_code_class.count() == 1
assert two_pubs.tracking_code_class.select()[0].formdef_id == target_formdef.id
assert two_pubs.tracking_code_class.select()[0].formdata_id == str(created_formdata.id)
create.condition = {'type': 'python', 'value': '1 == 2'}
wf.store()
del source_formdef._workflow
target_formdef.data_class().wipe()
assert target_formdef.data_class().count() == 0
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
def test_create_formdata_migration(pub):
wf = Workflow(name='create-formdata')
st1 = wf.add_status('Status1', 'st1')
create = CreateFormdataWorkflowStatusItem()
create.id = '_create'
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = st1
create.keep_user = True
st1.items.append(create)
wf.store()
wf = Workflow.get(wf.id)
assert wf.possible_status[0].items[0].user_association_mode == 'keep-user'
assert not hasattr(wf.possible_status[0].items[0], 'keep_user')
def test_create_formdata_tracking_code(two_pubs, emails):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_wf = Workflow(name='send-mail')
st1 = target_wf.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['bar@localhost']
item.subject = 'Foobar'
item.body = '{{ form_tracking_code }}'
st1.items.append(item)
item.parent = st1
target_wf.store()
target_formdef = FormDef()
target_formdef.name = 'target-form'
target_formdef.fields = [
EmailField(id='0', label='Email', type='email'),
]
target_formdef.workflow_id = target_wf.id
target_formdef.enable_tracking_codes = True
target_formdef.store()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('Status1', 'st1')
create = CreateFormdataWorkflowStatusItem()
create.formdef_slug = target_formdef.url_name
create.mappings = [
Mapping(field_id='0', expression='=form_var_email_string'),
]
st1.items.append(create)
item.parent = st1
wf.store()
formdef = FormDef()
formdef.name = 'source form'
formdef.fields = [
EmailField(id='0', label='Email', type='email'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
assert target_formdef.data_class().count() == 0
assert emails.count() == 0
formdata.perform_workflow()
get_response().process_after_jobs()
assert target_formdef.data_class().count() == 1
assert emails.count() == 1
tracking_code = target_formdef.data_class().select()[0].tracking_code
assert tracking_code in emails.get('Foobar')['payload']
def test_create_formdata_attach_to_history(two_pubs):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
]
target_formdef.store()
target_formdef.data_class().wipe()
wf = Workflow(name='create-formdata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = CreateFormdataWorkflowStatusItem()
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.id = '_create'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = wf.possible_status[1]
wf.possible_status[1].items.insert(0, create)
wf.store()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = []
source_formdef.workflow_id = wf.id
source_formdef.store()
formdata = source_formdef.data_class()()
formdata.data = {}
formdata.just_created()
create.formdef_slug = target_formdef.url_name
create.attach_to_history = True
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
assert formdata.evolution[-1].parts[0].attach_to_history is True
assert 'Created new form' in str(formdata.evolution[-1].parts[0].view())
assert target_formdef.get_url() in str(formdata.evolution[-1].parts[0].view())
# don't crash in case target formdata is removed
formdata.refresh_from_storage()
target_formdef.data_class().wipe()
assert 'deleted' in str(formdata.evolution[-1].parts[0].view())
# don't crash in case target formdef is removed
target_formdef.remove_self()
formdata.refresh_from_storage()
assert 'deleted' in str(formdata.evolution[-1].parts[0].view())
def test_call_external_workflow_with_evolution_linked_object(two_pubs):
FormDef.wipe()
CardDef.wipe()

View File

@ -0,0 +1,280 @@
import pytest
from quixote import cleanup, get_response
from wcs import sessions
from wcs.fields import EmailField, StringField
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.workflows import SendmailWorkflowStatusItem, Workflow
from ..utilities import clean_temporary_pub, create_temporary_pub
def setup_module(module):
cleanup()
def teardown_module(module):
clean_temporary_pub()
def pytest_generate_tests(metafunc):
if 'two_pubs' in metafunc.fixturenames:
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
@pytest.fixture
def two_pubs(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
def test_create_formdata(two_pubs):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = CreateFormdataWorkflowStatusItem()
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.id = '_create'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = wf.possible_status[1]
wf.possible_status[1].items.insert(0, create)
wf.store()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = []
source_formdef.workflow_id = wf.id
source_formdef.store()
formdata = source_formdef.data_class()()
formdata.data = {}
formdata.just_created()
assert target_formdef.data_class().count() == 0
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
# check unconfigure action do nothing
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
create.formdef_slug = target_formdef.url_name
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
if two_pubs.is_using_postgresql():
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert 'form_var_toto_string' in errors[0].exception_message
assert 'Missing field' in errors[1].summary
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
# no tracking code has been created
created_formdata = target_formdef.data_class().select()[0]
assert created_formdata.tracking_code is None
assert two_pubs.tracking_code_class.count() == 0
# now we want one
target_formdef.enable_tracking_codes = True
target_formdef.store()
target_formdef.data_class().wipe()
formdata.perform_workflow()
# and a tracking code is created
assert target_formdef.data_class().count() == 1
created_formdata = target_formdef.data_class().select()[0]
assert created_formdata.tracking_code is not None
assert two_pubs.tracking_code_class.count() == 1
assert two_pubs.tracking_code_class.select()[0].formdef_id == target_formdef.id
assert two_pubs.tracking_code_class.select()[0].formdata_id == str(created_formdata.id)
create.condition = {'type': 'python', 'value': '1 == 2'}
wf.store()
del source_formdef._workflow
target_formdef.data_class().wipe()
assert target_formdef.data_class().count() == 0
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
def test_create_formdata_migration(pub):
wf = Workflow(name='create-formdata')
st1 = wf.add_status('Status1', 'st1')
create = CreateFormdataWorkflowStatusItem()
create.id = '_create'
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = st1
create.keep_user = True
st1.items.append(create)
wf.store()
wf = Workflow.get(wf.id)
assert wf.possible_status[0].items[0].user_association_mode == 'keep-user'
assert not hasattr(wf.possible_status[0].items[0], 'keep_user')
def test_create_formdata_tracking_code(two_pubs, emails):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_wf = Workflow(name='send-mail')
st1 = target_wf.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['bar@localhost']
item.subject = 'Foobar'
item.body = '{{ form_tracking_code }}'
st1.items.append(item)
item.parent = st1
target_wf.store()
target_formdef = FormDef()
target_formdef.name = 'target-form'
target_formdef.fields = [
EmailField(id='0', label='Email', type='email'),
]
target_formdef.workflow_id = target_wf.id
target_formdef.enable_tracking_codes = True
target_formdef.store()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('Status1', 'st1')
create = CreateFormdataWorkflowStatusItem()
create.formdef_slug = target_formdef.url_name
create.mappings = [
Mapping(field_id='0', expression='=form_var_email_string'),
]
st1.items.append(create)
item.parent = st1
wf.store()
formdef = FormDef()
formdef.name = 'source form'
formdef.fields = [
EmailField(id='0', label='Email', type='email'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
assert target_formdef.data_class().count() == 0
assert emails.count() == 0
formdata.perform_workflow()
get_response().process_after_jobs()
assert target_formdef.data_class().count() == 1
assert emails.count() == 1
tracking_code = target_formdef.data_class().select()[0].tracking_code
assert tracking_code in emails.get('Foobar')['payload']
def test_create_formdata_attach_to_history(two_pubs):
FormDef.wipe()
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.tracking_code_class.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
]
target_formdef.store()
target_formdef.data_class().wipe()
wf = Workflow(name='create-formdata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = CreateFormdataWorkflowStatusItem()
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.id = '_create'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = wf.possible_status[1]
wf.possible_status[1].items.insert(0, create)
wf.store()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = []
source_formdef.workflow_id = wf.id
source_formdef.store()
formdata = source_formdef.data_class()()
formdata.data = {}
formdata.just_created()
create.formdef_slug = target_formdef.url_name
create.attach_to_history = True
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
assert formdata.evolution[-1].parts[0].attach_to_history is True
assert 'Created new form' in str(formdata.evolution[-1].parts[0].view())
assert target_formdef.get_url() in str(formdata.evolution[-1].parts[0].view())
# don't crash in case target formdata is removed
formdata.refresh_from_storage()
target_formdef.data_class().wipe()
assert 'deleted' in str(formdata.evolution[-1].parts[0].view())
# don't crash in case target formdef is removed
target_formdef.remove_self()
formdata.refresh_from_storage()
assert 'deleted' in str(formdata.evolution[-1].parts[0].view())