combo/tests/test_wcs.py

435 lines
14 KiB
Python

import pytest
import json
import re
import requests
import subprocess
import shutil
import tempfile
import time
import urlparse
import os
from django.conf import settings
from django.core.cache import cache
from django.test.client import RequestFactory
from django.template import Context
from combo.data.models import Page
from combo.apps.wcs.models import (WcsFormCell, WcsCurrentFormsCell,
WcsFormsOfCategoryCell, WcsCurrentDraftsCell, WcsCategoryCell)
from combo.utils import NothingInCacheException
pytestmark = pytest.mark.django_db
wcsctl_present = pytest.mark.skipif('WCSCTL' not in os.environ,
reason='WCSCTL not defined in environment')
WCSCTL = os.environ.get('WCSCTL')
WCS_SCRIPTS = {
'setup-auth': """
from quixote import get_publisher
get_publisher().cfg['identification'] = {'methods': ['password']}
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
get_publisher().write_cfg()
""",
'create-user': """
from quixote import get_publisher
from qommon.ident.password_accounts import PasswordAccount
user = get_publisher().user_class()
user.name = 'foo bar'
user.email = 'foo@example.net'
user.store()
account = PasswordAccount(id='user')
account.set_password('user')
account.user_id = user.id
account.store()
""",
'create-data': """
import datetime
from quixote import get_publisher
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.roles import Role
from wcs import fields
cats = []
for i in range(1, 10):
cat = Category()
cat.name = 'Test %d' % i
cat.description = 'Hello world'
cat.store()
cats.append(cat)
formdef = FormDef()
formdef.name = 'form title'
formdef.category_id = cat.id
formdef.fields = [
fields.StringField(id='1', label='1st field', type='string'),
fields.ItemField(id='2', label='2nd field', type='item',
items=['foo', 'bar', 'baz']),
]
formdef.store()
user = get_publisher().user_class.select()[0]
for i in range(50):
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.data = {'1': 'FOO BAR %d' % i}
if i%4 == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
elif i%4 == 1:
formdata.data['2'] = 'bar'
formdata.data['2_display'] = 'bar'
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
if i%7 == 0:
formdata.user_id = user.id
formdata.store()
# another formdef in same category
formdef = FormDef()
formdef.name = 'a second form title'
formdef.category_id = cat.id
formdef.fields = []
formdef.store()
# a formdef in another category
formdef = FormDef()
formdef.name = 'third form title'
formdef.category_id = cats[2].id
formdef.fields = []
formdef.enable_tracking_codes = True
formdef.store()
# a draft of that formdef
formdata = formdef.data_class()()
formdata.data = {}
formdata.page_no = 1
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.user_id = user.id
formdata.store()
# a private formdef
role = Role(name='Blah')
role.store()
formdef = FormDef()
formdef.name = 'a private form'
formdef.category_id = cats[2].id
formdef.roles = [role.id]
formdef.fields = []
formdef.store()
""",
}
WCS_DIR = tempfile.mkdtemp()
def run_wcs_script(script, hostname):
script_path = os.path.join(WCS_DIR, script + '.py')
fd = open(script_path, 'w')
fd.write(WCS_SCRIPTS[script])
fd.close()
subprocess.check_call([WCSCTL, 'runscript', '--app-dir', WCS_DIR,
'--vhost', hostname, script_path])
def setup_module(module):
if not WCSCTL:
return
for hostname in ['127.0.0.1', '127.0.0.2']:
os.mkdir(os.path.join(WCS_DIR, hostname))
run_wcs_script('setup-auth', hostname)
run_wcs_script('create-user', hostname)
run_wcs_script('create-data', hostname)
fd = file(os.path.join(WCS_DIR, hostname, 'site-options.cfg'), 'w')
fd.write('''[api-secrets]
combo = combo
''')
fd.close()
pidfile = os.path.join(WCS_DIR, 'wcs.pid')
subprocess.check_call([WCSCTL, 'start', '--daemonize', '--http',
'--port', '8999',
'--pidfile', pidfile, '--app-dir', WCS_DIR])
t0 = time.time()
while not os.path.exists(pidfile):
time.sleep(0.2)
if time.time() - t0 > 30:
raise AssertionError('failed to start wcs')
def teardown_module(module):
pidfile = os.path.join(WCS_DIR, 'wcs.pid')
subprocess.check_call([WCSCTL, 'stop',
'--pidfile', pidfile, '--app-dir', WCS_DIR])
shutil.rmtree(WCS_DIR)
def check_wcs_open(url):
session = requests.Session()
scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
path = '/login/'
login_url = urlparse.urlunsplit((scheme, netloc, path, query, fragment))
resp = session.post(login_url, data={'username': 'user', 'password': 'user'})
resp = session.get(url)
assert resp.status_code == 200
@pytest.fixture
def context():
ctx = Context({'request': RequestFactory().get('/')})
ctx['request'].user = None
ctx['request'].session = {}
return ctx
@wcsctl_present
def test_form_cell_setup():
cell = WcsFormCell()
form_class = cell.get_default_form_class()
form = form_class()
assert form.fields['formdef_reference'].widget.choices == [
(u'default:a-private-form', u'test : a private form'),
(u'default:a-second-form-title', u'test : a second form title'),
(u'default:form-title', u'test : form title'),
(u'default:third-form-title', u'test : third form title'),
(u'other:a-private-form', u'test2 : a private form'),
(u'other:a-second-form-title', u'test2 : a second form title'),
(u'other:form-title', u'test2 : form title'),
(u'other:third-form-title', u'test2 : third form title')]
@wcsctl_present
def test_form_cell_save_cache():
page = Page(title='xxx', slug='test_form_cell_save_cache', template_name='standard')
page.save()
cell = WcsFormCell(page=page, placeholder='content', order=0)
assert cell.get_additional_label() is None
cell.formdef_reference = u'default:form-title'
cell.save()
assert cell.cached_title == 'form title'
assert cell.get_additional_label() == 'form title'
# make sure cached attributes are removed from serialized pages
assert not 'cached_' in json.dumps(page.get_serialized_page())
@wcsctl_present
def test_category_cell_save_cache():
page = Page(title='xxx', slug='test_category_cell_save_cache', template_name='standard')
page.save()
cell = WcsCategoryCell(page=page, placeholder='content', order=0)
assert cell.get_additional_label() is None
cell.category_reference = u'default:test-3'
cell.save()
assert cell.cached_title == 'Test 3'
assert cell.get_additional_label() == 'Test 3'
@wcsctl_present
def test_form_cell_render():
page = Page(title='xxx', slug='test_form_cell_render', template_name='standard')
page.save()
cell = WcsFormCell(page=page, placeholder='content', order=0)
cell.formdef_reference = u'default:form-title'
cell.save()
result = cell.render(Context())
assert 'http://127.0.0.1:8999/form-title/tryauth' in result
assert 'form title' in result
@wcsctl_present
def test_current_forms_cell_setup():
cell = WcsCurrentFormsCell()
form_class = cell.get_default_form_class()
form = form_class()
assert form.fields['wcs_site'].widget.choices == [
('', 'All'), (u'default', u'test'), (u'other', u'test2')]
assert 'current_forms' in form.fields
assert cell.get_additional_label() == 'All Sites - Current Forms'
cell.wcs_site = 'default'
assert cell.get_additional_label() == 'test - Current Forms'
cell.wcs_site = None
cell.current_forms = True
cell.done_forms = True
assert cell.get_additional_label() == 'All Sites - All Forms'
cell.current_forms = False
cell.done_forms = True
assert cell.get_additional_label() == 'All Sites - Done Forms'
try:
# check there is not wcs_site field if there's a single one defined in
# the configuration
temp_settings = settings.KNOWN_SERVICES.copy()
default = settings.KNOWN_SERVICES['wcs']['default']
settings.KNOWN_SERVICES = {'wcs': {'default': default}}
form_class = cell.get_default_form_class()
form = form_class()
assert not 'wcs_site' in form.fields.keys()
assert cell.get_additional_label() == 'Done Forms'
finally:
# restore original settings
settings.KNOWN_SERVICES = temp_settings
@wcsctl_present
def test_current_forms_cell_render(context):
page = Page(title='xxx', slug='test_current_forms_cell_render', template_name='standard')
page.save()
cell = WcsCurrentFormsCell(page=page, placeholder='content', order=0)
cell.save()
class MockUser(object):
email = 'foo@example.net'
def is_authenticated(self):
return True
context['request'].user = MockUser()
# query should fail as nothing is cached
cache.clear()
with pytest.raises(NothingInCacheException):
result = cell.render(context)
context['synchronous'] = True # to get fresh content
# default is to get current forms from all wcs sites
result = cell.render(context)
assert 'http://127.0.0.1:8999/form-title/1/' in result
assert 'http://127.0.0.1:8999/form-title/22/' in result
assert not 'http://127.0.0.1:8999/form-title/8/' in result
assert 'http://127.0.0.2:8999/form-title/1/' in result
assert 'http://127.0.0.2:8999/form-title/22/' in result
# done forms
cell.current_forms = False
cell.done_forms = True
cell.save()
result = cell.render(context)
assert not 'http://127.0.0.1:8999/form-title/1/' in result
assert 'http://127.0.0.1:8999/form-title/8/' in result
@wcsctl_present
def test_current_forms_cell_render_single_site(context):
page = Page(title='xxx', slug='test_current_forms_cell_render', template_name='standard')
page.save()
cell = WcsCurrentFormsCell(page=page, placeholder='content', order=0)
cell.wcs_site = 'default'
cell.save()
class MockUser(object):
email = 'foo@example.net'
def is_authenticated(self):
return True
context['request'].user = MockUser()
# query should fail as nothing is cached
cache.clear()
with pytest.raises(NothingInCacheException):
result = cell.render(context)
context['synchronous'] = True # to get fresh content
result = cell.render(context)
assert 'http://127.0.0.1:8999/form-title/1/' in result
assert 'http://127.0.0.1:8999/form-title/22/' in result
assert 'http://127.0.0.2:8999/form-title/1/' not in result
assert 'http://127.0.0.2:8999/form-title/22/' not in result
@wcsctl_present
def test_forms_of_category_cell_setup():
cell = WcsFormsOfCategoryCell()
form_class = cell.get_default_form_class()
form = form_class()
assert form.fields['category_reference'].widget.choices == [
(u'default:test-3', u'test : Test 3'),
(u'default:test-9', u'test : Test 9'),
(u'other:test-3', u'test2 : Test 3'),
(u'other:test-9', u'test2 : Test 9')]
@wcsctl_present
def test_forms_of_category_cell_render(context):
page = Page(title='xxx', slug='test_forms_of_category_cell_render', template_name='standard')
page.save()
cell = WcsFormsOfCategoryCell(page=page, placeholder='content', order=0)
cell.category_reference = 'default:test-9'
cell.ordering = 'alpha'
cell.save()
context['synchronous'] = True # to get fresh content
result = cell.render(context)
assert 'form title' in result and 'a second form title' in result
assert result.index('form title') > result.index('a second form title')
assert 'http://127.0.0.1:8999/form-title/tryauth' in result
assert 'http://127.0.0.1:8999/a-second-form-title/tryauth' in result
cell.ordering = 'popularity'
cell.save()
result = cell.render(context)
assert 'form title' in result and 'a second form title' in result
assert result.index('form title') < result.index('a second form title')
cell.ordering = 'manual'
cell.save()
result = cell.render(context)
# by default all forms should be present, in alphabetical order
assert result.index('form title') > result.index('a second form title')
# set a manual order
cell.manual_order = {'data': ['default:test-9:a-second-form-title',
'default:test-9:form-title']}
cell.save()
result = cell.render(context)
assert result.index('form title') > result.index('a second form title')
# make sure all forms are displayed even if the manual order only specify
# some.
cell.manual_order = {'data': ['default:test-9:a-second-form-title']}
cell.save()
result = cell.render(context)
assert result.index('form title') > result.index('a second form title')
assert 'form title' in result and 'a second form title' in result
@wcsctl_present
def test_current_drafts_cell_render_unlogged(context):
page = Page(title='xxx', slug='test_current_drafts_cell_render', template_name='standard')
page.save()
cell = WcsCurrentDraftsCell(page=page, placeholder='content', order=0)
cell.save()
context['synchronous'] = True # to get fresh content
result = cell.render(context)
assert not 'http://127.0.0.1:8999/third-form-title' in result # no form
@wcsctl_present
def test_current_drafts_cell_render_logged_in(context):
page = Page(title='xxx', slug='test_current_drafts_cell_render', template_name='standard')
page.save()
cell = WcsCurrentDraftsCell(page=page, placeholder='content', order=0)
cell.save()
context['synchronous'] = True # to get fresh content
class MockUser(object):
email = 'foo@example.net'
def is_authenticated(self):
return True
context['request'].user = MockUser()
# default is to get current forms from all wcs sites
result = cell.render(context)
assert 'http://127.0.0.1:8999/third-form-title/1' in result
for url in re.findall('href="(.*)"', result):
check_wcs_open(url)