chrono/chrono/agendas/management/commands/send_email_notifications.py

76 lines
3.1 KiB
Python

# chrono - agendas system
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
from urllib.parse import urljoin
from django.conf import settings
from django.core.mail import send_mail
from django.core.management.base import BaseCommand
from django.db.transaction import atomic
from django.template.loader import render_to_string
from django.utils import translation
from django.utils.translation import gettext_lazy as _
from chrono.agendas.models import Agenda
from chrono.utils import timezone
class Command(BaseCommand):
EMAIL_SUBJECTS = {
'almost_full': _('Alert: event "%s" is almost full (90%%)'),
'full': _('Alert: event "%s" is full'),
'cancelled': _('Alert: event "%s" is cancelled'),
}
help = 'Send email notifications'
def handle(self, **options):
translation.activate(settings.LANGUAGE_CODE)
agendas = Agenda.objects.filter(notifications_settings__isnull=False).select_related(
'notifications_settings'
)
for agenda in agendas:
for notification_type in agenda.notifications_settings.get_notification_types():
recipients = notification_type.get_recipients()
if not recipients:
continue
status = notification_type.related_field
filter_kwargs = {status: True, status + '_notification_timestamp__isnull': True}
events = agenda.event_set.filter(**filter_kwargs)
for event in events:
self.send_notification(event, status, recipients)
def send_notification(self, event, status, recipients):
ctx = copy.copy(getattr(settings, 'TEMPLATE_VARS', {}))
ctx.update(
{
'event': event,
'event_url': urljoin(settings.SITE_BASE_URL, event.get_absolute_view_url()),
'subject': self.EMAIL_SUBJECTS[status] % event,
}
)
ctx.update(settings.TEMPLATE_VARS)
subject = render_to_string('agendas/emails_subject.txt', ctx).strip()
body = render_to_string('agendas/event_notification_body.txt', ctx)
html_body = render_to_string('agendas/event_notification_body.html', ctx)
timestamp = timezone.now()
with atomic():
setattr(event, status + '_notification_timestamp', timestamp)
event.save()
send_mail(subject, body, settings.DEFAULT_FROM_EMAIL, recipients, html_message=html_body)