welco/welco/utils.py

188 lines
6.8 KiB
Python
Raw Normal View History

# welco - multichannel request processing
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2015-09-21 13:57:33 +02:00
import base64
import datetime
import hmac
import hashlib
import json
import random
import re
import requests
2015-09-21 13:57:33 +02:00
import urllib
import urlparse
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponse, HttpResponseBadRequest
from django.utils.http import urlencode
2015-09-21 13:57:33 +02:00
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urllib.urlencode((
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urllib.quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(str(key), digestmod=digestmod, msg=s)
return hash.digest()
def get_wcs_services():
return settings.KNOWN_SERVICES.get('wcs')
def get_wcs_json(wcs_url, path, wcs_site, params={}):
if not wcs_url.endswith('/'):
wcs_url += '/'
url = wcs_url + path + '?orig=%s' % wcs_site.get('orig')
if params:
url += '&' + urlencode(params)
response_json = cache.get(url)
if response_json is None:
signed_url = sign_url(url, wcs_site.get('secret'))
response_json = requests.get(signed_url, headers={'accept': 'application/json'},
timeout=10).json()
if not isinstance(response_json, dict):
response_json = {'data': response_json}
cache.set(url, response_json)
return response_json
def get_wcs_options(url, condition=None, params={}):
categories = {}
for wcs_key, wcs_site in get_wcs_services().iteritems():
site_title = wcs_site.get('title')
response_json = get_wcs_json(wcs_site.get('url'), url, wcs_site, params)
if type(response_json) is dict:
response_json = response_json.get('data')
for element in response_json:
if condition and not condition(element):
continue
slug = element.get('slug')
category = element.get('category') or '-'
title = element.get('title')
if len(get_wcs_services()) == 1:
category_key = category
else:
category_key = '%s : %s' % (site_title, category)
if not category_key in categories:
categories[category_key] = []
reference = '%s:%s' % (wcs_key, slug)
categories[category_key].append((reference, title))
options = []
for category in sorted(categories.keys()):
options.append((category, sorted(categories[category], lambda x, y: cmp(x[1], y[1]))))
return options
def get_wcs_formdef_details(formdef_reference):
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
forms_response_json = get_wcs_json(wcs_site.get('url'), 'json', wcs_site)
for form in forms_response_json.get('data') or []:
slug = form.get('slug')
if slug == form_slug:
return form
return None
2015-09-21 13:57:33 +02:00
def push_wcs_formdata(request, formdef_reference, context=None):
2015-09-21 13:57:33 +02:00
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
wcs_site_url = get_wcs_services()[wcs_key]['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + 'api/formdefs/%s/schema' % form_slug
response = requests.get(url)
create_draft = not(bool('welco-direct' in (response.json().get('keywords') or '')))
2015-09-21 13:57:33 +02:00
url = wcs_site_url + 'api/formdefs/%s/submit?' % form_slug
data = {
'meta': {'draft': create_draft, 'backoffice-submission': True},
2015-09-21 13:57:33 +02:00
'data': {},
}
if context:
data['context'] = context
2015-09-21 13:57:33 +02:00
url += 'orig=%s' % wcs_site.get('orig')
if request.session.get('mellon_session'):
mellon = request.session['mellon_session']
nameid = mellon['name_id_content']
url += '&NameID=' + urllib.quote(nameid)
url = sign_url(url, wcs_site.get('secret'))
response = requests.post(url, data=json.dumps(data),
headers={'Content-type': 'application/json'})
if response.json().get('err') != 0:
raise Exception('error %r' % response.content)
return response.json()['data']['id']
2015-09-24 12:28:56 +02:00
def get_wcs_data(endpoint, params=None):
wcs_site = get_wcs_services().values()[0]
wcs_site_url = wcs_site['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + endpoint
if params:
params = params.copy()
for param, value in params.items():
if isinstance(value, unicode):
params[param] = value.encode('utf-8')
else:
params = {}
params['orig'] = wcs_site.get('orig')
if params:
url += '?' + urllib.urlencode(params.items())
url = sign_url(url, wcs_site.get('secret'))
response = requests.get(url)
response.raise_for_status()
json_response = response.json()
if not isinstance(json_response, dict):
json_response = {'data': json_response}
return json_response
def response_for_json(request, data):
json_str = json.dumps(data)
for variable in ('jsonpCallback', 'callback'):
if variable in request.GET:
identifier = request.GET[variable]
if not re.match(r'^[$A-Za-z_][0-9A-Za-z_$]*$', identifier):
return HttpResponseBadRequest('invalid JSONP callback name')
json_str = '%s(%s);' % (identifier, json_str)
response = HttpResponse(content_type='application/javascript')
break
else:
response = HttpResponse(content_type='application/json')
response.write(json_str)
return response