agenda: add support for remote calendar file with exceptions (#19070)

Remote calendars can be specified by desk and updated hourly, or used once.
This commit is contained in:
Serghei Mihai 2017-10-03 23:15:00 +02:00
parent 20aa74b697
commit 71f5b91cbe
14 changed files with 475 additions and 27 deletions

View File

View File

@ -0,0 +1,32 @@
# chrono - agendas system
# Copyright (C) 2016-2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import six
import sys
from chrono.agendas.models import Desk, ICSError
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = 'Synchronize time period exceptions from desks remote ics'
def handle(self, **options):
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
try:
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
except ICSError as e:
print >> sys.stderr, u'unable to create timeperiod exceptions for "%s": %s' % (desk, e)

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import datetime
from django.utils.timezone import utc
class Migration(migrations.Migration):
dependencies = [
('agendas', '0019_timeperiodexception'),
]
operations = [
migrations.AddField(
model_name='desk',
name='timeperiod_exceptions_remote_url',
field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True),
),
migrations.AddField(
model_name='timeperiodexception',
name='external_id',
field=models.CharField(max_length=256, verbose_name='External ID', blank=True),
),
migrations.AddField(
model_name='timeperiodexception',
name='update_datetime',
field=models.DateTimeField(default=datetime.datetime(2017, 11, 2, 10, 21, 1, 826837, tzinfo=utc), auto_now=True),
preserve_default=False,
),
]

View File

@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime import datetime
import requests
import vobject import vobject
from django.contrib.auth.models import Group from django.contrib.auth.models import Group
@ -28,7 +29,7 @@ from django.utils.dates import WEEKDAYS
from django.utils.encoding import force_text from django.utils.encoding import force_text
from django.utils.formats import date_format, get_format from django.utils.formats import date_format, get_format
from django.utils.text import slugify from django.utils.text import slugify
from django.utils.timezone import localtime, now, make_aware, make_naive from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from jsonfield import JSONField from jsonfield import JSONField
@ -358,6 +359,8 @@ class Desk(models.Model):
agenda = models.ForeignKey(Agenda) agenda = models.ForeignKey(Agenda)
label = models.CharField(_('Label'), max_length=150) label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=150) slug = models.SlugField(_('Identifier'), max_length=150)
timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'),
blank=True)
def __unicode__(self): def __unicode__(self):
return self.label return self.label
@ -417,7 +420,21 @@ class Desk(models.Model):
in_two_weeks = self.get_exceptions_within_two_weeks() in_two_weeks = self.get_exceptions_within_two_weeks()
return self.timeperiodexception_set.count() == len(in_two_weeks) return self.timeperiodexception_set.count() == len(in_two_weeks)
def create_timeperiod_exceptions_from_ics(self, data): def create_timeperiod_exceptions_from_remote_ics(self, url):
try:
response = requests.get(url)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code)
except requests.RequestException as e:
raise ICSError(_('Failed to retrieve remote calendar (%s).') % e)
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
def remove_timeperiod_exceptions_from_remote_ics(self):
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False):
try: try:
parsed = vobject.readOne(data) parsed = vobject.readOne(data)
except vobject.base.ParseError: except vobject.base.ParseError:
@ -425,12 +442,14 @@ class Desk(models.Model):
total_created = 0 total_created = 0
if not parsed.contents.get('vevent'): if not parsed.contents.get('vevent') and not keep_synced_by_uid:
raise ICSError(_('The file doesn\'t contain any events.')) raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic(): with transaction.atomic():
for vevent in parsed.contents['vevent']: update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
event = {} event = {}
summary = vevent.contents['summary'][0].value summary = vevent.contents['summary'][0].value
if not isinstance(summary, unicode): if not isinstance(summary, unicode):
summary = unicode(summary, 'utf-8') summary = unicode(summary, 'utf-8')
@ -441,7 +460,10 @@ class Desk(models.Model):
if not isinstance(start_dt, datetime.datetime): if not isinstance(start_dt, datetime.datetime):
start_dt = datetime.datetime.combine(start_dt, start_dt = datetime.datetime.combine(start_dt,
datetime.datetime.min.time()) datetime.datetime.min.time())
event['start_datetime'] = start_dt if not is_aware(start_dt):
event['start_datetime'] = make_aware(start_dt)
else:
event['start_datetime'] = start_dt
except AttributeError: except AttributeError:
raise ICSError(_('Event "%s" has no start date.') % summary) raise ICSError(_('Event "%s" has no start date.') % summary)
try: try:
@ -452,21 +474,36 @@ class Desk(models.Model):
except AttributeError: except AttributeError:
# events without end date are considered as ending the same day # events without end date are considered as ending the same day
end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time()) end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time())
event['end_datetime'] = end_dt if not is_aware(end_dt):
event['end_datetime'] = make_aware(end_dt)
obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary, else:
**event) event['end_datetime'] = end_dt
if keep_synced_by_uid:
external_id = vevent.contents['uid'][0].value
event['label'] = summary
obj, created = TimePeriodException.objects.update_or_create(desk=self, external_id=external_id,
defaults=event)
else:
obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event)
# return total_created
if created: if created:
total_created += 1 total_created += 1
if keep_synced_by_uid:
# delete all outdated exceptions from remote calendar
TimePeriodException.objects.filter(update_datetime__lt=update_datetime,
desk=self).exclude(external_id='').delete()
return total_created return total_created
class TimePeriodException(models.Model): class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk) desk = models.ForeignKey(Desk)
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True) label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time')) start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time')) end_datetime = models.DateTimeField(_('Exception end time'))
update_datetime = models.DateTimeField(auto_now=True)
class Meta: class Meta:
ordering = ['start_datetime'] ordering = ['start_datetime']

