emails: fix tests to allow simple hostname fetch in sendmail (#89284)
gitea/wcs/pipeline/head This commit looks good Details

This commit is contained in:
Yann Weber 2024-04-11 09:28:26 +02:00
parent b443ea2e5d
commit 836b7a96b5
7 changed files with 194 additions and 93 deletions

View File

@ -1,4 +1,3 @@
import datetime
import hashlib
import json
import os
@ -24,6 +23,7 @@ from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.forms.root import PublicFormStatusPage
from wcs.qommon.emails import docutils
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon.template import Template
from wcs.roles import logged_users_role
@ -71,6 +71,11 @@ def pub(request):
}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub.set_app_dir(req)
formdef = UserFieldsFormDef(pub)
formdef.fields = [
fields.StringField(id='_phone', label='phone', varname='phone', validation={'type': 'phone'})
@ -4134,13 +4139,12 @@ def test_card_email_actions(pub, emails):
assert action_url in force_str(email_data['payloads'][1])
headers = email_data.email.extra_headers
assert 'Message-Id' in headers
assert 'Message-ID' in headers
assert 'References' not in headers
assert 'In-Reply-To' not in headers
ts = datetime.datetime.now().strftime('%Y%m%d.%H%M%S.')
hostname = 'example.net'
expt_id = f'wcs-carddata-{carddef.id}-{carddata.id}.{ts}[^@]+@{hostname}'
assert re.match(expt_id, headers['Message-Id'])
expt_id = fr'wcs-carddata-{carddef.id}-{carddata.id}.[0-9]{{8}}\.[0-9]{{6}}\.[^@]+@{hostname}'
assert re.match(expt_id, headers['Message-ID'])
app = get_app(pub)
resp = app.get(action_url)

View File

@ -62,6 +62,7 @@ def pub():
req.response.filter = {}
req._user = None
pub._set_request(req)
pub.set_app_dir(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub

View File

@ -38,6 +38,7 @@ def pub(request):
req.response.filter = {}
req._user = None
pub._set_request(req)
pub.set_app_dir(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub

View File

@ -37,6 +37,7 @@ def pub(request):
req.response.filter = {}
req._user = None
pub._set_request(req)
pub.set_app_dir(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub

View File

@ -1,11 +1,11 @@
import base64
import datetime
import json
import os
import re
from unittest import mock
import pytest
from django.core import mail
from django.utils.encoding import force_bytes, force_str
from quixote import cleanup, get_response
@ -42,16 +42,13 @@ def pub(request):
req.response.filter = {}
req._user = None
pub._set_request(req)
pub.set_app_dir(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
TestDef.wipe()
return pub
def _get_message_id_re(msg_id_fmt):
return re.compile(msg_id_fmt % {'ts': datetime.datetime.now().strftime(r'%Y%m%d\.%H%M%S\.')})
def test_email(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
@ -82,25 +79,22 @@ def test_email(pub, emails):
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
for part in formdata.evolution[-1].parts:
assert not isinstance(part, EmailEvolutionPart)
assert not any(isinstance(part, EmailEvolutionPart) for part in formdata.evolution[-1].parts)
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
for part in formdata.evolution[-1].parts:
assert not isinstance(part, EmailEvolutionPart)
assert not any(isinstance(part, EmailEvolutionPart) for part in formdata.evolution[-1].parts)
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
for part in formdata.evolution[-1].parts:
assert not isinstance(part, EmailEvolutionPart)
assert not any(isinstance(part, EmailEvolutionPart) for part in formdata.evolution[-1].parts)
hostname = 'example.net'
expt_id = fr'wcs-formdata-{formdef.id}-{formdata.id}\.%(ts)s[^@]+@{hostname}'
expt_id = fr'wcs-formdata-{formdef.id}-{formdata.id}\.[0-9]{{8}}\.[0-9]{{6}}[^@]+@{hostname}'
# send for real
item.body = 'baz'
@ -111,16 +105,12 @@ def test_email(pub, emails):
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
assert 'baz' in emails.get('foobar')['payload']
headers = emails.get('foobar').email.extra_headers
assert 'Message-Id' in headers
assert 'Message-ID' in headers
assert 'In-Reply-To' not in headers
assert 'References' not in headers
assert _get_message_id_re(expt_id).match(headers['Message-Id'])
first_message_id = headers['Message-Id']
for part in formdata.evolution[-1].parts:
if isinstance(part, EmailEvolutionPart):
break
else:
assert False
assert re.match(expt_id, headers['Message-ID'])
first_message_id = headers['Message-ID']
assert any(isinstance(part, EmailEvolutionPart) for part in formdata.evolution[-1].parts)
# template for subject or body (Django)
emails.empty()
@ -133,12 +123,12 @@ def test_email(pub, emails):
assert '1 < 3' in emails.get('Foobar')['payload']
headers = emails.get('Foobar').email.extra_headers
assert 'Message-Id' in headers
assert 'Message-ID' in headers
assert 'In-Reply-To' in headers
assert 'References' in headers
assert first_message_id == headers['In-Reply-To']
assert first_message_id == headers['References']
assert _get_message_id_re(expt_id).match(headers['Message-Id'])
assert re.match(expt_id, headers['Message-ID'])
# template for subject or body (ezt)
emails.empty()
@ -150,12 +140,12 @@ def test_email(pub, emails):
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
headers = emails.get('Foobar').email.extra_headers
assert 'Message-Id' in headers
assert 'Message-ID' in headers
assert 'In-Reply-To' in headers
assert 'References' in headers
assert first_message_id == headers['In-Reply-To']
assert first_message_id == headers['References']
assert _get_message_id_re(expt_id).match(headers['Message-Id'])
assert re.match(expt_id, headers['Message-ID'])
# two recipients
emails.empty()
@ -163,15 +153,7 @@ def test_email(pub, emails):
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost', 'baz@localhost'}
# recipients changed, the email is not a reply-to the previous one
headers = emails.get('foobar').email.extra_headers
assert 'Message-Id' in headers
assert 'In-Reply-To' not in headers
assert 'References' not in headers
assert _get_message_id_re(expt_id).match(headers['Message-Id'])
assert emails.count() == 2 # reply to role1 and new thread to role2
# submitter as recipient, no known email address
emails.empty()
@ -202,16 +184,16 @@ def test_email(pub, emails):
item.to = ['=["foo@localhost", "bar@localhost"]']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
assert emails.count() == 2 # reply to role1 (foo) new mail to bar
assert set().union(*[m.recipients() for m in mail.outbox]) == {'foo@localhost', 'bar@localhost'}
# multiple recipients in a single computed string
emails.empty()
item.to = ['="foo@localhost, bar@localhost"']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
assert emails.count() == 2
assert set().union(*[m.recipients() for m in mail.outbox]) == {'foo@localhost', 'bar@localhost'}
# string as recipient
emails.empty()
@ -234,8 +216,8 @@ def test_email(pub, emails):
item.to = ['foo@localhost, bar@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['email_rcpt']) == {'foo@localhost', 'bar@localhost'}
assert emails.count() == 2
assert set().union(*[m.recipients() for m in mail.outbox]) == {'foo@localhost', 'bar@localhost'}
# invalid recipient
emails.empty()
@ -259,14 +241,6 @@ def test_email(pub, emails):
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['from'] == 'foobar@localhost'
# role1 as recipient again, should be a reply-to previous emails
headers = emails.get('foobar').email.extra_headers
assert 'Message-Id' in headers
assert 'In-Reply-To' in headers
assert 'References' in headers
assert _get_message_id_re(expt_id).match(headers['Message-Id'])
assert first_message_id == headers['In-Reply-To']
assert first_message_id == headers['References']
# custom from email (computed)
emails.empty()
@ -291,6 +265,117 @@ def test_email(pub, emails):
assert emails.get('foobar')['msg']['From'] == 'SENDER NAME <foobar@localhost>'
def test_email_threading(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
pub.role_class.wipe()
role1 = pub.role_class(name='a1')
role1.emails = ['a1@localhost']
role1.store()
role2 = pub.role_class(name='a2')
role2.emails = ['a2@localhost']
role2.store()
role3 = pub.role_class(name='a2')
role3.emails = ['a3@localhost']
role3.store()
# New thread to a1 & a2
emails.empty()
item = SendmailWorkflowStatusItem()
item.body = 'foobar'
item.subject = 'foobar'
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert set(emails.get('foobar')['bcc']) == {'a1@localhost', 'a2@localhost'}
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
headers = emails.get('foobar').email.extra_headers
assert 'Message-ID' in headers
assert 'In-Reply-To' not in headers
assert 'References' not in headers
message_id = headers['Message-ID']
# In-Reply-To a1
emails.empty()
item.to = [role1.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'a1@localhost'
headers = emails.get('foobar').email.extra_headers
assert 'Message-ID' in headers
assert 'In-Reply-To' in headers
assert 'References' in headers
assert headers['In-Reply-To'] == headers['References']
assert headers['In-Reply-To'] == message_id
# New thread to a3 & In-Reply-To a1
emails.empty()
item.to = [role1.id, role3.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 2
assert mail.outbox[0].extra_headers['Message-ID'] != mail.outbox[1].extra_headers['Message-ID']
reply_ids = {
email.extra_headers.get('In-Reply-To', None): set(email.recipients()) for email in mail.outbox
}
expt_ids = {message_id: {'a1@localhost'}, None: {'a3@localhost'}}
assert reply_ids == expt_ids
for email in mail.outbox:
if 'a3@localhost' in email.recipients():
message_id3 = email.extra_headers['Message-ID']
# In-Reply-To a1 & a2, In-Reply-To a3
emails.empty()
item.to = [role1.id, role2.id, role3.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 2
assert mail.outbox[0].extra_headers['Message-ID'] != mail.outbox[1].extra_headers['Message-ID']
reply_ids = {
email.extra_headers.get('In-Reply-To', None): set(email.recipients()) for email in mail.outbox
}
expt_ids = {message_id: {'a1@localhost', 'a2@localhost'}, message_id3: {'a3@localhost'}}
assert reply_ids == expt_ids
# New thread to a4, In-Reply-To a1 & a2, In-Reply-To a3
emails.empty()
role1.emails = ['a1@localhost', 'a4@localhost']
role1.store()
item.to = [role1.id, role2.id, role3.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 3
assert mail.outbox[0].extra_headers['Message-ID'] != mail.outbox[1].extra_headers['Message-ID']
assert mail.outbox[0].extra_headers['Message-ID'] != mail.outbox[2].extra_headers['Message-ID']
assert mail.outbox[1].extra_headers['Message-ID'] != mail.outbox[2].extra_headers['Message-ID']
reply_ids = {
email.extra_headers.get('In-Reply-To', None): set(email.recipients()) for email in mail.outbox
}
expt_ids = {
message_id: {'a1@localhost', 'a2@localhost'},
message_id3: {'a3@localhost'},
None: {'a4@localhost'},
}
assert reply_ids == expt_ids
def test_email_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'

View File

@ -42,6 +42,7 @@ def pub():
req.response.filter = {}
req._user = None
pub._set_request(req)
pub.set_app_dir(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub

View File

@ -47,12 +47,12 @@ from wcs.workflows import (
class EmailEvolutionPart(EvolutionPart):
def __init__(self, varname, addresses, mail_subject, mail_body, message_id):
def __init__(self, varname, addresses, mail_subject, mail_body, messages_id):
self.varname = varname
self.addresses = addresses
self.mail_subject = mail_subject
self.mail_body = mail_body
self.message_id = message_id
self.messages_id = messages_id
self.datetime = now()
@ -232,29 +232,31 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
)
def get_message_id(self, formdata):
for tenant in get_publisher().get_tenants():
if tenant.directory == get_publisher().app_dir:
hostname = tenant.hostname
break
else:
raise EmailError('Unable to find tenant')
message_id = 'wcs-%(type)s-%(formdef_id)s-%(formdata_id)s.%(ts)s@%(hostname)s'
message_id %= {
hostname = get_publisher().tenant.hostname
return 'wcs-%(type)s-%(formdef_id)s-%(formdata_id)s.%(ts)s@%(hostname)s' % {
'type': formdata.formdef.data_sql_prefix,
'formdef_id': formdata.formdef.id,
'formdata_id': formdata.id,
'ts': datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f'),
'hostname': hostname,
}
return message_id
def get_first_message_id(self, formdata, addresses):
for part in list(formdata.iter_evolution_parts(klass=EmailEvolutionPart)):
if part.addresses == addresses:
return part.message_id
return None
def get_threads_headers(self, formdata, addresses):
remaining = set(addresses)
result = {}
for part in formdata.iter_evolution_parts(klass=EmailEvolutionPart):
for msg_id, addrs in part.messages_id.items():
concerned = {addr for addr in remaining if addr in addrs}
remaining -= concerned
if concerned:
result[msg_id] = (self.get_message_id(formdata), list(concerned))
if len(remaining) == 0:
break
if len(remaining) == 0:
break
else:
result[None] = (self.get_message_id(formdata), list(remaining))
return result
def get_body_parameter_view_value(self):
return htmltext('<pre class="wrapping-pre">%s</pre>') % self.body
@ -417,8 +419,12 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
attachments = self.convert_attachments_to_uploads(extra_attachments)
message_id = self.get_message_id(formdata)
in_reply_to = self.get_first_message_id(formdata, addresses)
common_kwargs = {
'email_from': email_from,
'attachments': attachments,
}
threads_headers = self.get_threads_headers(formdata, addresses)
formdata.evolution[-1].add_part(
EmailEvolutionPart(
@ -426,34 +432,36 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
addresses=addresses,
mail_subject=mail_subject,
mail_body=mail_body,
message_id=message_id,
messages_id={msg_id: addrs for dummy, (msg_id, addrs) in threads_headers.items()},
)
)
formdata.store()
extra_headers = {'Message-Id': message_id}
if in_reply_to:
extra_headers['In-Reply-To'] = in_reply_to
extra_headers['References'] = in_reply_to
for first_id, (message_id, addrs) in threads_headers.items():
if first_id:
reply_headers = {'In-Reply-To': first_id, 'References': first_id}
else:
reply_headers = {}
kwargs = {
'email_from': email_from,
'attachments': attachments,
'extra_headers': extra_headers,
}
if len(addresses) > 1:
kwargs['email_rcpt'] = None
kwargs['bcc'] = addresses
else:
kwargs['email_rcpt'] = addresses
if len(addresses) > 1:
dest_kwargs = {'email_rcpt': None, 'bcc': addrs}
else:
dest_kwargs = {'email_rcpt': addresses}
try:
email = emails.get_email(mail_subject, mail_body, **kwargs)
email_kwargs = {
**common_kwargs,
**dest_kwargs,
'extra_headers': {'Message-ID': message_id, **reply_headers},
}
try:
email = emails.get_email(mail_subject, mail_body, **email_kwargs)
if email:
self.send_email(email)
except TooBigEmailError:
get_publisher().record_error(_('Email too big to be sent'), formdata=formdata, status_item=self)
if email:
self.send_email(email)
except TooBigEmailError:
get_publisher().record_error(
_('Email too big to be sent'), formdata=formdata, status_item=self
)
def send_email(self, email):
emails.send_email(email, fire_and_forget=True)