agendas: allow exception sources in unavailability calendars (#52370)

This commit is contained in:
Valentin Deniaud 2022-03-23 14:22:53 +01:00
parent 885ecf664c
commit 8487685feb
7 changed files with 290 additions and 23 deletions

View File

@ -0,0 +1,28 @@
# Generated by Django 2.2.19 on 2022-03-23 12:20
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agendas', '0111_timeperiod_weekday_indexes'),
]
operations = [
migrations.AddField(
model_name='timeperiodexceptionsource',
name='unavailability_calendar',
field=models.ForeignKey(
null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.UnavailabilityCalendar'
),
),
migrations.AlterField(
model_name='timeperiodexceptionsource',
name='desk',
field=models.ForeignKey(
null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk'
),
),
]

View File

@ -2252,7 +2252,8 @@ def ics_directory_path(instance, filename):
class TimePeriodExceptionSource(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
unavailability_calendar = models.ForeignKey('UnavailabilityCalendar', on_delete=models.CASCADE, null=True)
ics_filename = models.CharField(null=True, max_length=256)
ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
ics_url = models.URLField(null=True, max_length=500)
@ -2438,6 +2439,7 @@ class TimePeriodExceptionSource(models.Model):
'end_datetime': end_dt,
'label': summary,
'desk_id': self.desk_id,
'unavailability_calendar_id': self.unavailability_calendar_id,
'source': self,
'recurrence_id': 0,
}

View File

@ -1104,6 +1104,14 @@ class ExceptionsImportForm(forms.ModelForm):
required=False,
help_text=_('URL to remote calendar which will be synchronised hourly.'),
)
def clean(self, *args, **kwargs):
cleaned_data = super().clean(*args, **kwargs)
if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
class DeskExceptionsImportForm(ExceptionsImportForm):
all_desks = forms.BooleanField(label=_('Apply exceptions on all desks of the agenda'), required=False)
class Meta:
@ -1117,10 +1125,11 @@ class ExceptionsImportForm(forms.ModelForm):
elif self.instance.agenda.desk_simple_management:
del self.fields['all_desks']
def clean(self, *args, **kwargs):
cleaned_data = super().clean(*args, **kwargs)
if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
class UnavailabilityCalendarExceptionsImportForm(ExceptionsImportForm):
class Meta:
model = UnavailabilityCalendar
fields = []
class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
@ -1145,7 +1154,7 @@ class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
old_filename = self.instance.ics_filename
store_source(self.instance)
if self.instance.desk.agenda.desk_simple_management:
if self.instance.desk and self.instance.desk.agenda.desk_simple_management:
for desk in self.instance.desk.agenda.desk_set.exclude(pk=self.instance.desk_id):
source = desk.timeperiodexceptionsource_set.filter(ics_filename=old_filename).first()
if source is not None:

View File

@ -12,6 +12,7 @@
<span class="actions">
<a class="extra-actions-menu-opener"></a>
{% block agenda-extra-management-actions %}
<a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-import-unavailabilities' pk=unavailability_calendar.id %}">{% trans 'Manage unavailabilities from ICS' %}</a>
<a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-add-unavailability' pk=unavailability_calendar.id %}">{% trans 'Add Unavailability' %}</a>
{% endblock %}
<ul class="extra-actions-menu">

View File

@ -60,6 +60,11 @@ urlpatterns = [
views.unavailability_calendar_add_unavailability,
name='chrono-manager-unavailability-calendar-add-unavailability',
),
url(
r'^unavailability-calendar/(?P<pk>\d+)/import-unavailabilities/$',
views.unavailability_calendar_import_unavailabilities,
name='chrono-manager-unavailability-calendar-import-unavailabilities',
),
url(r'^resources/$', views.resource_list, name='chrono-manager-resource-list'),
url(r'^resource/add/$', views.resource_add, name='chrono-manager-resource-add'),
url(r'^resource/(?P<pk>\d+)/$', views.resource_view, name='chrono-manager-resource-view'),

View File

@ -103,11 +103,11 @@ from .forms import (
BookingAbsenceReasonForm,
BookingCancelForm,
BookingCheckFilterSet,
DeskExceptionsImportForm,
DeskForm,
EventCancelForm,
EventForm,
EventsTimesheetForm,
ExceptionsImportForm,
ImportEventsForm,
MeetingTypeForm,
NewDeskForm,
@ -123,6 +123,7 @@ from .forms import (
TimePeriodForm,
UnavailabilityCalendarAddForm,
UnavailabilityCalendarEditForm,
UnavailabilityCalendarExceptionsImportForm,
VirtualMemberForm,
)
from .utils import export_site, import_site
@ -2767,7 +2768,7 @@ time_period_exception_delete = TimePeriodExceptionDeleteView.as_view()
class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView):
model = Desk
form_class = ExceptionsImportForm
form_class = DeskExceptionsImportForm
template_name = 'chrono/manager_import_exceptions.html'
def get_context_data(self, **kwargs):
@ -2835,7 +2836,7 @@ class TimePeriodExceptionSourceDeleteView(ManagedTimePeriodExceptionMixin, Delet
source = self.get_object()
response = super().delete(request, *args, **kwargs)
if not source.desk.agenda.desk_simple_management:
if not source.desk or not source.desk.agenda.desk_simple_management:
return response
for desk in source.desk.agenda.desk_set.exclude(pk=source.desk_id):
@ -2862,21 +2863,18 @@ class TimePeriodExceptionSourceReplaceView(ManagedTimePeriodExceptionMixin, Upda
queryset = super().get_queryset()
return queryset.filter(ics_filename__isnull=False)
def import_file(self, desk, form):
source = desk.timeperiodexceptionsource_set.filter(
ics_filename=self.get_object().ics_filename
).first()
def import_file(self, obj, form):
source = obj.timeperiodexceptionsource_set.filter(ics_filename=self.get_object().ics_filename).first()
if source is not None:
source.refresh_timeperiod_exceptions()
def form_valid(self, form):
desk = self.get_object().desk
try:
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
if self.desk and self.desk.agenda.desk_simple_management:
for _desk in self.desk.agenda.desk_set.all():
self.import_file(_desk, form)
else:
self.import_file(desk, form)
self.import_file(self.desk or self.unavailability_calendar, form)
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
@ -2895,19 +2893,18 @@ class TimePeriodExceptionSourceRefreshView(ManagedTimePeriodExceptionMixin, Deta
queryset = super().get_queryset()
return queryset.filter(ics_url__isnull=False)
def import_file(self, desk):
source = desk.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first()
def import_file(self, obj):
source = obj.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first()
if source is not None:
source.refresh_timeperiod_exceptions()
def get(self, request, *args, **kwargs):
desk = self.get_object().desk
try:
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
if self.desk and self.desk.agenda.desk_simple_management:
for _desk in self.desk.agenda.desk_set.all():
self.import_file(_desk)
else:
self.import_file(desk)
self.import_file(self.desk or self.unavailability_calendar)
except ICSError as e:
messages.error(self.request, force_text(e))
@ -3321,6 +3318,55 @@ class UnavailabilityCalendarAddUnavailabilityView(ManagedUnavailabilityCalendarM
unavailability_calendar_add_unavailability = UnavailabilityCalendarAddUnavailabilityView.as_view()
class UnavailabilityCalendarImportUnavailabilitiesView(ManagedUnavailabilityCalendarMixin, UpdateView):
model = UnavailabilityCalendar
form_class = UnavailabilityCalendarExceptionsImportForm
template_name = 'chrono/manager_import_exceptions.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
unavailabilty_calendar = self.get_object()
context['exception_sources'] = unavailabilty_calendar.timeperiodexceptionsource_set.all()
context['base_template'] = 'chrono/manager_unavailability_calendar_settings.html'
return context
def import_file(self, form):
unavailabilty_calendar = self.get_object()
if form.cleaned_data['ics_file']:
ics_file = form.cleaned_data['ics_file']
source = unavailabilty_calendar.timeperiodexceptionsource_set.create(
ics_filename=ics_file.name, ics_file=ics_file
)
ics_file.seek(0)
elif form.cleaned_data['ics_url']:
source = unavailabilty_calendar.timeperiodexceptionsource_set.create(
ics_url=form.cleaned_data['ics_url']
)
parsed = source._check_ics_content()
source._parsed = parsed
return source
def form_valid(self, form):
try:
with transaction.atomic():
source = self.import_file(form)
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
try:
source.refresh_timeperiod_exceptions(data=source._parsed)
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
messages.info(self.request, _('Exceptions will be imported in a few minutes.'))
return super().form_valid(form)
unavailability_calendar_import_unavailabilities = UnavailabilityCalendarImportUnavailabilitiesView.as_view()
class SharedCustodyAgendaMixin:
agenda = None

View File

@ -1376,3 +1376,179 @@ def test_recurring_events_exceptions_report(settings, app, admin_user, freezer):
resp = app.get('/manage/agendas/%s/settings' % agenda.id)
assert 'warningnotice' not in resp.text
def test_unavailability_calendar_import_time_period_exception_from_ics(app, admin_user):
calendar = UnavailabilityCalendar.objects.create(label='Example')
login(app)
resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
assert 'Manage unavailabilities from ICS' in resp.text
resp = resp.click('Manage unavailabilities')
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
resp = resp.form.submit(status=200)
assert 'Please provide an ICS File or an URL.' in resp.text
assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 0
resp.form['ics_file'] = Upload('exceptions.ics', b'invalid content', 'text/calendar')
resp = resp.form.submit(status=200)
assert 'File format is invalid' in resp.text
assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 0
ics_with_exceptions = b"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 1
source = calendar.timeperiodexceptionsource_set.get()
exception = calendar.timeperiodexception_set.get()
assert exception.source == source
assert source.ics_filename == 'exceptions.ics'
assert 'exceptions.ics' in source.ics_file.name
assert source.ics_url is None
resp = resp.follow()
assert 'Exceptions will be imported in a few minutes.' in resp.text
@mock.patch('chrono.agendas.models.requests.get')
def test_unavailability_calendar_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
calendar = UnavailabilityCalendar.objects.create(label='Example')
login(app)
resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
assert 'ics_file' in resp.form.fields
assert 'ics_url' in resp.form.fields
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 1
source = calendar.timeperiodexceptionsource_set.get()
exception = calendar.timeperiodexception_set.get()
assert exception.source == source
assert source.ics_filename is None
assert source.ics_file.name == ''
assert source.ics_url == 'http://example.com/foo.ics'
def test_unavailability_calendar_delete_time_period_exception_source(app, admin_user):
calendar = UnavailabilityCalendar.objects.create(label='Example')
source1 = TimePeriodExceptionSource.objects.create(
unavailability_calendar=calendar, ics_url='https://example.com/test.ics'
)
TimePeriodException.objects.create(
unavailability_calendar=calendar,
source=source1,
start_datetime=now() - datetime.timedelta(days=1),
end_datetime=now() + datetime.timedelta(days=1),
)
source2 = TimePeriodExceptionSource.objects.create(
unavailability_calendar=calendar, ics_url='https://example.com/test.ics'
)
TimePeriodException.objects.create(
unavailability_calendar=calendar,
source=source2,
start_datetime=now() - datetime.timedelta(days=1),
end_datetime=now() + datetime.timedelta(days=1),
)
login(app)
resp = app.get('/manage/time-period-exceptions-source/%d/delete' % source2.pk)
resp = resp.form.submit()
assert TimePeriodException.objects.count() == 1
assert TimePeriodExceptionSource.objects.count() == 1
assert source1.timeperiodexception_set.count() == 1
assert TimePeriodExceptionSource.objects.filter(pk=source2.pk).exists() is False
def test_unavailability_calendar_replace_time_period_exception_source(app, admin_user, freezer):
freezer.move_to('2019-12-01')
calendar = UnavailabilityCalendar.objects.create(label='Example')
ics_file_content = b"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
RRULE:FREQ=YEARLY
END:VEVENT
END:VCALENDAR"""
login(app)
# import a source from a file
resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
resp.form['ics_file'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
resp = resp.form.submit(status=302).follow()
assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 2
source = TimePeriodExceptionSource.objects.latest('pk')
assert source.timeperiodexception_set.count() == 2
exceptions = list(source.timeperiodexception_set.order_by('pk'))
old_ics_file_path = source.ics_file.path
# replace the source
resp = app.get('/manage/time-period-exceptions-source/%d/replace' % source.pk)
resp.form['ics_newfile'] = Upload('exceptions-bis.ics', ics_file_content, 'text/calendar')
resp = resp.form.submit().follow()
source.refresh_from_db()
assert source.ics_file.path != old_ics_file_path
assert source.ics_filename == 'exceptions-bis.ics'
assert os.path.exists(old_ics_file_path) is False
assert TimePeriodException.objects.count() == 2
assert source.timeperiodexception_set.count() == 2
new_exceptions = list(source.timeperiodexception_set.order_by('pk'))
assert exceptions[0].pk != new_exceptions[0].pk
assert exceptions[1].pk != new_exceptions[1].pk
@mock.patch('chrono.agendas.models.requests.get')
def test_unavailability_calendar_refresh_time_period_exception_source(mocked_get, app, admin_user):
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
calendar = UnavailabilityCalendar.objects.create(label='Example')
login(app)
# import a source from an url
resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
resp = resp.click('Manage unavailabilities')
resp.form['ics_url'] = 'http://example.com/foo.ics'
resp = resp.form.submit(status=302).follow()
assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
source = TimePeriodExceptionSource.objects.latest('pk')
assert source.timeperiodexception_set.count() == 1
exceptions = list(source.timeperiodexception_set.order_by('pk'))
# refresh the source
resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
resp = resp.click('Manage unavailabilities')
resp = resp.click(href='/manage/time-period-exceptions-source/%d/refresh' % source.pk)
assert TimePeriodException.objects.count() == 1
new_exceptions = list(source.timeperiodexception_set.order_by('pk'))
assert exceptions[0].pk != new_exceptions[0].pk