agendas: handle import/export of unavailability calendars (#47394)

This commit is contained in:
Emmanuel Cazenave 2020-10-26 17:15:47 +01:00
parent 3e5f02b5ca
commit 033c43f860
9 changed files with 276 additions and 30 deletions

View File

@ -1146,6 +1146,7 @@ class Desk(models.Model):
timeperiods = data.pop('timeperiods', [])
exceptions = data.pop('exceptions', [])
sources = data.pop('exception_sources', [])
unavailability_calendars = data.pop('unavailability_calendars', [])
data = clean_import_data(cls, data)
desk, created = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
for timeperiod in timeperiods:
@ -1157,6 +1158,14 @@ class Desk(models.Model):
for source in sources:
source['desk'] = desk
TimePeriodExceptionSource.import_json(source)
for unavailability_calendar in unavailability_calendars:
slug = unavailability_calendar['slug']
try:
target_calendar = UnavailabilityCalendar.objects.get(slug=slug)
except UnavailabilityCalendar.DoesNotExist:
raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug)
desk.unavailability_calendars.add(target_calendar)
return desk
def export_json(self):
@ -1168,6 +1177,7 @@ class Desk(models.Model):
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.filter()],
'exceptions': [exception.export_json() for exception in time_period_exceptions],
'exception_sources': [source.export_json() for source in time_period_exception_sources],
'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()],
}
def duplicate(self, label=None, agenda_target=None):
@ -1561,6 +1571,36 @@ class UnavailabilityCalendar(models.Model):
def get_absolute_url(self):
return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id})
def export_json(self):
unavailability_calendar = {
'label': self.label,
'slug': self.slug,
'permissions': {
'view': self.view_role.name if self.view_role else None,
'edit': self.edit_role.name if self.edit_role else None,
},
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
}
return unavailability_calendar
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
permissions = data.pop('permissions', {})
exceptions = data.pop('exceptions', [])
for permission in ('view', 'edit'):
if permissions.get(permission):
data[permission + '_role'] = Group.objects.get(name=permissions[permission])
data = clean_import_data(cls, data)
unavailability_calendar, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete()
for exception in exceptions:
exception['unavailability_calendar'] = unavailability_calendar
TimePeriodException.import_json(exception)
return created
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)

View File

@ -525,7 +525,7 @@ class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
class AgendasImportForm(forms.Form):
agendas_json = forms.FileField(label=_('Agendas Export File'))
agendas_json = forms.FileField(label=_('Export File'))
class AgendaDuplicateForm(forms.Form):

View File

@ -2,7 +2,7 @@
{% load i18n %}
{% block appbar %}
<h2>{% trans "Agendas Import" %}</h2>
<h2>{% trans "Import" %}</h2>
{% endblock %}
{% block content %}

View File

@ -16,6 +16,7 @@
{% endblock %}
<ul class="extra-actions-menu">
<li><a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-edit' pk=unavailability_calendar.id %}">{% trans 'Options' %}</a></li>
<li><a download href="{% url 'chrono-manager-unavailability-calendar-export' pk=unavailability_calendar.id %}">{% trans 'Export Configuration (JSON)' %}</a></li>
{% if user.is_staff %}
<li><a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-delete' pk=unavailability_calendar.id %}">{% trans 'Delete' %}</a></li>
{% endif %}

View File

