wcs/tests/workflow/test_email.py

671 lines
23 KiB
Python

import base64
import json
import os
import pytest
from django.utils.encoding import force_bytes, force_text
from quixote import cleanup, get_response
from wcs import sessions
from wcs.fields import FileField, StringField
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.substitution import CompatibilityNamesDict
from wcs.qommon.upload_storage import PicklableUpload
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.sendmail import SendmailWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from ..utilities import MockSubstitutionVariables, clean_temporary_pub, create_temporary_pub
def setup_module(module):
cleanup()
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
def test_email(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
pub.role_class.wipe()
role1 = pub.role_class(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = pub.role_class(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
# send using an uncompleted element
item = SendmailWorkflowStatusItem()
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
# send for real
item.body = 'baz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
assert 'baz' in emails.get('foobar')['payload']
# template for subject or body (Django)
emails.empty()
item.subject = '{{ bar }}'
item.body = '{{ foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# template for subject or body (ezt)
emails.empty()
item.subject = '[bar]'
item.body = '[foo]'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# two recipients
emails.empty()
item.subject = 'foobar'
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost', 'baz@localhost'}
# submitter as recipient, no known email address
emails.empty()
item.to = ['_submitter']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# submitter as recipient, known email address
emails.empty()
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
# computed recipient
emails.empty()
item.to = ['=email']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
# computed list of recipients
emails.empty()
item.to = ['=["foo@localhost", "bar@localhost"]']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
# multiple recipients in a single computed string
emails.empty()
item.to = ['="foo@localhost, bar@localhost"']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
# string as recipient
emails.empty()
item.to = 'xyz@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# string as recipient (but correctly set in a list)
emails.empty()
item.to = ['xyz@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# multiple recipients in a static string
emails.empty()
item.to = ['foo@localhost, bar@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
# invalid recipient
emails.empty()
item.to = ['=foobar']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# empty recipient
emails.empty()
item.to = ['=None']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# custom from email
emails.empty()
item.to = [role1.id]
item.custom_from = 'foobar@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['from'] == 'foobar@localhost'
# custom from email (computed)
emails.empty()
item.to = [role1.id]
item.custom_from = '="foobar@localhost"'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['from'] == 'foobar@localhost'
# custom sender name defined from site-options variable
pub.load_site_options()
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'email_sender_name', 'SENDER NAME')
emails.empty()
item.to = [role1.id]
item.custom_from = '="foobar@localhost"'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['msg']['From'] == 'SENDER NAME <foobar@localhost>'
def test_email_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = SendmailWorkflowStatusItem()
item.to = ['foo@localhost']
item.subject = 'Foobar'
# explicit safe strings
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo|safe }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic no-escaping (because text/plain)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic escaping (because mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '<p>1 &lt; 3</p>' in emails.get('Foobar')['payload'].strip()
# no automatic escaping for subject (even if mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.subject = '{{ form_var_foo }}'
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('1 < 3')
def test_email_attachments(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
with open(os.path.join(os.path.dirname(__file__), '..', 'image-with-gps-data.jpeg'), 'rb') as fd:
upload.receive([fd.read()])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
sendmail = SendmailWorkflowStatusItem()
sendmail.subject = 'foobar'
sendmail.body = '<p>force html</p>'
sendmail.to = ['to@example.net']
sendmail.attachments = ['form_var_file_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# build a backoffice field
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='email with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1-1x', label='bo field 1', type='file', varname='backoffice_file1'),
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
# store file in backoffice field form_fbo1_1x / form_var_backoffice_file_raw
setbo = SetBackofficeFieldsWorkflowStatusItem()
setbo.parent = st1
setbo.fields = [{'field_id': 'bo1-1x', 'value': '=form_var_frontoffice_file_raw'}]
setbo.perform(formdata)
# check compatibility with actions defined before #33366 was fixed
emails.empty()
sendmail.attachments = ['form_fbo1-1x']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# check with correct varname-less field
emails.empty()
sendmail.attachments = ['form_fbo1_1x']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# check with template with varname-less field
emails.empty()
sendmail.attachments = ['{{form_fbo1_1x}}']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# check with variable
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# backoffice_file2 is unset, no more parts :
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
# set backoffice_file2 and retry
setbo.fields = [
{
'field_id': 'bo2',
'value': '={"content": "blah", "filename": "hello.txt", ' '"content_type": "text/plain"}',
}
]
setbo.perform(formdata)
emails.empty()
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
assert (
base64.decodebytes(force_bytes(emails.emails['foobar']['msg'].get_payload()[2].get_payload()))
== b'blah'
)
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
emails.empty()
sendmail.attachments = [
'utils.attachment("Hello world")',
'utils.attachment(\'{"hello": "world"}\', content_type=\'application/json\')',
]
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload(0).get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload(1).get_content_type() == 'application/octet-stream'
assert emails.emails['foobar']['msg'].get_payload(2).get_content_type() == 'application/json'
payload1 = emails.emails['foobar']['msg'].get_payload(1)
payload2 = emails.emails['foobar']['msg'].get_payload(2)
assert payload1.get_payload(decode=True) == b"Hello world"
assert json.loads(force_text(payload2.get_payload(decode=True))) == {'hello': 'world'}
# check with templates
emails.empty()
sendmail.attachments = ['{{form_var_backoffice_file1_raw}}']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
emails.empty()
sendmail.attachments = ['{{form_var_backoffice_file2}}']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'text/plain'
emails.empty()
sendmail.attachments = ['{% firstof form_var_frontoffice_file form_var_backoffice_file2 %}']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
def test_workflow_email_line_details(pub):
workflow = Workflow(name='email')
st1 = workflow.add_status('Status1', 'st1')
sendmail = SendmailWorkflowStatusItem()
sendmail.parent = st1
assert sendmail.get_line_details() == 'not completed'
role = pub.role_class(name='foorole')
role.store()
sendmail.to = [role.id]
assert sendmail.get_line_details() == 'to foorole'
sendmail.to = ['test@example.net']
assert sendmail.get_line_details() == 'to test@example.net'
sendmail.to = ['{{ foobar }}']
assert sendmail.get_line_details() == 'to computed value'
def test_workflow_email_to_user_function(pub, emails):
user = pub.user_class(name='foo')
user.email = 'foobar@localhost'
user.name_identifiers = ['0123456789']
user.store()
workflow = Workflow(name='wf roles')
st1 = workflow.add_status('Status1', 'st1')
item1 = st1.add_action('dispatch')
item1.role_key = '_receiver'
item1.role_id = '{{ form_user }}'
item2 = st1.add_action('sendmail')
item2.to = ['_receiver']
item2.subject = 'Foobar'
item2.body = 'Hello'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item1.perform(formdata)
assert formdata.workflow_roles == {'_receiver': ['_user:%s' % user.id]}
emails.empty()
item2.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert emails.get('Foobar')['email_rcpt'] == ['foobar@localhost']
def test_email_part(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
role1 = pub.role_class(name='bar')
role1.emails = ['bar@localhost', 'baz@localhost']
role1.store()
item = SendmailWorkflowStatusItem()
item.to = [role1.id]
item.subject = 'foobar'
item.body = 'baz'
item.varname = 'zzz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
substvars = CompatibilityNamesDict()
substvars.update(formdata.get_substitution_variables())
keys = substvars.get_flat_keys()
for key in keys:
# noqa pylint: disable=unused-variable
var = substvars[key] # check it doesn't raise, ignore the value
assert substvars['form_workflow_email_zzz_subject'] == 'foobar'
assert substvars['form_workflow_email_zzz_body']
assert substvars['form_workflow_email_zzz_datetime']
assert substvars['form_workflow_email_zzz_addresses']
# check indexed access is not advertised but does work
assert 'form_workflow_email_zzz_0_subject' not in keys
assert substvars['form_workflow_email_zzz_0_subject'] == 'foobar'
# run a second time
item.subject = 'foobar2'
item.perform(formdata)
get_response().process_after_jobs()
keys = substvars.get_flat_keys()
# check indexed access is now advertised
assert 'form_workflow_email_zzz_0_subject' in keys
assert 'form_workflow_email_zzz_1_subject' in keys
# check indexed access
assert substvars['form_workflow_email_zzz_0_subject'] == 'foobar'
assert substvars['form_workflow_email_zzz_1_subject'] == 'foobar2'
# check non-indexed access gives the latest value
assert substvars['form_workflow_email_zzz_subject'] == 'foobar2'
def test_email_computed_recipients(pub, emails):
pub.user_class.wipe()
FormDef.wipe()
user1 = pub.user_class(name='userA')
user1.name_identifiers = ['xxy1']
user1.email = 'user1@example.com'
user1.store()
user2 = pub.user_class(name='userB')
user2.name_identifiers = ['xxy2']
user2.email = 'user2@example.com'
user2.store()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdatas = []
for i in range(2):
formdatas.append(formdef.data_class()())
formdatas[0].user_id = user1.id
formdatas[1].user_id = user2.id
for formdata in formdatas:
formdata.just_created()
formdata.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendmailWorkflowStatusItem()
item.varname = 'test'
item.to = []
item.subject = 'xxx'
item.body = 'XXX'
for recipient in [
'user1@example.com,user2@example.com',
'{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_email" }},{% endfor %}',
'{{ forms|objects:"foo"|getlist:"form_user_email" }}',
'{{ forms|objects:"foo"|getlist:"form_user_email"|list }}',
'{{ forms|objects:"foo"|getlist:"form_user" }}',
'{{ forms|objects:"foo"|getlist:"form_user"|list }}',
]:
item.to = [recipient]
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(formdata.evolution[-1].parts[-1].addresses) == {'user1@example.com', 'user2@example.com'}
formdata.evolution[-1].parts = []
formdata.user_id = user1.id
pub.substitutions.feed(formdata)
item.to = ['{{form_user}}']
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(formdata.evolution[-1].parts[-1].addresses) == {'user1@example.com'}
formdata.evolution[-1].parts = []