wcs/tests/api/test_access.py

366 lines
13 KiB
Python

import base64
import datetime
import hashlib
import hmac
import os
import shutil
import urllib.parse
import pytest
from django.utils.encoding import force_bytes
from quixote import get_publisher
from wcs.api_utils import get_secret_and_orig, is_url_signed, sign_url
from wcs.qommon.errors import AccessForbiddenError
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@pytest.fixture
def pub(emails):
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def local_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.name_identifiers = ['0123456789']
user.store()
return user
@pytest.fixture
def admin_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'John Doe Admin'
user.email = 'john.doe@example.com'
user.name_identifiers = ['0123456789']
user.is_admin = True
user.store()
account = PasswordAccount(id='admin')
account.set_password('admin')
account.user_id = user.id
account.store()
return user
@pytest.fixture
def no_request_pub():
pub = create_temporary_pub()
pub.app_dir = os.path.join(pub.APP_DIR, 'example.net')
pub.set_config()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[wscall-secrets]
api.example.com = 1234
'''
)
return pub
def test_get_secret_and_orig(no_request_pub):
secret, orig = get_secret_and_orig('https://api.example.com/endpoint/')
assert secret == '1234'
assert orig == 'example.net'
def test_user_page_redirect(pub):
output = get_app(pub).get('/user')
assert output.headers.get('location') == 'http://example.net/myspace/'
def test_user_page_error(pub):
# check we get json as output for errors
output = get_app(pub).get('/api/user/', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_user_page_error_when_json_and_no_user(pub):
output = get_app(pub).get('/api/user/?format=json', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_missing_orig(pub):
output = get_app(pub).get('/api/user/?format=json&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple orig field'
def test_get_user_from_api_query_string_error_invalid_orig(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coin&signature=xxx', status=403)
assert output.json['err_desc'] == 'invalid orig'
def test_get_user_from_api_query_string_error_missing_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple algo field'
def test_get_user_from_api_query_string_error_invalid_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=coin', status=403)
assert output.json['err_desc'] == 'invalid algo'
output = get_app(pub).get(
'/api/user/?format=json&orig=coucou&signature=xxx&algo=__getattribute__', status=403
)
assert output.json['err_desc'] == 'invalid algo'
def test_get_user_from_api_query_string_error_invalid_signature(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=sha1', status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', b'format=json&orig=coucou&algo=sha1', hashlib.sha1).digest())
)
output = get_app(pub).get(
'/api/user/?format=json&orig=coucou&algo=sha1&signature=%s' % signature, status=403
)
assert output.json['err_desc'] == 'missing/multiple timestamp field'
def test_get_user_from_api_query_string_error_missing_email(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx&timestamp=' + timestamp
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'unknown NameID'
def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
# check it's ok to sign an URL without specifiying an user if the endpoint
# works fine without user.
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub):
# check the categories and forms endpoints accept an unknown NameID
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha1&email='
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == 'Jean Darmette'
def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha256&email='
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha256&email='
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == 'Jean Darmette'
def test_sign_url(pub, local_user):
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == 'Jean Darmette'
# try to add something after signed url
get_app(pub).get('%s&foo=bar' % url, status=403)
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'12345',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url, status=403)
def test_get_user(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == 'Jean Darmette'
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
app = login(get_app(pub))
resp = app.get('/backoffice/settings/api-access/new')
resp.form['name'] = 'Salut API access key'
resp.form['access_identifier'] = 'salut'
resp.form['access_key'] = '5678'
resp = resp.form.submit('submit')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=UNKNOWN_ACCESS&email=%s'
% (urllib.parse.quote(local_user.email)),
'5678',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url, status=403)
assert output.json['err_desc'] == 'invalid orig'
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=salut&email=%s'
% (urllib.parse.quote(local_user.email)),
'5678',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == 'Jean Darmette'
def test_is_url_signed_check_nonce(pub, local_user, freezer):
ORIG = 'xxx'
KEY = 'xxx'
pub.site_options.add_section('api-secrets')
pub.site_options.set('api-secrets', ORIG, KEY)
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
# test clean_nonces do not bark when nonces directory is empty
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
pub.clean_nonces(now=0)
nonce_dir = os.path.join(pub.app_dir, 'nonces')
assert not os.path.exists(nonce_dir) or not os.listdir(nonce_dir)
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.parse.quote(local_user.email)), KEY)
req = HTTPRequest(
None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net', 'QUERY_STRING': signed_url[1:]}
)
req.process_inputs()
pub.set_app_dir(req)
pub._set_request(req)
assert is_url_signed()
with pytest.raises(AccessForbiddenError) as exc_info:
req.signed = False
is_url_signed()
assert exc_info.value.public_msg == 'nonce already used'
# test that clean nonces works
pub.clean_nonces()
assert os.listdir(nonce_dir)
# 80 seconds in the future, nothing should be cleaned
freezer.move_to(datetime.timedelta(seconds=80))
pub.clean_nonces()
assert os.listdir(nonce_dir)
# 90 seconds in the future, nonces should be removed
freezer.move_to(datetime.timedelta(seconds=10))
pub.clean_nonces()
assert not os.listdir(nonce_dir)
def test_get_user_compat_endpoint(pub, local_user):
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == 'Jean Darmette'