welco/welco/utils.py

186 lines
6.8 KiB
Python
Raw Permalink Normal View History

# welco - multichannel request processing
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2015-09-21 13:57:33 +02:00
import base64
import datetime
import hmac
import hashlib
import json
import random
import re
import requests
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponse, HttpResponseBadRequest
2020-01-19 19:15:38 +01:00
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode, quote
from django.utils.six.moves.urllib import parse as urlparse
2015-09-21 13:57:33 +02:00
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urlencode((
2015-09-21 13:57:33 +02:00
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + quote(signature)
2015-09-21 13:57:33 +02:00
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
2020-01-19 19:15:38 +01:00
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
2015-09-21 13:57:33 +02:00
return hash.digest()
def get_wcs_services():
return settings.KNOWN_SERVICES.get('wcs')
def get_wcs_json(wcs_url, path, wcs_site, params={}):
if not wcs_url.endswith('/'):
wcs_url += '/'
url = wcs_url + path + '?orig=%s' % wcs_site.get('orig')
if params:
url += '&' + urlencode(params)
response_json = cache.get(url)
if response_json is None:
signed_url = sign_url(url, wcs_site.get('secret'))
response_json = requests.get(signed_url, headers={'accept': 'application/json'},
timeout=10).json()
if not isinstance(response_json, dict):
response_json = {'data': response_json}
cache.set(url, response_json)
return response_json
def get_wcs_options(url, condition=None, params={}):
categories = {}
2020-01-19 19:10:24 +01:00
for wcs_key, wcs_site in get_wcs_services().items():
site_title = wcs_site.get('title')
response_json = get_wcs_json(wcs_site.get('url'), url, wcs_site, params)
if type(response_json) is dict:
response_json = response_json.get('data')
for element in response_json:
if condition and not condition(element):
continue
slug = element.get('slug')
category = element.get('category') or '-'
title = element.get('title')
if len(get_wcs_services()) == 1:
category_key = category
else:
category_key = '%s : %s' % (site_title, category)
if not category_key in categories:
categories[category_key] = []
reference = '%s:%s' % (wcs_key, slug)
categories[category_key].append((reference, title))
options = []
for category in sorted(categories.keys()):
options.append((category, sorted(categories[category], key=lambda x: x[1])))
return options
def get_wcs_formdef_details(formdef_reference):
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
forms_response_json = get_wcs_json(wcs_site.get('url'), 'json', wcs_site)
for form in forms_response_json.get('data') or []:
slug = form.get('slug')
if slug == form_slug:
return form
return None
2015-09-21 13:57:33 +02:00
def push_wcs_formdata(request, formdef_reference, context=None):
2015-09-21 13:57:33 +02:00
wcs_key, form_slug = formdef_reference.split(':')
wcs_site = get_wcs_services()[wcs_key]
wcs_site_url = get_wcs_services()[wcs_key]['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + 'api/formdefs/%s/schema' % form_slug
response = requests.get(url)
create_draft = not(bool('welco-direct' in (response.json().get('keywords') or '')))
2015-09-21 13:57:33 +02:00
url = wcs_site_url + 'api/formdefs/%s/submit?' % form_slug
data = {
'meta': {'draft': create_draft, 'backoffice-submission': True},
2015-09-21 13:57:33 +02:00
'data': {},
}
if context:
data['context'] = context
2015-09-21 13:57:33 +02:00
url += 'orig=%s' % wcs_site.get('orig')
if request.session.get('mellon_session'):
mellon = request.session['mellon_session']
nameid = mellon['name_id_content']
url += '&NameID=' + quote(nameid)
2015-09-21 13:57:33 +02:00
url = sign_url(url, wcs_site.get('secret'))
response = requests.post(url, data=json.dumps(data),
headers={'Content-type': 'application/json'})
if response.json().get('err') != 0:
raise Exception('error %r' % response.content)
data = response.json()['data']
return data['id'], data.get('backoffice_url')
2015-09-24 12:28:56 +02:00
def get_wcs_data(endpoint, params=None):
wcs_site = list(get_wcs_services().values())[0]
2015-09-24 12:28:56 +02:00
wcs_site_url = wcs_site['url']
if not wcs_site_url.endswith('/'):
wcs_site_url += '/'
url = wcs_site_url + endpoint
if not params:
2015-09-24 12:28:56 +02:00
params = {}
params['orig'] = wcs_site.get('orig')
if params:
url += '?' + urlencode(params.items())
2015-09-24 12:28:56 +02:00
url = sign_url(url, wcs_site.get('secret'))
response = requests.get(url)
response.raise_for_status()
json_response = response.json()
if not isinstance(json_response, dict):
json_response = {'data': json_response}
return json_response
def response_for_json(request, data):
json_str = json.dumps(data)
for variable in ('jsonpCallback', 'callback'):
if variable in request.GET:
identifier = request.GET[variable]
if not re.match(r'^[$A-Za-z_][0-9A-Za-z_$]*$', identifier):
return HttpResponseBadRequest('invalid JSONP callback name')
json_str = '%s(%s);' % (identifier, json_str)
response = HttpResponse(content_type='application/javascript')
break
else:
response = HttpResponse(content_type='application/json')
response.write(json_str)
return response