wcs/tests/test_sessions.py

293 lines
9.3 KiB
Python

import datetime
import os
import pickle
import shutil
import time
import pytest
from wcs import fields, sql
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from .utilities import clean_temporary_pub, create_temporary_pub, get_app, login
def setup_module():
clean_temporary_pub()
def teardown_module():
pass
@pytest.fixture(scope='function')
def pub(request):
pub = create_temporary_pub()
def fin():
shutil.rmtree(pub.APP_DIR)
request.addfinalizer(fin)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
@pytest.fixture
def http_request(pub):
req = HTTPRequest(None, {})
req.language = None
pub._set_request(req)
@pytest.fixture
def user(pub):
user = pub.user_class()
user.email = 'foo@localhost'
user.store()
account = PasswordAccount(id='foo')
account.set_password('foo')
account.user_id = user.id
account.store()
return user
@pytest.fixture
def app(pub):
return get_app(pub)
def test_session_max_age(pub, user, app):
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as cfg:
cfg.write(
'''[options]
session_max_age: 1
'''
)
pub.load_site_options()
login(app, username='foo', password='foo')
assert 'Logout' in app.get('/')
time.sleep(0.5)
assert 'Logout' in app.get('/')
time.sleep(0.6)
assert 'Logout' not in app.get('/')
def test_session_expire(pub, user, app):
login(app, username='foo', password='foo')
assert 'Logout' in app.get('/')
session = pub.session_manager.session_class.select()[0]
session.set_expire(time.time() + 10)
session.store()
assert 'Logout' in app.get('/')
session.set_expire(time.time() - 1)
session.store()
assert 'Logout' not in app.get('/')
def test_sessions_visiting_objects(pub, http_request):
# check it starts with nothing
assert len(pub.session_class.get_visited_objects()) == 0
class MockFormData:
def __init__(self, id):
self.id = id
def get_object_key(self):
return 'formdata-foobar-%s' % self.id
# mark two visits
session1 = pub.session_class(id='session1')
session1.user = 'FOO'
session1.mark_visited_object(MockFormData(1))
session1.mark_visited_object(MockFormData(2))
session1.store()
assert len(pub.session_class.get_visited_objects()) == 2
assert {x[0] for x in pub.session_class.get_object_visitors(MockFormData(2))} == {'FOO'}
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35 * 60
session1.store()
assert len(pub.session_class.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object(MockFormData(2))
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.session_class.get_visited_objects()) == 1
assert list(pub.session_class.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session1.mark_visited_object(MockFormData(1))
session1.mark_visited_object(MockFormData(2))
session1.store()
assert len(pub.session_class.get_visited_objects()) == 2
# mark a visit as being in the past
session1.visiting_objects['formdata-foobar-1'] = time.time() - 35 * 60
session1.store()
assert len(pub.session_class.get_visited_objects()) == 1
# check older visits are automatically removed
session1 = pub.session_class.get('session1')
assert len(session1.visiting_objects.keys()) == 2
session1.mark_visited_object(MockFormData(2))
assert len(session1.visiting_objects.keys()) == 1
session1.store()
assert len(pub.session_class.get_visited_objects()) == 1
assert list(pub.session_class.get_visited_objects()) == ['formdata-foobar-2']
# check with a second session
session2 = pub.session_class(id='session2')
session2.user = 'BAR'
session2.store()
assert len(pub.session_class.get_visited_objects()) == 1
session2.mark_visited_object(MockFormData(2))
session2.store()
assert len(pub.session_class.get_visited_objects()) == 1
session2.mark_visited_object(MockFormData(3))
session2.store()
assert len(pub.session_class.get_visited_objects()) == 2
assert list(pub.session_class.get_visited_objects(exclude_user='BAR')) == ['formdata-foobar-2']
# check visitors
assert {x[0] for x in pub.session_class.get_object_visitors(MockFormData(2))} == {'FOO', 'BAR'}
assert {x[0] for x in pub.session_class.get_object_visitors(MockFormData(1))} == set()
def test_session_do_not_reuse_id(pub, user, app):
pub.session_manager.session_class.wipe()
login(app, username='foo', password='foo')
assert pub.session_manager.session_class.count() == 1
resp = app.get('/')
login_page = app.get('/login/')
login_form = login_page.forms['login-form']
login_form['username'] = 'foo'
login_form['password'] = 'foo'
resp = login_form.submit()
assert resp.status_int == 302
assert pub.session_manager.session_class.count() == 2
def test_session_substitution_variables_1st_page_condition(pub, user, app):
pub.session_manager.session_class.wipe()
resp = app.get('/')
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(
id='0',
label='1st PAGE',
type='page',
condition={'type': 'python', 'value': 'vars().get("session_hash_id") is not None'},
),
fields.CommentField(id='10', label='COMHASH1 [session_hash_id]', type='comment'),
fields.PageField(id='8', label='2nd PAGE', type='page'),
fields.CommentField(id='9', label='COM2 [session_hash_id]', type='comment'),
]
formdef.store()
resp = app.get('/foobar/')
assert pub.session_manager.session_class.count() == 1
session = pub.session_manager.session_class.select()[0]
assert 'COMHASH1 %s' % session.get_substitution_variables().get('session_hash_id') in resp.text
def test_session_clean_job(pub, user, app, freezer):
pub.session_manager.session_class.wipe()
login(app, username='foo', password='foo')
assert pub.session_manager.session_class.count() == 1
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 1
freezer.move_to(datetime.datetime.now() + datetime.timedelta(2))
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 1
freezer.move_to(datetime.datetime.now() + datetime.timedelta(5)) # last usage limit
pub.clean_sessions()
assert pub.session_manager.session_class.count() == 0
def test_inactive_user(pub, user, app):
login(app, username='foo', password='foo')
assert 'Logout' in app.get('/')
user.is_active = False
user.store()
assert 'Logout' not in app.get('/')
def test_transient_data_removal(pub, app):
pub.session_manager.session_class.wipe()
sql.TransientData.wipe()
resp = app.get('/')
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.StringField(id='1', label='string', type='string')]
formdef.store()
formdef.data_class().wipe()
resp = app.get('/foobar/')
resp.form['f1'] = 'test'
resp = resp.form.submit('submit')
assert sql.Session.count() == 1
assert sql.TransientData.count() == 1
transient_data = sql.TransientData.select()[0]
app.get('/logout')
assert sql.Session.count() == 0
assert sql.TransientData.count() == 0
transient_data.store() # session_id not found, should not fail
def test_magictoken_migration(pub, app):
pub.session_manager.session_class.wipe()
sql.TransientData.wipe()
resp = app.get('/')
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(id='0', label='1st PAGE', type='page'),
fields.StringField(id='1', label='string', type='string'),
fields.PageField(id='2', label='2nd PAGE', type='page'),
fields.PageField(id='3', label='3rd PAGE', type='page'),
]
formdef.store()
formdef.data_class().wipe()
resp = app.get('/foobar/')
resp.form['f1'] = 'test'
resp = resp.form.submit('submit')
# migrate back session to look like before transient data table
assert pub.session_manager.session_class.count() == 1
session = pub.session_manager.session_class.select()[0]
session.magictokens = {}
for transient_data in sql.TransientData.select():
session.magictokens[transient_data.id] = transient_data.data
sql.TransientData.wipe()
conn, cur = sql.get_connection_and_cursor()
sql_statement = 'UPDATE sessions SET session_data = %s WHERE id = %s'
cur.execute(sql_statement, (bytearray(pickle.dumps(session.__dict__, protocol=2)), session.id))
conn.commit()
cur.close()
# and get back to submitting form, it should run migration
resp = resp.form.submit('submit') # -> 3rd page
resp = resp.form.submit('submit') # -> validation page
resp = resp.form.submit('submit') # -> submit
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data['1'] == 'test'