@ -50,6 +50,11 @@ urlpatterns = [
views.unavailability_calendar_settings,
name='chrono-manager-unavailability-calendar-settings',
),
url(
r'^unavailability-calendar/(?P<pk>\d+)/export$',
views.unavailability_calendar_export,
name='chrono-manager-unavailability-calendar-export',
),
url(
r'^unavailability-calendar/(?P<pk>\d+)/add-unavailability$',
views.unavailability_calendar_add_unavailability,

View File

@ -14,18 +14,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import itertools
from django.contrib.auth.models import Group
from django.db import transaction
from django.db.models import Q
from chrono.agendas.models import Agenda, AgendaImportError
from chrono.agendas.models import Agenda, AgendaImportError, UnavailabilityCalendar
def export_site():
'''Dump site objects to JSON-dumpable dictionnary'''
data = {}
data = collections.OrderedDict()
data['unavailability_calendars'] = [x.export_json() for x in UnavailabilityCalendar.objects.all()]
qs1 = Agenda.objects.filter(~Q(kind='virtual'))
qs2 = Agenda.objects.filter(kind='virtual')
data['agendas'] = [x.export_json() for x in itertools.chain(qs1, qs2)]
@ -33,16 +35,25 @@ def export_site():
def import_site(data, if_empty=False, clean=False, overwrite=False):
if if_empty and Agenda.objects.count():
if if_empty and (Agenda.objects.count() or UnavailabilityCalendar.objects.count()):
return
if clean:
Agenda.objects.all().delete()
UnavailabilityCalendar.objects.all().count()
results = {'created': 0, 'updated': 0}
results = {
'agendas': {'created': 0, 'updated': 0},
'unavailability_calendars': {'created': 0, 'updated': 0},
}
agendas = data.get('agendas', [])
unavailability_calendars = data.get('unavailability_calendars', [])
role_names = {name for data in agendas for _, name in data.get('permissions', {}).items() if name}
role_names = set()
for objs in (agendas, unavailability_calendars):
role_names = role_names.union(
{name for data in objs for _, name in data.get('permissions', {}).items() if name}
)
existing_roles = Group.objects.filter(name__in=role_names)
if existing_roles.count() != len(role_names):
@ -50,10 +61,14 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
raise AgendaImportError('Missing roles: "%s"' % ', '.join(role_names - existing_roles_names))
with transaction.atomic():
for data in agendas:
created = Agenda.import_json(data, overwrite=overwrite)
if created:
results['created'] += 1
else:
results['updated'] += 1
for objs, cls, label in (
(unavailability_calendars, UnavailabilityCalendar, 'unavailability_calendars'),
(agendas, Agenda, 'agendas'),
):
for data in objs:
created = cls.import_json(data, overwrite=overwrite)
if created:
results[label]['created'] += 1
else:
results[label]['updated'] += 1
return results

View File

@ -618,23 +618,44 @@ class AgendasImportView(FormView):
form.add_error('agendas_json', _('Key "%s" is missing.') % exc.args[0])
return self.form_invalid(form)
if results.get('created') == 0 and results.get('updated') == 0:
messages.info(self.request, _('No agendas were found.'))
else:
if results.get('created') == 0:
message1 = _('No agenda created.')
else:
message1 = ungettext(
'An agenda has been created.', '%(count)d agendas have been created.', results['created']
) % {'count': results['created']}
global_noop = True
for obj_name, obj_results in results.items():
if obj_name == 'agendas':
message_create_noop = _('No agenda created.')
message_create = ('An agenda has been created.', '%(count)d agendas have been created.')
message_update_noop = _('No agenda updated.')
message_update = ('An agenda has been updated.', '%(count)d agendas have been updated.')
elif obj_name == 'unavailability_calendars':
message_create_noop = _('No unavailability calendar created.')
message_create = (
'An unavailability calendar has been created.',
'%(count)d unavailability calendars have been created.',
)
message_update_noop = _('No unavailability calendar updated.')
message_update = (
'An unavailability calendar has been updated.',
'%(count)d unavailability calendars have been updated.',
)
if results.get('updated') == 0:
message2 = _('No agenda updated.')
else:
message2 = ungettext(
'An agenda has been updated.', '%(count)d agendas have been updated.', results['updated']
) % {'count': results['updated']}
messages.info(self.request, u'%s %s' % (message1, message2))
if obj_results.get('created') != 0 or obj_results.get('updated') != 0:
global_noop = False
if obj_results.get('created') == 0:
message1 = message_create_noop
else:
message1 = ungettext(message_create[0], message_create[1], obj_results['created']) % {
'count': obj_results['created']
}
if obj_results.get('updated') == 0:
message2 = message_update_noop
else:
message2 = ungettext(message_update[0], message_update[1], obj_results['updated']) % {
'count': obj_results['updated']
}
messages.info(self.request, u'%s %s' % (message1, message2))
if global_noop:
messages.info(self.request, _('No data found.'))
return super(AgendasImportView, self).form_valid(form)
@ -2364,6 +2385,24 @@ class UnavailabilityCalendarSettings(ManagedUnavailabilityCalendarMixin, DetailV
unavailability_calendar_settings = UnavailabilityCalendarSettings.as_view()
class UnavailabilityCalendarExport(ManagedUnavailabilityCalendarMixin, DetailView):
model = UnavailabilityCalendar
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
response[
'Content-Disposition'
] = 'attachment; filename="export_unavailability-calendar_{}_{}.json"'.format(
self.get_object().slug, today.strftime('%Y%m%d')
)
json.dump({'unavailability_calendars': [self.get_object().export_json()]}, response, indent=2)
return response
unavailability_calendar_export = UnavailabilityCalendarExport.as_view()
class UnavailabilityCalendarAddUnavailabilityView(ManagedUnavailabilityCalendarMixin, CreateView):
template_name = 'chrono/manager_time_period_exception_form.html'
form_class = TimePeriodExceptionForm

View File

@ -32,6 +32,7 @@ from chrono.agendas.models import (
VirtualMember,
AgendaNotificationsSettings,
AgendaReminderSettings,
UnavailabilityCalendar,
)
from chrono.manager.utils import import_site
@ -594,3 +595,67 @@ def test_import_export_do_not_duplicate_timeperiod_and_exceptions():
assert TimePeriod.objects.count() == 2
assert TimePeriodException.objects.count() == 2
def test_import_export_unavailability_calendar(app):
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['unavailability_calendars']) == 0
group1 = Group.objects.create(name=u'gé1')
group2 = Group.objects.create(name=u'gé2')
calendar = UnavailabilityCalendar.objects.create(label='Calendar', view_role=group1, edit_role=group2)
tp1_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0))
tp1_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30))
tp1 = TimePeriodException.objects.create(
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
)
tp2_start = make_aware(datetime.datetime(2018, 5, 22, 8, 0))
tp2_end = make_aware(datetime.datetime(2018, 5, 22, 12, 30))
tp2 = TimePeriodException.objects.create(
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
)
meetings_agenda = Agenda.objects.create(label='Foo Bar', kind='meetings')
MeetingType.objects.create(agenda=meetings_agenda, label='Meeting Type', duration=30)
desk = Desk.objects.create(agenda=meetings_agenda, label='Desk')
desk.unavailability_calendars.add(calendar)
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['unavailability_calendars']) == 1
assert len(payload['agendas']) == 1
calendar.delete()
tp1.delete()
tp2.delete()
meetings_agenda.delete()
assert not UnavailabilityCalendar.objects.exists()
assert not TimePeriodException.objects.exists()
assert not Agenda.objects.exists()
assert not Desk.objects.exists()
import_site(copy.deepcopy(payload))
assert UnavailabilityCalendar.objects.count() == 1
calendar = UnavailabilityCalendar.objects.first()
assert calendar.label == 'Calendar'
assert calendar.view_role == group1
assert calendar.edit_role == group2
assert calendar.timeperiodexception_set.count() == 2
assert TimePeriodException.objects.get(
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
)
assert TimePeriodException.objects.get(
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
)
agenda = Agenda.objects.get(label='Foo Bar')
desk = agenda.desk_set.first()
assert desk.unavailability_calendars.count() == 1
assert desk.unavailability_calendars.first() == calendar
# update
update_payload = copy.deepcopy(payload)
update_payload['unavailability_calendars'][0]['label'] = 'Calendar Updated'
import_site(update_payload)
calendar.refresh_from_db()
assert calendar.label == 'Calendar Updated'

