chrono/tests/test_import_export.py

337 lines
13 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import datetime
from io import StringIO
import json
import os
import shutil
import sys
import tempfile
import pytest
from django.contrib.auth.models import Group
from django.core.management import call_command, CommandError
from django.utils.encoding import force_bytes
from django.utils.timezone import make_aware
from chrono.agendas.models import (
Agenda,
Event,
TimePeriod,
Desk,
TimePeriodException,
AgendaImportError,
MeetingType,
VirtualMember,
)
from chrono.manager.utils import import_site
from test_api import some_data, meetings_agenda, time_zone, mock_now
pytestmark = pytest.mark.django_db
def get_output_of_command(command, *args, **kwargs):
old_stdout = sys.stdout
output = sys.stdout = StringIO()
call_command(command, *args, **kwargs)
sys.stdout = old_stdout
return output.getvalue()
def test_import_export(app, some_data, meetings_agenda):
first_event = Agenda.objects.get(label='Foo bar').event_set.first()
# add exception to meeting agenda
desk = meetings_agenda.desk_set.first()
tpx_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0))
tpx_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30))
TimePeriodException.objects.create(desk=desk, start_datetime=tpx_start, end_datetime=tpx_end)
output = get_output_of_command('export_site')
assert len(json.loads(output)['agendas']) == 3
import_site(data={}, clean=True)
empty_output = get_output_of_command('export_site')
assert len(json.loads(empty_output)['agendas']) == 0
Agenda(label=u'test').save()
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
assert Agenda.objects.count() == 1
try:
call_command('import_site', '-', clean=True)
finally:
sys.stdin = old_stdin
assert Agenda.objects.count() == 0
with tempfile.NamedTemporaryFile() as f:
f.write(force_bytes(output))
f.flush()
call_command('import_site', f.name)
assert Agenda.objects.count() == 3
first_imported_event = Agenda.objects.get(label='Foo bar').event_set.first()
assert first_imported_event.start_datetime == first_event.start_datetime
assert TimePeriodException.objects.get().start_datetime == tpx_start
assert TimePeriodException.objects.get().end_datetime == tpx_end
agenda1 = Agenda.objects.get(label=u'Foo bar')
agenda2 = Agenda.objects.get(label=u'Foo bar Meeting')
event = Event(agenda=agenda1, start_datetime=make_aware(datetime.datetime.now()), places=10)
event.save()
desk, _ = Desk.objects.get_or_create(agenda=agenda2, label='Desk A', slug='desk-a')
timeperiod = TimePeriod(
desk=desk, weekday=2, start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)
)
timeperiod.save()
exception = TimePeriodException(
desk=desk,
start_datetime=make_aware(datetime.datetime(2017, 5, 22, 8, 0)),
end_datetime=make_aware(datetime.datetime(2017, 5, 22, 12, 30)),
)
exception.save()
import_site(json.loads(output), overwrite=True)
assert Event.objects.filter(id=event.id).count() == 0
assert Desk.objects.filter(slug='desk-a').count() == 0
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 0
assert TimePeriodException.objects.filter(id=exception.id).count() == 0
event = Event(agenda=agenda1, start_datetime=make_aware(datetime.datetime.now()), places=10)
event.save()
desk, _ = Desk.objects.get_or_create(agenda=agenda2, label='Desk A', slug='desk-a')
timeperiod = TimePeriod(
weekday=2, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)
)
timeperiod.save()
exception = TimePeriodException(
desk=desk,
start_datetime=make_aware(datetime.datetime(2017, 5, 22, 8, 0)),
end_datetime=make_aware(datetime.datetime(2017, 5, 22, 12, 30)),
)
exception.save()
import_site(json.loads(output), overwrite=False)
assert Event.objects.filter(id=event.id).count() == 1
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 1
assert TimePeriodException.objects.filter(id=exception.id).count() == 1
import_site(data={}, if_empty=True)
assert Agenda.objects.count() == 3
import_site(data={}, clean=True)
tempdir = tempfile.mkdtemp('chrono-test')
empty_output = get_output_of_command('export_site', output=os.path.join(tempdir, 't.json'))
assert os.path.exists(os.path.join(tempdir, 't.json'))
shutil.rmtree(tempdir)
def test_import_export_event_details(app, some_data, meetings_agenda):
first_event = Agenda.objects.get(label='Foo bar').event_set.first()
first_event.description = 'description'
first_event.pricing = '100'
first_event.url = 'https://example.net/'
first_event.save()
output = get_output_of_command('export_site')
assert len(json.loads(output)['agendas']) == 3
import_site(data={}, clean=True)
with tempfile.NamedTemporaryFile() as f:
f.write(force_bytes(output))
f.flush()
call_command('import_site', f.name)
assert Agenda.objects.count() == 3
first_imported_event = Agenda.objects.get(label='Foo bar').event_set.first()
assert first_imported_event.description == 'description'
assert first_imported_event.pricing == '100'
assert first_imported_event.url == 'https://example.net/'
def test_import_export_permissions(app, some_data, meetings_agenda):
group1 = Group(name=u'gé1')
group1.save()
group2 = Group(name=u'gé2')
group2.save()
meetings_agenda.view_role = group1
meetings_agenda.edit_role = group2
meetings_agenda.save()
output = get_output_of_command('export_site')
assert len(json.loads(output)['agendas']) == 3
import_site(data={}, clean=True)
assert Agenda.objects.count() == 0
Group.objects.all().delete()
with pytest.raises(AgendaImportError) as excinfo:
import_site(json.loads(output), overwrite=True)
assert u'%s' % excinfo.value == u'Missing "gé1" role'
group1 = Group(name=u'gé1')
group1.save()
with pytest.raises(AgendaImportError) as excinfo:
import_site(json.loads(output), overwrite=True)
assert u'%s' % excinfo.value == u'Missing "gé2" role'
with tempfile.NamedTemporaryFile() as f:
f.write(force_bytes(output))
f.flush()
with pytest.raises(CommandError) as excinfo:
call_command('import_site', f.name)
assert u'%s' % excinfo.value == u'Missing "gé2" role'
group2 = Group(name=u'gé2')
group2.save()
import_site(json.loads(output), overwrite=True)
agenda = Agenda.objects.get(slug=meetings_agenda.slug)
assert agenda.view_role == group1
assert agenda.edit_role == group2
def test_import_export_virtual_agenda(app):
virtual_agenda = Agenda.objects.create(label='Virtual Agenda', kind='virtual')
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['agendas']) == 1
virtual_agenda.delete()
import_site(json.loads(output), overwrite=True)
virtual_agenda = Agenda.objects.get(label='Virtual Agenda', slug='virtual-agenda', kind='virtual')
assert virtual_agenda.minimal_booking_delay is None
assert virtual_agenda.maximal_booking_delay is None
assert virtual_agenda.real_agendas.count() == 0
assert virtual_agenda.excluded_timeperiods.count() == 0
# add booking delay
virtual_agenda.minimal_booking_delay = 2
virtual_agenda.maximal_booking_delay = 10
virtual_agenda.save()
output = get_output_of_command('export_site')
payload = json.loads(output)
virtual_agenda.delete()
import_site(json.loads(output), overwrite=True)
virtual_agenda = Agenda.objects.get(label='Virtual Agenda', slug='virtual-agenda', kind='virtual')
assert virtual_agenda.minimal_booking_delay == 2
assert virtual_agenda.maximal_booking_delay == 10
# add excluded timeperiods
tp1 = TimePeriod.objects.create(
agenda=virtual_agenda, weekday=1, start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)
)
tp2 = TimePeriod.objects.create(
agenda=virtual_agenda, weekday=2, start_time=datetime.time(12, 0), end_time=datetime.time(13, 0)
)
output = get_output_of_command('export_site')
payload = json.loads(output)
virtual_agenda.delete()
tp1.delete()
tp2.delete()
import_site(json.loads(output), overwrite=True)
virtual_agenda = Agenda.objects.get(label='Virtual Agenda', slug='virtual-agenda', kind='virtual')
assert virtual_agenda.excluded_timeperiods.count() == 2
tp1 = TimePeriod.objects.get(
agenda=virtual_agenda, weekday=1, start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)
)
tp2 = TimePeriod.objects.get(
agenda=virtual_agenda, weekday=2, start_time=datetime.time(12, 0), end_time=datetime.time(13, 0)
)
def test_import_export_virtual_agenda_with_included_agenda(app):
virtual_agenda = Agenda.objects.create(label='Virtual Agenda', kind='virtual')
foo_agenda = Agenda.objects.create(label='Foo', kind='meetings')
bar_agenda = Agenda.objects.create(label='Bar', kind='meetings')
mt1 = MeetingType.objects.create(agenda=foo_agenda, label='Meeting Type', duration=30)
mt2 = MeetingType.objects.create(agenda=bar_agenda, label='Meeting Type', duration=30)
VirtualMember.objects.create(virtual_agenda=virtual_agenda, real_agenda=foo_agenda)
VirtualMember.objects.create(virtual_agenda=virtual_agenda, real_agenda=bar_agenda)
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['agendas']) == 3
virtual_agenda.delete()
foo_agenda.delete()
bar_agenda.delete()
mt1.delete()
mt2.delete()
import_site(payload, overwrite=True)
virtual_agenda = Agenda.objects.get(label='Virtual Agenda', slug='virtual-agenda', kind='virtual')
assert virtual_agenda.real_agendas.count() == 2
assert virtual_agenda.real_agendas.filter(label='Foo').count() == 1
assert virtual_agenda.real_agendas.filter(label='Bar').count() == 1
# add incompatible meetingtype
bar_agenda = Agenda.objects.get(label='Bar', kind='meetings')
mt2 = MeetingType.objects.get(agenda=bar_agenda, label='Meeting Type')
mt2.duration = 10
mt2.save()
output = get_output_of_command('export_site')
payload = json.loads(output)
virtual_agenda.delete()
with pytest.raises(AgendaImportError) as excinfo:
import_site(json.loads(output), overwrite=False)
assert (
'This agenda does not have the same meeting types provided by the virtual agenda.'
in '%s' % excinfo.value
)
def test_import_export_desk_missing_fields(app, meetings_agenda):
output = get_output_of_command('export_site')
payload = json.loads(output)
del payload['agendas'][0]['desks'][0]['timeperiods']
del payload['agendas'][0]['desks'][0]['exceptions']
Agenda.objects.all().delete()
import_site(payload)
assert TimePeriod.objects.exists() is False
assert TimePeriodException.objects.exists() is False
def test_import_export_desk_unknown_fields(app, some_data, meetings_agenda):
# add exception to meeting agenda
desk = meetings_agenda.desk_set.first()
tpx_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0))
tpx_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30))
TimePeriodException.objects.create(desk=desk, start_datetime=tpx_start, end_datetime=tpx_end)
# add permissions
group1 = Group.objects.create(name=u'group1')
group2 = Group.objects.create(name=u'group2')
meetings_agenda.view_role = group1
meetings_agenda.edit_role = group2
meetings_agenda.save()
# add event
Event.objects.create(
agenda=meetings_agenda, start_datetime=make_aware(datetime.datetime.now()), places=10
)
output = get_output_of_command('export_site')
payload = json.loads(output)
Agenda.objects.all().delete()
assert Agenda.objects.exists() is False
assert MeetingType.objects.exists() is False
assert Desk.objects.exists() is False
assert TimePeriod.objects.exists() is False
assert TimePeriodException.objects.exists() is False
# add unknown fields everywhere
for agenda in payload['agendas']:
agenda['unknown_field'] = 'foobar'
if 'meetingtypes' in agenda:
agenda['meetingtypes'][0]['unknown_field'] = 'foobar'
if 'desks' in agenda:
agenda['desks'][0]['unknown_field'] = 'foobar'
agenda['desks'][0]['timeperiods'][0]['unknown_field'] = 'foobar'
agenda['desks'][0]['exceptions'][0]['unknown_field'] = 'foobar'
if 'events' in agenda:
agenda['events'][0]['unknown_field'] = 'foobar'
import_site(payload)
assert Agenda.objects.exists() is True
assert MeetingType.objects.exists() is True
assert Desk.objects.exists() is True
assert TimePeriod.objects.exists() is True
assert TimePeriodException.objects.exists() is True