849 lines
30 KiB
Python
849 lines
30 KiB
Python
import datetime
|
|
import json
|
|
import re
|
|
import uuid
|
|
from io import StringIO
|
|
|
|
import freezegun
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.files import File
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.timezone import now
|
|
from webtest import Upload
|
|
|
|
from passerelle.apps.csvdatasource.models import CsvDataSource, Query
|
|
from passerelle.apps.photon.models import Photon
|
|
from passerelle.base.models import AccessRight, ApiUser, Job, ResourceLog, ResourceStatus
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def login(app, username='admin', password='admin'):
|
|
login_page = app.get('/login/')
|
|
login_form = login_page.forms[0]
|
|
login_form['username'] = username
|
|
login_form['password'] = password
|
|
resp = login_form.submit()
|
|
assert resp.status_int == 302
|
|
return app
|
|
|
|
|
|
def test_homepage_redirect(app):
|
|
assert app.get('/', status=302).location.endswith('/manage/')
|
|
|
|
|
|
def test_unlogged_access(app):
|
|
# connect while not being logged in
|
|
assert app.get('/manage/', status=302).location.endswith('/login/?next=/manage/')
|
|
|
|
|
|
def test_simple_user_access(app, simple_user):
|
|
# connect while being logged as a simple user
|
|
app = login(app, username='user', password='user')
|
|
assert app.get('/manage/', status=403)
|
|
assert app.get('/manage/add', status=403)
|
|
assert app.get('/manage/ovh/add', status=403)
|
|
assert app.get('/manage/access/', status=403)
|
|
|
|
|
|
def test_access(app, admin_user):
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
assert 'Add Connector' in resp.text
|
|
assert app.get('/manage/access/', status=200)
|
|
|
|
|
|
def test_add_connector(app, admin_user):
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Add Connector')
|
|
assert 'Business Process Connectors' in resp.text
|
|
assert 'Geographic information system' in resp.text
|
|
resp = resp.click('Base Adresse Web Service')
|
|
resp.forms[0]['title'] = 'Test Connector'
|
|
resp.forms[0]['slug'] = 'test-connector'
|
|
resp.forms[0]['description'] = 'Connector for a simple test'
|
|
resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/'
|
|
resp = resp.forms[0].submit()
|
|
assert resp.status_int == 302
|
|
assert resp.location.endswith('/base-adresse/test-connector/')
|
|
resp = resp.follow()
|
|
assert 'Base Adresse Web Service - Test Connector' in resp.text
|
|
|
|
resp = app.get('/manage/', status=200)
|
|
assert 'Test Connector' in resp.text
|
|
|
|
|
|
def test_add_connector_unique_slug(app, admin_user):
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Add Connector')
|
|
resp = resp.click('Base Adresse Web Service')
|
|
assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug'
|
|
|
|
resp.forms[0]['title'] = 'Test Connector'
|
|
resp.forms[0]['slug'] = 'test-connector'
|
|
resp.forms[0]['description'] = 'Connector for a simple test'
|
|
resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/'
|
|
resp1 = resp.forms[0].submit()
|
|
assert resp1.status_int == 302
|
|
|
|
resp2 = resp.forms[0].submit()
|
|
assert 'There were errors processing your form.' in resp2.text
|
|
assert 'this Identifier already exists.' in resp2.text
|
|
resp.forms[0]['slug'] = 'foo'
|
|
resp2 = resp.forms[0].submit()
|
|
assert resp2.status_int == 302
|
|
|
|
|
|
def test_edit_connector(app, admin_user):
|
|
Photon.objects.create(
|
|
slug='test-connector',
|
|
title='Test Connector',
|
|
description='Connector for a simple test',
|
|
service_url='https://example.org/',
|
|
)
|
|
app = login(app)
|
|
resp = app.get('/manage/photon/test-connector/edit', status=200)
|
|
assert 'Test Connector' in resp.text
|
|
|
|
resp.forms[0]['title'] = 'Connector updated'
|
|
resp.forms[0]['description'] = 'simple test'
|
|
resp.forms[0]['service_url'] = 'https://example.net/'
|
|
resp = resp.forms[0].submit()
|
|
assert resp.status_int == 302
|
|
assert resp.location.endswith('/photon/test-connector/')
|
|
resp = resp.follow()
|
|
assert 'Connector updated' in resp.text
|
|
|
|
resp = app.get('/manage/photon/test-connector/edit', status=200)
|
|
resp.forms[0]['slug'] = 'test'
|
|
resp = resp.forms[0].submit()
|
|
assert resp.status_int == 302
|
|
assert resp.location.endswith('/photon/test/')
|
|
|
|
|
|
def test_edit_connector_unique_slug(app, admin_user):
|
|
Photon.objects.create(
|
|
slug='test-connector',
|
|
title='Test Connector',
|
|
description='Connector for a simple test',
|
|
service_url='https://example.org/',
|
|
)
|
|
Photon.objects.create(
|
|
slug='other-connector',
|
|
title='Other Connector',
|
|
description='Connector for another test',
|
|
service_url='https://example.org/',
|
|
)
|
|
app = login(app)
|
|
resp = app.get('/manage/photon/test-connector/edit', status=200)
|
|
assert 'Test Connector' in resp.text
|
|
assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug'
|
|
|
|
resp.forms[0]['slug'] = 'other-connector'
|
|
resp = resp.forms[0].submit()
|
|
assert resp.status_int == 200
|
|
assert 'Photon with this Identifier already exists.' in resp.text
|
|
|
|
# if title and slug name are not in sync, disable prepopulate slug by JS
|
|
resp.forms[0]['slug'] = 'another-connector'
|
|
resp = resp.forms[0].submit()
|
|
resp = app.get('/manage/photon/another-connector/edit', status=200)
|
|
assert 'data-slug-sync' not in resp.html.find('input', {'name': 'title'}).attrs
|
|
|
|
|
|
def test_visit_connectors(app, admin_user):
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Add Connector')
|
|
for link in re.findall('href="(/manage.*add)"', resp.text):
|
|
resp = app.get(link, status=200)
|
|
|
|
|
|
def test_access_management(app, admin_user):
|
|
assert ApiUser.objects.count() == 0
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Access Management')
|
|
resp = resp.click('Add API User')
|
|
resp.form['username'] = 'foo'
|
|
resp.form['fullname'] = 'Foo'
|
|
resp = resp.form.submit().follow()
|
|
assert ApiUser.objects.count() == 1
|
|
assert ApiUser.objects.get(username='foo').fullname == 'Foo'
|
|
|
|
resp = resp.click('Add API User')
|
|
resp.form['username'] = 'bar'
|
|
resp.form['fullname'] = 'Bar'
|
|
resp.form['keytype'] = 'API'
|
|
resp = resp.form.submit()
|
|
assert 'Key can not be empty' in resp.text
|
|
|
|
resp = resp.click('Add API User')
|
|
resp.form['username'] = 'bar'
|
|
resp.form['fullname'] = 'Bar'
|
|
resp.form['keytype'] = 'SIGN'
|
|
resp.form['key'] = '123'
|
|
resp = resp.form.submit()
|
|
assert 'Signature Key length must be at least 256 bits.' in resp.text
|
|
|
|
|
|
def test_menu_json(app, admin_user):
|
|
app.get('/manage/menu.json', status=302)
|
|
|
|
app = login(app)
|
|
resp = app.get('/manage/menu.json')
|
|
assert resp.headers['content-type'] == 'application/json'
|
|
assert resp.json[0]['label'] == 'Web Services'
|
|
|
|
resp = app.get('/manage/menu.json?callback=FooBar')
|
|
assert resp.headers['content-type'] == 'application/javascript'
|
|
assert resp.content.startswith(b'FooBar([{"')
|
|
|
|
|
|
def test_logs(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
|
|
query = Query(slug='fooba', resource=csv, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'text:text'])
|
|
query.save()
|
|
|
|
api = ApiUser.objects.create(
|
|
username='public', fullname='public', description='access for all', keytype='', key=''
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access',
|
|
apiuser=api,
|
|
resource_type=obj_type,
|
|
resource_pk=csv.pk,
|
|
)
|
|
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert '<p>No records found</p>' in resp.text
|
|
|
|
app.get('/csvdatasource/test/query/foobar/')
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'endpoint GET /csvdatasource/test/query/foobar/ ' in resp.text
|
|
|
|
app.get('/csvdatasource/test/query/foobar/?q=toto')
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'endpoint GET /csvdatasource/test/query/foobar/?q=toto' in resp.text
|
|
|
|
resp = resp.click('Full page')
|
|
assert resp.text.count('<td class="timestamp">') == 4
|
|
assert resp.text.count('Error occurred while processing request') == 2
|
|
|
|
resp.form['q'] = 'toto'
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<td class="timestamp">') == 1
|
|
|
|
resp.form['q'] = datetime.date.today().strftime('%d/%m/%Y')
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<td class="timestamp">') == 4
|
|
|
|
resp.form['q'] = datetime.date.today().strftime('%d/%m/2010')
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<td class="timestamp">') == 0
|
|
|
|
resp.form['q'] = 'Wed, 29 Sep 2021 00:40:48 +0200'
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<td class="timestamp">') == 0
|
|
|
|
resp.form['q'] = ''
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<td class="timestamp">') == 4
|
|
|
|
log_pk = re.findall(r'data-pk="(.*)"', resp.text)[0]
|
|
base_url = re.findall(r'data-log-base-url="(.*)"', resp.text)[0]
|
|
resp = app.get(base_url + log_pk + '/')
|
|
resp = resp.click('Search for logs from the same call')
|
|
assert resp.text.count('<td class="timestamp">') == 2
|
|
|
|
resp = app.get(base_url + '12345' + '/', status=404)
|
|
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_id=%s' % log_pk)
|
|
assert 'log_target' in resp.context
|
|
|
|
# unknown id
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_id=0')
|
|
assert 'log_target' not in resp.context
|
|
# bad id
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_id=foo')
|
|
assert 'log_target' not in resp.context
|
|
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_level=')
|
|
assert resp.text.count('<td class="timestamp">') == 4
|
|
assert resp.text.count('level-warning') == 2
|
|
assert resp.text.count('level-info') == 2
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_level=INFO')
|
|
assert resp.text.count('<td class="timestamp">') == 2
|
|
assert resp.text.count('level-warning') == 0
|
|
assert resp.text.count('level-info') == 2
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_level=ERROR')
|
|
assert resp.text.count('<td class="timestamp">') == 0
|
|
assert resp.text.count('level-warning') == 0
|
|
assert resp.text.count('level-info') == 0
|
|
resp = app.get('/manage/csvdatasource/test/logs/?log_level=foobar')
|
|
assert resp.text.count('<td class="timestamp">') == 4
|
|
assert resp.text.count('level-warning') == 2
|
|
assert resp.text.count('level-info') == 2
|
|
|
|
with freezegun.freeze_time('2020-10-06 14:08:12'):
|
|
log1 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(), slug=csv.slug, levelno=50, message='hop'
|
|
)
|
|
log2 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(), slug=csv.slug, levelno=42, message='hop'
|
|
)
|
|
resp = app.get('/manage/csvdatasource/test/logs/%s/' % log1.pk)
|
|
assert 'title="Critical - Oct. 6, 2020 14:08:12"' in resp.text
|
|
resp = app.get('/manage/csvdatasource/test/logs/%s/' % log2.pk)
|
|
assert 'title="Level 42 - Oct. 6, 2020 14:08:12"' in resp.text
|
|
|
|
|
|
def test_logs_tab_pagination(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
|
|
query = Query(slug='fooba', resource=csv, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'text:text'])
|
|
query.save()
|
|
|
|
api = ApiUser.objects.create(
|
|
username='public', fullname='public', description='access for all', keytype='', key=''
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access',
|
|
apiuser=api,
|
|
resource_type=obj_type,
|
|
resource_pk=csv.pk,
|
|
)
|
|
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert '<p>No records found</p>' in resp.text
|
|
|
|
for i in range(20):
|
|
app.get('/csvdatasource/test/query/foobar/?q=toto%s' % i)
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert resp.pyquery('#panel-logs[hidden]')
|
|
assert resp.pyquery('#panel-endpoints:not([hidden])')
|
|
assert resp.pyquery('#table-logs tr').length == 10
|
|
assert resp.pyquery('#panel-logs .paginator a:first').text() == '2'
|
|
resp = resp.click('2')
|
|
assert resp.pyquery('#panel-logs:not([hidden])')
|
|
assert resp.pyquery('#panel-endpoints[hidden]')
|
|
assert resp.pyquery('#table-logs tr').length == 10
|
|
resp.pyquery('#panel-logs .paginator a:first').text()
|
|
assert resp.pyquery('#panel-logs .paginator a:first').text() == '1'
|
|
|
|
|
|
def test_logs_search(app, admin_user):
|
|
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
|
|
app = login(app)
|
|
|
|
transaction_id = str(uuid.uuid4())
|
|
looks_like_transaction_id = str(uuid.uuid4())
|
|
log1 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(),
|
|
slug=csv.slug,
|
|
levelno=1,
|
|
message='hop',
|
|
transaction_id=transaction_id,
|
|
extra={'transaction_id': transaction_id, 'foo': 'plop'},
|
|
)
|
|
log2 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(),
|
|
slug=csv.slug,
|
|
levelno=1,
|
|
message='plop',
|
|
transaction_id=transaction_id,
|
|
extra={'bar': 'hop'},
|
|
)
|
|
log3 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message='foo', extra={'bar': 'hop'}
|
|
)
|
|
log4 = ResourceLog.objects.create(
|
|
appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message=looks_like_transaction_id
|
|
)
|
|
|
|
resp = app.get('/manage/csvdatasource/t/logs/')
|
|
assert list(resp.context['page_obj'].object_list) == [log4, log3, log2, log1]
|
|
|
|
resp.form['q'] = transaction_id
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == [log2, log1]
|
|
|
|
resp.form['q'] = looks_like_transaction_id
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == [log4]
|
|
|
|
resp.form['q'] = str(uuid.uuid4())
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == []
|
|
|
|
resp.form['q'] = 'not there'
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == []
|
|
|
|
resp.form['q'] = 'hop'
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == [log3, log2, log1]
|
|
|
|
resp.form['q'] = 'plop'
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == [log2, log1]
|
|
|
|
resp.form['q'] = 'foo'
|
|
resp = resp.form.submit()
|
|
assert list(resp.context['page_obj'].object_list) == [log3, log1]
|
|
|
|
|
|
def test_logging_parameters(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Logging parameters')
|
|
resp.form['log_level'] = 'ERROR'
|
|
resp = resp.form.submit()
|
|
assert CsvDataSource.objects.get(id=csv.id).log_level == 'ERROR'
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Logging parameters')
|
|
resp.form['log_level'] = 'DEBUG'
|
|
resp = resp.form.submit()
|
|
assert CsvDataSource.objects.get(id=csv.id).log_level == 'DEBUG'
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Logging parameters')
|
|
resp.form['trace_emails'] = 'fred@localhost'
|
|
resp = resp.form.submit()
|
|
assert CsvDataSource.objects.get(id=csv.id).logging_parameters.trace_emails == 'fred@localhost'
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Logging parameters')
|
|
assert resp.form['trace_emails'].value == 'fred@localhost'
|
|
|
|
|
|
def test_availability_parameters(app, admin_user, monkeypatch):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
|
|
assert csv.availability_parameters.run_check
|
|
# csv connector has the default check_status which does nothing
|
|
# so availability check is hidden
|
|
assert 'Availability check' not in resp.text
|
|
|
|
def check_status(*args, **kwargs):
|
|
return True
|
|
|
|
monkeypatch.setattr(CsvDataSource, 'check_status', check_status)
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'Availability check' in resp.text
|
|
|
|
resp = resp.click('Availability check')
|
|
assert 'up' in resp.text
|
|
resp.form['run_check'] = False
|
|
resp = resp.form.submit()
|
|
# Connector status not changed, availability parameters changed
|
|
assert not csv.availability_parameters.run_check
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Availability check')
|
|
resp.form['run_check'] = True
|
|
resp = resp.form.submit()
|
|
|
|
# Connector down
|
|
resource_type = ContentType.objects.get_for_model(csv)
|
|
status = ResourceStatus(resource_type=resource_type, resource_pk=csv.pk, status='down', message='')
|
|
status.save()
|
|
assert csv.down()
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Availability check')
|
|
resp.form['run_check'] = False
|
|
resp = resp.form.submit()
|
|
# Connector is put back up
|
|
assert not csv.availability_parameters.run_check
|
|
assert not csv.down()
|
|
status = csv.get_availability_status()
|
|
assert status.status == 'up'
|
|
|
|
# Notification delays
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Availability check')
|
|
assert resp.form['notification_delays'].value == '0'
|
|
resp.form['notification_delays'] = '0,5,100'
|
|
resp = resp.form.submit().follow()
|
|
assert not resp.pyquery('.messages .warning')
|
|
assert csv.availability_parameters.notification_delays == '0,5,100'
|
|
|
|
resp = app.get(csv.get_absolute_url())
|
|
resp = resp.click('Availability check')
|
|
resp.form['notification_delays'] = 'x'
|
|
resp = resp.form.submit()
|
|
assert len(resp.pyquery('#id_notification_delays_p .error'))
|
|
|
|
|
|
def test_jobs(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
|
|
api = ApiUser.objects.create(
|
|
username='public', fullname='public', description='access for all', keytype='', key=''
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access',
|
|
apiuser=api,
|
|
resource_type=obj_type,
|
|
resource_pk=csv.pk,
|
|
)
|
|
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'jobs' not in resp.text
|
|
|
|
csv.add_job('sample_job')
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'jobs' in resp.text
|
|
assert 'sample_job' in resp.text
|
|
|
|
resp = resp.click('Full page', index=1)
|
|
assert resp.text.count('<tr data-pk') == 1
|
|
assert resp.text.count('sample_job') == 1
|
|
|
|
resp.form['q'] = 'sample'
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<tr data-pk') == 1
|
|
|
|
resp.form['q'] = 'blah'
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<tr data-pk') == 0
|
|
|
|
resp.form['q'] = datetime.date.today().strftime('%d/%m/%Y')
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<tr data-pk') == 1
|
|
|
|
resp.form['q'] = datetime.date.today().strftime('%d/%m/2010')
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<tr data-pk') == 0
|
|
|
|
resp.form['q'] = ''
|
|
resp = resp.form.submit()
|
|
assert resp.text.count('<tr data-pk') == 1
|
|
job_pk = re.findall(r'data-pk="(.*)"', resp.text)[0]
|
|
base_url = re.findall(r'data-job-base-url="(.*)"', resp.text)[0]
|
|
resp = app.get(base_url + job_pk + '/')
|
|
resp = app.get(base_url + '12345' + '/', status=404)
|
|
|
|
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=%s' % job_pk)
|
|
assert 'job_target' in resp.context
|
|
|
|
# unknown id
|
|
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=0')
|
|
assert 'job_target' not in resp.context
|
|
# bad id
|
|
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=foo')
|
|
assert 'job_target' not in resp.context
|
|
|
|
|
|
def test_job_restart(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
app = login(app)
|
|
|
|
# unknown job
|
|
app.get('/manage/csvdatasource/test/jobs/0/restart/', status=404)
|
|
app.get('/manage/csvdatasource/test/jobs/foo/restart/', status=404)
|
|
|
|
# create a job, with wrong status
|
|
job = Job.objects.create(resource=csv, status='registered')
|
|
app.get('/manage/csvdatasource/test/jobs/%s/restart/' % job.pk, status=404)
|
|
|
|
job.status = 'failed'
|
|
job.done_timestamp = now()
|
|
job.status_details = {'error_summary': 'foo bar'}
|
|
job.save()
|
|
|
|
# unknown connector
|
|
app.get('/manage/csvdatasourc/test/jobs/%s/restart/' % job.pk, status=404)
|
|
app.get('/manage/csvdatasource/teste/jobs/%s/restart/' % job.pk, status=404)
|
|
|
|
# ok, restart job
|
|
resp = app.get('/manage/csvdatasource/test/jobs/%s/restart/' % job.pk, status=302)
|
|
new_job = Job.objects.latest('pk')
|
|
assert resp.location.endswith('/manage/csvdatasource/test/jobs/')
|
|
assert new_job.status == 'registered'
|
|
assert new_job.done_timestamp is None
|
|
assert new_job.status_details == {}
|
|
|
|
job.refresh_from_db()
|
|
assert job.status == 'restarted'
|
|
assert job.status_details == {'error_summary': 'foo bar', 'new_job_pk': new_job.pk}
|
|
|
|
resp = app.get('/manage/csvdatasource/test/jobs/%s/' % job.pk)
|
|
assert '/manage/csvdatasource/test/jobs/?job_id=%s' % new_job.pk in resp
|
|
|
|
|
|
def test_manager_import_export(app, admin_user):
|
|
data = StringIO('1;Foo\n2;Bar\n3;Baz')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
CsvDataSource.objects.create(
|
|
csv_file=File(data, 't.csv'),
|
|
columns_keynames='id, text',
|
|
slug='test2',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
api = ApiUser.objects.create(
|
|
username='public', fullname='public', description='access for all', keytype='', key=''
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access',
|
|
apiuser=api,
|
|
resource_type=obj_type,
|
|
resource_pk=csv.pk,
|
|
)
|
|
|
|
# cannot export if not connected
|
|
resp = app.get('/%s/%s/' % (csv.get_connector_slug(), csv.slug), status=200)
|
|
assert 'Export' not in resp.html.find('ul', {'class': 'extra-actions-menu'}).text
|
|
|
|
# export site
|
|
app = login(app)
|
|
resp = app.get('/manage/')
|
|
with freezegun.freeze_time('2020-09-01'):
|
|
resp = resp.click('Export')
|
|
assert resp.headers['content-type'] == 'application/json'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="export_connectors_20200901.json"'
|
|
site_export = resp.text
|
|
|
|
# invalid json
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', b'garbage', 'application/json')
|
|
resp = resp.form.submit()
|
|
assert 'File is not in the expected JSON format.' in resp.text
|
|
|
|
# empty json
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', b'{}', 'application/json')
|
|
resp = resp.form.submit().follow()
|
|
assert CsvDataSource.objects.count() == 2
|
|
|
|
# unknown connectors
|
|
export = {
|
|
'resources': [
|
|
{'@type': 'passerelle-resource', 'resource_type': 'not_installed_app1.app_model'},
|
|
{'@type': 'passerelle-resource', 'resource_type': 'not_installed_app2.app_model'},
|
|
]
|
|
}
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', force_bytes(json.dumps(export)), 'application/json')
|
|
resp = resp.form.submit()
|
|
assert (
|
|
resp.html.find('ul', {'class': 'errorlist'}).li.text
|
|
== "Unknown connectors: not_installed_app1, not_installed_app2"
|
|
)
|
|
|
|
# import site
|
|
CsvDataSource.objects.all().delete()
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
|
|
resp = resp.form.submit().follow()
|
|
assert CsvDataSource.objects.count() == 2
|
|
|
|
# export connector
|
|
resp = app.get('/%s/%s/' % (csv.get_connector_slug(), csv.slug), status=200)
|
|
assert 'Export' in resp.html.find('ul', {'class': 'extra-actions-menu'}).text
|
|
with freezegun.freeze_time('2020-09-01'):
|
|
resp = resp.click('Export')
|
|
assert resp.headers['content-type'] == 'application/json'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="export_%s_%s_20200901.json"' % (
|
|
csv.get_connector_slug(),
|
|
csv.slug,
|
|
)
|
|
connector_export = resp.text
|
|
|
|
# import connector
|
|
csv.delete()
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', connector_export.encode('utf-8'), 'application/json')
|
|
resp = resp.form.submit().follow()
|
|
assert CsvDataSource.objects.count() == 2
|
|
assert CsvDataSource.objects.filter(slug='test').exists()
|
|
|
|
# import users
|
|
ApiUser.objects.all().delete()
|
|
AccessRight.objects.all().delete()
|
|
resp = app.get('/manage/', status=200)
|
|
resp = resp.click('Import')
|
|
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
|
|
resp = resp.form.submit().follow()
|
|
assert not ApiUser.objects.exists()
|
|
assert not AccessRight.objects.exists()
|
|
resp = resp.click('Import')
|
|
resp.form['import_users'] = True
|
|
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
|
|
resp = resp.form.submit().follow()
|
|
assert ApiUser.objects.filter(username='public').exists()
|
|
assert AccessRight.objects.filter(codename='can_access').exists()
|
|
|
|
|
|
def test_manager_open_access_information(app, admin_user):
|
|
photon = Photon.objects.create(slug='t')
|
|
|
|
app = login(app)
|
|
resp = app.get(photon.get_absolute_url())
|
|
assert resp.html.find('div', {'id': 'panel-security'}).div.text.strip() == 'Access is open.'
|
|
|
|
|
|
def test_manager_add_open_access_warning(app, admin_user):
|
|
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
|
|
private = ApiUser.objects.create(username='private', fullname='private', keytype='', key='xxx')
|
|
public = ApiUser.objects.create(username='public', fullname='private', keytype='', key='')
|
|
assert AccessRight.objects.count() == 0
|
|
|
|
# adding private api user works
|
|
app = login(app)
|
|
resp = app.get(csv.get_absolute_url())
|
|
assert 'Access is limited' in resp.html.find('div', {'id': 'panel-security'}).div.text.strip()
|
|
resp = resp.click('Add', href='/can_access/')
|
|
resp.form['apiuser'] = private.pk
|
|
resp = resp.form.submit().follow()
|
|
assert AccessRight.objects.count() == 1
|
|
|
|
# adding public user displays a warning
|
|
resp = resp.click('Add', href='/can_access/')
|
|
resp.form['apiuser'] = public.pk
|
|
resp = resp.form.submit()
|
|
assert AccessRight.objects.count() == 1
|
|
assert 'user has no security' in resp.text
|
|
|
|
resp = resp.form.submit()
|
|
assert AccessRight.objects.count() == 1
|
|
assert 'user has no security' in resp.text
|
|
|
|
# user has to check a box to procceed
|
|
resp.form['confirm_open_access'] = True
|
|
resp.form.submit().follow()
|
|
assert AccessRight.objects.count() == 2
|
|
|
|
|
|
def test_manager_open_access_flag(app, admin_user):
|
|
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
|
|
api = ApiUser.objects.create(username='test', fullname='test', keytype='', key='test')
|
|
|
|
app = login(app)
|
|
resp = app.get('/manage/', status=200)
|
|
assert len(resp.pyquery('li.connector')) == 1
|
|
assert 'open access' not in resp.text
|
|
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk)
|
|
|
|
resp = app.get('/manage/', status=200)
|
|
assert 'open access' not in resp.text
|
|
|
|
api.key = ''
|
|
api.save()
|
|
|
|
resp = app.get('/manage/', status=200)
|
|
assert 'open access' in resp.text
|
|
|
|
|
|
class TestRequestsSubstitutionsDisplay:
|
|
@pytest.fixture
|
|
def setup(self, settings):
|
|
settings.CONNECTORS_SETTINGS = {
|
|
"photon/t": {
|
|
"requests_substitutions": [
|
|
{
|
|
'search': 'abcd',
|
|
'replace': 'efgh',
|
|
}
|
|
]
|
|
}
|
|
}
|
|
photon = Photon.objects.create(slug='t')
|
|
return {'url': photon.get_absolute_url()}
|
|
|
|
def test_shown_when_logged(self, setup, app, admin_user):
|
|
app = login(app)
|
|
resp = app.get(setup['url'])
|
|
paragraph = str(resp.pyquery('#requests-substitutions'))
|
|
assert 'Requests substitutions' in paragraph
|
|
assert 'URL:' not in paragraph
|
|
assert 'Search: <code>abcd</code>' in paragraph
|
|
assert 'Replace: <code>efgh</code>' in paragraph
|
|
|
|
def test_absent_when_unlogged(self, setup, app):
|
|
resp = app.get(setup['url'])
|
|
paragraph = str(resp.pyquery('#requests-substitutions'))
|
|
assert 'Requests substitutions' not in paragraph
|
|
|
|
def test_url(self, setup, app, admin_user, settings):
|
|
settings.CONNECTORS_SETTINGS['photon/t']['requests_substitutions'][0]['url'] = 'https://example.com/'
|
|
app = login(app)
|
|
resp = app.get(setup['url'])
|
|
paragraph = str(resp.pyquery('#requests-substitutions'))
|
|
assert 'URL: https://example.com/' in paragraph
|