general: add export/import support (#15527)
This commit is contained in:
parent
fe18e6a360
commit
52e21e98c6
|
@ -23,7 +23,7 @@ from django.db import transaction
|
|||
from django.utils.dates import WEEKDAYS
|
||||
from django.utils.formats import date_format
|
||||
from django.utils.text import slugify
|
||||
from django.utils.timezone import localtime, now
|
||||
from django.utils.timezone import localtime, now, make_aware
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from jsonfield import JSONField
|
||||
|
@ -79,6 +79,51 @@ class Agenda(models.Model):
|
|||
group_ids = [x.id for x in user.groups.all()]
|
||||
return bool(self.view_role_id in group_ids)
|
||||
|
||||
def export_json(self):
|
||||
agenda = {
|
||||
'label': self.label,
|
||||
'slug': self.slug,
|
||||
'kind': self.kind,
|
||||
'minimal_booking_delay': self.minimal_booking_delay,
|
||||
'maximal_booking_delay': self.maximal_booking_delay,
|
||||
'permissions': {
|
||||
'view': self.view_role.name if self.view_role else None,
|
||||
'edit': self.edit_role.name if self.edit_role else None,
|
||||
}
|
||||
}
|
||||
if self.kind == 'events':
|
||||
agenda['events'] = [x.export_json() for x in self.event_set.all()]
|
||||
elif self.kind == 'meetings':
|
||||
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()]
|
||||
agenda['timeperiods'] = [x.export_json() for x in self.timeperiod_set.all()]
|
||||
return agenda
|
||||
|
||||
@classmethod
|
||||
def import_json(self, data, overwrite=False):
|
||||
data = data.copy()
|
||||
permissions = data.pop('permissions')
|
||||
if data['kind'] == 'events':
|
||||
events = data.pop('events')
|
||||
elif data['kind'] == 'meetings':
|
||||
meetingtypes = data.pop('meetingtypes')
|
||||
timeperiods = data.pop('timeperiods')
|
||||
agenda, created = self.objects.get_or_create(slug=data['slug'], defaults=data)
|
||||
if data['kind'] == 'events':
|
||||
if overwrite:
|
||||
Event.objects.filter(agenda=agenda).delete()
|
||||
for event_data in events:
|
||||
event_data['agenda'] = agenda
|
||||
Event.import_json(event_data).save()
|
||||
elif data['kind'] == 'meetings':
|
||||
if overwrite:
|
||||
MeetingType.objects.filter(agenda=agenda).delete()
|
||||
TimePeriod.objects.filter(agenda=agenda).delete()
|
||||
for type_data in meetingtypes:
|
||||
type_data['agenda'] = agenda
|
||||
MeetingType.import_json(type_data).save()
|
||||
for period_data in timeperiods:
|
||||
period_data['agenda'] = agenda
|
||||
TimePeriod.import_json(period_data).save()
|
||||
|
||||
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0])
|
||||
|
||||
|
@ -114,6 +159,17 @@ class TimePeriod(models.Model):
|
|||
def weekday_str(self):
|
||||
return WEEKDAYS[self.weekday]
|
||||
|
||||
@classmethod
|
||||
def import_json(cls, data):
|
||||
return cls(**data)
|
||||
|
||||
def export_json(self):
|
||||
return {
|
||||
'weekday': self.weekday,
|
||||
'start_time': self.start_time.strftime('%H:%M'),
|
||||
'end_time': self.end_time.strftime('%H:%M'),
|
||||
}
|
||||
|
||||
def get_time_slots(self, min_datetime, max_datetime, meeting_type):
|
||||
duration = datetime.timedelta(minutes=meeting_type.duration)
|
||||
|
||||
|
@ -164,6 +220,17 @@ class MeetingType(models.Model):
|
|||
self.slug = slug
|
||||
super(MeetingType, self).save(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def import_json(cls, data):
|
||||
return cls(**data)
|
||||
|
||||
def export_json(self):
|
||||
return {
|
||||
'label': self.label,
|
||||
'slug': self.slug,
|
||||
'duration': self.duration,
|
||||
}
|
||||
|
||||
|
||||
class Event(models.Model):
|
||||
agenda = models.ForeignKey(Agenda)
|
||||
|
@ -222,6 +289,20 @@ class Event(models.Model):
|
|||
def get_absolute_url(self):
|
||||
return reverse('chrono-manager-event-edit', kwargs={'pk': self.id})
|
||||
|
||||
@classmethod
|
||||
def import_json(cls, data):
|
||||
data['start_datetime'] = make_aware(datetime.datetime.strptime(
|
||||
data['start_datetime'], '%Y-%m-%d %H:%M:%S'))
|
||||
return cls(**data)
|
||||
|
||||
def export_json(self):
|
||||
return {
|
||||
'start_datetime': self.start_datetime.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
'places': self.places,
|
||||
'waiting_list_places': self.waiting_list_places,
|
||||
'label': self.label
|
||||
}
|
||||
|
||||
|
||||
class Booking(models.Model):
|
||||
event = models.ForeignKey(Event)
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
# chrono - agendas system
|
||||
# Copyright (C) 2016-2017 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import sys
|
||||
from optparse import make_option
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from chrono.manager.utils import export_site
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
args = ''
|
||||
help = 'Export the site'
|
||||
option_list = BaseCommand.option_list + (
|
||||
make_option('--output', metavar='FILE', default=None,
|
||||
help='name of a file to write output to'),
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if options['output']:
|
||||
output = open(options['output'], 'w')
|
||||
else:
|
||||
output = sys.stdout
|
||||
json.dump(export_site(), output, indent=4)
|
|
@ -0,0 +1,46 @@
|
|||
# chrono - agendas system
|
||||
# Copyright (C) 2016-2017 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
from optparse import make_option
|
||||
import sys
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from chrono.manager.utils import import_site
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
args = '<filename>'
|
||||
help = 'Import an exported site'
|
||||
option_list = BaseCommand.option_list + (
|
||||
make_option('--clean', action='store_true', default=False,
|
||||
help='Clean site before importing'),
|
||||
make_option('--if-empty', action='store_true', default=False,
|
||||
help='Import only if site is empty'),
|
||||
make_option('--overwrite', action='store_true', default=False,
|
||||
help='Overwrite existing data'),
|
||||
)
|
||||
|
||||
def handle(self, filename, **options):
|
||||
if filename == '-':
|
||||
fd = sys.stdin
|
||||
else:
|
||||
fd = open(filename)
|
||||
import_site(json.load(fd),
|
||||
if_empty=options['if_empty'],
|
||||
clean=options['clean'],
|
||||
overwrite=options['overwrite'])
|
|
@ -0,0 +1,38 @@
|
|||
# chrono - agendas system
|
||||
# Copyright (C) 2016-2017 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
from chrono.agendas.models import Agenda
|
||||
|
||||
|
||||
def export_site():
|
||||
'''Dump site objects to JSON-dumpable dictionnary'''
|
||||
d = {}
|
||||
d['agendas'] = [x.export_json() for x in Agenda.objects.all()]
|
||||
return d
|
||||
|
||||
|
||||
def import_site(data, if_empty=False, clean=False, overwrite=False):
|
||||
if if_empty and Agenda.objects.count():
|
||||
return
|
||||
|
||||
if clean:
|
||||
Agenda.objects.all().delete()
|
||||
|
||||
with transaction.atomic():
|
||||
for data in data.get('agendas', []):
|
||||
Agenda.import_json(data, overwrite=overwrite)
|
|
@ -47,7 +47,7 @@ def some_data():
|
|||
|
||||
@pytest.fixture
|
||||
def meetings_agenda():
|
||||
agenda = Agenda(label=u'Foo bar', kind='meetings',
|
||||
agenda = Agenda(label=u'Foo bar Meeting', kind='meetings',
|
||||
minimal_booking_delay=1, maximal_booking_delay=56)
|
||||
agenda.save()
|
||||
meeting_type = MeetingType(agenda=agenda, label='Blah', duration=30)
|
||||
|
@ -318,7 +318,7 @@ def test_booking_cancellation_post_api(app, some_data, user):
|
|||
assert resp.json['err'] == 1
|
||||
|
||||
def test_booking_cancellation_post_meeting_api(app, meetings_agenda, user):
|
||||
agenda_id = Agenda.objects.filter(label=u'Foo bar')[0].id
|
||||
agenda_id = Agenda.objects.filter(label=u'Foo bar Meeting')[0].id
|
||||
meeting_type = MeetingType.objects.get(agenda=meetings_agenda)
|
||||
resp = app.get('/api/agenda/meetings/%s/datetimes/' % meeting_type.id)
|
||||
nb_events = len(resp.json['data'])
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
from cStringIO import StringIO
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
|
||||
from chrono.agendas.models import Agenda, Event, MeetingType, TimePeriod
|
||||
from chrono.manager.utils import export_site, import_site
|
||||
|
||||
from test_api import some_data, meetings_agenda
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
def get_output_of_command(command, *args, **kwargs):
|
||||
old_stdout = sys.stdout
|
||||
output = sys.stdout = StringIO()
|
||||
call_command(command, *args, **kwargs)
|
||||
sys.stdout = old_stdout
|
||||
return output.getvalue()
|
||||
|
||||
def test_import_export(app, some_data, meetings_agenda):
|
||||
output = get_output_of_command('export_site')
|
||||
assert len(json.loads(output)['agendas']) == 3
|
||||
import_site(data={}, clean=True)
|
||||
empty_output = get_output_of_command('export_site')
|
||||
assert len(json.loads(empty_output)['agendas']) == 0
|
||||
|
||||
Agenda(label=u'test').save()
|
||||
old_stdin = sys.stdin
|
||||
sys.stdin = StringIO(json.dumps({}))
|
||||
assert Agenda.objects.count() == 1
|
||||
try:
|
||||
call_command('import_site', '-', clean=True)
|
||||
finally:
|
||||
sys.stdin = old_stdin
|
||||
assert Agenda.objects.count() == 0
|
||||
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(output)
|
||||
f.flush()
|
||||
call_command('import_site', f.name)
|
||||
|
||||
assert Agenda.objects.count() == 3
|
||||
|
||||
agenda1 = Agenda.objects.get(label=u'Foo bar')
|
||||
agenda2 = Agenda.objects.get(label=u'Foo bar Meeting')
|
||||
event = Event(agenda=agenda1, start_datetime= datetime.datetime.now(), places=10)
|
||||
event.save()
|
||||
timeperiod = TimePeriod(agenda=agenda2, weekday=2,
|
||||
start_time=datetime.time(10, 0), end_time=datetime.time(11, 0))
|
||||
timeperiod.save()
|
||||
|
||||
import_site(json.loads(output), overwrite=True)
|
||||
assert Event.objects.filter(id=event.id).count() == 0
|
||||
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 0
|
||||
|
||||
event = Event(agenda=agenda1, start_datetime= datetime.datetime.now(), places=10)
|
||||
event.save()
|
||||
timeperiod = TimePeriod(agenda=agenda2, weekday=2,
|
||||
start_time=datetime.time(10, 0), end_time=datetime.time(11, 0))
|
||||
timeperiod.save()
|
||||
import_site(json.loads(output), overwrite=False)
|
||||
assert Event.objects.filter(id=event.id).count() == 1
|
||||
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 1
|
||||
|
||||
import_site(data={}, if_empty=True)
|
||||
assert Agenda.objects.count() == 3
|
||||
|
||||
import_site(data={}, clean=True)
|
||||
tempdir = tempfile.mkdtemp('chrono-test')
|
||||
empty_output = get_output_of_command('export_site', output=os.path.join(tempdir, 't.json'))
|
||||
assert os.path.exists(os.path.join(tempdir, 't.json'))
|
||||
shutil.rmtree(tempdir)
|
Loading…
Reference in New Issue