wcs/tests/test_sessions.py

227 lines
7.5 KiB
Python

import datetime
import os
import shutil
import time
import pytest
from quixote import cleanup
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon.http_request import HTTPRequest
from wcs.formdef import FormDef
from wcs import fields
from utilities import create_temporary_pub, clean_temporary_pub, get_app, login
def setup_module():
clean_temporary_pub()
def teardown_module():
pass
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture(scope='function')
def pub(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
def fin():
shutil.rmtree(pub.APP_DIR)
request.addfinalizer(fin)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
@pytest.fixture
def http_request(pub):
req = HTTPRequest(None, {})
req.language = None
pub._set_request(req)
@pytest.fixture
def user(pub):
user = pub.user_class()
user.email = 'foo@localhost'
user.store()
account = PasswordAccount(id='foo')
account.set_password('foo')
account.user_id = user.id
account.store()
return user
@pytest.fixture
def app(pub):
return get_app(pub)
def test_session_max_age(pub, user, app):
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as cfg:
cfg.write('''[options]
session_max_age: 1
''')
pub.load_site_options()
login(app, username='foo', password='foo')
assert 'Logout' in app.get('/')
time.sleep(0.5)
assert 'Logout' in app.get('/')
time.sleep(0.6)
assert 'Logout' not in app.get('/')
def test_session_expire(pub, user, app):
login(app, username='foo', password='foo')
assert 'Logout' in app.get('/')
session = pub.session_manager.session_class.select()[0]
session.set_expire(time.time() + 10)
session.store()
assert 'Logout' in app.get('/')
session.set_expire(time.time() - 1)
session.store()
assert 'Logout' not in app.get('/')
def test_sessions_visiting_objects(pub, http_request):
# check it starts with nothing
assert len(pub.get_visited_objects()) == 0
# mark two visits
session1 = pub.session_class(id='session1')
session1.user = 'FOO'
session1.mark_visited_object('formdata-foobar-1')
session1.mark_visited_object('formdata-foobar-2')
session1.store()
assert len(pub.get_visited_objects()) == 2
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-2')]) == set(['FOO'])
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35*60
session1.store()
assert len(pub.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object('formdata-foobar-2')
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.get_visited_objects()) == 1
assert list(pub.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session1.mark_visited_object('formdata-foobar-1')
session1.mark_visited_object('formdata-foobar-2')
session1.store()
assert len(pub.get_visited_objects()) == 2
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35*60
session1.store()
assert len(pub.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object('formdata-foobar-2')
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.get_visited_objects()) == 1
assert list(pub.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session2 = pub.session_class(id='session2')
session2.user = 'BAR'
session2.store()
assert len(pub.get_visited_objects()) == 1
session2.mark_visited_object('formdata-foobar-2')
session2.store()
assert len(pub.get_visited_objects()) == 1
session2.mark_visited_object('formdata-foobar-3')
session2.store()
assert len(pub.get_visited_objects()) == 2
assert list(pub.get_visited_objects(exclude_user='BAR')) == ['formdata-foobar-2']
# check visitors
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-2')]) == set(['FOO', 'BAR'])
assert set([x[0] for x in pub.get_object_visitors('formdata-foobar-1')]) == set([])
def test_session_do_not_reuse_id(pub, user, app):
pub.session_manager.session_class.wipe()
login(app, username='foo', password='foo')
assert pub.session_manager.session_class.count() == 1
resp = app.get('/')
login_page = app.get('/login/')
login_form = login_page.forms['login-form']
login_form['username'] = 'foo'
login_form['password'] = 'foo'
resp = login_form.submit()
assert resp.status_int == 302
assert pub.session_manager.session_class.count() == 2
def test_session_substitution_variables(pub, user, app):
pub.session_manager.session_class.wipe()
resp = app.get('/')
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.CommentField(id='7', label='Hello [session_id]', type='comment')]
formdef.store()
resp = app.get('/foobar/')
assert pub.session_manager.session_class.count() == 1
session_id = pub.session_manager.session_class.select()[0].id
assert 'Hello %s' % session_id in resp.text
login(app, username='foo', password='foo')
assert pub.session_manager.session_class.count() == 2
session_id = [x for x in pub.session_manager.session_class.select() if x.id != session_id][0].id
resp = app.get('/foobar/')
assert 'Hello %s' % session_id in resp.text
def test_session_substitution_variables_1st_page_condition(pub, user, app):
pub.session_manager.session_class.wipe()
resp = app.get('/')
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.PageField(id='0', label='1st PAGE', type='page',
condition={'type': 'python', 'value': 'vars().get("session_id") is not None'}),
fields.CommentField(id='7', label='COM1 [session_id]', type='comment'),
fields.CommentField(id='10', label='COMHASH1 [session_hash_id]', type='comment'),
fields.PageField(id='8', label='2nd PAGE', type='page'),
fields.CommentField(id='9', label='COM2 [session_id]', type='comment')]
formdef.store()
resp = app.get('/foobar/')
assert pub.session_manager.session_class.count() == 1
session = pub.session_manager.session_class.select()[0]
assert 'COM1 %s' % session.id in resp.text
assert 'COMHASH1 %s' % session.get_substitution_variables().get('session_hash_id') in resp.text
def test_session_clean_job(pub, user, app, freezer):
pub.session_manager.session_class.wipe()
login(app, username='foo', password='foo')
assert pub.session_manager.session_class.count() == 1
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 1
freezer.move_to(datetime.datetime.now() + datetime.timedelta(2))
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 1
freezer.move_to(datetime.datetime.now() + datetime.timedelta(5)) # last usage limit
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 0