agendas: move exception refresh logic (#50723)

This commit is contained in:
Lauréline Guérin 2021-02-09 11:35:38 +01:00
parent cd5d93ca52
commit 2f72bd8287
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
5 changed files with 256 additions and 304 deletions

View File

@ -28,21 +28,15 @@ class Command(BaseCommand):
help = 'Synchronize time period exceptions from desks remote ics'
def handle(self, **options):
for source in TimePeriodExceptionSource.objects.filter(
qs_url = TimePeriodExceptionSource.objects.filter(
Q(ics_file='') | Q(ics_file__isnull=True), ics_url__isnull=False
):
try:
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
except ICSError as e:
print(
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
)
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude(
)
qs_file = TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude(
Q(ics_file='') | Q(ics_file__isnull=True)
):
)
for source in qs_url.union(qs_file):
try:
source.desk.import_timeperiod_exceptions_from_ics_file(source.ics_file, source=source)
source.refresh_timeperiod_exceptions_from_ics()
except ICSError as e:
print(
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr

View File

@ -1415,119 +1415,6 @@ class Desk(models.Model):
in_two_weeks = self.get_exceptions_within_two_weeks()
return len(self.prefetched_exceptions) == len(in_two_weeks)
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
try:
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
% {'url': ics_url, 'status_code': e.response.status_code}
)
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
% {'url': ics_url, 'exception': e}
)
if source is None:
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
try:
# override response encoding received in HTTP headers as it may
# often be missing and defaults to iso-8859-15.
response.content.decode('utf-8')
response.encoding = 'utf-8'
except UnicodeDecodeError:
pass
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
if source is None:
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name, ics_file=ics_file)
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
total_created = 0
if not parsed.contents.get('vevent'):
raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic():
if source.pk is None:
source.save()
# delete old exceptions related to this source
source.timeperiodexception_set.all().delete()
# create new exceptions
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
if 'summary' in vevent.contents:
summary = force_text(vevent.contents['summary'][0].value)
else:
summary = _('Exception')
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
if not is_aware(end_dt):
end_dt = make_aware(end_dt)
duration = end_dt - start_dt
except AttributeError:
try:
duration = vevent.duration.value
end_dt = start_dt + duration
except AttributeError:
# events without end date are considered as ending the same day
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {
'start_datetime': start_dt,
'end_datetime': end_dt,
'label': summary,
'desk': self,
'source': source,
'recurrence_id': 0,
}
if not vevent.rruleset:
# classical event
TimePeriodException.objects.create(**event)
total_created += 1
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
if not is_aware(vevent.rruleset[0]):
from_dt = make_naive(from_dt)
until_dt = make_naive(until_dt)
i = -1
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
# recompute start_dt and end_dt from occurrences and duration
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
event['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt >= update_datetime:
TimePeriodException.objects.create(**event)
total_created += 1
return total_created
def get_opening_hours(self, date):
openslots = IntervalSet()
for timeperiod in self.timeperiod_set.all():
@ -1680,6 +1567,125 @@ class TimePeriodExceptionSource(models.Model):
self.enabled = False
self.save()
def _check_ics_content(self):
if self.ics_url:
try:
response = requests.get(self.ics_url, proxies=settings.REQUESTS_PROXIES)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
% {'url': self.ics_url, 'status_code': e.response.status_code}
)
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
% {'url': self.ics_url, 'exception': e}
)
try:
# override response encoding received in HTTP headers as it may
# often be missing and defaults to iso-8859-15.
response.content.decode('utf-8')
response.encoding = 'utf-8'
except UnicodeDecodeError:
pass
data = response.text
else:
data = force_text(self.ics_file.read())
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
raise ICSError(_('File format is invalid.'))
if not parsed.contents.get('vevent'):
raise ICSError(_('The file doesn\'t contain any events.'))
for vevent in parsed.contents.get('vevent', []):
summary = self._get_summary_from_vevent(vevent)
try:
vevent.dtstart.value
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
return parsed
def _get_summary_from_vevent(self, vevent):
if 'summary' in vevent.contents:
return force_text(vevent.contents['summary'][0].value)
return _('Exception')
def refresh_timeperiod_exceptions(self, data=None):
self.refresh_timeperiod_exceptions_from_ics(data=data)
def refresh_timeperiod_exceptions_from_ics(self, data=None, recurring_days=600):
if data is None:
parsed = self._check_ics_content()
else:
parsed = data
with transaction.atomic():
# delete old exceptions related to this source
self.timeperiodexception_set.all().delete()
# create new exceptions
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
summary = self._get_summary_from_vevent(vevent)
try:
start_dt = vevent.dtstart.value
if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time())
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary)
try:
end_dt = vevent.dtend.value
if not isinstance(end_dt, datetime.datetime):
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time())
if not is_aware(end_dt):
end_dt = make_aware(end_dt)
duration = end_dt - start_dt
except AttributeError:
try:
duration = vevent.duration.value
end_dt = start_dt + duration
except AttributeError:
# events without end date are considered as ending the same day
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {
'start_datetime': start_dt,
'end_datetime': end_dt,
'label': summary,
'desk_id': self.desk_id,
'source': self,
'recurrence_id': 0,
}
if not vevent.rruleset:
# classical event
TimePeriodException.objects.create(**event)
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
until_dt = update_datetime + datetime.timedelta(days=recurring_days)
if not is_aware(vevent.rruleset[0]):
from_dt = make_naive(from_dt)
until_dt = make_naive(until_dt)
i = -1
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)):
# recompute start_dt and end_dt from occurrences and duration
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
event['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt >= update_datetime:
TimePeriodException.objects.create(**event)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)

