agendas: add exception source model (#29209)

This commit is contained in:
Lauréline Guérin 2019-12-10 15:28:39 +01:00
parent 30bbc8c90f
commit a5a8a3fe3d
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
11 changed files with 288 additions and 258 deletions

View File

@ -18,16 +18,19 @@ from __future__ import print_function
import sys
from chrono.agendas.models import Desk, ICSError
from django.core.management.base import BaseCommand
from chrono.agendas.models import ICSError, TimePeriodExceptionSource
class Command(BaseCommand):
help = 'Synchronize time period exceptions from desks remote ics'
def handle(self, **options):
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
try:
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
except ICSError as e:
print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
print(
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
)

View File

@ -54,10 +54,10 @@ class Migration(migrations.Migration):
model_name='agenda',
name='kind',
field=models.CharField(
default=b'events',
default='events',
max_length=20,
verbose_name='Kind',
choices=[(b'events', 'Events'), (b'meetings', 'Meetings')],
choices=[('events', 'Events'), ('meetings', 'Meetings')],
),
),
migrations.AddField(

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agendas', '0032_auto_20191127_0919'),
]
operations = [
migrations.CreateModel(
name='TimePeriodExceptionSource',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('ics_filename', models.CharField(max_length=256, null=True)),
('ics_url', models.URLField(null=True, max_length=500)),
('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
],
),
migrations.AddField(
model_name='timeperiodexception',
name='source',
field=models.ForeignKey(
null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'
),
),
migrations.AlterField(
model_name='desk',
name='timeperiod_exceptions_remote_url',
field=models.URLField(
blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'
),
),
migrations.AlterField(
model_name='timeperiodexception',
name='external_id',
field=models.CharField(blank=True, max_length=256, null=True, verbose_name='External ID'),
),
]

View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
def create_source(apps, schema_editor):
Desk = apps.get_model('agendas', 'Desk')
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
# create a source for each remote url
source = TimePeriodExceptionSource.objects.create(
desk=desk, ics_url=desk.timeperiod_exceptions_remote_url
)
# clear timeperiod_exceptions_remote_url
desk.timeperiod_exceptions_remote_url = None
desk.save()
# attach exceptions to the created source
desk.timeperiodexception_set.filter(external_id__isnull=False).exclude(external_id='').update(
source=source
)
def init_remote_url(apps, schema_editor):
Desk = apps.get_model('agendas', 'Desk')
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
# set timeperiod_exceptions_remote_url
source.desk.timeperiod_exceptions_remote_url = source.ics_url
source.desk.save()
# unlink exceptions
source.timeperiodexception_set.update(source=None)
# delete the source
source.delete()
# reset remote_url
for desk in Desk.objects.filter(timeperiod_exceptions_remote_url__isnull=True):
desk.timeperiod_exceptions_remote_url = ''
desk.save()
class Migration(migrations.Migration):
dependencies = [
('agendas', '0033_timeperiodexceptionsource'),
]
operations = [
migrations.RunPython(create_source, init_remote_url),
]

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2019-12-09 14:24
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('agendas', '0034_initial_source'),
]
operations = [
migrations.RemoveField(model_name='desk', name='timeperiod_exceptions_remote_url',),
migrations.RemoveField(model_name='timeperiodexception', name='external_id',),
]

View File

@ -469,9 +469,6 @@ class Desk(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160)
timeperiod_exceptions_remote_url = models.URLField(
_('URL to fetch time period exceptions from'), blank=True, max_length=500
)
def __str__(self):
return self.label
@ -528,27 +525,31 @@ class Desk(models.Model):
in_two_weeks = self.get_exceptions_within_two_weeks()
return self.timeperiodexception_set.count() == len(in_two_weeks)
def create_timeperiod_exceptions_from_remote_ics(self, url):
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
try:
response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
response.raise_for_status()
except requests.HTTPError as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
% {'url': url, 'status_code': e.response.status_code}
% {'url': ics_url, 'status_code': e.response.status_code}
)
except requests.RequestException as e:
raise ICSError(
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
% {'url': url, 'exception': e}
% {'url': ics_url, 'exception': e}
)
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
if source is None:
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
def remove_timeperiod_exceptions_from_remote_ics(self):
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
if source is None:
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
try:
parsed = vobject.readOne(data)
except vobject.base.ParseError:
@ -556,10 +557,15 @@ class Desk(models.Model):
total_created = 0
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
if not parsed.contents.get('vevent'):
raise ICSError(_('The file doesn\'t contain any events.'))
with transaction.atomic():
if source.pk is None:
source.save()
# delete old exceptions related to this source
source.timeperiodexception_set.all().delete()
# create new exceptions
update_datetime = now()
for vevent in parsed.contents.get('vevent', []):
if 'summary' in vevent.contents:
@ -590,24 +596,19 @@ class Desk(models.Model):
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
duration = end_dt - start_dt
event = {}
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
event['label'] = summary
kwargs = {}
kwargs['desk'] = self
kwargs['recurrence_id'] = 0
if keep_synced_by_uid:
kwargs['external_id'] = vevent.contents['uid'][0].value
else:
kwargs['label'] = summary
event = {
'start_datetime': start_dt,
'end_datetime': end_dt,
'label': summary,
'desk': self,
'source': source,
'recurrence_id': 0,
}
if not vevent.rruleset:
# classical event
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
if created:
total_created += 1
TimePeriodException.objects.create(**event)
total_created += 1
elif vevent.rruleset.count():
# recurring event until recurring_days in the future
from_dt = start_dt
@ -621,26 +622,12 @@ class Desk(models.Model):
if not is_aware(start_dt):
start_dt = make_aware(start_dt)
end_dt = start_dt + duration
kwargs['recurrence_id'] = i
event['recurrence_id'] = i
event['start_datetime'] = start_dt
event['end_datetime'] = end_dt
if end_dt < update_datetime:
TimePeriodException.objects.filter(**kwargs).update(**event)
else:
obj, created = TimePeriodException.objects.update_or_create(
defaults=event, **kwargs
)
if created:
total_created += 1
# delete unseen occurrences
kwargs.pop('recurrence_id', None)
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
if keep_synced_by_uid:
# delete all outdated exceptions from remote calendar
TimePeriodException.objects.filter(update_datetime__lt=update_datetime, desk=self).exclude(
external_id=''
).delete()
if end_dt >= update_datetime:
TimePeriodException.objects.create(**event)
total_created += 1
return total_created
@ -661,10 +648,22 @@ class Desk(models.Model):
return openslots.search(aware_date, aware_next_date)
@python_2_unicode_compatible
class TimePeriodExceptionSource(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
ics_filename = models.CharField(null=True, max_length=256)
ics_url = models.URLField(null=True, max_length=500)
def __str__(self):
if self.ics_filename is not None:
return self.ics_filename
return self.ics_url
@python_2_unicode_compatible
class TimePeriodException(models.Model):
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
start_datetime = models.DateTimeField(_('Exception start time'))
end_datetime = models.DateTimeField(_('Exception end time'))
@ -739,7 +738,6 @@ class TimePeriodException(models.Model):
'label': self.label,
'start_datetime': export_datetime(self.start_datetime),
'end_datetime': export_datetime(self.end_datetime),
'external_id': self.external_id,
'recurrence_id': self.recurrence_id,
'update_datetime': export_datetime(self.update_datetime),
}

View File

@ -142,7 +142,7 @@ class NewDeskForm(forms.ModelForm):
widgets = {
'agenda': forms.HiddenInput(),
}
exclude = ['slug', 'timeperiod_exceptions_remote_url']
exclude = ['slug']
class DeskForm(forms.ModelForm):
@ -151,7 +151,7 @@ class DeskForm(forms.ModelForm):
widgets = {
'agenda': forms.HiddenInput(),
}
exclude = ['timeperiod_exceptions_remote_url']
exclude = []
class TimePeriodExceptionForm(forms.ModelForm):
@ -262,10 +262,6 @@ class ImportEventsForm(forms.Form):
class ExceptionsImportForm(forms.ModelForm):
class Meta:
model = Desk
fields = []
ics_file = forms.FileField(
label=_('ICS File'),
required=False,
@ -277,6 +273,15 @@ class ExceptionsImportForm(forms.ModelForm):
help_text=_('URL to remote calendar which will be synchronised hourly.'),
)
class Meta:
model = Desk
fields = []
def clean(self, *args, **kwargs):
cleaned_data = super().clean(*args, **kwargs)
if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
class AgendasImportForm(forms.Form):
agendas_json = forms.FileField(label=_('Agendas Export File'))

View File

@ -13,7 +13,37 @@
{% block content %}
<form method="post" enctype="multipart/form-data">
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
{% if exception_sources %}
<table class="main">
<thead>
<tr>
<th>{% trans "Exceptions" %}</th>
<th></th>
<th></th>
</tr>
</thead>
<tbody>
{% for object in exception_sources %}
<tr>
<td>
<span title="{{ object }}">
{% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
</span>
</td>
<td>
<a rel="popup" href="">
{% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
</a>
</td>
<td><a rel="popup" href="">{% trans "remove" %}</a></td>
</tr>
{% endfor %}
</tbody>
</table>
<br />
{% endif %}
<p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
{% csrf_token %}
{{ form.as_p }}
<p>

View File

@ -20,7 +20,8 @@ import json
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.db.models import Q
from django.http import HttpResponse, Http404, HttpResponseRedirect
from django.forms import ValidationError
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse, reverse_lazy
from django.utils.dates import MONTHS
@ -881,26 +882,26 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView
form_class = ExceptionsImportForm
template_name = 'chrono/manager_import_exceptions.html'
def get_initial(self):
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
def get_context_data(self, **kwargs):
context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
return context
def form_valid(self, form):
exceptions = None
try:
if form.cleaned_data['ics_file']:
ics_file_content = force_text(form.cleaned_data['ics_file'].read())
exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(
form.cleaned_data['ics_file']
)
elif form.cleaned_data['ics_url']:
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(
exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(
form.cleaned_data['ics_url']
)
else:
form.instance.remove_timeperiod_exceptions_from_remote_ics()
except ICSError as e:
form.add_error(None, force_text(e))
return self.form_invalid(form)
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
form.instance.save()
if exceptions is not None:
message = ungettext(
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions

View File

@ -1,24 +1,23 @@
import pytest
import datetime
import mock
import re
import requests
from django.utils.timezone import now, make_aware, localtime
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils.timezone import localtime, make_aware, now
from chrono.agendas.models import (
Agenda,
Event,
Booking,
MeetingType,
Desk,
TimePeriod,
TimePeriodException,
Event,
ICSError,
MeetingType,
TimePeriodException,
TimePeriodExceptionSource,
)
pytestmark = pytest.mark.django_db
@ -28,7 +27,6 @@ VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20170824T082855Z
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
DTSTART:20170831T170800Z
DTEND:20170831T203400Z
SEQUENCE:1
@ -36,7 +34,6 @@ SUMMARY:Event 1
END:VEVENT
BEGIN:VEVENT
DTSTAMP:20170824T092855Z
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
DTSTART:20170830T180800Z
DTEND:20170831T223400Z
SEQUENCE:2
@ -48,7 +45,6 @@ VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20170824T082855Z
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
DTSTART:20170831T170800Z
DURATION:PT3H26M
SEQUENCE:1
@ -56,7 +52,6 @@ SUMMARY:Event 1
END:VEVENT
BEGIN:VEVENT
DTSTAMP:20170824T092855Z
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
DTSTART:20170830T180800Z
DURATION:P1D4H26M
SEQUENCE:2
@ -83,25 +78,6 @@ RRULE:FREQ=YEARLY
END:VEVENT
END:VCALENDAR"""
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20170720T145803Z
DESCRIPTION:Vacances d'ete
DTSTART;VALUE=DATE:20180101
DTEND;VALUE=DATE:20180101
SUMMARY:reccurent event
END:VEVENT
BEGIN:VEVENT
DTSTAMP:20170824T082855Z
DTSTART:20180102
DTEND:20180101
SUMMARY:New Year's Eve
RRULE:FREQ=YEARLY;COUNT=1
END:VEVENT
END:VCALENDAR"""
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
@ -122,6 +98,7 @@ END:VCALENDAR"""
INVALID_ICS_SAMPLE = """content
"""
with open('tests/data/atreal.ics') as f:
ICS_ATREAL = f.read()
@ -213,7 +190,9 @@ def test_timeperiodexception_creation_from_ics():
agenda.save()
desk = Desk(label='Test 1 desk', agenda=agenda)
desk.save()
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE, name='sample.ics')
)
assert exceptions_count == 2
assert TimePeriodException.objects.filter(desk=desk).count() == 2
@ -229,9 +208,9 @@ def test_timeperiodexception_creation_from_ics_without_startdt():
if line.startswith('DTSTART:'):
continue
lines.append(line)
ics_sample = "\n".join(lines)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
assert 'Event "Event 1" has no start date.' == str(e.value)
@ -246,8 +225,8 @@ def test_timeperiodexception_creation_from_ics_without_enddt():
if line.startswith('DTEND:'):
continue
lines.append(line)
ics_sample = "\n".join(lines)
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
for exception in TimePeriodException.objects.filter(desk=desk):
end_time = localtime(exception.end_datetime).time()
assert end_time == datetime.time(23, 59, 59, 999999)
@ -259,18 +238,13 @@ def test_timeperiodexception_creation_from_ics_with_recurrences():
agenda.save()
desk = Desk(label='Test 4 desk', agenda=agenda)
desk.save()
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
assert (
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
)
== 3
)
assert TimePeriodException.objects.filter(desk=desk).count() == 3
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
)
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
# verify occurences are cleaned when count changed
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]
)
def test_timeexception_creation_from_ics_with_dates():
@ -284,8 +258,8 @@ def test_timeexception_creation_from_ics_with_dates():
if line.startswith('RRULE:'):
continue
lines.append(line)
ics_sample = "\n".join(lines)
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
assert exceptions_count == 2
for exception in TimePeriodException.objects.filter(desk=desk):
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
@ -298,7 +272,7 @@ def test_timeexception_create_from_invalid_ics():
desk = Desk(label='Test 6 desk', agenda=agenda)
desk.save()
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
assert str(e.value) == 'File format is invalid.'
@ -308,7 +282,9 @@ def test_timeexception_create_from_ics_with_no_events():
desk = Desk(label='Test 7 desk', agenda=agenda)
desk.save()
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
)
assert str(e.value) == "The file doesn't contain any events."
@ -321,19 +297,14 @@ def test_timeperiodexception_creation_from_remote_ics(mocked_get):
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 2
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
mocked_get.return_value = mocked_response
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
assert 'New summary ' in timeperiod.label
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
mocked_get.return_value = mocked_response
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert exceptions_count == 0
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
with pytest.raises(ICSError) as e:
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert str(e.value) == "The file doesn't contain any events."
@mock.patch('chrono.agendas.models.requests.get')
@ -351,7 +322,7 @@ def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
mocked_get.side_effect = mocked_requests_connection_error
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
@ -371,7 +342,7 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
mocked_get.side_effect = mocked_requests_http_forbidden_error
with pytest.raises(ICSError) as e:
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
assert (
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
)
@ -381,10 +352,9 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
agenda = Agenda(label=u'Test 11 agenda')
agenda.save()
desk = Desk(
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics'
)
desk = Desk(label='Test 11 desk', agenda=agenda)
desk.save()
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
mocked_response = mock.Mock()
mocked_response.status_code = 403
mocked_get.return_value = mocked_response
@ -401,41 +371,6 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
)
@mock.patch('chrono.agendas.models.requests.get')
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
agenda = Agenda(label=u'Test 11 agenda')
agenda.save()
desk = Desk(
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics'
)
desk.save()
mocked_response = mock.Mock()
mocked_response.text = ICS_SAMPLE
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert TimePeriodException.objects.filter(desk=desk).count() == 2
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
DTSTAMP:20180824T082855Z
UID:new-and-unique-uid
DTSTART:20180831T170800Z
DTEND:20180831T203400Z
SUMMARY:Wonderfull event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'new-and-unique-uid'
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
mocked_get.return_value = mocked_response
call_command('sync_desks_timeperiod_exceptions')
assert not TimePeriodException.objects.filter(desk=desk).exists()
def test_base_meeting_duration():
agenda = Agenda(label='Meeting', kind='meetings')
agenda.save()
@ -463,7 +398,9 @@ def test_timeperiodexception_creation_from_ics_with_duration():
agenda.save()
desk = Desk(label='Test 1 desk', agenda=agenda)
desk.save()
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
)
assert exceptions_count == 2
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
@ -488,12 +425,16 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
agenda.save()
desk = Desk(label='Test 4 desk', agenda=agenda)
desk.save()
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
assert (
desk.import_timeperiod_exceptions_from_ics_file(
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
)
== 2
)
assert TimePeriodException.objects.filter(desk=desk).count() == 2
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
)
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
@ -501,7 +442,7 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
agenda.save()
desk = Desk(label='Test atreal desk', agenda=agenda)
desk.save()
assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
def test_management_role_deletion():

View File

@ -14,10 +14,16 @@ import pytest
import requests
from webtest import Upload
from chrono.wsgi import application
from chrono.agendas.models import Agenda, Event, Booking, MeetingType, TimePeriod, Desk, TimePeriodException
from chrono.manager.utils import export_site
from chrono.agendas.models import (
Agenda,
Booking,
Desk,
Event,
MeetingType,
TimePeriod,
TimePeriodException,
TimePeriodExceptionSource,
)
pytestmark = pytest.mark.django_db
@ -1158,8 +1164,9 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user):
resp = resp.click('Settings')
assert 'Import exceptions from .ics' in resp.text
resp = resp.click('upload')
assert "You can upload a file or specify an address to a remote calendar." in resp
resp = resp.form.submit(status=302)
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
resp = resp.form.submit(status=200)
assert 'Please provide an ICS File or an URL.' in resp.text
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
@ -1200,6 +1207,12 @@ END:VCALENDAR"""
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
source = TimePeriodExceptionSource.objects.latest('pk')
exception = TimePeriodException.objects.latest('pk')
assert exception.source == source
assert source.ics_filename == 'exceptions.ics'
assert source.ics_url is None
resp = resp.follow()
assert 'An exception has been imported.' in resp.text
@ -1257,7 +1270,6 @@ def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, ad
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:random-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
@ -1267,15 +1279,12 @@ END:VCALENDAR"""
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'random-event-id'
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
resp.form['ics_url'] = ''
resp = resp.form.submit(status=302)
assert not TimePeriodException.objects.filter(
desk=desk, external_id='desk-%s:random-event-id' % desk.id
).exists()
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
source = TimePeriodExceptionSource.objects.latest('pk')
exception = TimePeriodException.objects.latest('pk')
assert exception.source == source
assert source.ics_filename is None
assert source.ics_url == 'http://example.com/foo.ics'
@mock.patch('chrono.agendas.models.requests.get')
@ -1301,79 +1310,10 @@ def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_ge
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:random-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:New Year's Eve
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 1
exception = TimePeriodException.objects.get(desk=desk)
assert exception.external_id == 'random-event-id'
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
END:VCALENDAR"""
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
resp = resp.form.submit(status=302)
assert not TimePeriodException.objects.filter(desk=desk, external_id='random-event-id').exists()
@mock.patch('chrono.agendas.models.requests.get')
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
agenda = Agenda.objects.create(label='New Example', kind='meetings')
desk = Desk.objects.create(agenda=agenda, label='New Desk')
MeetingType(agenda=agenda, label='Bar').save()
login(app)
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
assert 'Import exceptions from .ics' not in resp.text
TimePeriod.objects.create(
weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
)
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response = mock.Mock()
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:first-eventrandom-event-id
DTSTART:20180101
DTEND:20180101
SUMMARY:First test event
END:VEVENT
BEGIN:VEVENT
UID:second-eventrandom-event-id
DTSTART:20190101
DTEND:20190101
SUMMARY:Second test event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)
assert TimePeriodException.objects.filter(desk=desk).count() == 2
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
resp = resp.click('Settings')
resp = resp.click('upload')
resp.form['ics_url'] = 'http://example.com/foo.ics'
mocked_response.text = """BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//foo.bar//EN
BEGIN:VEVENT
UID:secord-eventrandom-event-id
DTSTART:20190101
DTEND:20190101
SUMMARY:Second test event
END:VEVENT
END:VCALENDAR"""
mocked_get.return_value = mocked_response
resp = resp.form.submit(status=302)