View File

@ -3534,7 +3534,7 @@ def test_import_agenda(app, admin_user):
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', b'{}', 'application/json')
resp = resp.form.submit().follow()
assert 'No agendas were found.' in resp.text
assert 'No data found.' in resp.text
# existing agenda
resp = app.get('/manage/', status=200)
@ -3599,6 +3599,87 @@ def test_import_agenda(app, admin_user):
assert resp.context['form'].errors['agendas_json'] == ['Key "kind" is missing.']
def test_import_unavailability_calendar(app, admin_user):
calendar = UnavailabilityCalendar.objects.create(label=u'Foo bar')
app = login(app)
with freezegun.freeze_time('2020-06-15'):
resp = app.get('/manage/unavailability-calendar/%s/export' % calendar.id)
assert resp.headers['content-type'] == 'application/json'
assert (
resp.headers['content-disposition']
== 'attachment; filename="export_unavailability-calendar_foo-bar_20200615.json"'
)
calendar_export = resp.text
# empty json
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', b'{}', 'application/json')
resp = resp.form.submit().follow()
assert 'No data found.' in resp.text
# existing unavailability calendar
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', calendar_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert 'No unavailability calendar created. An unavailability calendar has been updated.' in resp.text
assert UnavailabilityCalendar.objects.count() == 1
# new unavailability calendar
UnavailabilityCalendar.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', calendar_export.encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert 'An unavailability calendar has been created. No unavailability calendar updated.' in resp.text
assert UnavailabilityCalendar.objects.count() == 1
# multiple unavailability calendars
calendars = json.loads(calendar_export)
calendars['unavailability_calendars'].append(copy.copy(calendars['unavailability_calendars'][0]))
calendars['unavailability_calendars'].append(copy.copy(calendars['unavailability_calendars'][0]))
calendars['unavailability_calendars'][1]['label'] = 'Foo bar 2'
calendars['unavailability_calendars'][1]['slug'] = 'foo-bar-2'
calendars['unavailability_calendars'][2]['label'] = 'Foo bar 3'
calendars['unavailability_calendars'][2]['slug'] = 'foo-bar-3'
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(calendars).encode('utf-8'), 'application/json'
)
resp = resp.form.submit().follow()
assert (
'2 unavailability calendars have been created. An unavailability calendar has been updated.'
in resp.text
)
assert UnavailabilityCalendar.objects.count() == 3
UnavailabilityCalendar.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(calendars).encode('utf-8'), 'application/json'
)
resp = resp.form.submit().follow()
assert '3 unavailability calendars have been created. No unavailability calendar updated.' in resp.text
assert UnavailabilityCalendar.objects.count() == 3
# reference to unknown group
calendar_export_dict = json.loads(force_text(calendar_export))
calendar_export_dict['unavailability_calendars'][0]['permissions']['view'] = u'gé1'
calendar_export = json.dumps(calendar_export_dict).encode('utf-8')
UnavailabilityCalendar.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', calendar_export, 'application/json')
resp = resp.form.submit()
assert u'Missing roles: &quot;gé1&quot;' in resp.text
del calendar_export_dict['unavailability_calendars'][0]['permissions']['view']
def test_import_does_not_delete_bookings(app, admin_user):
agenda = Agenda.objects.create(label='Foo', kind='meetings')
meeting_type = MeetingType.objects.create(agenda=agenda, label='Meeting Type', duration=30)