passerelle/tests/test_manager.py

848 lines
30 KiB
Python

import datetime
import json
import re
import uuid
from io import StringIO
import freezegun
import pytest
from django.contrib.contenttypes.models import ContentType
from django.core.files import File
from django.utils.encoding import force_bytes
from django.utils.timezone import now
from webtest import Upload
from passerelle.apps.csvdatasource.models import CsvDataSource, Query
from passerelle.apps.photon.models import Photon
from passerelle.base.models import AccessRight, ApiUser, Job, ResourceLog, ResourceStatus
pytestmark = pytest.mark.django_db
def login(app, username='admin', password='admin'):
login_page = app.get('/login/')
login_form = login_page.forms[0]
login_form['username'] = username
login_form['password'] = password
resp = login_form.submit()
assert resp.status_int == 302
return app
def test_homepage_redirect(app):
assert app.get('/', status=302).location.endswith('/manage/')
def test_unlogged_access(app):
# connect while not being logged in
assert app.get('/manage/', status=302).location.endswith('/login/?next=/manage/')
def test_simple_user_access(app, simple_user):
# connect while being logged as a simple user
app = login(app, username='user', password='user')
assert app.get('/manage/', status=403)
assert app.get('/manage/add', status=403)
assert app.get('/manage/ovh/add', status=403)
assert app.get('/manage/access/', status=403)
def test_access(app, admin_user):
app = login(app)
resp = app.get('/manage/', status=200)
assert 'Add Connector' in resp.text
assert app.get('/manage/access/', status=200)
def test_add_connector(app, admin_user):
app = login(app)
resp = app.get('/manage/', status=200)
resp = resp.click('Add Connector')
assert 'Business Process Connectors' in resp.text
assert 'Geographic information system' in resp.text
resp = resp.click('Base Adresse Web Service')
resp.forms[0]['title'] = 'Test Connector'
resp.forms[0]['slug'] = 'test-connector'
resp.forms[0]['description'] = 'Connector for a simple test'
resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/'
resp = resp.forms[0].submit()
assert resp.status_int == 302
assert resp.location.endswith('/base-adresse/test-connector/')
resp = resp.follow()
assert 'Base Adresse Web Service - Test Connector' in resp.text
resp = app.get('/manage/', status=200)
assert 'Test Connector' in resp.text
def test_add_connector_unique_slug(app, admin_user):
app = login(app)
resp = app.get('/manage/', status=200)
resp = resp.click('Add Connector')
resp = resp.click('Base Adresse Web Service')
assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug'
resp.forms[0]['title'] = 'Test Connector'
resp.forms[0]['slug'] = 'test-connector'
resp.forms[0]['description'] = 'Connector for a simple test'
resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/'
resp1 = resp.forms[0].submit()
assert resp1.status_int == 302
resp2 = resp.forms[0].submit()
assert 'There were errors processing your form.' in resp2.text
assert 'this Identifier already exists.' in resp2.text
resp.forms[0]['slug'] = 'foo'
resp2 = resp.forms[0].submit()
assert resp2.status_int == 302
def test_edit_connector(app, admin_user):
Photon.objects.create(
slug='test-connector',
title='Test Connector',
description='Connector for a simple test',
service_url='https://example.org/',
)
app = login(app)
resp = app.get('/manage/photon/test-connector/edit', status=200)
assert 'Test Connector' in resp.text
resp.forms[0]['title'] = 'Connector updated'
resp.forms[0]['description'] = 'simple test'
resp.forms[0]['service_url'] = 'https://example.net/'
resp = resp.forms[0].submit()
assert resp.status_int == 302
assert resp.location.endswith('/photon/test-connector/')
resp = resp.follow()
assert 'Connector updated' in resp.text
resp = app.get('/manage/photon/test-connector/edit', status=200)
resp.forms[0]['slug'] = 'test'
resp = resp.forms[0].submit()
assert resp.status_int == 302
assert resp.location.endswith('/photon/test/')
def test_edit_connector_unique_slug(app, admin_user):
Photon.objects.create(
slug='test-connector',
title='Test Connector',
description='Connector for a simple test',
service_url='https://example.org/',
)
Photon.objects.create(
slug='other-connector',
title='Other Connector',
description='Connector for another test',
service_url='https://example.org/',
)
app = login(app)
resp = app.get('/manage/photon/test-connector/edit', status=200)
assert 'Test Connector' in resp.text
assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug'
resp.forms[0]['slug'] = 'other-connector'
resp = resp.forms[0].submit()
assert resp.status_int == 200
assert 'Photon with this Identifier already exists.' in resp.text
# if title and slug name are not in sync, disable prepopulate slug by JS
resp.forms[0]['slug'] = 'another-connector'
resp = resp.forms[0].submit()
resp = app.get('/manage/photon/another-connector/edit', status=200)
assert 'data-slug-sync' not in resp.html.find('input', {'name': 'title'}).attrs
def test_visit_connectors(app, admin_user):
app = login(app)
resp = app.get('/manage/', status=200)
resp = resp.click('Add Connector')
for link in re.findall('href="(/manage.*add)"', resp.text):
resp = app.get(link, status=200)
def test_access_management(app, admin_user):
assert ApiUser.objects.count() == 0
app = login(app)
resp = app.get('/manage/', status=200)
resp = resp.click('Access Management')
resp = resp.click('Add API User')
resp.form['username'] = 'foo'
resp.form['fullname'] = 'Foo'
resp = resp.form.submit().follow()
assert ApiUser.objects.count() == 1
assert ApiUser.objects.get(username='foo').fullname == 'Foo'
resp = resp.click('Add API User')
resp.form['username'] = 'bar'
resp.form['fullname'] = 'Bar'
resp.form['keytype'] = 'API'
resp = resp.form.submit()
assert 'Key can not be empty' in resp.text
resp = resp.click('Add API User')
resp.form['username'] = 'bar'
resp.form['fullname'] = 'Bar'
resp.form['keytype'] = 'SIGN'
resp.form['key'] = '123'
resp = resp.form.submit()
assert 'Signature Key length must be at least 256 bits.' in resp.text
def test_menu_json(app, admin_user):
app.get('/manage/menu.json', status=302)
app = login(app)
resp = app.get('/manage/menu.json')
assert resp.headers['content-type'] == 'application/json'
assert resp.json[0]['label'] == 'Web Services'
resp = app.get('/manage/menu.json?callback=FooBar')
assert resp.headers['content-type'] == 'application/javascript'
assert resp.content.startswith(b'FooBar([{"')
def test_logs(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
query = Query(slug='fooba', resource=csv, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:text'])
query.save()
api = ApiUser.objects.create(
username='public', fullname='public', description='access for all', keytype='', key=''
)
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk,
)
app = login(app)
resp = app.get(csv.get_absolute_url())
assert '<p>No records found</p>' in resp.text
app.get('/csvdatasource/test/query/foobar/')
resp = app.get(csv.get_absolute_url())
assert 'endpoint GET /csvdatasource/test/query/foobar/ ' in resp.text
app.get('/csvdatasource/test/query/foobar/?q=toto')
resp = app.get(csv.get_absolute_url())
assert 'endpoint GET /csvdatasource/test/query/foobar/?q=toto' in resp.text
resp = resp.click('Full page')
assert resp.text.count('<td class="timestamp">') == 4
assert resp.text.count('Error occurred while processing request') == 2
resp.form['q'] = 'toto'
resp = resp.form.submit()
assert resp.text.count('<td class="timestamp">') == 1
resp.form['q'] = datetime.date.today().strftime('%d/%m/%Y')
resp = resp.form.submit()
assert resp.text.count('<td class="timestamp">') == 4
resp.form['q'] = datetime.date.today().strftime('%d/%m/2010')
resp = resp.form.submit()
assert resp.text.count('<td class="timestamp">') == 0
resp.form['q'] = 'Wed, 29 Sep 2021 00:40:48 +0200'
resp = resp.form.submit()
assert resp.text.count('<td class="timestamp">') == 0
resp.form['q'] = ''
resp = resp.form.submit()
assert resp.text.count('<td class="timestamp">') == 4
log_pk = re.findall(r'data-pk="(.*)"', resp.text)[0]
base_url = re.findall(r'data-log-base-url="(.*)"', resp.text)[0]
resp = app.get(base_url + log_pk + '/')
resp = resp.click('Search for logs from the same call')
assert resp.text.count('<td class="timestamp">') == 2
resp = app.get(base_url + '12345' + '/', status=404)
resp = app.get('/manage/csvdatasource/test/logs/?log_id=%s' % log_pk)
assert 'log_target' in resp.context
# unknown id
resp = app.get('/manage/csvdatasource/test/logs/?log_id=0')
assert 'log_target' not in resp.context
# bad id
resp = app.get('/manage/csvdatasource/test/logs/?log_id=foo')
assert 'log_target' not in resp.context
resp = app.get('/manage/csvdatasource/test/logs/?log_level=')
assert resp.text.count('<td class="timestamp">') == 4
assert resp.text.count('level-warning') == 2
assert resp.text.count('level-info') == 2
resp = app.get('/manage/csvdatasource/test/logs/?log_level=INFO')
assert resp.text.count('<td class="timestamp">') == 2
assert resp.text.count('level-warning') == 0
assert resp.text.count('level-info') == 2
resp = app.get('/manage/csvdatasource/test/logs/?log_level=ERROR')
assert resp.text.count('<td class="timestamp">') == 0
assert resp.text.count('level-warning') == 0
assert resp.text.count('level-info') == 0
resp = app.get('/manage/csvdatasource/test/logs/?log_level=foobar')
assert resp.text.count('<td class="timestamp">') == 4
assert resp.text.count('level-warning') == 2
assert resp.text.count('level-info') == 2
with freezegun.freeze_time('2020-10-06 14:08:12'):
log1 = ResourceLog.objects.create(
appname=csv.get_connector_slug(), slug=csv.slug, levelno=50, message='hop'
)
log2 = ResourceLog.objects.create(
appname=csv.get_connector_slug(), slug=csv.slug, levelno=42, message='hop'
)
resp = app.get('/manage/csvdatasource/test/logs/%s/' % log1.pk)
assert 'title="Critical - Oct. 6, 2020 14:08:12"' in resp.text
resp = app.get('/manage/csvdatasource/test/logs/%s/' % log2.pk)
assert 'title="Level 42 - Oct. 6, 2020 14:08:12"' in resp.text
def test_logs_tab_pagination(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
query = Query(slug='fooba', resource=csv, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:text'])
query.save()
api = ApiUser.objects.create(
username='public', fullname='public', description='access for all', keytype='', key=''
)
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk,
)
app = login(app)
resp = app.get(csv.get_absolute_url())
assert '<p>No records found</p>' in resp.text
for i in range(20):
app.get('/csvdatasource/test/query/foobar/?q=toto%s' % i)
resp = app.get(csv.get_absolute_url())
assert resp.pyquery('#panel-logs[hidden]')
assert resp.pyquery('#panel-endpoints:not([hidden])')
assert resp.pyquery('#table-logs tr').length == 10
assert resp.pyquery('#panel-logs .paginator a:first').text() == '2'
resp = resp.click('2')
assert resp.pyquery('#panel-logs:not([hidden])')
assert resp.pyquery('#panel-endpoints[hidden]')
assert resp.pyquery('#table-logs tr').length == 10
resp.pyquery('#panel-logs .paginator a:first').text()
assert resp.pyquery('#panel-logs .paginator a:first').text() == '1'
def test_logs_search(app, admin_user):
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
app = login(app)
transaction_id = str(uuid.uuid4())
looks_like_transaction_id = str(uuid.uuid4())
log1 = ResourceLog.objects.create(
appname=csv.get_connector_slug(),
slug=csv.slug,
levelno=1,
message='hop',
transaction_id=transaction_id,
extra={'transaction_id': transaction_id, 'foo': 'plop'},
)
log2 = ResourceLog.objects.create(
appname=csv.get_connector_slug(),
slug=csv.slug,
levelno=1,
message='plop',
transaction_id=transaction_id,
extra={'bar': 'hop'},
)
log3 = ResourceLog.objects.create(
appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message='foo', extra={'bar': 'hop'}
)
log4 = ResourceLog.objects.create(
appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message=looks_like_transaction_id
)
resp = app.get('/manage/csvdatasource/t/logs/')
assert list(resp.context['page_obj'].object_list) == [log4, log3, log2, log1]
resp.form['q'] = transaction_id
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == [log2, log1]
resp.form['q'] = looks_like_transaction_id
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == [log4]
resp.form['q'] = str(uuid.uuid4())
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == []
resp.form['q'] = 'not there'
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == []
resp.form['q'] = 'hop'
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == [log3, log2, log1]
resp.form['q'] = 'plop'
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == [log2, log1]
resp.form['q'] = 'foo'
resp = resp.form.submit()
assert list(resp.context['page_obj'].object_list) == [log3, log1]
def test_logging_parameters(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
app = login(app)
resp = app.get(csv.get_absolute_url())
resp = resp.click('Logging parameters')
resp.form['log_level'] = 'ERROR'
resp = resp.form.submit()
assert CsvDataSource.objects.get(id=csv.id).log_level == 'ERROR'
resp = app.get(csv.get_absolute_url())
resp = resp.click('Logging parameters')
resp.form['log_level'] = 'DEBUG'
resp = resp.form.submit()
assert CsvDataSource.objects.get(id=csv.id).log_level == 'DEBUG'
resp = app.get(csv.get_absolute_url())
resp = resp.click('Logging parameters')
resp.form['trace_emails'] = 'fred@localhost'
resp = resp.form.submit()
assert CsvDataSource.objects.get(id=csv.id).logging_parameters.trace_emails == 'fred@localhost'
resp = app.get(csv.get_absolute_url())
resp = resp.click('Logging parameters')
assert resp.form['trace_emails'].value == 'fred@localhost'
def test_availability_parameters(app, admin_user, monkeypatch):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
app = login(app)
resp = app.get(csv.get_absolute_url())
assert csv.availability_parameters.run_check
# csv connector has the default check_status which does nothing
# so availability check is hidden
assert 'Availability check' not in resp.text
def check_status(*args, **kwargs):
return True
monkeypatch.setattr(CsvDataSource, 'check_status', check_status)
resp = app.get(csv.get_absolute_url())
assert 'Availability check' in resp.text
resp = resp.click('Availability check')
resp.form['run_check'] = False
resp = resp.form.submit()
# Connector status not changed, availability parameters changed
assert not csv.availability_parameters.run_check
resp = app.get(csv.get_absolute_url())
resp = resp.click('Availability check')
resp.form['run_check'] = True
resp = resp.form.submit()
# Connector down
resource_type = ContentType.objects.get_for_model(csv)
status = ResourceStatus(resource_type=resource_type, resource_pk=csv.pk, status='down', message='')
status.save()
assert csv.down()
resp = app.get(csv.get_absolute_url())
resp = resp.click('Availability check')
resp.form['run_check'] = False
resp = resp.form.submit()
# Connector is put back up
assert not csv.availability_parameters.run_check
assert not csv.down()
status = csv.get_availability_status()
assert status.status == 'up'
# Notification delays
resp = app.get(csv.get_absolute_url())
resp = resp.click('Availability check')
assert resp.form['notification_delays'].value == '0'
resp.form['notification_delays'] = '0,5,100'
resp = resp.form.submit().follow()
assert not resp.pyquery('.messages .warning')
assert csv.availability_parameters.notification_delays == '0,5,100'
resp = app.get(csv.get_absolute_url())
resp = resp.click('Availability check')
resp.form['notification_delays'] = 'x'
resp = resp.form.submit()
assert len(resp.pyquery('#id_notification_delays_p .error'))
def test_jobs(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
api = ApiUser.objects.create(
username='public', fullname='public', description='access for all', keytype='', key=''
)
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk,
)
app = login(app)
resp = app.get(csv.get_absolute_url())
assert 'jobs' not in resp.text
csv.add_job('sample_job')
resp = app.get(csv.get_absolute_url())
assert 'jobs' in resp.text
assert 'sample_job' in resp.text
resp = resp.click('Full page', index=1)
assert resp.text.count('<tr data-pk') == 1
assert resp.text.count('sample_job') == 1
resp.form['q'] = 'sample'
resp = resp.form.submit()
assert resp.text.count('<tr data-pk') == 1
resp.form['q'] = 'blah'
resp = resp.form.submit()
assert resp.text.count('<tr data-pk') == 0
resp.form['q'] = datetime.date.today().strftime('%d/%m/%Y')
resp = resp.form.submit()
assert resp.text.count('<tr data-pk') == 1
resp.form['q'] = datetime.date.today().strftime('%d/%m/2010')
resp = resp.form.submit()
assert resp.text.count('<tr data-pk') == 0
resp.form['q'] = ''
resp = resp.form.submit()
assert resp.text.count('<tr data-pk') == 1
job_pk = re.findall(r'data-pk="(.*)"', resp.text)[0]
base_url = re.findall(r'data-job-base-url="(.*)"', resp.text)[0]
resp = app.get(base_url + job_pk + '/')
resp = app.get(base_url + '12345' + '/', status=404)
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=%s' % job_pk)
assert 'job_target' in resp.context
# unknown id
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=0')
assert 'job_target' not in resp.context
# bad id
resp = app.get('/manage/csvdatasource/test/jobs/?job_id=foo')
assert 'job_target' not in resp.context
def test_job_restart(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
app = login(app)
# unknown job
app.get('/manage/csvdatasource/test/jobs/0/restart/', status=404)
app.get('/manage/csvdatasource/test/jobs/foo/restart/', status=404)
# create a job, with wrong status
job = Job.objects.create(resource=csv, status='registered')
app.get('/manage/csvdatasource/test/jobs/%s/restart/' % job.pk, status=404)
job.status = 'failed'
job.done_timestamp = now()
job.status_details = {'error_summary': 'foo bar'}
job.save()
# unknown connector
app.get('/manage/csvdatasourc/test/jobs/%s/restart/' % job.pk, status=404)
app.get('/manage/csvdatasource/teste/jobs/%s/restart/' % job.pk, status=404)
# ok, restart job
resp = app.get('/manage/csvdatasource/test/jobs/%s/restart/' % job.pk, status=302)
new_job = Job.objects.latest('pk')
assert resp.location.endswith('/manage/csvdatasource/test/jobs/')
assert new_job.status == 'registered'
assert new_job.done_timestamp is None
assert new_job.status_details == {}
job.refresh_from_db()
assert job.status == 'restarted'
assert job.status_details == {'error_summary': 'foo bar', 'new_job_pk': new_job.pk}
resp = app.get('/manage/csvdatasource/test/jobs/%s/' % job.pk)
assert '/manage/csvdatasource/test/jobs/?job_id=%s' % new_job.pk in resp
def test_manager_import_export(app, admin_user):
data = StringIO('1;Foo\n2;Bar\n3;Baz')
csv = CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test',
title='a title',
description='a description',
)
CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test2',
title='a title',
description='a description',
)
api = ApiUser.objects.create(
username='public', fullname='public', description='access for all', keytype='', key=''
)
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk,
)
# cannot export if not connected
resp = app.get('/%s/%s/' % (csv.get_connector_slug(), csv.slug), status=200)
assert 'Export' not in resp.html.find('ul', {'class': 'extra-actions-menu'}).text
# export site
app = login(app)
resp = app.get('/manage/')
with freezegun.freeze_time('2020-09-01'):
resp = resp.click('Export')
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_connectors_20200901.json"'
site_export = resp.text
# invalid json
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', b'garbage', 'application/json')
resp = resp.form.submit()
assert 'File is not in the expected JSON format.' in resp.text
# empty json
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', b'{}', 'application/json')
resp = resp.form.submit().follow()
assert CsvDataSource.objects.count() == 2
# unknown connectors
export = {
'resources': [
{'@type': 'passerelle-resource', 'resource_type': 'not_installed_app1.app_model'},
{'@type': 'passerelle-resource', 'resource_type': 'not_installed_app2.app_model'},
]
}
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', force_bytes(json.dumps(export)), 'application/json')
resp = resp.form.submit()
assert (
resp.html.find('ul', {'class': 'errorlist'}).li.text
== 'Unknown connectors: not_installed_app1, not_installed_app2'
)
# import site
CsvDataSource.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert CsvDataSource.objects.count() == 2
# export connector
resp = app.get('/%s/%s/' % (csv.get_connector_slug(), csv.slug), status=200)
assert 'Export' in resp.html.find('ul', {'class': 'extra-actions-menu'}).text
with freezegun.freeze_time('2020-09-01'):
resp = resp.click('Export')
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_%s_%s_20200901.json"' % (
csv.get_connector_slug(),
csv.slug,
)
connector_export = resp.text
# import connector
csv.delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', connector_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert CsvDataSource.objects.count() == 2
assert CsvDataSource.objects.filter(slug='test').exists()
# import users
ApiUser.objects.all().delete()
AccessRight.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert not ApiUser.objects.exists()
assert not AccessRight.objects.exists()
resp = resp.click('Import')
resp.form['import_users'] = True
resp.form['site_json'] = Upload('export.json', site_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert ApiUser.objects.filter(username='public').exists()
assert AccessRight.objects.filter(codename='can_access').exists()
def test_manager_open_access_information(app, admin_user):
photon = Photon.objects.create(slug='t')
app = login(app)
resp = app.get(photon.get_absolute_url())
assert resp.html.find('div', {'id': 'panel-security'}).div.text.strip() == 'Access is open.'
def test_manager_add_open_access_warning(app, admin_user):
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
private = ApiUser.objects.create(username='private', fullname='private', keytype='', key='xxx')
public = ApiUser.objects.create(username='public', fullname='private', keytype='', key='')
assert AccessRight.objects.count() == 0
# adding private api user works
app = login(app)
resp = app.get(csv.get_absolute_url())
assert 'Access is limited' in resp.html.find('div', {'id': 'panel-security'}).div.text.strip()
resp = resp.click('Add', href='/can_access/')
resp.form['apiuser'] = private.pk
resp = resp.form.submit().follow()
assert AccessRight.objects.count() == 1
# adding public user displays a warning
resp = resp.click('Add', href='/can_access/')
resp.form['apiuser'] = public.pk
resp = resp.form.submit()
assert AccessRight.objects.count() == 1
assert 'user has no security' in resp.text
resp = resp.form.submit()
assert AccessRight.objects.count() == 1
assert 'user has no security' in resp.text
# user has to check a box to procceed
resp.form['confirm_open_access'] = True
resp.form.submit().follow()
assert AccessRight.objects.count() == 2
def test_manager_open_access_flag(app, admin_user):
csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t')
api = ApiUser.objects.create(username='test', fullname='test', keytype='', key='test')
app = login(app)
resp = app.get('/manage/', status=200)
assert len(resp.pyquery('li.connector')) == 1
assert 'open access' not in resp.text
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk)
resp = app.get('/manage/', status=200)
assert 'open access' not in resp.text
api.key = ''
api.save()
resp = app.get('/manage/', status=200)
assert 'open access' in resp.text
class TestRequestsSubstitutionsDisplay:
@pytest.fixture
def setup(self, settings):
settings.CONNECTORS_SETTINGS = {
'photon/t': {
'requests_substitutions': [
{
'search': 'abcd',
'replace': 'efgh',
}
]
}
}
photon = Photon.objects.create(slug='t')
return {'url': photon.get_absolute_url()}
def test_shown_when_logged(self, setup, app, admin_user):
app = login(app)
resp = app.get(setup['url'])
paragraph = str(resp.pyquery('#requests-substitutions'))
assert 'Requests substitutions' in paragraph
assert 'URL:' not in paragraph
assert 'Search: <code>abcd</code>' in paragraph
assert 'Replace: <code>efgh</code>' in paragraph
def test_absent_when_unlogged(self, setup, app):
resp = app.get(setup['url'])
paragraph = str(resp.pyquery('#requests-substitutions'))
assert 'Requests substitutions' not in paragraph
def test_url(self, setup, app, admin_user, settings):
settings.CONNECTORS_SETTINGS['photon/t']['requests_substitutions'][0]['url'] = 'https://example.com/'
app = login(app)
resp = app.get(setup['url'])
paragraph = str(resp.pyquery('#requests-substitutions'))
assert 'URL: https://example.com/' in paragraph