View File

@ -24,6 +24,7 @@ import uuid
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.db.models import Q, Value, BooleanField
from django.db.models import Min, Max
from django.http import Http404, HttpResponse, HttpResponseRedirect
@ -2212,32 +2213,37 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView
def import_file(self, desk, form):
if form.cleaned_data['ics_file']:
exceptions = desk.import_timeperiod_exceptions_from_ics_file(form.cleaned_data['ics_file'])
form.cleaned_data['ics_file'].seek(0)
return exceptions
ics_file = form.cleaned_data['ics_file']
source = desk.timeperiodexceptionsource_set.create(ics_filename=ics_file.name, ics_file=ics_file)
ics_file.seek(0)
elif form.cleaned_data['ics_url']:
return desk.import_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
source = desk.timeperiodexceptionsource_set.create(ics_url=form.cleaned_data['ics_url'])
parsed = source._check_ics_content()
source._parsed = parsed
return source
def form_valid(self, form):
exceptions = None
desk = self.get_object()
sources = []
try:
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
result = self.import_file(_desk, form)
exceptions = result if exceptions is None else exceptions
else:
exceptions = self.import_file(desk, form)
with transaction.atomic():
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
sources.append(self.import_file(_desk, form))
else:
sources.append(self.import_file(desk, form))
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
if exceptions is not None:
message = ungettext(
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
)
message = message % {'count': exceptions}
messages.info(self.request, message)
try:
for source in sources:
source.refresh_timeperiod_exceptions(data=source._parsed)
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
messages.info(self.request, _('Exceptions will be imported in a few minutes.'))
return super(DeskImportTimePeriodExceptionsView, self).form_valid(form)
@ -2283,37 +2289,25 @@ class TimePeriodExceptionSourceReplaceView(ManagedDeskSubobjectMixin, UpdateView
return queryset.filter(ics_filename__isnull=False)
def import_file(self, desk, form):
exceptions = None
source = desk.timeperiodexceptionsource_set.filter(
ics_filename=self.get_object().ics_filename
).first()
if source is not None:
exceptions = desk.import_timeperiod_exceptions_from_ics_file(
form.cleaned_data['ics_newfile'], source=source
)
form.cleaned_data['ics_newfile'].seek(0)
return exceptions
source.refresh_timeperiod_exceptions()
def form_valid(self, form):
exceptions = None
desk = self.get_object().desk
try:
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
result = self.import_file(_desk, form)
exceptions = result if exceptions is None else exceptions
self.import_file(_desk, form)
else:
exceptions = self.import_file(desk, form)
self.import_file(desk, form)
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
if exceptions is not None:
message = ungettext(
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
)
message = message % {'count': exceptions}
messages.info(self.request, message)
messages.info(self.request, _('Exceptions will be synchronized in a few minutes.'))
return super(TimePeriodExceptionSourceReplaceView, self).form_valid(form)
@ -2328,32 +2322,22 @@ class TimePeriodExceptionSourceRefreshView(ManagedDeskSubobjectMixin, DetailView
return queryset.filter(ics_url__isnull=False)
def import_file(self, desk):
exceptions = None
source = desk.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first()
if source is not None:
exceptions = desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
return exceptions
source.refresh_timeperiod_exceptions()
def get(self, request, *args, **kwargs):
exceptions = None
desk = self.get_object().desk
try:
if desk.agenda.desk_simple_management:
for _desk in desk.agenda.desk_set.all():
result = self.import_file(_desk)
exceptions = result if exceptions is None else exceptions
self.import_file(_desk)
else:
exceptions = self.import_file(desk)
self.import_file(desk)
except ICSError as e:
messages.error(self.request, force_text(e))
if exceptions is not None:
message = ungettext(
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
)
message = message % {'count': exceptions}
messages.info(self.request, message)
messages.info(self.request, _('Exceptions will be synchronized in a few minutes.'))
# redirect to settings
return HttpResponseRedirect(reverse('chrono-manager-agenda-settings', kwargs={'pk': desk.agenda_id}))

View File

@ -451,22 +451,18 @@ def test_meeting_type_slugs():
def test_timeperiodexception_creation_from_ics():
agenda = Agenda(label=u'Test 1 agenda')
agenda.save()
desk = Desk(label='Test 1 desk', agenda=agenda)
desk.save()
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE, name='sample.ics')
agenda = Agenda.objects.create(label=u'Test 1 agenda')
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics')
)
assert exceptions_count == 2
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
def test_timeperiodexception_creation_from_ics_without_startdt():
agenda = Agenda(label=u'Test 2 agenda')
agenda.save()
desk = Desk(label='Test 2 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 2 agenda')
desk = Desk.objects.create(label='Test 2 desk', agenda=agenda)
lines = []
# remove start datetimes from ics
for line in ICS_SAMPLE.splitlines():
@ -474,16 +470,15 @@ def test_timeperiodexception_creation_from_ics_without_startdt():
continue
lines.append(line)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample)
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
source._check_ics_content()
assert 'Event "Événement 1" has no start date.' == str(e.value)
def test_timeperiodexception_creation_from_ics_without_enddt():
agenda = Agenda(label=u'Test 3 agenda')
agenda.save()
desk = Desk(label='Test 3 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 3 agenda')
desk = Desk.objects.create(label='Test 3 desk', agenda=agenda)
lines = []
# remove end datetimes from ics
for line in ICS_SAMPLE.splitlines():
@ -491,7 +486,8 @@ def test_timeperiodexception_creation_from_ics_without_enddt():
continue
lines.append(line)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample)
source.refresh_timeperiod_exceptions_from_ics()
for exception in TimePeriodException.objects.filter(desk=desk):
end_time = localtime(exception.end_datetime).time()
assert end_time == datetime.time(23, 59, 59, 999999)
@ -499,24 +495,18 @@ def test_timeperiodexception_creation_from_ics_without_enddt():
@pytest.mark.freeze_time('2017-12-01')
def test_timeperiodexception_creation_from_ics_with_recurrences():
agenda = Agenda(label=u'Test 4 agenda')
agenda.save()
desk = Desk(label='Test 4 desk', agenda=agenda)
desk.save()
assert (
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
)
== 3
agenda = Agenda.objects.create(label=u'Test 4 agenda')
desk = Desk.objects.create(label='Test 4 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 3
def test_timeexception_creation_from_ics_with_dates():
agenda = Agenda(label=u'Test 5 agenda')
agenda.save()
desk = Desk(label='Test 5 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 5 agenda')
desk = Desk.objects.create(label='Test 5 desk', agenda=agenda)
lines = []
# remove end datetimes from ics
for line in ICS_SAMPLE_WITH_RECURRENT_EVENT.splitlines():
@ -524,76 +514,73 @@ def test_timeexception_creation_from_ics_with_dates():
continue
lines.append(line)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
assert exceptions_count == 2
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
for exception in TimePeriodException.objects.filter(desk=desk):
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
assert localtime(exception.end_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
def test_timeexception_create_from_invalid_ics():
agenda = Agenda(label=u'Test 6 agenda')
agenda.save()
desk = Desk(label='Test 6 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 6 agenda')
desk = Desk.objects.create(label='Test 6 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(INVALID_ICS_SAMPLE, name='sample.ics')
)
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
source._check_ics_content()
assert str(e.value) == 'File format is invalid.'
def test_timeexception_create_from_ics_with_no_events():
agenda = Agenda(label=u'Test 7 agenda')
agenda.save()
desk = Desk(label='Test 7 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 7 agenda')
desk = Desk.objects.create(label='Test 7 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
)
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
)
source._check_ics_content()
assert str(e.value) == "The file doesn't contain any events."
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 8 agenda')
agenda.save()
desk = Desk(label='Test 8 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 8 agenda')
desk = Desk.objects.create(label='Test 8 desk', agenda=agenda)
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 2
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics')
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
mocked_get.return_value = mocked_response
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
source._check_ics_content()
assert str(e.value) == "The file doesn't contain any events."
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_remote_ics_encoding(mocked_get):
agenda = Agenda(label=u'Test 8 agenda')
agenda.save()
desk = Desk(label='Test 8 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 8 agenda')
desk = Desk.objects.create(label='Test 8 desk', agenda=agenda)
mocked_response = mock.Mock()
mocked_response.content = ICS_SAMPLE.encode('iso-8859-15')
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 2
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics')
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()]
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 9 agenda')
agenda.save()
desk = Desk(label='Test 9 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 9 agenda')
desk = Desk.objects.create(label='Test 9 desk', agenda=agenda)
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
@ -601,18 +588,17 @@ def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
def mocked_requests_connection_error(*args, **kwargs):
raise requests.ConnectionError('unreachable')
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics')
mocked_get.side_effect = mocked_requests_connection_error
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
source._check_ics_content()
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 10 agenda')
agenda.save()
desk = Desk(label='Test 10 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 10 agenda')
desk = Desk.objects.create(label='Test 10 desk', agenda=agenda)
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
@ -620,10 +606,10 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
def mocked_requests_http_forbidden_error(*args, **kwargs):
raise requests.HTTPError(response=mocked_response)
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics')
mocked_get.side_effect = mocked_requests_http_forbidden_error
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
source._check_ics_content()
assert (
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
)
@ -631,10 +617,8 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
@mock.patch('chrono.agendas.models.requests.get')
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
agenda = Agenda(label=u'Test 11 agenda')
agenda.save()
desk = Desk(label='Test 11 desk', agenda=agenda)
desk.save()
agenda = Agenda.objects.create(label=u'Test 11 agenda')
desk = Desk.objects.create(label='Test 11 desk', agenda=agenda)
source = TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
mocked_response = mock.Mock()
mocked_response.status_code = 403
@ -655,50 +639,34 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
assert source.ics_filename is None
assert source.ics_file.name is None
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == [mock.call('http://example.com/sample.ics', source=source)]
assert import_file_ics.call_args_list == []
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics'
) as refresh:
call_command('sync_desks_timeperiod_exceptions')
assert refresh.call_args_list == [mock.call()]
source.ics_url = None
source.ics_filename = 'sample.ics'
source.ics_file = ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
source.save()
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == []
assert import_file_ics.call_args_list == [mock.call(mock.ANY, source=source)]
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics'
) as refresh:
call_command('sync_desks_timeperiod_exceptions')
assert refresh.call_args_list == [mock.call()]
TimePeriodExceptionSource.objects.update(ics_file='')
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == []
assert import_file_ics.call_args_list == []
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics'
) as refresh:
call_command('sync_desks_timeperiod_exceptions')
assert refresh.call_args_list == []
TimePeriodExceptionSource.objects.update(ics_file=None)
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
) as import_remote_ics:
with mock.patch(
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
) as import_file_ics:
call_command('sync_desks_timeperiod_exceptions')
assert import_remote_ics.call_args_list == []
assert import_file_ics.call_args_list == []
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics'
) as refresh:
call_command('sync_desks_timeperiod_exceptions')
assert refresh.call_args_list == []
@override_settings(
@ -811,14 +779,12 @@ def test_base_meeting_duration():
def test_timeperiodexception_creation_from_ics_with_duration():
# test that event defined using duration works and give the same start and
# end dates
agenda = Agenda(label=u'Test 1 agenda')
agenda.save()
desk = Desk(label='Test 1 desk', agenda=agenda)
desk.save()
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
agenda = Agenda.objects.create(label=u'Test 1 agenda')
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
)
assert exceptions_count == 2
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
[
@ -838,16 +804,13 @@ def test_timeperiodexception_creation_from_ics_with_duration():
def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
# test that recurrent events before today are not created
# also test that duration + recurrent events works
agenda = Agenda(label=u'Test 4 agenda')
agenda.save()
desk = Desk(label='Test 4 desk', agenda=agenda)
desk.save()
assert (
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
)
== 2
agenda = Agenda.objects.create(label=u'Test 4 agenda')
desk = Desk.objects.create(label='Test 4 desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics',
ics_file=ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics'),
)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
@ -855,11 +818,13 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
agenda = Agenda(label=u'Test atreal agenda')
agenda.save()
desk = Desk(label='Test atreal desk', agenda=agenda)
desk.save()
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
agenda = Agenda.objects.create(label=u'Test atreal agenda')
desk = Desk.objects.create(label='Test atreal desk', agenda=agenda)
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_ATREAL, name='sample.ics')
)
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).exists()
def test_management_role_deletion():
@ -1245,19 +1210,18 @@ def test_desk_duplicate():
def test_desk_duplicate_exception_sources():
agenda = Agenda.objects.create(label='Agenda')
desk = Desk.objects.create(label='Desk', agenda=agenda)
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE, name='sample.ics')
source = desk.timeperiodexceptionsource_set.create(
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics')
)
source = desk.timeperiodexceptionsource_set.get(ics_filename='sample.ics')
assert exceptions_count == 2
source.refresh_timeperiod_exceptions_from_ics()
assert TimePeriodException.objects.filter(desk=desk).count() == 2
new_desk = desk.duplicate(label="New Desk")
new_source = new_desk.timeperiodexceptionsource_set.get(ics_filename='sample.ics')
assert new_desk.timeperiodexception_set.count() == exceptions_count
assert new_desk.timeperiodexception_set.count() == 2
source.delete()
assert new_desk.timeperiodexception_set.count() == exceptions_count
assert new_desk.timeperiodexception_set.count() == 2
new_source.delete()
assert not new_desk.timeperiodexception_set.exists()

View File

@ -2937,9 +2937,11 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user):
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
resp = resp.form.submit(status=200)
assert 'Please provide an ICS File or an URL.' in resp.text
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0
resp.form['ics_file'] = Upload('exceptions.ics', b'invalid content', 'text/calendar')
resp = resp.form.submit(status=200)
assert 'File format is invalid' in resp.text
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0
ics_with_no_start_date = b"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
@ -2951,6 +2953,7 @@ END:VCALENDAR"""
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar')
resp = resp.form.submit(status=200)
assert 'Event "New Year's Eve" has no start date.' in resp.text
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0
ics_with_no_events = b"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
@ -2958,6 +2961,7 @@ END:VCALENDAR"""
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_events, 'text/calendar')
resp = resp.form.submit(status=200)
assert "The file doesn't contain any events." in resp.text
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0
ics_with_exceptions = b"""BEGIN:VCALENDAR
VERSION:2.0
@ -2980,7 +2984,7 @@ END:VCALENDAR"""
assert 'exceptions.ics' in source.ics_file.name
assert source.ics_url is None
resp = resp.follow()
assert 'An exception has been imported.' in resp.text
assert 'Exceptions will be imported in a few minutes.' in resp.text
@pytest.mark.freeze_time('2017-12-01')