invoice emailing by fetching user data from idp

This commit is contained in:
Serghei Mihai 2014-10-06 17:02:55 +02:00
parent 44e0786840
commit 4e82a97a2e
4 changed files with 122 additions and 13 deletions

View File

@ -2,21 +2,33 @@ from datetime import datetime, timedelta
import json
import shutil
import os
import requests
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from django.utils.timezone import make_aware, get_current_timezone
from django.db.models import Q
from django.template.loader import get_template
from django.template import Context
from django.core.mail import EmailMultiAlternatives
from django.contrib.sites.models import Site
from synchro_orleans.data.models import Facture, InvoiceNotificationEmail
from synchro_orleans import signature
tmp_json_location = settings.INVOICES_MAIL_DIR
json_location = settings.INVOICES_MAIL_LOCATION_PATTERN
notification_timeout = settings.INVOICES_NOTIFICATION_TIMEOUT
invoice_view_url_base = settings.INVOICE_VIEW_URL_BASE
class Command(BaseCommand):
help = """Checks for new active invoices and creates emails to send to the users"""
help = """
Sends email notifications about the invoices
"""
email_subject = 'Nouvelle facture'
email_template = 'invoice_mail.txt'
email_from = settings.DEFAULT_FROM_EMAIL
def handle(self, *args, **options):
@ -31,11 +43,24 @@ class Command(BaseCommand):
InvoiceNotificationEmail.objects.create(invoice_number=invoice.id)
nameid = invoice.famille.liaisonnameidfamille_set.all()[0]
tmp_json_file = os.path.join(tmp_json_location, '%s.json' % invoice.id)
with open(tmp_json_file, 'w') as json_output:
json.dump({'nameid': nameid.name_id, 'invoice_id': invoice.id}, json_output)
invoice.date_envoi_dernier_mail = make_aware(datetime.now(), get_current_timezone())
InvoiceNotificationEmail.objects.create(invoice_number=invoice.id)
invoice.save()
shutil.move(tmp_json_file, json_location.format(invoice_id=invoice.id))
signed_query = signature.sign_query('orig=synchro-orleans')
url = '{url}/{nameid}?{query}'.format(url=settings.AUTHENTIC2_USER_INFO_URL,
nameid=nameid.name_id,
query=signed_query)
response = requests.get(url)
data = response.json()
attachment = os.path.join(settings.INVOICES_DIR, 'facture_%s.pdf' % invoice.id)
context = Context({'invoice': invoice,
'site': Site.objects.get_current(),
'invoice_view_url_base': invoice_view_url_base},)
context.update(data)
text_body = get_template(self.email_template).render(context)
message = EmailMultiAlternatives(self.email_subject, text_body, self.email_from, [result.email])
message.attach_file(os.path.join(settings.INVOICES_DIR, 'facture_%s.pdf'% invoice.id))
message.send()
invoice.date_envoi_dernier_mail = make_aware(datetime.now(), get_current_timezone())
InvoiceNotificationEmail.objects.create(invoice_number=invoice.id)
invoice.save()

View File

@ -0,0 +1,9 @@
Bonjour {{ first_name }} {{ last_name }},
La facture nr. {{ invoice.id }} d'un montant de {{ invoice.amount }} {{ invoice.devise }} a ete emise le {{ invoice.creation_date }}.
Vous pouvez la visualiser a l'adresse {{ invoice_view_url_base }}/{{ invoice.id }}/{{ invoice.hash }}
Cordialement,
La mairie d'Orleans

View File

@ -160,12 +160,16 @@ LOGGING = {
}
INVOICES_DIR = os.environ.get('INVOICES_DIR', os.path.dirname(__file__))
INVOICES_MAIL_DIR = os.environ.get('INVOICES_MAIL_DIR', os.path.dirname(__file__))
INVOICES_LOCATION_PATTERN = os.path.join(INVOICES_DIR, 'facture_{invoice_id}.pdf')
INVOICES_MAIL_LOCATION_PATTERN = os.path.join(INVOICES_MAIL_DIR, 'tipi', 'facture_{invoice_id}.json')
INVOICES_NOTIFICATION_TIMEOUT = os.environ.get('INVOICES_NOTIFICATION_TIMEOUT', 10) # in days
AUTHENTIC2_USER_INFO_URL = os.environ.get('AUTHENTIC2_USER_INFO_URL','')
AUTHENTIC2_URL = os.environ.get('AUTHENTIC2_URL', '')
PORTAIL_CITOYEN_URL = os.environ.get('PORTAIl_CITOYEN_URL', '')
EMAILING_APIKEY = os.environ.get('EMAILING_APIKEY', '12345')
INVOICE_VIEW_URL_BASE = PORTAIL_CITOYEN_URL + '/facture/simple/tipi/'
AUTHENTIC2_USER_INFO_URL = '%s/userinfo' % AUTHENTIC2_URL
try:
from local_settings import *

View File

@ -0,0 +1,71 @@
import datetime
import base64
import hmac
import hashlib
import urllib
import random
import urlparse
'''Simple signature scheme for query strings'''
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urllib.urlencode((
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urllib.quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def check_url(url, key, known_nonce=None, timedelta=30):
parsed = urlparse.urlparse(url, 'https')
return check_query(parsed.query, key)
def check_query(query, key, known_nonce=None, timedelta=30):
parsed = urlparse.parse_qs(query)
signature = base64.b64decode(parsed['signature'][0])
algo = parsed['algo'][0]
timestamp = parsed['timestamp'][0]
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
nonce = parsed['nonce']
unsigned_query = query.split('&signature=')[0]
if known_nonce is not None and known_nonce(nonce):
return False
print 'timedelta', datetime.datetime.utcnow() - timestamp
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
return False
return check_string(unsigned_query, signature, key, algo=algo)
def check_string(s, signature, key, algo='sha256'):
# constant time compare
signature2 = sign_string(s, key, algo=algo)
if len(signature2) != len(signature):
return False
res = 0
for a, b in zip(signature, signature2):
res |= ord(a) ^ ord(b)
return res == 0
if __name__ == '__main__':
test_key = '12345'
signed_query = sign_query('NameId=_12345&orig=montpellier', test_key)
assert check_query(signed_query, test_key, timedelta=0) is False
assert check_query(signed_query, test_key) is True