wcs/tests/backoffice_pages/test_export.py

556 lines
19 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import io
import os
import time
import urllib.parse
import xml.etree.ElementTree as ET
import zipfile
import pytest
from wcs import fields
from wcs.blocks import BlockDef
from wcs.formdef import FormDef
from wcs.qommon import ods
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.upload_storage import PicklableUpload
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_superuser
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql', 'pickle-templates'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(
sql_mode=bool('sql' in request.param), templates_mode=bool('templates' in request.param)
)
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_backoffice_csv(pub):
create_superuser(pub)
datasource = {'type': 'formula', 'value': repr([('A', 'aa'), ('B', 'bb'), ('C', 'cc')])}
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.StringField(
id='1', label='1st field', type='string', display_locations=['validation', 'summary', 'listings']
),
fields.ItemField(
id='2',
label='2nd field',
type='item',
items=['foo', 'bar', 'baz'],
display_locations=['validation', 'summary', 'listings'],
),
fields.ItemField(id='3', label='3rd field', type='item', data_source=datasource, varname='foo'),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
for i in range(3):
formdata = formdef.data_class()()
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.data = {'1': 'FOO BAR %d' % i}
if i == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
formdata.data['3'] = 'A'
formdata.data['3_display'] = 'aa'
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
formdata.data['3'] = 'C'
formdata.data['3_display'] = 'cc'
if i < 2:
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert resp.headers['content-type'].startswith('text/')
assert len(resp.text.splitlines()) == 3 # 3 + header line
assert len(resp.text.splitlines()[0].split(',')) == 7
formdef = FormDef.get_by_urlname('form-title')
formdef.fields[-1].display_locations = ['validation', 'summary', 'listings']
formdef.store()
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert len(resp.text.splitlines()[0].split(',')) == 9
# check item fields with datasources get two columns (id & text)
assert resp.text.splitlines()[0].split(',')[6] == '3rd field (identifier)'
assert resp.text.splitlines()[0].split(',')[7] == '3rd field'
assert resp.text.splitlines()[1].split(',')[6] == 'C'
assert resp.text.splitlines()[1].split(',')[7] == 'cc'
resp = app.get('/backoffice/management/form-title/')
resp.forms['listing-settings']['filter'] = 'all'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 4
# test status filter
resp.forms['listing-settings']['filter'] = 'pending'
resp.forms['listing-settings']['filter-2'].checked = True
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-2-value'] = 'baz'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 2
# test criteria filters
resp.forms['listing-settings']['filter-start'].checked = True
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-start-value'] = datetime.datetime(2015, 2, 1).strftime('%Y-%m-%d')
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 1
resp.forms['listing-settings']['filter-start-value'] = datetime.datetime(2014, 2, 1).strftime('%Y-%m-%d')
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-2-value'] = 'baz'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 2
assert 'Created' in resp_csv.text.splitlines()[0]
# test column selection
resp.forms['listing-settings']['time'].checked = False
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert 'Created' not in resp_csv.text.splitlines()[0]
@pytest.fixture
def threshold():
from wcs.backoffice.management import FormPage
FormPage.WCS_SYNC_EXPORT_LIMIT = 1
yield
FormPage.WCS_SYNC_EXPORT_LIMIT = 100
def test_backoffice_export_long_listings(pub, threshold):
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.StringField(
id='1', label='1st field', type='string', display_locations=['validation', 'summary', 'listings']
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
for i in range(2):
formdata = formdef.data_class()()
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.data = {'1': 'BAZ BAZ %d' % i}
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
resp_lines = resp.text.splitlines()
assert resp_lines[0] == 'Number,Created,Last Modified,User Label,1st field,Status'
assert len(resp_lines) == 3
assert resp_lines[1].split(',')[1].startswith(time.strftime('%Y-%m-%d', formdata.receipt_time))
assert resp_lines[1].split(',')[2].startswith(time.strftime('%Y-%m-%d', formdata.last_update_time))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
# check afterjob ajax call
status_resp = app.get('/afterjobs/' + job_id)
assert status_resp.text == 'completed|completed'
# check error handling
app.get('/afterjobs/whatever', status=404)
def test_backoffice_csv_export_channel(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
assert 'Channel' not in resp_csv.text.splitlines()[0]
# add submission channel column
resp.forms['listing-settings']['submission_channel'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] == 'Channel'
assert resp_csv.text.splitlines()[1].split(',')[-1] == 'Web'
def test_backoffice_csv_export_anonymised(pub):
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] != 'Anonymised'
# add anonymised column
resp.forms['listing-settings']['anonymised'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] == 'Anonymised'
assert resp_csv.text.splitlines()[1].split(',')[-1] == 'No'
def test_backoffice_csv_export_block(pub):
create_superuser(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', required=True, label='Test', type='string', varname='foo'),
fields.StringField(id='234', required=True, label='Test2', type='string', varname='bar'),
]
block.digest_template = 'X{{foobar_var_foo}}Y'
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.BlockField(id='1', label='test', type='block:foobar', max_items=3),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'1': {
'data': [
{'123': 'foo', '234': 'bar'},
{'123': 'foo2', '234': 'bar2'},
],
'schema': {'123': 'string', '234': 'string'},
}
}
formdata.just_created()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
resp.forms['listing-settings']['1'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-3:] == ['test - 1', 'test - 2', 'test - 3']
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['XfooY', 'Xfoo2Y', '']
def test_backoffice_csv_export_ordering(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.ItemField(
id='1',
label='field 1',
type='item',
items=['foo', 'bar', 'baz'],
display_locations=['validation', 'summary', 'listings'],
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'1': 'foo', '1_display': 'foo'}
formdata.jump_status('new')
formdata.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'bar', '1_display': 'bar'}
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp_csv = app.get('/backoffice/management/form-title/csv')
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['-', 'bar', 'New']
assert resp_csv.text.splitlines()[2].split(',')[-3:] == ['-', 'foo', 'New']
resp_csv = app.get('/backoffice/management/form-title/csv?order_by=id')
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['-', 'foo', 'New']
assert resp_csv.text.splitlines()[2].split(',')[-3:] == ['-', 'bar', 'New']
def test_backoffice_ods(pub):
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.FileField(
id='4', label='file field', type='file', display_locations=['validation', 'summary', 'listings']
),
fields.DateField(
id='5', label='date field', type='date', display_locations=['validation', 'summary', 'listings']
),
fields.StringField(
id='6',
label='number field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='7',
label='phone field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.DateField(
id='8',
label='very old field',
type='date',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='9',
label='string field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='10',
label='number with comma field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='11',
label='not a number, with underscore',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.headers['content-type'] == 'application/vnd.oasis.opendocument.spreadsheet'
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'4': PicklableUpload('/foo/bar', content_type='text/plain'),
'5': time.strptime('2015-05-12', '%Y-%m-%d'),
'6': '12345',
'7': '0102030405',
'8': time.strptime('1871-03-18', '%Y-%m-%d'),
'9': 'plop\npl\x1dop', # with control characters
'10': ' 123,45',
'11': '1_000_000',
}
formdata.data['4'].receive([b'hello world'])
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.headers['content-type'] == 'application/vnd.oasis.opendocument.spreadsheet'
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
assert (
elem.attrib['{%s}href' % ods.NS['xlink']]
== 'http://example.net/backoffice/management/form-title/%s/files/4/bar' % formdata.id
)
resp = app.get(elem.attrib['{%s}href' % ods.NS['xlink']])
assert resp.text == 'hello world'
all_texts = [
x.text for x in ods_sheet.findall('.//{%s}table-row//{%s}p' % (ods.NS['table'], ods.NS['text']))
]
created_column = all_texts.index('Created')
date_column = all_texts.index('date field')
number_column = all_texts.index('number field')
phone_column = all_texts.index('phone field')
old_column = all_texts.index('very old field')
string_column = all_texts.index('string field')
comma_number_column = all_texts.index('number with comma field')
not_number_column = all_texts.index('not a number, with underscore')
for row in ods_sheet.findall('.//{%s}table-row' % ods.NS['table']):
if (
row.findall('.//{%s}table-cell/{%s}p' % (ods.NS['table'], ods.NS['text']))[0].text
== formdata.get_display_id()
):
break
else:
assert False, 'failed to find data row'
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[created_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[created_column].attrib[
'{%s}style-name' % ods.NS['table']
]
== 'DateTime'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[date_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[date_column].attrib[
'{%s}style-name' % ods.NS['table']
]
== 'Date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[number_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'float'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[number_column].attrib[
'{%s}value' % ods.NS['office']
]
== '12345'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[phone_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'string'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[old_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[old_column].attrib[
'{%s}date-value' % ods.NS['office']
]
== '1871-03-18'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[string_column].find('{%s}p' % ods.NS['text']).text
== 'plop\nplop'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[comma_number_column].attrib[
'{%s}value' % ods.NS['office']
]
== '123.45'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[not_number_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'string'
)