wcs/tests/backoffice_pages/test_export.py

556 lines
19 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import io
import os
import time
import urllib.parse
import xml.etree.ElementTree as ET
import zipfile
import pytest
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from wcs import fields
from wcs.blocks import BlockDef
from wcs.formdef import FormDef
from wcs.qommon import ods
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.upload_storage import PicklableUpload
from .test_all import create_superuser
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql', 'pickle-templates'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(
sql_mode=bool('sql' in request.param), templates_mode=bool('templates' in request.param)
)
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_backoffice_csv(pub):
create_superuser(pub)
datasource = {'type': 'formula', 'value': repr([('A', 'aa'), ('B', 'bb'), ('C', 'cc')])}
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.StringField(
id='1', label='1st field', type='string', display_locations=['validation', 'summary', 'listings']
),
fields.ItemField(
id='2',
label='2nd field',
type='item',
items=['foo', 'bar', 'baz'],
display_locations=['validation', 'summary', 'listings'],
),
fields.ItemField(id='3', label='3rd field', type='item', data_source=datasource, varname='foo'),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
for i in range(3):
formdata = formdef.data_class()()
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.data = {'1': 'FOO BAR %d' % i}
if i == 0:
formdata.data['2'] = 'foo'
formdata.data['2_display'] = 'foo'
formdata.data['3'] = 'A'
formdata.data['3_display'] = 'aa'
else:
formdata.data['2'] = 'baz'
formdata.data['2_display'] = 'baz'
formdata.data['3'] = 'C'
formdata.data['3_display'] = 'cc'
if i < 2:
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert resp.headers['content-type'].startswith('text/')
assert len(resp.text.splitlines()) == 3 # 3 + header line
assert len(resp.text.splitlines()[0].split(',')) == 7
formdef = FormDef.get_by_urlname('form-title')
formdef.fields[-1].display_locations = ['validation', 'summary', 'listings']
formdef.store()
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert len(resp.text.splitlines()[0].split(',')) == 9
# check item fields with datasources get two columns (id & text)
assert resp.text.splitlines()[0].split(',')[6] == '3rd field (identifier)'
assert resp.text.splitlines()[0].split(',')[7] == '3rd field'
assert resp.text.splitlines()[1].split(',')[6] == 'C'
assert resp.text.splitlines()[1].split(',')[7] == 'cc'
resp = app.get('/backoffice/management/form-title/')
resp.forms['listing-settings']['filter'] = 'all'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 4
# test status filter
resp.forms['listing-settings']['filter'] = 'pending'
resp.forms['listing-settings']['filter-2'].checked = True
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-2-value'] = 'baz'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 2
# test criteria filters
resp.forms['listing-settings']['filter-start'].checked = True
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-start-value'] = datetime.datetime(2015, 2, 1).strftime('%Y-%m-%d')
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 1
resp.forms['listing-settings']['filter-start-value'] = datetime.datetime(2014, 2, 1).strftime('%Y-%m-%d')
resp = resp.forms['listing-settings'].submit()
resp.forms['listing-settings']['filter-2-value'] = 'baz'
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert len(resp_csv.text.splitlines()) == 2
assert 'Created' in resp_csv.text.splitlines()[0]
# test column selection
resp.forms['listing-settings']['time'].checked = False
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert 'Created' not in resp_csv.text.splitlines()[0]
@pytest.fixture
def threshold():
from wcs.backoffice.management import FormPage
FormPage.WCS_SYNC_EXPORT_LIMIT = 1
yield
FormPage.WCS_SYNC_EXPORT_LIMIT = 100
def test_backoffice_export_long_listings(pub, threshold):
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.StringField(
id='1', label='1st field', type='string', display_locations=['validation', 'summary', 'listings']
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
for i in range(2):
formdata = formdef.data_class()()
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.data = {'1': 'BAZ BAZ %d' % i}
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export as CSV File')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
resp_lines = resp.text.splitlines()
assert resp_lines[0] == 'Number,Created,Last Modified,User Label,1st field,Status'
assert len(resp_lines) == 3
assert resp_lines[1].split(',')[1].startswith(time.strftime('%Y-%m-%d', formdata.receipt_time))
assert resp_lines[1].split(',')[2].startswith(time.strftime('%Y-%m-%d', formdata.last_update_time))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
# check afterjob ajax call
status_resp = app.get('/afterjobs/' + job_id)
assert status_resp.text == 'completed|completed'
# check error handling
app.get('/afterjobs/whatever', status=404)
def test_backoffice_csv_export_channel(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
assert 'Channel' not in resp_csv.text.splitlines()[0]
# add submission channel column
resp.forms['listing-settings']['submission_channel'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] == 'Channel'
assert resp_csv.text.splitlines()[1].split(',')[-1] == 'Web'
def test_backoffice_csv_export_anonymised(pub):
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] != 'Anonymised'
# add anonymised column
resp.forms['listing-settings']['anonymised'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-1] == 'Anonymised'
assert resp_csv.text.splitlines()[1].split(',')[-1] == 'No'
def test_backoffice_csv_export_block(pub):
create_superuser(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', required=True, label='Test', type='string', varname='foo'),
fields.StringField(id='234', required=True, label='Test2', type='string', varname='bar'),
]
block.digest_template = 'X{{foobar_var_foo}}Y'
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.BlockField(id='1', label='test', type='block:foobar', max_items=3),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'1': {
'data': [
{'123': 'foo', '234': 'bar'},
{'123': 'foo2', '234': 'bar2'},
],
'schema': {'123': 'string', '234': 'string'},
}
}
formdata.just_created()
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp_csv = resp.click('Export as CSV File')
resp.forms['listing-settings']['1'].checked = True
resp = resp.forms['listing-settings'].submit()
resp_csv = resp.click('Export as CSV File')
assert resp_csv.text.splitlines()[0].split(',')[-3:] == ['test - 1', 'test - 2', 'test - 3']
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['XfooY', 'Xfoo2Y', '']
def test_backoffice_csv_export_ordering(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.ItemField(
id='1',
label='field 1',
type='item',
items=['foo', 'bar', 'baz'],
display_locations=['validation', 'summary', 'listings'],
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'1': 'foo', '1_display': 'foo'}
formdata.jump_status('new')
formdata.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'bar', '1_display': 'bar'}
formdata.jump_status('new')
formdata.store()
app = login(get_app(pub))
resp_csv = app.get('/backoffice/management/form-title/csv')
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['-', 'bar', 'New']
assert resp_csv.text.splitlines()[2].split(',')[-3:] == ['-', 'foo', 'New']
resp_csv = app.get('/backoffice/management/form-title/csv?order_by=id')
assert resp_csv.text.splitlines()[1].split(',')[-3:] == ['-', 'foo', 'New']
assert resp_csv.text.splitlines()[2].split(',')[-3:] == ['-', 'bar', 'New']
def test_backoffice_ods(pub):
create_superuser(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.FileField(
id='4', label='file field', type='file', display_locations=['validation', 'summary', 'listings']
),
fields.DateField(
id='5', label='date field', type='date', display_locations=['validation', 'summary', 'listings']
),
fields.StringField(
id='6',
label='number field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='7',
label='phone field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.DateField(
id='8',
label='very old field',
type='date',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='9',
label='string field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='10',
label='number with comma field',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
fields.StringField(
id='11',
label='not a number, with underscore',
type='string',
display_locations=['validation', 'summary', 'listings'],
),
]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.headers['content-type'] == 'application/vnd.oasis.opendocument.spreadsheet'
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'4': PicklableUpload('/foo/bar', content_type='text/plain'),
'5': time.strptime('2015-05-12', '%Y-%m-%d'),
'6': '12345',
'7': '0102030405',
'8': time.strptime('1871-03-18', '%Y-%m-%d'),
'9': 'plop\npl\x1dop', # with control characters
'10': ' 123,45',
'11': '1_000_000',
}
formdata.data['4'].receive([b'hello world'])
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.headers['content-type'] == 'application/vnd.oasis.opendocument.spreadsheet'
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
assert (
elem.attrib['{%s}href' % ods.NS['xlink']]
== 'http://example.net/backoffice/management/form-title/%s/files/4/bar' % formdata.id
)
resp = app.get(elem.attrib['{%s}href' % ods.NS['xlink']])
assert resp.text == 'hello world'
all_texts = [
x.text for x in ods_sheet.findall('.//{%s}table-row//{%s}p' % (ods.NS['table'], ods.NS['text']))
]
created_column = all_texts.index('Created')
date_column = all_texts.index('date field')
number_column = all_texts.index('number field')
phone_column = all_texts.index('phone field')
old_column = all_texts.index('very old field')
string_column = all_texts.index('string field')
comma_number_column = all_texts.index('number with comma field')
not_number_column = all_texts.index('not a number, with underscore')
for row in ods_sheet.findall('.//{%s}table-row' % ods.NS['table']):
if (
row.findall('.//{%s}table-cell/{%s}p' % (ods.NS['table'], ods.NS['text']))[0].text
== formdata.get_display_id()
):
break
else:
assert False, 'failed to find data row'
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[created_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[created_column].attrib[
'{%s}style-name' % ods.NS['table']
]
== 'DateTime'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[date_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[date_column].attrib[
'{%s}style-name' % ods.NS['table']
]
== 'Date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[number_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'float'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[number_column].attrib[
'{%s}value' % ods.NS['office']
]
== '12345'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[phone_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'string'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[old_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'date'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[old_column].attrib[
'{%s}date-value' % ods.NS['office']
]
== '1871-03-18'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[string_column].find('{%s}p' % ods.NS['text']).text
== 'plop\nplop'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[comma_number_column].attrib[
'{%s}value' % ods.NS['office']
]
== '123.45'
)
assert (
row.findall('.//{%s}table-cell' % ods.NS['table'])[not_number_column].attrib[
'{%s}value-type' % ods.NS['office']
]
== 'string'
)