View File

@ -83,7 +83,7 @@ class NewDeskForm(forms.ModelForm):
widgets = { widgets = {
'agenda': forms.HiddenInput(), 'agenda': forms.HiddenInput(),
} }
exclude = ['slug'] exclude = ['slug', 'timeperiod_exceptions_remote_url']
class DeskForm(forms.ModelForm): class DeskForm(forms.ModelForm):
@ -92,7 +92,7 @@ class DeskForm(forms.ModelForm):
widgets = { widgets = {
'agenda': forms.HiddenInput(), 'agenda': forms.HiddenInput(),
} }
exclude = [] exclude = ['timeperiod_exceptions_remote_url']
class TimePeriodExceptionForm(forms.ModelForm): class TimePeriodExceptionForm(forms.ModelForm):
@ -170,5 +170,7 @@ class ExceptionsImportForm(forms.ModelForm):
model = Desk model = Desk
fields = [] fields = []
ics_file = forms.FileField(label=_('ICS File'), ics_file = forms.FileField(label=_('ICS File'), required=False,
help_text=_('ICS file containing events which will be considered as exceptions')) help_text=_('ICS file containing events which will be considered as exceptions'))
ics_url = forms.URLField(label=_('URL'), required=False,
help_text=_('URL to remote calendar which will be synchronised hourly'))

View File

@ -13,6 +13,7 @@
{% block content %} {% block content %}
<form method="post" enctype="multipart/form-data"> <form method="post" enctype="multipart/form-data">
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
{% csrf_token %} {% csrf_token %}
{{ form.as_p }} {{ form.as_p }}
<p> <p>

View File

@ -394,16 +394,28 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView
form_class = ExceptionsImportForm form_class = ExceptionsImportForm
template_name = 'chrono/manager_import_exceptions.html' template_name = 'chrono/manager_import_exceptions.html'
def get_initial(self):
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
def form_valid(self, form): def form_valid(self, form):
exceptions = None
try: try:
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) if form.cleaned_data['ics_file']:
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
elif form.cleaned_data['ics_url']:
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
else:
form.instance.remove_timeperiod_exceptions_from_remote_ics()
except ICSError as e: except ICSError as e:
form.add_error(None, unicode(e)) form.add_error(None, unicode(e))
return self.form_invalid(form) return self.form_invalid(form)
message = ungettext('An exception has been imported.', form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
'%(count)d exceptions have been imported.', exceptions) form.instance.save()
message = message % {'count': exceptions} if exceptions is not None:
messages.info(self.request, message) message = ungettext('An exception has been imported.',
'%(count)d exceptions have been imported.', exceptions)
message = message % {'count': exceptions}
messages.info(self.request, message)
return super(DeskImportTimePeriodExceptionsView, self).form_valid(form) return super(DeskImportTimePeriodExceptionsView, self).form_valid(form)
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view() desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view()

3
debian/chrono.cron.d vendored Normal file
View File

@ -0,0 +1,3 @@
MAILTO=root
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants

3
debian/control vendored
View File

@ -11,7 +11,8 @@ Architecture: all
Depends: ${misc:Depends}, ${python:Depends}, Depends: ${misc:Depends}, ${python:Depends},
python-django (>= 1.8), python-django (>= 1.8),
python-gadjo, python-gadjo,
python-intervaltree python-intervaltree,
python-requests
Recommends: python-django-mellon Recommends: python-django-mellon
Description: Agendas System (Python module) Description: Agendas System (Python module)

View File

@ -3,3 +3,5 @@ gadjo
djangorestframework>=3.1, <3.7 djangorestframework>=3.1, <3.7
django-jsonfield >= 0.9.3 django-jsonfield >= 0.9.3
intervaltree intervaltree
requests
vobject

View File

@ -107,7 +107,8 @@ setup(
'djangorestframework>=3.1, <3.7', 'djangorestframework>=3.1, <3.7',
'django-jsonfield >= 0.9.3', 'django-jsonfield >= 0.9.3',
'intervaltree', 'intervaltree',
'vobject' 'vobject',
'requests'
], ],
zip_safe=False, zip_safe=False,
cmdclass={ cmdclass={

View File

@ -1,7 +1,13 @@
import pytest import pytest
import datetime import datetime
import mock
import re
import requests
from django.utils.timezone import now, make_aware, localtime from django.utils.timezone import now, make_aware, localtime
from django.core.management import call_command
from django.core.management.base import CommandError
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType, from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
Desk, TimePeriodException, ICSError) Desk, TimePeriodException, ICSError)
@ -22,8 +28,8 @@ END:VEVENT
BEGIN:VEVENT BEGIN:VEVENT
DTSTAMP:20170824T092855Z DTSTAMP:20170824T092855Z
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4 UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
DTSTART:20170831T180800Z DTSTART:20170830T180800Z
DTEND:20170831T213400Z DTEND:20170831T223400Z
SEQUENCE:2 SEQUENCE:2
SUMMARY:Event 2 SUMMARY:Event 2
END:VEVENT END:VEVENT
@ -43,7 +49,7 @@ BEGIN:VEVENT
DTSTAMP:20170824T082855Z DTSTAMP:20170824T082855Z
DTSTART:20180101 DTSTART:20180101
DTEND:20180101 DTEND:20180101
SUMMARY:New eve SUMMARY:New Year's Eve
RRULE:FREQ=YEARLY RRULE:FREQ=YEARLY
END:VEVENT END:VEVENT
END:VCALENDAR""" END:VCALENDAR"""
@ -220,3 +226,107 @@ def test_timeexception_create_from_ics_with_no_events():
with pytest.raises(ICSError) as e: with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS) exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
assert str(e.value) == "The file doesn't contain any events." assert str(e.value) == "The file doesn't contain any events."
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 8 agenda')
agenda.save()
desk = Desk(label='Test 8 desk', agenda=agenda)
desk.save()
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 2
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
mocked_get.return_value = mocked_response
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
assert 'New summary ' in timeperiod.label
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
mocked_get.return_value = mocked_response
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 0
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 9 agenda')
agenda.save()
desk = Desk(label='Test 9 desk', agenda=agenda)
desk.save()
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
def mocked_requests_connection_error(*args, **kwargs):
raise requests.ConnectionError('unreachable')
mocked_get.side_effect = mocked_requests_connection_error
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert str(e.value) == "Failed to retrieve remote calendar (unreachable)."
@mock.patch('chrono.agendas.models.requests.get')
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
agenda = Agenda(label=u'Test 10 agenda')
agenda.save()
desk = Desk(label='Test 10 desk', agenda=agenda)
desk.save()
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
def mocked_requests_http_forbidden_error(*args, **kwargs):
raise requests.HTTPError(response=mocked_response)
mocked_get.side_effect = mocked_requests_http_forbidden_error
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)."
@mock.patch('chrono.agendas.models.requests.get')
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
agenda = Agenda(label=u'Test 11 agenda')
agenda.save()
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
desk.save()
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
def mocked_requests_http_forbidden_error(*args, **kwargs):
raise requests.HTTPError(response=mocked_response)
mocked_get.side_effect = mocked_requests_http_forbidden_error
call_command('sync_desks_timeperiod_exceptions')
out, err = capsys.readouterr()
assert err == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).\n'
@mock.patch('chrono.agendas.models.requests.get')
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
agenda = Agenda(label=u'Test 11 agenda')
agenda.save()
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
desk.save()
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert TimePeriodException.objects.filter(desk=desk).count() == 2
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20180824T082855Z
UID:new-and-unique-uid
DTSTART:20180831T170800Z
DTEND:20180831T203400Z
SUMMARY:Wonderfull event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'new-and-unique-uid'
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert not TimePeriodException.objects.filter(desk=desk).exists()

View File

@ -3,7 +3,9 @@
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User, Group
from django.utils.timezone import make_aware, now, localtime from django.utils.timezone import make_aware, now, localtime
import datetime import datetime
import mock
import pytest import pytest
import requests
from webtest import TestApp, Upload from webtest import TestApp, Upload
from chrono.wsgi import application from chrono.wsgi import application
@ -840,6 +842,10 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user):
resp = app.get('/manage/agendas/%d/' % agenda.pk) resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' in resp.content assert 'Import exceptions from .ics' in resp.content
resp = resp.click('upload') resp = resp.click('upload')
assert "You can upload a file or specify an address to a remote calendar." in resp
resp = resp.form.submit(status=302)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar') resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar')
resp = resp.form.submit(status=200) resp = resp.form.submit(status=200)
assert 'File format is invalid' in resp.content assert 'File format is invalid' in resp.content
@ -849,7 +855,7 @@ PRODID:-//foo.bar//EN
BEGIN:VEVENT BEGIN:VEVENT
DTSTART:20180101 DTSTART:20180101
DTEND:20180101 DTEND:20180101
SUMMARY:New eve SUMMARY:New Year's Eve
RRULE:FREQ=YEARLY RRULE:FREQ=YEARLY
END:VEVENT END:VEVENT
END:VCALENDAR""" END:VCALENDAR"""
@ -861,12 +867,12 @@ VERSION:2.0
PRODID:-//foo.bar//EN PRODID:-//foo.bar//EN
BEGIN:VEVENT BEGIN:VEVENT
DTEND:20180101 DTEND:20180101
SUMMARY:New eve SUMMARY:New Year's Eve
END:VEVENT END:VEVENT
END:VCALENDAR""" END:VCALENDAR"""
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar') resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar')
resp = resp.form.submit(status=200) resp = resp.form.submit(status=200)
assert 'Event &quot;New eve&quot; has no start date.' in resp.content assert 'Event &quot;New Year&#39;s Eve&quot; has no start date.' in resp.content
ics_with_no_events = """BEGIN:VCALENDAR ics_with_no_events = """BEGIN:VCALENDAR
VERSION:2.0 VERSION:2.0
PRODID:-//foo.bar//EN PRODID:-//foo.bar//EN
@ -881,11 +887,220 @@ PRODID:-//foo.bar//EN
BEGIN:VEVENT BEGIN:VEVENT
DTSTART:20180101 DTSTART:20180101
DTEND:20180101 DTEND:20180101
SUMMARY:New eve SUMMARY:New Year's Eve
END:VEVENT END:VEVENT
END:VCALENDAR""" END:VCALENDAR"""
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar') resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
resp = resp.form.submit(status=302) resp = resp.form.submit(status=302)
assert TimePeriodException.objects.count() == 1 assert TimePeriodException.objects.filter(desk=desk).count() == 1
resp = resp.follow() resp = resp.follow()
assert 'An exception has been imported.' in resp.content assert 'An exception has been imported.' in resp.content
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
assert 'ics_file' in resp.form.fields
assert 'ics_url' in resp.form.fields
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:random-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'random-event-id'
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = ''
resp = resp.form.submit(status=302)
assert not TimePeriodException.objects.filter(desk=desk,
external_id='desk-%s:random-event-id' % desk.id).exists()
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:random-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'random-event-id'
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
END:VCALENDAR"""
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp = resp.form.submit(status=302)
assert not TimePeriodException.objects.filter(desk=desk,
external_id='random-event-id').exists()
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:first-eventrandom-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:First test event
END:VEVENT
BEGIN:VEVENT
UID:second-eventrandom-event-id
DTSTART:20190101
DTEND:20190101
SUMMARY:Second test event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 2
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:secord-eventrandom-event-id
DTSTART:20190101
DTEND:20190101
SUMMARY:Second test event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
assert 'ics_file' in resp.form.fields
assert 'ics_url' in resp.form.fields
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_get.return_value = mocked_response
def mocked_requests_connection_error(*args, **kwargs):
raise requests.exceptions.ConnectionError('unreachable')
mocked_get.side_effect = mocked_requests_connection_error
resp = resp.form.submit(status=200)
assert 'Failed to retrieve remote calendar (unreachable).' in resp.content
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
def mocked_requests_http_forbidden_error(*args, **kwargs):
raise requests.exceptions.HTTPError(response=mocked_response)
mocked_get.side_effect = mocked_requests_http_forbidden_error
resp = resp.form.submit(status=200)
assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk)
assert 'Import exceptions from .ics' not in resp.content
TimePeriod.objects.create(weekday=1, desk=desk,
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
resp = app.get('/manage/agendas/%d/' % agenda.pk)
resp = resp.click('upload')
resp.form['ics_url'] = 'https://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_get.return_value = mocked_response
def mocked_requests_http_ssl_error(*args, **kwargs):
raise requests.exceptions.SSLError('SSL error')
mocked_get.side_effect = mocked_requests_http_ssl_error
resp = resp.form.submit(status=200)
assert 'Failed to retrieve remote calendar (SSL error).' in resp.content