import datetime import json import re import uuid from io import StringIO import freezegun import pytest from django.contrib.contenttypes.models import ContentType from django.core.files import File from django.utils.encoding import force_bytes from django.utils.timezone import now from webtest import Upload from passerelle.apps.csvdatasource.models import CsvDataSource, Query from passerelle.apps.photon.models import Photon from passerelle.base.models import AccessRight, ApiUser, Job, ResourceLog, ResourceStatus pytestmark = pytest.mark.django_db def login(app, username='admin', password='admin'): login_page = app.get('/login/') login_form = login_page.forms[0] login_form['username'] = username login_form['password'] = password resp = login_form.submit() assert resp.status_int == 302 return app def test_homepage_redirect(app): assert app.get('/', status=302).location.endswith('/manage/') def test_unlogged_access(app): # connect while not being logged in assert app.get('/manage/', status=302).location.endswith('/login/?next=/manage/') def test_simple_user_access(app, simple_user): # connect while being logged as a simple user app = login(app, username='user', password='user') assert app.get('/manage/', status=403) assert app.get('/manage/add', status=403) assert app.get('/manage/ovh/add', status=403) assert app.get('/manage/access/', status=403) def test_access(app, admin_user): app = login(app) resp = app.get('/manage/', status=200) assert 'Add Connector' in resp.text assert app.get('/manage/access/', status=200) def test_add_connector(app, admin_user): app = login(app) resp = app.get('/manage/', status=200) resp = resp.click('Add Connector') assert 'Business Process Connectors' in resp.text assert 'Geographic information system' in resp.text resp = resp.click('Base Adresse Web Service') resp.forms[0]['title'] = 'Test Connector' resp.forms[0]['slug'] = 'test-connector' resp.forms[0]['description'] = 'Connector for a simple test' resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/' resp = resp.forms[0].submit() assert resp.status_int == 302 assert resp.location.endswith('/base-adresse/test-connector/') resp = resp.follow() assert 'Base Adresse Web Service - Test Connector' in resp.text resp = app.get('/manage/', status=200) assert 'Test Connector' in resp.text def test_add_connector_unique_slug(app, admin_user): app = login(app) resp = app.get('/manage/', status=200) resp = resp.click('Add Connector') resp = resp.click('Base Adresse Web Service') assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug' resp.forms[0]['title'] = 'Test Connector' resp.forms[0]['slug'] = 'test-connector' resp.forms[0]['description'] = 'Connector for a simple test' resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/' resp1 = resp.forms[0].submit() assert resp1.status_int == 302 resp2 = resp.forms[0].submit() assert 'There were errors processing your form.' in resp2.text assert 'this Identifier already exists.' in resp2.text resp.forms[0]['slug'] = 'foo' resp2 = resp.forms[0].submit() assert resp2.status_int == 302 def test_edit_connector(app, admin_user): Photon.objects.create( slug='test-connector', title='Test Connector', description='Connector for a simple test', service_url='https://example.org/', ) app = login(app) resp = app.get('/manage/photon/test-connector/edit', status=200) assert 'Test Connector' in resp.text resp.forms[0]['title'] = 'Connector updated' resp.forms[0]['description'] = 'simple test' resp.forms[0]['service_url'] = 'https://example.net/' resp = resp.forms[0].submit() assert resp.status_int == 302 assert resp.location.endswith('/photon/test-connector/') resp = resp.follow() assert 'Connector updated' in resp.text resp = app.get('/manage/photon/test-connector/edit', status=200) resp.forms[0]['slug'] = 'test' resp = resp.forms[0].submit() assert resp.status_int == 302 assert resp.location.endswith('/photon/test/') def test_edit_connector_unique_slug(app, admin_user): Photon.objects.create( slug='test-connector', title='Test Connector', description='Connector for a simple test', service_url='https://example.org/', ) Photon.objects.create( slug='other-connector', title='Other Connector', description='Connector for another test', service_url='https://example.org/', ) app = login(app) resp = app.get('/manage/photon/test-connector/edit', status=200) assert 'Test Connector' in resp.text assert resp.html.find('input', {'name': 'title'}).attrs['data-slug-sync'] == 'slug' resp.forms[0]['slug'] = 'other-connector' resp = resp.forms[0].submit() assert resp.status_int == 200 assert 'Photon with this Identifier already exists.' in resp.text # if title and slug name are not in sync, disable prepopulate slug by JS resp.forms[0]['slug'] = 'another-connector' resp = resp.forms[0].submit() resp = app.get('/manage/photon/another-connector/edit', status=200) assert 'data-slug-sync' not in resp.html.find('input', {'name': 'title'}).attrs def test_visit_connectors(app, admin_user): app = login(app) resp = app.get('/manage/', status=200) resp = resp.click('Add Connector') for link in re.findall('href="(/manage.*add)"', resp.text): resp = app.get(link, status=200) def test_access_management(app, admin_user): assert ApiUser.objects.count() == 0 app = login(app) resp = app.get('/manage/', status=200) resp = resp.click('Access Management') resp = resp.click('Add API User') resp.form['username'] = 'foo' resp.form['fullname'] = 'Foo' resp = resp.form.submit().follow() assert ApiUser.objects.count() == 1 assert ApiUser.objects.get(username='foo').fullname == 'Foo' resp = resp.click('Add API User') resp.form['username'] = 'bar' resp.form['fullname'] = 'Bar' resp.form['keytype'] = 'API' resp = resp.form.submit() assert 'Key can not be empty' in resp.text resp = resp.click('Add API User') resp.form['username'] = 'bar' resp.form['fullname'] = 'Bar' resp.form['keytype'] = 'SIGN' resp.form['key'] = '123' resp = resp.form.submit() assert 'Signature Key length must be at least 256 bits.' in resp.text def test_menu_json(app, admin_user): app.get('/manage/menu.json', status=302) app = login(app) resp = app.get('/manage/menu.json') assert resp.headers['content-type'] == 'application/json' assert resp.json[0]['label'] == 'Web Services' resp = app.get('/manage/menu.json?callback=FooBar') assert resp.headers['content-type'] == 'application/javascript' assert resp.content.startswith(b'FooBar([{"') def test_logs(app, admin_user): data = StringIO('1;Foo\n2;Bar\n3;Baz') csv = CsvDataSource.objects.create( csv_file=File(data, 't.csv'), columns_keynames='id, text', slug='test', title='a title', description='a description', ) query = Query(slug='fooba', resource=csv, structure='array') query.projections = '\n'.join(['id:int(id)', 'text:text']) query.save() api = ApiUser.objects.create( username='public', fullname='public', description='access for all', keytype='', key='' ) obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk, ) app = login(app) resp = app.get(csv.get_absolute_url()) assert '

