413 lines
14 KiB
Python
413 lines
14 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from passerelle.apps.qrcode.models import Certificate, Event, QRCodeConnector, Reader
|
|
from tests.utils import generic_endpoint_url, setup_access_rights
|
|
|
|
|
|
@pytest.fixture()
|
|
def connector(db):
|
|
return setup_access_rights(
|
|
QRCodeConnector.objects.create(
|
|
slug='test',
|
|
key='5e8176e50d45b67e9db875d6006edf3ba805ff4ef4d945327012db4c797be1be',
|
|
)
|
|
)
|
|
|
|
|
|
def test_save_certificate(app, connector):
|
|
endpoint = generic_endpoint_url('qrcode', 'save-certificate', slug=connector.slug)
|
|
|
|
result = app.post_json(
|
|
endpoint,
|
|
params={
|
|
'data': {
|
|
'first_name': 'Georges',
|
|
'last_name': 'Abitbol',
|
|
},
|
|
'metadata': {'puissance_intellectuelle': 'BAC +2'},
|
|
'validity_start': '2022-01-01 10:00:00+00:00',
|
|
'validity_end': '2023-01-01 10:00:00+00:00',
|
|
},
|
|
)
|
|
|
|
assert result.json['err'] == 0
|
|
|
|
certificate_uuid = result.json['data']['uuid']
|
|
assert result.json['data']['qrcode_url'] == f'http://testserver/qrcode/test/get-qrcode/{certificate_uuid}'
|
|
certificate = connector.certificates.get(uuid=certificate_uuid)
|
|
|
|
assert certificate.data['first_name'] == 'Georges'
|
|
assert certificate.data['last_name'] == 'Abitbol'
|
|
|
|
assert certificate.metadata['puissance_intellectuelle'] == 'BAC +2'
|
|
assert certificate.validity_start == datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert certificate.validity_end == datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
|
|
result = app.post_json(
|
|
f'{endpoint}/{certificate_uuid}',
|
|
params={
|
|
'data': {
|
|
'first_name': 'Robert',
|
|
'last_name': 'Redford',
|
|
},
|
|
'validity_start': '2024-01-01T10:00:00+00:00',
|
|
'validity_end': '2025-01-01T10:00:00+00:00',
|
|
},
|
|
)
|
|
|
|
certificate.refresh_from_db()
|
|
assert certificate.data['first_name'] == 'Robert'
|
|
assert certificate.data['last_name'] == 'Redford'
|
|
assert certificate.validity_start == datetime(2024, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert certificate.validity_end == datetime(2025, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
def test_get_certificate(app, connector):
|
|
certificate = connector.certificates.create(
|
|
data={
|
|
'first_name': 'Georges',
|
|
'last_name': 'Abitbol',
|
|
},
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
|
|
endpoint = generic_endpoint_url('qrcode', 'get-certificate', slug=connector.slug)
|
|
result = app.get(f'{endpoint}/{certificate.uuid}')
|
|
|
|
assert result.json == {
|
|
'err': 0,
|
|
'data': {
|
|
'uuid': str(certificate.uuid),
|
|
'data': {'first_name': 'Georges', 'last_name': 'Abitbol'},
|
|
'validity_start': '2022-01-01T10:00:00+00:00',
|
|
'validity_end': '2023-01-01T10:00:00+00:00',
|
|
'qrcode_url': f'http://testserver/qrcode/test/get-qrcode/{certificate.uuid}',
|
|
},
|
|
}
|
|
|
|
|
|
def test_get_qrcode(app, connector):
|
|
certificate = connector.certificates.create(
|
|
uuid=uuid.UUID('12345678-1234-5678-1234-567812345678'),
|
|
data={
|
|
'first_name': 'Georges',
|
|
'last_name': 'Abitbol',
|
|
},
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
endpoint = generic_endpoint_url('qrcode', 'get-qrcode', slug=connector.slug)
|
|
|
|
response = app.get(f'{endpoint}/{certificate.uuid}')
|
|
assert response.headers['Content-Type'] == 'image/png'
|
|
with open('tests/data/qrcode/test-qrcode.png', 'rb') as expected_qrcode:
|
|
# just check images are the same. Decoded content is tested javascript-side.
|
|
assert response.body == expected_qrcode.read()
|
|
|
|
|
|
def test_save_reader(app, connector):
|
|
endpoint = generic_endpoint_url('qrcode', 'save-reader', slug=connector.slug)
|
|
|
|
result = app.post_json(
|
|
endpoint,
|
|
params={
|
|
'validity_start': '2022-01-01 10:00:00+00:00',
|
|
'validity_end': '2023-01-01 10:00:00+00:00',
|
|
'readable_metadatas': 'name,last_name',
|
|
'enable_tallying': True,
|
|
},
|
|
)
|
|
|
|
assert result.json['err'] == 0
|
|
|
|
reader_uuid = result.json['data']['uuid']
|
|
assert result.json['data']['url'] == f'http://testserver/qrcode/test/open-reader/{reader_uuid}'
|
|
reader = connector.readers.get(uuid=reader_uuid)
|
|
|
|
assert reader.readable_metadatas == 'name,last_name'
|
|
assert reader.validity_start == datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert reader.validity_end == datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert reader.tally
|
|
|
|
result = app.post_json(
|
|
f'{endpoint}/{reader_uuid}',
|
|
params={
|
|
'validity_start': '2024-01-01T10:00:00+00:00',
|
|
'validity_end': '2025-01-01T10:00:00+00:00',
|
|
},
|
|
)
|
|
|
|
reader.refresh_from_db()
|
|
assert reader.validity_start == datetime(2024, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert reader.validity_end == datetime(2025, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc)
|
|
assert not reader.tally
|
|
|
|
|
|
def test_get_reader(app, connector):
|
|
reader = connector.readers.create(
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
|
|
endpoint = generic_endpoint_url('qrcode', 'get-reader', slug=connector.slug)
|
|
result = app.get(f'{endpoint}/{reader.uuid}')
|
|
|
|
assert result.json == {
|
|
'err': 0,
|
|
'data': {
|
|
'uuid': str(reader.uuid),
|
|
'validity_start': '2022-01-01T10:00:00+00:00',
|
|
'validity_end': '2023-01-01T10:00:00+00:00',
|
|
'url': f'http://testserver/qrcode/test/open-reader/{reader.uuid}',
|
|
},
|
|
}
|
|
|
|
|
|
def test_open_reader(app, connector, freezer):
|
|
reader = connector.readers.create(
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
|
|
endpoint = generic_endpoint_url('qrcode', 'open-reader', slug=connector.slug)
|
|
freezer.move_to('2022-01-01T09:59:59')
|
|
result = app.get(f'{endpoint}/{reader.uuid}')
|
|
|
|
assert 'Reader isn\'t usable yet' in result.body.decode('utf-8')
|
|
|
|
freezer.move_to('2022-01-01T10:00:00')
|
|
result = app.get(f'{endpoint}/{reader.uuid}')
|
|
|
|
assert result.pyquery(f'qrcode-reader[verify-key="{connector.hex_verify_key}"]')
|
|
assert not result.pyquery('qrcode-reader[tally-url]')
|
|
|
|
reader.tally = True
|
|
reader.save()
|
|
result = app.get(f'{endpoint}/{reader.uuid}')
|
|
|
|
assert result.pyquery(f'qrcode-reader[verify-key="{connector.hex_verify_key}"][tally-url]')
|
|
|
|
freezer.move_to('2023-01-01T10:00:01')
|
|
result = app.get(f'{endpoint}/{reader.uuid}')
|
|
|
|
assert 'Reader has expired.' in result.body.decode('utf-8')
|
|
|
|
|
|
def test_read_metadata(app, connector, freezer):
|
|
reader = connector.readers.create(
|
|
readable_metadatas='first_name,puissance_intellectuelle',
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
|
|
certificate = connector.certificates.create(
|
|
uuid=uuid.UUID('12345678-1234-5678-1234-567812345678'),
|
|
metadata={'first_name': 'Georges', 'last_name': 'Abitbol', 'puissance_intellectuelle': 'BAC +2'},
|
|
validity_start=datetime(2022, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
validity_end=datetime(2023, 1, 1, 10, 0, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
|
|
endpoint = generic_endpoint_url('qrcode', 'read-metadata', slug=connector.slug)
|
|
freezer.move_to('2022-01-01T09:59:59')
|
|
result = app.get(f'{endpoint}/{reader.uuid}', params={'certificate': certificate.uuid})
|
|
|
|
assert result.json['err'] == 1
|
|
assert result.json['err_desc'] == 'Reader isn\'t usable yet.'
|
|
|
|
freezer.move_to('2022-01-01T10:00:00')
|
|
result = app.get(f'{endpoint}/{reader.uuid}', params={'certificate': certificate.uuid})
|
|
|
|
assert result.json['err'] == 0
|
|
assert result.json['data'] == {'first_name': 'Georges', 'puissance_intellectuelle': 'BAC +2'}
|
|
|
|
freezer.move_to('2023-01-01T10:00:01')
|
|
result = app.get(f'{endpoint}/{reader.uuid}', params={'certificate': certificate.uuid})
|
|
|
|
assert result.json['err'] == 1
|
|
assert result.json['err_desc'] == 'Reader has expired.'
|
|
|
|
|
|
def test_tally_query_events(app, connector, freezer):
|
|
first_reader = connector.readers.create(tally=True)
|
|
second_reader = connector.readers.create(tally=True)
|
|
first_certificate = connector.certificates.create()
|
|
second_certificate = connector.certificates.create()
|
|
|
|
freezer.move_to('2000-01-01T11:59:59')
|
|
yesterday = datetime.now(timezone.utc)
|
|
Event.objects.create(
|
|
reader=first_reader,
|
|
certificate=first_certificate,
|
|
happened=yesterday - timedelta(minutes=1),
|
|
)
|
|
|
|
freezer.move_to('2000-01-02T11:00:00')
|
|
an_hour_ago = datetime.now(timezone.utc)
|
|
Event.objects.create(
|
|
reader=second_reader,
|
|
certificate=second_certificate,
|
|
# Far in the past to check that date taken into account is the received
|
|
# date, not the happened one.
|
|
happened=an_hour_ago - timedelta(days=1),
|
|
)
|
|
|
|
freezer.move_to('2000-01-02T12:00:00')
|
|
now = datetime.now(timezone.utc)
|
|
endpoint = generic_endpoint_url('qrcode', 'tally', slug=connector.slug)
|
|
result = app.post_json(
|
|
f'{endpoint}/{first_reader.uuid}',
|
|
params={'since': 0, 'events': []},
|
|
)
|
|
assert result.json['err'] == 0
|
|
assert result.json['data'] == {
|
|
'timestamp': int(datetime.timestamp(now)),
|
|
'stamps': {str(second_certificate.uuid): 'ok'},
|
|
}
|
|
|
|
result = app.post_json(
|
|
f'{endpoint}/{first_reader.uuid}',
|
|
params={
|
|
'since': int(datetime.timestamp(yesterday)),
|
|
},
|
|
)
|
|
|
|
assert result.json['err'] == 0
|
|
assert result.json['data'] == {
|
|
'timestamp': int(datetime.timestamp(now)),
|
|
'stamps': {
|
|
str(first_certificate.uuid): 'ok',
|
|
str(second_certificate.uuid): 'ok',
|
|
},
|
|
}
|
|
|
|
|
|
def test_tally_save_events(app, connector, freezer):
|
|
freezer.move_to('2023-01-01T10:00:01')
|
|
|
|
reader = connector.readers.create(tally=True)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
a_minute_ago = now - timedelta(minutes=1)
|
|
|
|
certificate = connector.certificates.create()
|
|
stamped_certificate = connector.certificates.create()
|
|
|
|
Event.objects.create(
|
|
reader=reader,
|
|
certificate=stamped_certificate,
|
|
happened=a_minute_ago,
|
|
)
|
|
|
|
endpoint = generic_endpoint_url('qrcode', 'tally', slug=connector.slug)
|
|
result = app.post_json(
|
|
f'{endpoint}/{reader.uuid}',
|
|
params={
|
|
'since': 0,
|
|
'events': [
|
|
{
|
|
'certificate': str(certificate.uuid),
|
|
'timestamp': datetime.timestamp(a_minute_ago),
|
|
},
|
|
{
|
|
'certificate': 'deadbeef-0000-0000-0000-000000000000',
|
|
'timestamp': datetime.timestamp(a_minute_ago),
|
|
},
|
|
{
|
|
'certificate': str(stamped_certificate.uuid),
|
|
'timestamp': datetime.timestamp(now),
|
|
},
|
|
],
|
|
},
|
|
)
|
|
|
|
assert result.json['data'] == {
|
|
'timestamp': int(datetime.timestamp(now)),
|
|
'stamps': {
|
|
str(certificate.uuid): 'ok',
|
|
str(stamped_certificate.uuid): 'duplicate',
|
|
},
|
|
}
|
|
|
|
events = list(certificate.events.all())
|
|
assert len(events) == 1
|
|
assert events[0].reader == reader
|
|
assert events[0].certificate == certificate
|
|
assert events[0].happened == a_minute_ago
|
|
assert events[0].received == now
|
|
|
|
events = list(stamped_certificate.events.all())
|
|
assert len(events) == 1
|
|
|
|
|
|
MISSING = object()
|
|
|
|
|
|
@pytest.mark.parametrize('value', [MISSING, None, ''], ids=['missing', 'null', 'empty string'])
|
|
class TestOptional:
|
|
def test_certificate_validity_start(self, value, app, connector):
|
|
params = {
|
|
'data': {
|
|
'first_name': 'Georges',
|
|
'last_name': 'Abitbol',
|
|
},
|
|
'validity_end': '2023-01-01 10:00:00+00:00',
|
|
}
|
|
if value is not MISSING:
|
|
params['validity_start'] = value
|
|
|
|
app.post_json('/qrcode/test/save-certificate/', params=params)
|
|
assert Certificate.objects.get().validity_start is None
|
|
|
|
def test_certificate_validity_end(self, value, app, connector):
|
|
params = {
|
|
'data': {
|
|
'first_name': 'Georges',
|
|
'last_name': 'Abitbol',
|
|
},
|
|
'validity_start': '2023-01-01 10:00:00+00:00',
|
|
}
|
|
if value is not MISSING:
|
|
params['validity_end'] = value
|
|
|
|
app.post_json('/qrcode/test/save-certificate/', params=params)
|
|
assert Certificate.objects.get().validity_end is None
|
|
|
|
def test_reader_validity_start(self, value, app, connector):
|
|
params = {
|
|
'validity_end': '2023-01-01 10:00:00+00:00',
|
|
}
|
|
if value is not MISSING:
|
|
params['validity_start'] = value
|
|
|
|
app.post_json('/qrcode/test/save-reader/', params=params)
|
|
assert Reader.objects.get().validity_start is None
|
|
|
|
def test_reader_validity_end(self, value, app, connector):
|
|
params = {
|
|
'validity_start': '2023-01-01 10:00:00+00:00',
|
|
}
|
|
if value is not MISSING:
|
|
params['validity_end'] = value
|
|
|
|
app.post_json('/qrcode/test/save-reader/', params=params)
|
|
assert Reader.objects.get().validity_end is None
|