workflows: allow attachments in emails (#8274)
This commit is contained in:
parent
05f0d8e0d0
commit
786343b28b
|
@ -1921,7 +1921,7 @@ def test_workflows_edit_email_action(pub):
|
|||
role = create_role()
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
workflow.add_status(name='baz')
|
||||
st1 = workflow.add_status(name='baz')
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
@ -1959,6 +1959,78 @@ def test_workflows_edit_email_action(pub):
|
|||
resp = resp.form.submit('submit')
|
||||
assert 'error in template' in resp.body and 'unmatched [end]' in resp.body
|
||||
|
||||
# attachments without backoffice fields: python expressions
|
||||
resp = app.get(item_url)
|
||||
assert "Attachments (Python expressions)" in resp.body
|
||||
resp.form['attachments$element0'] = 'form_var_upload_raw'
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
resp = app.get(item_url)
|
||||
assert "Attachments (Python expressions)" in resp.body
|
||||
assert resp.form['attachments$element0'].value == 'form_var_upload_raw'
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw']
|
||||
|
||||
# attachments with backoffice fields: select-with-other inputs
|
||||
workflow = Workflow.get(workflow.id)
|
||||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||||
workflow.backoffice_fields_formdef.fields = [
|
||||
fields.FileField(id='bo1', label='bo field 1', type='file', varname='upload'),
|
||||
fields.FileField(id='bo2', label='bo field 2', type='file', varname='upload2'),
|
||||
fields.FileField(id='bo3', label='bo field varnameless', type='file'),
|
||||
]
|
||||
workflow.store()
|
||||
resp = app.get(item_url)
|
||||
assert "Attachments" in resp.body
|
||||
assert "Attachments (Python expressions)" not in resp.body
|
||||
assert resp.form['attachments$element0$choice'].value == 'form_var_upload_raw'
|
||||
assert len(resp.form['attachments$element0$choice'].options) == 5
|
||||
resp.form['attachments$element1$choice'] = 'form_var_upload2_raw'
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw']
|
||||
|
||||
resp = app.get(item_url)
|
||||
resp.form['attachments$element2$choice'] = 'form_fbo3'
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw', 'form_fbo3']
|
||||
|
||||
resp = app.get(item_url)
|
||||
resp.form['attachments$element3$choice'] = '__other'
|
||||
resp.form['attachments$element3$other'] = '{"content":"foo", "filename":"bar.txt"}'
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw', 'form_fbo3',
|
||||
'{"content":"foo", "filename":"bar.txt"}']
|
||||
|
||||
# remove some backoffice fields: varnameless fbo3 disapear
|
||||
workflow = Workflow.get(workflow.id)
|
||||
workflow.backoffice_fields_formdef.fields = [
|
||||
fields.FileField(id='bo2', label='bo field 2', type='file', varname='upload2'),
|
||||
]
|
||||
workflow.store()
|
||||
resp = app.get(item_url)
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw',
|
||||
'{"content":"foo", "filename":"bar.txt"}']
|
||||
# remove all backoffice fields
|
||||
workflow = Workflow.get(workflow.id)
|
||||
workflow.backoffice_fields_formdef.fields = []
|
||||
workflow.store()
|
||||
resp = app.get(item_url)
|
||||
assert "Attachments (Python expressions)" in resp.body
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location
|
||||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw',
|
||||
'{"content":"foo", "filename":"bar.txt"}']
|
||||
|
||||
def test_workflows_edit_sms_action(pub):
|
||||
create_superuser(pub)
|
||||
role = create_role()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import datetime
|
||||
import os
|
||||
import pytest
|
||||
import shutil
|
||||
import StringIO
|
||||
|
@ -765,6 +766,116 @@ def test_email(pub, emails):
|
|||
assert emails.count() == 1
|
||||
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
||||
|
||||
|
||||
def test_email_attachments(pub, emails):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.fields = [
|
||||
FileField(id='3', label='File', type='file', varname='file'),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||||
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()
|
||||
upload.receive([jpg])
|
||||
formdata = formdef.data_class()()
|
||||
formdata.data = {'3': upload}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
pub.substitutions.feed(formdata)
|
||||
|
||||
sendmail = SendmailWorkflowStatusItem()
|
||||
sendmail.subject = 'foobar'
|
||||
sendmail.body = '<p>force html</p>'
|
||||
sendmail.to = ['to@example.net']
|
||||
sendmail.attachments = ['form_var_file_raw']
|
||||
sendmail.perform(formdata)
|
||||
get_response().process_after_jobs()
|
||||
assert emails.count() == 1
|
||||
assert emails.emails['foobar']['msg'].is_multipart()
|
||||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||||
|
||||
# build a backoffice field
|
||||
Workflow.wipe()
|
||||
FormDef.wipe()
|
||||
wf = Workflow(name='email with attachments')
|
||||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||||
wf.backoffice_fields_formdef.fields = [
|
||||
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
||||
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
|
||||
]
|
||||
st1 = wf.add_status('Status1')
|
||||
wf.store()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.fields = [
|
||||
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
|
||||
]
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.store()
|
||||
formdata = formdef.data_class()()
|
||||
formdata.data = {'1': upload}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
pub.substitutions.feed(formdata)
|
||||
# store file in backoffice field form_fbo1 / form_var_backoffice_file_raw
|
||||
setbo = SetBackofficeFieldsWorkflowStatusItem()
|
||||
setbo.parent = st1
|
||||
setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}]
|
||||
setbo.perform(formdata)
|
||||
|
||||
emails.empty()
|
||||
sendmail.attachments = ['form_fbo1']
|
||||
sendmail.perform(formdata)
|
||||
get_response().process_after_jobs()
|
||||
assert emails.count() == 1
|
||||
assert emails.emails['foobar']['msg'].is_multipart()
|
||||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||||
|
||||
emails.empty()
|
||||
sendmail.attachments = ['form_var_backoffice_file1_raw']
|
||||
sendmail.perform(formdata)
|
||||
get_response().process_after_jobs()
|
||||
assert emails.count() == 1
|
||||
assert emails.emails['foobar']['msg'].is_multipart()
|
||||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||||
|
||||
emails.empty()
|
||||
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
|
||||
sendmail.perform(formdata)
|
||||
get_response().process_after_jobs()
|
||||
assert emails.count() == 1
|
||||
assert emails.emails['foobar']['msg'].is_multipart()
|
||||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||||
# backoffice_file2 is unset, no more parts :
|
||||
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
|
||||
|
||||
# set backoffice_file2 and retry
|
||||
setbo.fields = [{'field_id': 'bo2',
|
||||
'value': '={"content": "blah", "filename": "hello.txt", '
|
||||
'"content_type": "text/plain"}'}]
|
||||
setbo.perform(formdata)
|
||||
emails.empty()
|
||||
sendmail.perform(formdata)
|
||||
get_response().process_after_jobs()
|
||||
assert emails.count() == 1
|
||||
assert emails.emails['foobar']['msg'].is_multipart()
|
||||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
|
||||
assert emails.emails['foobar']['msg'].get_payload()[2].get_payload() == 'blah'
|
||||
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
|
||||
|
||||
|
||||
def test_webservice_call(pub):
|
||||
pub.substitutions.feed(MockSubstitutionVariables())
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ from quixote.html import htmltext
|
|||
import qommon.errors
|
||||
|
||||
from wcs.roles import Role, logged_users_role, get_user_roles
|
||||
from wcs.fields import FileField
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.formdata import Evolution
|
||||
|
||||
|
@ -2015,6 +2016,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
subject = None
|
||||
body = None
|
||||
custom_from = None
|
||||
attachments = None
|
||||
|
||||
comment = None
|
||||
|
||||
|
@ -2053,11 +2055,29 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
return _('Send mail (not completed)')
|
||||
|
||||
def get_parameters(self):
|
||||
return ('to', 'subject', 'body', 'custom_from')
|
||||
return ('to', 'subject', 'body', 'attachments', 'custom_from')
|
||||
|
||||
def fill_admin_form(self, form):
|
||||
self.add_parameters_widgets(form, self.get_parameters())
|
||||
|
||||
def get_attachments_options(self):
|
||||
attachments_options = [(None, '---', None)]
|
||||
varnameless = []
|
||||
for field in self.parent.parent.get_backoffice_fields():
|
||||
if field.key != 'file':
|
||||
continue
|
||||
if field.varname:
|
||||
codename = 'form_var_%s_raw' % field.varname
|
||||
else:
|
||||
codename = 'form_f%s' % field.id # = form_fbo<n>
|
||||
varnameless.append(codename)
|
||||
attachments_options.append((codename, field.label, codename))
|
||||
# filter: do not consider removed fields without varname
|
||||
attachments = [attachment for attachment in self.attachments or []
|
||||
if ((not attachment.startswith('form_fbo')) or
|
||||
(attachment in varnameless))]
|
||||
return attachments_options, attachments
|
||||
|
||||
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
||||
if 'to' in parameters:
|
||||
form.add(WidgetList, '%sto' % prefix, title=_('To'),
|
||||
|
@ -2076,6 +2096,24 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
value=self.body, cols=80, rows=10,
|
||||
validation_function=ComputedExpressionWidget.validate_ezt,
|
||||
hint=_('Available variables: url, url_status, details, name, number, comment, field_NAME'))
|
||||
|
||||
if 'attachments' in parameters:
|
||||
attachments_options, attachments = self.get_attachments_options()
|
||||
if len(attachments_options) > 1:
|
||||
form.add(WidgetList, '%sattachments' % prefix, title=_('Attachments'),
|
||||
element_type=SingleSelectWidgetWithOther,
|
||||
value=attachments,
|
||||
add_element_label=_('Add attachment'),
|
||||
element_kwargs={'render_br': False, 'options': attachments_options})
|
||||
else:
|
||||
form.add(WidgetList, '%sattachments' % prefix,
|
||||
title=_('Attachments (Python expressions)'),
|
||||
element_type=StringWidget,
|
||||
value=attachments,
|
||||
add_element_label=_('Add attachment'),
|
||||
element_kwargs={'render_br': False, 'size': 50},
|
||||
advanced=not(bool(attachments)))
|
||||
|
||||
if 'custom_from' in parameters:
|
||||
form.add(ComputedExpressionWidget, '%scustom_from' % prefix,
|
||||
title=_('Custom From Address'), value=self.custom_from,
|
||||
|
@ -2152,14 +2190,37 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
if self.custom_from:
|
||||
email_from = self.compute(self.custom_from)
|
||||
|
||||
attachments = []
|
||||
if self.attachments:
|
||||
global_eval_dict = get_publisher().get_global_eval_dict()
|
||||
local_eval_dict = get_publisher().substitutions.get_context_variables()
|
||||
for attachment in self.attachments:
|
||||
try:
|
||||
picklableupload = eval(attachment, global_eval_dict, local_eval_dict)
|
||||
except:
|
||||
get_publisher().notify_of_exception(sys.exc_info(),
|
||||
context='[Sendmail/attachments]')
|
||||
continue
|
||||
if not picklableupload:
|
||||
continue
|
||||
try:
|
||||
picklableupload = FileField.convert_value_from_anything(picklableupload)
|
||||
except ValueError:
|
||||
get_publisher().notify_of_exception(sys.exc_info(),
|
||||
context='[Sendmail/attachments]')
|
||||
continue
|
||||
attachments.append(picklableupload)
|
||||
|
||||
if len(addresses) > 1:
|
||||
emails.email(mail_subject, mail_body, email_rcpt=None,
|
||||
bcc=addresses, email_from=email_from,
|
||||
exclude_current_user=False,
|
||||
attachments=attachments,
|
||||
fire_and_forget=True)
|
||||
else:
|
||||
emails.email(mail_subject, mail_body, email_rcpt=addresses,
|
||||
email_from=email_from, exclude_current_user=False,
|
||||
attachments=attachments,
|
||||
fire_and_forget=True)
|
||||
register_item_class(SendmailWorkflowStatusItem)
|
||||
|
||||
|
|
Loading…
Reference in New Issue