No records found

' in resp.text app.get('/csvdatasource/test/query/foobar/') resp = app.get(csv.get_absolute_url()) assert 'endpoint GET /csvdatasource/test/query/foobar/ ' in resp.text app.get('/csvdatasource/test/query/foobar/?q=toto') resp = app.get(csv.get_absolute_url()) assert 'endpoint GET /csvdatasource/test/query/foobar/?q=toto' in resp.text resp = resp.click('Full page') assert resp.text.count('') == 4 assert resp.text.count('Error occurred while processing request') == 2 resp.form['q'] = 'toto' resp = resp.form.submit() assert resp.text.count('') == 1 resp.form['q'] = datetime.date.today().strftime('%d/%m/%Y') resp = resp.form.submit() assert resp.text.count('') == 4 resp.form['q'] = datetime.date.today().strftime('%d/%m/2010') resp = resp.form.submit() assert resp.text.count('') == 0 resp.form['q'] = 'Wed, 29 Sep 2021 00:40:48 +0200' resp = resp.form.submit() assert resp.text.count('') == 0 resp.form['q'] = '' resp = resp.form.submit() assert resp.text.count('') == 4 log_pk = re.findall(r'data-pk="(.*)"', resp.text)[0] base_url = re.findall(r'data-log-base-url="(.*)"', resp.text)[0] resp = app.get(base_url + log_pk + '/') resp = resp.click('Search for logs from the same call') assert resp.text.count('') == 2 resp = app.get(base_url + '12345' + '/', status=404) resp = app.get('/manage/csvdatasource/test/logs/?log_id=%s' % log_pk) assert 'log_target' in resp.context # unknown id resp = app.get('/manage/csvdatasource/test/logs/?log_id=0') assert 'log_target' not in resp.context # bad id resp = app.get('/manage/csvdatasource/test/logs/?log_id=foo') assert 'log_target' not in resp.context resp = app.get('/manage/csvdatasource/test/logs/?log_level=') assert resp.text.count('') == 4 assert resp.text.count('level-warning') == 2 assert resp.text.count('level-info') == 2 resp = app.get('/manage/csvdatasource/test/logs/?log_level=INFO') assert resp.text.count('') == 2 assert resp.text.count('level-warning') == 0 assert resp.text.count('level-info') == 2 resp = app.get('/manage/csvdatasource/test/logs/?log_level=ERROR') assert resp.text.count('') == 0 assert resp.text.count('level-warning') == 0 assert resp.text.count('level-info') == 0 resp = app.get('/manage/csvdatasource/test/logs/?log_level=foobar') assert resp.text.count('') == 4 assert resp.text.count('level-warning') == 2 assert resp.text.count('level-info') == 2 with freezegun.freeze_time('2020-10-06 14:08:12'): log1 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=50, message='hop' ) log2 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=42, message='hop' ) resp = app.get('/manage/csvdatasource/test/logs/%s/' % log1.pk) assert 'title="Critical - Oct. 6, 2020 14:08:12"' in resp.text resp = app.get('/manage/csvdatasource/test/logs/%s/' % log2.pk) assert 'title="Level 42 - Oct. 6, 2020 14:08:12"' in resp.text def test_logs_tab_pagination(app, admin_user): data = StringIO('1;Foo\n2;Bar\n3;Baz') csv = CsvDataSource.objects.create( csv_file=File(data, 't.csv'), columns_keynames='id, text', slug='test', title='a title', description='a description', ) query = Query(slug='fooba', resource=csv, structure='array') query.projections = '\n'.join(['id:int(id)', 'text:text']) query.save() api = ApiUser.objects.create( username='public', fullname='public', description='access for all', keytype='', key='' ) obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk, ) app = login(app) resp = app.get(csv.get_absolute_url()) assert '

