welco/welco/utils.py

186 lines
6.8 KiB
Python

# welco - multichannel request processing
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import hmac
import hashlib
import json
import random
import re
import requests
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponse, HttpResponseBadRequest
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode, quote
from django.utils.six.moves.urllib import parse as urlparse
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urlencode((
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
return hash.digest()
def get_wcs_services():
return settings.KNOWN_SERVICES.get('wcs')
def get_wcs_json(wcs_url, path, wcs_site, params={}):
if not wcs_url.endswith('/'):
wcs_url += '/'
url = wcs_url + path + '?orig=%s' % wcs_site.get('orig')
if params:
url += '&' + urlencode(params)
response_json = cache.get(url)
if response_json is None:
signed_url = sign_url(url, wcs_site.get('secret'))
response_json = requests.get(signed_url, headers={'accept': 'application/json'},
timeout=10).json()
if not isinstance(response_json, dict):
response_json = {'data': response_json}
cache.set(url, response_json)
return response_json
def get_wcs_options(url, condition=None, params={}):
categories = {}
for wcs_key, wcs_site in get_wcs_services().items():
site_title = wcs_site.get('title')
response_json = get_wcs_json(wcs_site.get('url'), url, wcs_site, params)
if type(response_json) is dict:
response_json = response_json.get('data')
for element in response_json:
if condition and not condition(element):
continue
slug = element.get('slug')
category = element.get('category') or '-'
title = element.get('title')
if len(get_wcs_services()) == 1:
category_key = category
else:
category_key = '%s : %s' % (site_title, category)
if not category_key in categories:
categories[category_key] = []
reference = '%s:%s' % (wcs_key, slug)
categories[category_key].append((reference, title))
options = []
for category in sorted(categories.keys()):
options.append((category, sorted(categories[category], key=lambda x: x[1])))
return options
def get_wcs_formdef_details(formdef_reference):
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
forms_response_json = get_wcs_json(wcs_site.get('url'), 'json', wcs_site)
for form in forms_response_json.get('data') or []:
slug = form.get('slug')
if slug == form_slug:
return form
return None
def push_wcs_formdata(request, formdef_reference, context=None):
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
wcs_site_url = get_wcs_services()[wcs_key]['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + 'api/formdefs/%s/schema' % form_slug
response = requests.get(url)
create_draft = not(bool('welco-direct' in (response.json().get('keywords') or '')))
url = wcs_site_url + 'api/formdefs/%s/submit?' % form_slug
data = {
'meta': {'draft': create_draft, 'backoffice-submission': True},
'data': {},
}
if context:
data['context'] = context
url += 'orig=%s' % wcs_site.get('orig')
if request.session.get('mellon_session'):
mellon = request.session['mellon_session']
nameid = mellon['name_id_content']
url += '&NameID=' + quote(nameid)
url = sign_url(url, wcs_site.get('secret'))
response = requests.post(url, data=json.dumps(data),
headers={'Content-type': 'application/json'})
if response.json().get('err') != 0:
raise Exception('error %r' % response.content)
data = response.json()['data']
return data['id'], data.get('backoffice_url')
def get_wcs_data(endpoint, params=None):
wcs_site = list(get_wcs_services().values())[0]
wcs_site_url = wcs_site['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + endpoint
if not params:
params = {}
params['orig'] = wcs_site.get('orig')
if params:
url += '?' + urlencode(params.items())
url = sign_url(url, wcs_site.get('secret'))
response = requests.get(url)
response.raise_for_status()
json_response = response.json()
if not isinstance(json_response, dict):
json_response = {'data': json_response}
return json_response
def response_for_json(request, data):
json_str = json.dumps(data)
for variable in ('jsonpCallback', 'callback'):
if variable in request.GET:
identifier = request.GET[variable]
if not re.match(r'^[$A-Za-z_][0-9A-Za-z_$]*$', identifier):
return HttpResponseBadRequest('invalid JSONP callback name')
json_str = '%s(%s);' % (identifier, json_str)
response = HttpResponse(content_type='application/javascript')
break
else:
response = HttpResponse(content_type='application/json')
response.write(json_str)
return response