wcs/tests/test_api.py

617 lines
24 KiB
Python

import pytest
import shutil
import os
import hmac
import base64
import hashlib
import urllib
import urlparse
import datetime
import time
from quixote import cleanup, get_publisher
from wcs.qommon.form import PicklableUpload
from wcs.users import User
from wcs.roles import Role
from wcs.formdef import FormDef
from wcs.categories import Category
from wcs import fields
from wcs.api import sign_url
from utilities import get_app, create_temporary_pub
pub, req, app_dir = None, None, None
def setup_module(module):
cleanup()
global pub, req, app_dir
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
file(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''\
[api-secrets]
coucou = 1234
''')
def teardown_module(module):
global pub
shutil.rmtree(pub.APP_DIR)
@pytest.fixture
def local_user():
User.wipe()
user = User()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.store()
return user
def sign_uri(uri, user=None):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if query:
query += '&'
query += 'format=json&orig=coucou&algo=sha256&timestamp=' + timestamp
if user:
query += '&email=' + urllib.quote(user.email)
query += '&signature=%s' % urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha256).digest()))
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def test_user_page_redirect():
output = get_app(pub).get('/user')
assert output.headers.get('location') == 'http://example.net/myspace/'
def test_user_page_error_when_json_and_no_user():
output = get_app(pub).get('/api/user/?format=json', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_missing_orig():
output = get_app(pub).get('/api/user/?format=json&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple orig field'
def test_get_user_from_api_query_string_error_invalid_orig():
output = get_app(pub).get('/api/user/?format=json&orig=coin&signature=xxx', status=403)
assert output.json['err_desc'] == 'invalid orig'
def test_get_user_from_api_query_string_error_missing_algo():
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple algo field'
def test_get_user_from_api_query_string_error_invalid_algo():
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=coin', status=403)
assert output.json['err_desc'] == 'invalid algo'
def test_get_user_from_api_query_string_error_invalid_signature():
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=sha1', status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_missing_timestamp():
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
'format=json&orig=coucou&algo=sha1',
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?format=json&orig=coucou&algo=sha1&signature=%s' % signature, status=403)
assert output.json['err_desc'] == 'missing/multiple timestamp field'
def test_get_user_from_api_query_string_error_missing_email():
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_unknown_nameid():
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'unknown NameID'
def test_get_user_from_api_query_string_error_missing_email_valid_endpoint():
# check it's ok to sign an URL without specifiying an user if the endpoint
# works fine without user.
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == []
def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint():
# check the categories and forms endpoints accept an unknown NameID
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == []
def test_get_user_from_api_query_string_error_success_sha1(local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_success_sha256(local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha256).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_sign_url(local_user):
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'12345'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url, status=403)
def test_get_user(local_user):
Role.wipe()
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
def test_get_user_compat_endpoint(local_user):
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
def test_formdef_list():
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.keywords = 'mobile, test'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
resp1 = get_app(pub).get('/json')
resp2 = get_app(pub).get('/', headers={'Accept': 'application/json'})
resp3 = get_app(pub).get('/api/formdefs/')
assert resp1.json == resp2.json == resp3.json
assert resp1.json[0]['title'] == 'test'
assert resp1.json[0]['url'] == 'http://example.net/test/'
assert resp1.json[0]['count'] == 0
assert resp1.json[0]['redirection'] == False
assert resp1.json[0]['description'] == 'plop'
assert resp1.json[0]['keywords'] == ['mobile', 'test']
assert resp1.json[0]['functions'].keys() == ['_receiver']
assert resp1.json[0]['functions']['_receiver']['label'] == 'Recipient'
assert resp1.json[0]['functions']['_receiver']['role']['slug'] == role.slug
assert resp1.json[0]['functions']['_receiver']['role']['name'] == role.name
def test_formdef_list_redirection():
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.disabled = True
formdef.disabled_redirection = 'http://example.net'
formdef.fields = []
formdef.store()
resp1 = get_app(pub).get('/json')
assert resp1.json[0]['title'] == 'test'
assert resp1.json[0]['url'] == 'http://example.net/test/'
assert resp1.json[0]['count'] == 0
assert resp1.json[0]['redirection'] == True
def test_formdef_schema():
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
resp = get_app(pub).get('/api/formdefs/test/schema')
resp2 = get_app(pub).get('/test/schema')
assert resp.json == resp2.json
assert resp.json['name'] == 'test'
assert resp.json['fields'][0]['label'] == 'foobar'
assert resp.json['fields'][0]['type'] == 'string'
def test_formdef_submit(local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
data_class = formdef.data_class()
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'no user'
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, {'data': {}})
assert resp.json['err'] == 0
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).user_id == local_user.id
assert data_class.get(resp.json['data']['id']).tracking_code is None
formdef.disabled = True
formdef.store()
resp = get_app(pub).post_json(url, {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'disabled form'
formdef.disabled = False
formdef.store()
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = ['xx']
formdef.store()
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
formdef.enable_tracking_codes = True
formdef.store()
resp = get_app(pub).post_json(url, {'data': {}})
assert data_class.get(resp.json['data']['id']).tracking_code
resp = get_app(pub).post_json(url, {'meta': {'draft': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'draft'
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {},
'context': {'channel': 'mail'} })
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
assert data_class.get(resp.json['data']['id']).submission_context == {'channel': 'mail'}
data_class.wipe()
def test_categories():
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'category'
category.description = 'hello world'
category.store()
resp = get_app(pub).get('/api/categories/', headers={'Accept': 'application/json'})
assert resp.json['data'] == [] # no advertised forms
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.store()
resp = get_app(pub).get('/api/categories/')
resp2 = get_app(pub).get('/categories', headers={'Accept': 'application/json'})
assert resp.json == resp2.json
assert resp.json['data'][0]['title'] == 'category'
assert resp.json['data'][0]['url'] == 'http://example.net/category/'
assert resp.json['data'][0]['description'] == 'hello world'
def test_categories_formdefs():
test_categories()
formdef2 = FormDef()
formdef2.name = 'other test'
formdef2.category_id = None
formdef2.store()
resp = get_app(pub).get('/api/categories/category/formdefs/')
resp2 = get_app(pub).get('/category/json')
assert resp.json == resp2.json
assert len(resp.json) == 1
assert resp.json[0]['title'] == 'test'
assert resp.json[0]['url'] == 'http://example.net/test/'
assert resp.json[0]['count'] == 0
assert resp.json[0]['redirection'] == False
def test_formdata(local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.DateField(id='2', label='foobar3', varname='date'),
fields.FileField(id='3', label='foobar4', varname='file'),]
formdef.workflow_roles = {'_receiver': role.id}
formdef.store()
formdata = formdef.data_class()()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'foo@localhost', '1': 'xxx', '2': date, '3': upload}
formdata.user_id = local_user.id
formdata.just_created()
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
resp2 = get_app(pub).get(sign_uri('/test/%s/' % formdata.id, user=local_user))
assert resp.json == resp2.json
assert 'last_update_time' in resp.json
assert len(resp.json['fields']) == 3 # foobar2 has no varname, not in json
assert resp.json['user']['name'] == local_user.name
assert resp.json['fields']['foobar'] == 'foo@localhost'
assert resp.json['fields']['date'] == '2014-01-20'
assert resp.json['fields']['file']['content'] == 'YmFzZTY0bWU=' # base64('base64me')
assert resp.json['fields']['file']['filename'] == 'test.txt'
assert resp.json['fields']['file']['content_type'] == 'text/plain'
def test_user_forms(local_user):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),]
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert len(resp.json) == 0
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': 'xxx'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/forms', user=local_user))
resp3 = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
assert len(resp.json) == 1
assert resp.json[0]['form_status'] == 'New'
assert resp.json == resp2.json == resp3.json
resp = get_app(pub).get(sign_uri('/api/user/forms?full=on', user=local_user))
assert resp.json[0]['fields']['foobar'] == 'foo@localhost'
def test_user_drafts(local_user):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.FileField(id='2', label='foobar3', varname='file'),]
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert len(resp.json) == 0
formdata = formdef.data_class()()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'foo@localhost', '1': 'xxx', '2': upload}
formdata.user_id = local_user.id
formdata.page_no = 1
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/drafts', user=local_user))
assert len(resp.json) == 1
assert resp.json == resp2.json
assert not 'fields' in resp.json[0]
resp = get_app(pub).get(sign_uri('/api/user/drafts?full=on', user=local_user))
assert 'fields' in resp.json[0]
assert resp.json[0]['fields']['foobar'] == 'foo@localhost'
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
def test_api_list_formdata(local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
items=['foo', 'bar', 'baz']),
fields.FileField(id='2', label='foobar4', varname='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
if i%4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i%4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert not 'fields' in resp.json[0]
# check getting full formdata
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
# check filtered results
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=foo', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=bar', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=baz', user=local_user))
assert len(resp.json) == 14
# check filter on status
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=pending', user=local_user))
assert len(resp.json) == 10
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=done', user=local_user))
assert len(resp.json) == 20
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp.json) == 30
def test_roles(local_user):
Role.wipe()
role = Role(name='Hello World')
role.emails = ['toto@example.com', 'zozo@example.com']
role.details = 'kouign amann'
role.store()
resp = get_app(pub).get('/api/roles', status=403)
resp = get_app(pub).get(sign_uri('/api/roles'))
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
# also check old endpoint, for compatibility
resp = get_app(pub).get(sign_uri('/roles'), headers={'Accept': 'application/json'})
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
def test_users(local_user):
resp = get_app(pub).get('/api/users/', status=403)
resp = get_app(pub).get(sign_uri('/api/users/'))
assert resp.json['data'][0]['user_display_name'] == local_user.name
assert resp.json['data'][0]['user_email'] == local_user.email
assert resp.json['data'][0]['user_id'] == local_user.id
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
assert len(resp.json['data']) == 0