No records found

' in resp.text for i in range(20): app.get('/csvdatasource/test/query/foobar/?q=toto%s' % i) resp = app.get(csv.get_absolute_url()) assert resp.pyquery('#panel-logs[hidden]') assert resp.pyquery('#panel-endpoints:not([hidden])') assert resp.pyquery('#table-logs tr').length == 10 assert resp.pyquery('#panel-logs .paginator a:first').text() == '2' resp = resp.click('2') assert resp.pyquery('#panel-logs:not([hidden])') assert resp.pyquery('#panel-endpoints[hidden]') assert resp.pyquery('#table-logs tr').length == 10 resp.pyquery('#panel-logs .paginator a:first').text() assert resp.pyquery('#panel-logs .paginator a:first').text() == '1' def test_logs_search(app, admin_user): csv = CsvDataSource.objects.create(csv_file=File(StringIO('1;t\n'), 't.csv'), slug='t', title='t') app = login(app) transaction_id = str(uuid.uuid4()) looks_like_transaction_id = str(uuid.uuid4()) log1 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message='hop', transaction_id=transaction_id, extra={'transaction_id': transaction_id, 'foo': 'plop'}, ) log2 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message='plop', transaction_id=transaction_id, extra={'bar': 'hop'}, ) log3 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message='foo', extra={'bar': 'hop'} ) log4 = ResourceLog.objects.create( appname=csv.get_connector_slug(), slug=csv.slug, levelno=1, message=looks_like_transaction_id ) resp = app.get('/manage/csvdatasource/t/logs/') assert list(resp.context['page_obj'].object_list) == [log4, log3, log2, log1] resp.form['q'] = transaction_id resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [log2, log1] resp.form['q'] = looks_like_transaction_id resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [log4] resp.form['q'] = str(uuid.uuid4()) resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [] resp.form['q'] = 'not there' resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [] resp.form['q'] = 'hop' resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [log3, log2, log1] resp.form['q'] = 'plop' resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [log2, log1] resp.form['q'] = 'foo' resp = resp.form.submit() assert list(resp.context['page_obj'].object_list) == [log3, log1] def test_logging_parameters(app, admin_user): data = StringIO('1;Foo\n2;Bar\n3;Baz') csv = CsvDataSource.objects.create( csv_file=File(data, 't.csv'), columns_keynames='id, text', slug='test', title='a title', description='a description', ) app = login(app) resp = app.get(csv.get_absolute_url()) resp = resp.click('Logging parameters') resp.form['log_level'] = 'ERROR' resp = resp.form.submit() assert CsvDataSource.objects.get(id=csv.id).log_level == 'ERROR' resp = app.get(csv.get_absolute_url()) resp = resp.click('Logging parameters') resp.form['log_level'] = 'DEBUG' resp = resp.form.submit() assert CsvDataSource.objects.get(id=csv.id).log_level == 'DEBUG' resp = app.get(csv.get_absolute_url()) resp = resp.click('Logging parameters') resp.form['trace_emails'] = 'fred@localhost' resp = resp.form.submit() assert CsvDataSource.objects.get(id=csv.id).logging_parameters.trace_emails == 'fred@localhost' resp = app.get(csv.get_absolute_url()) resp = resp.click('Logging parameters') assert resp.form['trace_emails'].value == 'fred@localhost' def test_availability_parameters(app, admin_user, monkeypatch): data = StringIO('1;Foo\n2;Bar\n3;Baz') csv = CsvDataSource.objects.create( csv_file=File(data, 't.csv'), columns_keynames='id, text', slug='test', title='a title', description='a description', ) app = login(app) resp = app.get(csv.get_absolute_url()) assert csv.availability_parameters.run_check # csv connector has the default check_status which does nothing # so availability check is hidden assert 'Availability check' not in resp.text def check_status(*args, **kwargs): return True monkeypatch.setattr(CsvDataSource, 'check_status', check_status) resp = app.get(csv.get_absolute_url()) assert 'Availability check' in resp.text resp = resp.click('Availability check') resp.form['run_check'] = False resp = resp.form.submit() # Connector status not changed, availability parameters changed assert not csv.availability_parameters.run_check resp = app.get(csv.get_absolute_url()) resp = resp.click('Availability check') resp.form['run_check'] = True resp = resp.form.submit() # Connector down resource_type = ContentType.objects.get_for_model(csv) status = ResourceStatus(resource_type=resource_type, resource_pk=csv.pk, status='down', message='') status.save() assert csv.down() resp = app.get(csv.get_absolute_url()) resp = resp.click('Availability check') resp.form['run_check'] = False resp = resp.form.submit() # Connector is put back up assert not csv.availability_parameters.run_check assert not csv.down() status = csv.get_availability_status() assert status.status == 'up' # Notification delays resp = app.get(csv.get_absolute_url()) resp = resp.click('Availability check') assert resp.form['notification_delays'].value == '0' resp.form['notification_delays'] = '0,5,100' resp = resp.form.submit().follow() assert not resp.pyquery('.messages .warning') assert csv.availability_parameters.notification_delays == '0,5,100' resp = app.get(csv.get_absolute_url()) resp = resp.click('Availability check') resp.form['notification_delays'] = 'x' resp = resp.form.submit() assert len(resp.pyquery('#id_notification_delays_p .error')) def test_jobs(app, admin_user): data = StringIO('1;Foo\n2;Bar\n3;Baz') csv = CsvDataSource.objects.create( csv_file=File(data, 't.csv'), columns_keynames='id, text', slug='test', title='a title', description='a description', ) api = ApiUser.objects.create( username='public', fullname='public', description='access for all', keytype='', key='' ) obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk, ) app = login(app) resp = app.get(csv.get_absolute_url()) assert 'jobs' not in resp.text csv.add_job('sample_job') resp = app.get(csv.get_absolute_url()) assert 'jobs' in resp.text assert 'sample_job' in resp.text resp = resp.click('Full page', index=1) assert resp.text.count('abcd' in paragraph assert 'Replace: efgh' in paragraph def test_absent_when_unlogged(self, setup, app): resp = app.get(setup['url']) paragraph = str(resp.pyquery('#requests-substitutions')) assert 'Requests substitutions' not in paragraph def test_url(self, setup, app, admin_user, settings): settings.CONNECTORS_SETTINGS['photon/t']['requests_substitutions'][0]['url'] = 'https://example.com/' app = login(app) resp = app.get(setup['url']) paragraph = str(resp.pyquery('#requests-substitutions')) assert 'URL: https://example.com/' in paragraph