workflow: make it possible to sign webservice calls (#6446)

The sign_* functions have been imported from cmsplugin_blurp.
This commit is contained in:
Frédéric Péters 2015-02-09 20:06:05 +01:00
parent ce8254372a
commit dc65fd5b7b
3 changed files with 60 additions and 2 deletions

View File

@ -11,6 +11,7 @@ from wcs.users import User
from wcs.formdef import FormDef
from wcs.categories import Category
from wcs import fields
from wcs.api import sign_url
from utilities import get_app, create_temporary_pub
@ -118,6 +119,22 @@ def test_get_user_from_api_query_string_error_success_sha256():
output = get_app(pub).get('/user?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_sign_url():
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
'12345'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url, status=403)
def test_formdef_list():
FormDef.wipe()
formdef = FormDef()

View File

@ -18,7 +18,10 @@ import base64
import hmac
import hashlib
import datetime
import urllib
import urllib2
import urlparse
import random
from quixote import get_request, get_publisher, get_response
from quixote.directory import Directory
@ -86,6 +89,32 @@ def get_user_from_api_query_string():
return user
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:-1]
new_query = query
if new_query:
new_query += '&'
new_query += urllib.urlencode((
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urllib.quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
class ApiDirectory(Directory):
_q_exports = [('reverse-geocoding', 'reverse_geocoding')]

View File

@ -18,7 +18,8 @@ import json
from qommon.form import *
from qommon.misc import http_get_page, http_post_request, get_variadic_url
from wcs.workflows import WorkflowStatusItem, register_item_class
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata
from wcs.api import sign_url
TIMEOUT = 15
@ -30,9 +31,10 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
url = None
varname = None
post = True
request_signature_key = None
def get_parameters(self):
return ('url', 'post', 'varname')
return ('url', 'post', 'varname', 'request_signature_key')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'url' in parameters:
@ -46,6 +48,10 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Variable Name'), value=self.varname)
if 'request_signature_key':
form.add(StringWidget, '%srequest_signature_key' % prefix,
title=_('Request Signature Key'),
value=self.request_signature_key)
def perform(self, formdata):
if not self.url:
@ -55,6 +61,12 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
if '[' in url:
variables = get_publisher().substitutions.get_context_variables()
url = get_variadic_url(url, variables)
if self.request_signature_key:
signature_key = template_on_formdata(formdata, self.request_signature_key)
if signature_key:
url = sign_url(url, signature_key)
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}
if self.post: