agendas: keep the file of an exception source (#39259)

This commit is contained in:
Lauréline Guérin 2020-01-28 15:08:24 +01:00
parent 38fe35abcd
commit af17dc072f
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
8 changed files with 107 additions and 19 deletions

View File

@ -27,10 +27,18 @@ class Command(BaseCommand):
help = 'Synchronize time period exceptions from desks remote ics'
def handle(self, **options):
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False, ics_file=''):
try:
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
except ICSError as e:
print(
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
)
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude(ics_file=''):
try:
source.desk.import_timeperiod_exceptions_from_ics_file(source.ics_file, source=source)
except ICSError as e:
print(
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
)

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import chrono.agendas.models
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agendas', '0036_auto_20191223_1758'),
]
operations = [
migrations.AddField(
model_name='timeperiodexceptionsource',
name='ics_file',
field=models.FileField(blank=True, null=True, upload_to=chrono.agendas.models.ics_directory_path),
),
]

View File

@ -19,6 +19,7 @@ import datetime
import math
import requests
import vobject
import uuid
import django
from django.conf import settings
@ -583,13 +584,13 @@ class Desk(models.Model):
# often be missing and defaults to iso-8859-15.
response.content.decode('utf-8')
response.encoding = 'utf-8'
except UnicodeDecodeError as e:
except UnicodeDecodeError:
pass
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
if source is None:
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name, ics_file=ics_file)
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
@ -691,9 +692,14 @@ class Desk(models.Model):
return openslots.search(aware_date, aware_next_date)
def ics_directory_path(instance, filename):
return 'ics/{0}/{1}'.format(str(uuid.uuid4()), filename)
class TimePeriodExceptionSource(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
ics_filename = models.CharField(null=True, max_length=256)
ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
ics_url = models.URLField(null=True, max_length=500)
def __str__(self):

View File

@ -263,7 +263,9 @@ class ExceptionsImportForm(forms.ModelForm):
ics_file = forms.FileField(
label=_('ICS File'),
required=False,
help_text=_('ICS file containing events which will be considered as exceptions.'),
help_text=_(
'ICS file containing events which will be considered as exceptions. Will be synchronised hourly'
),
)
ics_url = forms.URLField(
label=_('URL'),
@ -282,7 +284,7 @@ class ExceptionsImportForm(forms.ModelForm):
class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
ics_file = forms.FileField(
ics_newfile = forms.FileField(
label=_('ICS File'),
required=False,
help_text=_('ICS file containing events which will be considered as exceptions.'),
@ -292,6 +294,12 @@ class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
model = TimePeriodExceptionSource
fields = []
def save(self, *args, **kwargs):
if bool(self.instance.ics_file):
self.instance.ics_file.delete()
self.instance.ics_file = self.cleaned_data['ics_newfile']
self.instance.save()
class AgendasImportForm(forms.Form):
agendas_json = forms.FileField(label=_('Agendas Export File'))

View File

@ -2,26 +2,18 @@
{% load i18n %}
{% block appbar %}
<h2>{% if form.instance.ics_filename %}{% trans "Replace exceptions" %}{% else %}{% trans "Refresh exceptions" %}{% endif %}</h2>
<h2>{% trans "Replace exceptions" %}</h2>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
{% if form.instance.ics_filename %}
<p class="notice">{% trans "To replace existing exceptions, please upload a new file." %}</p>
{% else %}
<p class="notice">
{% trans 'Press the button "Refresh" to refresh existing exceptions from:' %}
<br />
<a href="{{ form.instance.ics_url }}">{{ form.instance.ics_url }}</a>
</p>
{% endif %}
<p class="notice">{% trans "To replace existing exceptions, please upload a new file." %}</p>
{% csrf_token %}
{{ form.as_p }}
<p>
</p>
<div class="buttons">
<button>{% if form.instance.ics_filename %}{% trans "Replace" %}{% else %}{% trans "Refresh" %}{% endif %}</button>
<button>{% trans "Replace" %}</button>
<a class="cancel" href="{% url 'chrono-manager-agenda-settings' pk=agenda.id %}">{% trans 'Cancel' %}</a>
</div>
</form>

View File

@ -988,11 +988,15 @@ class TimePeriodExceptionSourceReplaceView(ManagedDeskSubobjectMixin, UpdateView
form_class = TimePeriodExceptionSourceReplaceForm
template_name = 'chrono/manager_replace_exceptions.html'
def get_queryset(self):
queryset = super(TimePeriodExceptionSourceReplaceView, self).get_queryset()
return queryset.filter(ics_filename__isnull=False)
def form_valid(self, form):
exceptions = None
try:
exceptions = form.instance.desk.import_timeperiod_exceptions_from_ics_file(
form.cleaned_data['ics_file'], source=form.instance
form.cleaned_data['ics_newfile'], source=form.instance
)
except ICSError as e:
form.add_error(None, force_text(e))
@ -1013,6 +1017,10 @@ time_period_exception_source_replace = TimePeriodExceptionSourceReplaceView.as_v
class TimePeriodExceptionSourceRefreshView(ManagedDeskSubobjectMixin, DetailView):
model = TimePeriodExceptionSource
def get_queryset(self):
queryset = super(TimePeriodExceptionSourceRefreshView, self).get_queryset()
return queryset.filter(ics_url__isnull=False)
def get(self, request, *args, **kwargs):
try:
source = self.get_object()

View File

@ -370,7 +370,7 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
agenda.save()
desk = Desk(label='Test 11 desk', agenda=agenda)
desk.save()
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
source = TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
@ -386,6 +386,45 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
== 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403).\n'
)
assert source.ics_url is not None
assert source.ics_filename is None
assert source.ics_file.name is None
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == [mock.call('http://example.com/sample.ics', source=source)]
assert import_file_ics.call_args_list == []
source.ics_url = None
source.ics_filename = 'sample.ics'
source.ics_file = ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
source.save()
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == []
assert import_file_ics.call_args_list == [mock.call(mock.ANY, source=source)]
source.ics_file.delete()
source.save()
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == []
assert import_file_ics.call_args_list == []
def test_base_meeting_duration():
agenda = Agenda(label='Meeting', kind='meetings')

View File

@ -3,6 +3,7 @@
from __future__ import unicode_literals
import copy
import json
import os
from django.contrib.auth.models import User, Group
from django.utils.encoding import force_text
@ -1266,6 +1267,7 @@ END:VCALENDAR"""
exception = TimePeriodException.objects.latest('pk')
assert exception.source == source
assert source.ics_filename == 'exceptions.ics'
assert 'exceptions.ics' in source.ics_file.name
assert source.ics_url is None
resp = resp.follow()
assert 'An exception has been imported.' in resp.text
@ -1338,6 +1340,7 @@ END:VCALENDAR"""
exception = TimePeriodException.objects.latest('pk')
assert exception.source == source
assert source.ics_filename is None
assert source.ics_file.name == ''
assert source.ics_url == 'http://example.com/foo.ics'
@ -1555,14 +1558,18 @@ END:VCALENDAR"""
source = TimePeriodExceptionSource.objects.latest('pk')
assert source.timeperiodexception_set.count() == 2
exceptions = list(source.timeperiodexception_set.order_by('pk'))
old_ics_file_path = source.ics_file.path
# replace the source
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
resp = resp.click(href='/manage/time-period-exceptions-source/%d/replace' % source.pk)
resp.form['ics_file'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
resp.form['ics_newfile'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
resp = resp.form.submit().follow()
source.refresh_from_db()
assert source.ics_file.path != old_ics_file_path
assert os.path.exists(old_ics_file_path) is False
assert TimePeriodException.objects.count() == 2
assert source.timeperiodexception_set.count() == 2
new_exceptions = list(source.timeperiodexception_set.order_by('pk'))