agendas: add exception source model (#29209)
This commit is contained in:
parent
30bbc8c90f
commit
a5a8a3fe3d
|
@ -18,16 +18,19 @@ from __future__ import print_function
|
|||
|
||||
import sys
|
||||
|
||||
from chrono.agendas.models import Desk, ICSError
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from chrono.agendas.models import ICSError, TimePeriodExceptionSource
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Synchronize time period exceptions from desks remote ics'
|
||||
|
||||
def handle(self, **options):
|
||||
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
|
||||
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
|
||||
try:
|
||||
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
|
||||
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
|
||||
except ICSError as e:
|
||||
print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
|
||||
print(
|
||||
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
|
||||
)
|
||||
|
|
|
@ -54,10 +54,10 @@ class Migration(migrations.Migration):
|
|||
model_name='agenda',
|
||||
name='kind',
|
||||
field=models.CharField(
|
||||
default=b'events',
|
||||
default='events',
|
||||
max_length=20,
|
||||
verbose_name='Kind',
|
||||
choices=[(b'events', 'Events'), (b'meetings', 'Meetings')],
|
||||
choices=[('events', 'Events'), ('meetings', 'Meetings')],
|
||||
),
|
||||
),
|
||||
migrations.AddField(
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('agendas', '0032_auto_20191127_0919'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='TimePeriodExceptionSource',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('ics_filename', models.CharField(max_length=256, null=True)),
|
||||
('ics_url', models.URLField(null=True, max_length=500)),
|
||||
('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='timeperiodexception',
|
||||
name='source',
|
||||
field=models.ForeignKey(
|
||||
null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'
|
||||
),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='desk',
|
||||
name='timeperiod_exceptions_remote_url',
|
||||
field=models.URLField(
|
||||
blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'
|
||||
),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='timeperiodexception',
|
||||
name='external_id',
|
||||
field=models.CharField(blank=True, max_length=256, null=True, verbose_name='External ID'),
|
||||
),
|
||||
]
|
|
@ -0,0 +1,49 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
def create_source(apps, schema_editor):
|
||||
Desk = apps.get_model('agendas', 'Desk')
|
||||
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
|
||||
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
|
||||
# create a source for each remote url
|
||||
source = TimePeriodExceptionSource.objects.create(
|
||||
desk=desk, ics_url=desk.timeperiod_exceptions_remote_url
|
||||
)
|
||||
# clear timeperiod_exceptions_remote_url
|
||||
desk.timeperiod_exceptions_remote_url = None
|
||||
desk.save()
|
||||
# attach exceptions to the created source
|
||||
desk.timeperiodexception_set.filter(external_id__isnull=False).exclude(external_id='').update(
|
||||
source=source
|
||||
)
|
||||
|
||||
|
||||
def init_remote_url(apps, schema_editor):
|
||||
Desk = apps.get_model('agendas', 'Desk')
|
||||
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
|
||||
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
|
||||
# set timeperiod_exceptions_remote_url
|
||||
source.desk.timeperiod_exceptions_remote_url = source.ics_url
|
||||
source.desk.save()
|
||||
# unlink exceptions
|
||||
source.timeperiodexception_set.update(source=None)
|
||||
# delete the source
|
||||
source.delete()
|
||||
# reset remote_url
|
||||
for desk in Desk.objects.filter(timeperiod_exceptions_remote_url__isnull=True):
|
||||
desk.timeperiod_exceptions_remote_url = ''
|
||||
desk.save()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('agendas', '0033_timeperiodexceptionsource'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(create_source, init_remote_url),
|
||||
]
|
|
@ -0,0 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.18 on 2019-12-09 14:24
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('agendas', '0034_initial_source'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(model_name='desk', name='timeperiod_exceptions_remote_url',),
|
||||
migrations.RemoveField(model_name='timeperiodexception', name='external_id',),
|
||||
]
|
|
@ -469,9 +469,6 @@ class Desk(models.Model):
|
|||
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
||||
label = models.CharField(_('Label'), max_length=150)
|
||||
slug = models.SlugField(_('Identifier'), max_length=160)
|
||||
timeperiod_exceptions_remote_url = models.URLField(
|
||||
_('URL to fetch time period exceptions from'), blank=True, max_length=500
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.label
|
||||
|
@ -528,27 +525,31 @@ class Desk(models.Model):
|
|||
in_two_weeks = self.get_exceptions_within_two_weeks()
|
||||
return self.timeperiodexception_set.count() == len(in_two_weeks)
|
||||
|
||||
def create_timeperiod_exceptions_from_remote_ics(self, url):
|
||||
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
|
||||
try:
|
||||
response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
|
||||
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
|
||||
response.raise_for_status()
|
||||
except requests.HTTPError as e:
|
||||
raise ICSError(
|
||||
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
|
||||
% {'url': url, 'status_code': e.response.status_code}
|
||||
% {'url': ics_url, 'status_code': e.response.status_code}
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
raise ICSError(
|
||||
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
|
||||
% {'url': url, 'exception': e}
|
||||
% {'url': ics_url, 'exception': e}
|
||||
)
|
||||
|
||||
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
|
||||
if source is None:
|
||||
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
|
||||
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
|
||||
|
||||
def remove_timeperiod_exceptions_from_remote_ics(self):
|
||||
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
|
||||
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
|
||||
if source is None:
|
||||
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
|
||||
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
|
||||
|
||||
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
|
||||
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
|
||||
try:
|
||||
parsed = vobject.readOne(data)
|
||||
except vobject.base.ParseError:
|
||||
|
@ -556,10 +557,15 @@ class Desk(models.Model):
|
|||
|
||||
total_created = 0
|
||||
|
||||
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
|
||||
if not parsed.contents.get('vevent'):
|
||||
raise ICSError(_('The file doesn\'t contain any events.'))
|
||||
|
||||
with transaction.atomic():
|
||||
if source.pk is None:
|
||||
source.save()
|
||||
# delete old exceptions related to this source
|
||||
source.timeperiodexception_set.all().delete()
|
||||
# create new exceptions
|
||||
update_datetime = now()
|
||||
for vevent in parsed.contents.get('vevent', []):
|
||||
if 'summary' in vevent.contents:
|
||||
|
@ -590,24 +596,19 @@ class Desk(models.Model):
|
|||
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
|
||||
duration = end_dt - start_dt
|
||||
|
||||
event = {}
|
||||
event['start_datetime'] = start_dt
|
||||
event['end_datetime'] = end_dt
|
||||
event['label'] = summary
|
||||
|
||||
kwargs = {}
|
||||
kwargs['desk'] = self
|
||||
kwargs['recurrence_id'] = 0
|
||||
if keep_synced_by_uid:
|
||||
kwargs['external_id'] = vevent.contents['uid'][0].value
|
||||
else:
|
||||
kwargs['label'] = summary
|
||||
event = {
|
||||
'start_datetime': start_dt,
|
||||
'end_datetime': end_dt,
|
||||
'label': summary,
|
||||
'desk': self,
|
||||
'source': source,
|
||||
'recurrence_id': 0,
|
||||
}
|
||||
|
||||
if not vevent.rruleset:
|
||||
# classical event
|
||||
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
|
||||
if created:
|
||||
total_created += 1
|
||||
TimePeriodException.objects.create(**event)
|
||||
total_created += 1
|
||||
elif vevent.rruleset.count():
|
||||
# recurring event until recurring_days in the future
|
||||
from_dt = start_dt
|
||||
|
@ -621,26 +622,12 @@ class Desk(models.Model):
|
|||
if not is_aware(start_dt):
|
||||
start_dt = make_aware(start_dt)
|
||||
end_dt = start_dt + duration
|
||||
kwargs['recurrence_id'] = i
|
||||
event['recurrence_id'] = i
|
||||
event['start_datetime'] = start_dt
|
||||
event['end_datetime'] = end_dt
|
||||
if end_dt < update_datetime:
|
||||
TimePeriodException.objects.filter(**kwargs).update(**event)
|
||||
else:
|
||||
obj, created = TimePeriodException.objects.update_or_create(
|
||||
defaults=event, **kwargs
|
||||
)
|
||||
if created:
|
||||
total_created += 1
|
||||
# delete unseen occurrences
|
||||
kwargs.pop('recurrence_id', None)
|
||||
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
|
||||
|
||||
if keep_synced_by_uid:
|
||||
# delete all outdated exceptions from remote calendar
|
||||
TimePeriodException.objects.filter(update_datetime__lt=update_datetime, desk=self).exclude(
|
||||
external_id=''
|
||||
).delete()
|
||||
if end_dt >= update_datetime:
|
||||
TimePeriodException.objects.create(**event)
|
||||
total_created += 1
|
||||
|
||||
return total_created
|
||||
|
||||
|
@ -661,10 +648,22 @@ class Desk(models.Model):
|
|||
return openslots.search(aware_date, aware_next_date)
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
class TimePeriodExceptionSource(models.Model):
|
||||
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
||||
ics_filename = models.CharField(null=True, max_length=256)
|
||||
ics_url = models.URLField(null=True, max_length=500)
|
||||
|
||||
def __str__(self):
|
||||
if self.ics_filename is not None:
|
||||
return self.ics_filename
|
||||
return self.ics_url
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
class TimePeriodException(models.Model):
|
||||
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
||||
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
|
||||
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
|
||||
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
|
||||
start_datetime = models.DateTimeField(_('Exception start time'))
|
||||
end_datetime = models.DateTimeField(_('Exception end time'))
|
||||
|
@ -739,7 +738,6 @@ class TimePeriodException(models.Model):
|
|||
'label': self.label,
|
||||
'start_datetime': export_datetime(self.start_datetime),
|
||||
'end_datetime': export_datetime(self.end_datetime),
|
||||
'external_id': self.external_id,
|
||||
'recurrence_id': self.recurrence_id,
|
||||
'update_datetime': export_datetime(self.update_datetime),
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ class NewDeskForm(forms.ModelForm):
|
|||
widgets = {
|
||||
'agenda': forms.HiddenInput(),
|
||||
}
|
||||
exclude = ['slug', 'timeperiod_exceptions_remote_url']
|
||||
exclude = ['slug']
|
||||
|
||||
|
||||
class DeskForm(forms.ModelForm):
|
||||
|
@ -151,7 +151,7 @@ class DeskForm(forms.ModelForm):
|
|||
widgets = {
|
||||
'agenda': forms.HiddenInput(),
|
||||
}
|
||||
exclude = ['timeperiod_exceptions_remote_url']
|
||||
exclude = []
|
||||
|
||||
|
||||
class TimePeriodExceptionForm(forms.ModelForm):
|
||||
|
@ -262,10 +262,6 @@ class ImportEventsForm(forms.Form):
|
|||
|
||||
|
||||
class ExceptionsImportForm(forms.ModelForm):
|
||||
class Meta:
|
||||
model = Desk
|
||||
fields = []
|
||||
|
||||
ics_file = forms.FileField(
|
||||
label=_('ICS File'),
|
||||
required=False,
|
||||
|
@ -277,6 +273,15 @@ class ExceptionsImportForm(forms.ModelForm):
|
|||
help_text=_('URL to remote calendar which will be synchronised hourly.'),
|
||||
)
|
||||
|
||||
class Meta:
|
||||
model = Desk
|
||||
fields = []
|
||||
|
||||
def clean(self, *args, **kwargs):
|
||||
cleaned_data = super().clean(*args, **kwargs)
|
||||
if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
|
||||
raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
|
||||
|
||||
|
||||
class AgendasImportForm(forms.Form):
|
||||
agendas_json = forms.FileField(label=_('Agendas Export File'))
|
||||
|
|
|
@ -13,7 +13,37 @@
|
|||
{% block content %}
|
||||
|
||||
<form method="post" enctype="multipart/form-data">
|
||||
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
|
||||
{% if exception_sources %}
|
||||
<table class="main">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>{% trans "Exceptions" %}</th>
|
||||
<th></th>
|
||||
<th></th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for object in exception_sources %}
|
||||
<tr>
|
||||
<td>
|
||||
<span title="{{ object }}">
|
||||
{% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
|
||||
</span>
|
||||
</td>
|
||||
<td>
|
||||
<a rel="popup" href="">
|
||||
{% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
|
||||
</a>
|
||||
</td>
|
||||
<td><a rel="popup" href="">{% trans "remove" %}</a></td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
<br />
|
||||
{% endif %}
|
||||
|
||||
<p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
|
||||
{% csrf_token %}
|
||||
{{ form.as_p }}
|
||||
<p>
|
||||
|
|
|
@ -20,7 +20,8 @@ import json
|
|||
from django.contrib import messages
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.db.models import Q
|
||||
from django.http import HttpResponse, Http404, HttpResponseRedirect
|
||||
from django.forms import ValidationError
|
||||
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.urls import reverse, reverse_lazy
|
||||
from django.utils.dates import MONTHS
|
||||
|
@ -881,26 +882,26 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView
|
|||
form_class = ExceptionsImportForm
|
||||
template_name = 'chrono/manager_import_exceptions.html'
|
||||
|
||||
def get_initial(self):
|
||||
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
|
||||
context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
|
||||
return context
|
||||
|
||||
def form_valid(self, form):
|
||||
exceptions = None
|
||||
try:
|
||||
if form.cleaned_data['ics_file']:
|
||||
ics_file_content = force_text(form.cleaned_data['ics_file'].read())
|
||||
exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
|
||||
exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(
|
||||
form.cleaned_data['ics_file']
|
||||
)
|
||||
elif form.cleaned_data['ics_url']:
|
||||
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(
|
||||
exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(
|
||||
form.cleaned_data['ics_url']
|
||||
)
|
||||
else:
|
||||
form.instance.remove_timeperiod_exceptions_from_remote_ics()
|
||||
except ICSError as e:
|
||||
form.add_error(None, force_text(e))
|
||||
return self.form_invalid(form)
|
||||
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
|
||||
form.instance.save()
|
||||
|
||||
if exceptions is not None:
|
||||
message = ungettext(
|
||||
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
|
||||
|
|
|
@ -1,24 +1,23 @@
|
|||
import pytest
|
||||
import datetime
|
||||
import mock
|
||||
import re
|
||||
import requests
|
||||
|
||||
|
||||
from django.utils.timezone import now, make_aware, localtime
|
||||
from django.contrib.auth.models import Group
|
||||
from django.core.files.base import ContentFile
|
||||
from django.core.management import call_command
|
||||
from django.core.management.base import CommandError
|
||||
from django.utils.timezone import localtime, make_aware, now
|
||||
|
||||
from chrono.agendas.models import (
|
||||
Agenda,
|
||||
Event,
|
||||
Booking,
|
||||
MeetingType,
|
||||
Desk,
|
||||
TimePeriod,
|
||||
TimePeriodException,
|
||||
Event,
|
||||
ICSError,
|
||||
MeetingType,
|
||||
TimePeriodException,
|
||||
TimePeriodExceptionSource,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
@ -28,7 +27,6 @@ VERSION:2.0
|
|||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170824T082855Z
|
||||
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
|
||||
DTSTART:20170831T170800Z
|
||||
DTEND:20170831T203400Z
|
||||
SEQUENCE:1
|
||||
|
@ -36,7 +34,6 @@ SUMMARY:Event 1
|
|||
END:VEVENT
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170824T092855Z
|
||||
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
|
||||
DTSTART:20170830T180800Z
|
||||
DTEND:20170831T223400Z
|
||||
SEQUENCE:2
|
||||
|
@ -48,7 +45,6 @@ VERSION:2.0
|
|||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170824T082855Z
|
||||
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
|
||||
DTSTART:20170831T170800Z
|
||||
DURATION:PT3H26M
|
||||
SEQUENCE:1
|
||||
|
@ -56,7 +52,6 @@ SUMMARY:Event 1
|
|||
END:VEVENT
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170824T092855Z
|
||||
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
|
||||
DTSTART:20170830T180800Z
|
||||
DURATION:P1D4H26M
|
||||
SEQUENCE:2
|
||||
|
@ -83,25 +78,6 @@ RRULE:FREQ=YEARLY
|
|||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
|
||||
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170720T145803Z
|
||||
DESCRIPTION:Vacances d'ete
|
||||
DTSTART;VALUE=DATE:20180101
|
||||
DTEND;VALUE=DATE:20180101
|
||||
SUMMARY:reccurent event
|
||||
END:VEVENT
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20170824T082855Z
|
||||
DTSTART:20180102
|
||||
DTEND:20180101
|
||||
SUMMARY:New Year's Eve
|
||||
RRULE:FREQ=YEARLY;COUNT=1
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
|
||||
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
|
@ -122,6 +98,7 @@ END:VCALENDAR"""
|
|||
INVALID_ICS_SAMPLE = """content
|
||||
"""
|
||||
|
||||
|
||||
with open('tests/data/atreal.ics') as f:
|
||||
ICS_ATREAL = f.read()
|
||||
|
||||
|
@ -213,7 +190,9 @@ def test_timeperiodexception_creation_from_ics():
|
|||
agenda.save()
|
||||
desk = Desk(label='Test 1 desk', agenda=agenda)
|
||||
desk.save()
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
|
||||
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
||||
ContentFile(ICS_SAMPLE, name='sample.ics')
|
||||
)
|
||||
assert exceptions_count == 2
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
|
||||
|
@ -229,9 +208,9 @@ def test_timeperiodexception_creation_from_ics_without_startdt():
|
|||
if line.startswith('DTSTART:'):
|
||||
continue
|
||||
lines.append(line)
|
||||
ics_sample = "\n".join(lines)
|
||||
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||
with pytest.raises(ICSError) as e:
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
||||
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||
assert 'Event "Event 1" has no start date.' == str(e.value)
|
||||
|
||||
|
||||
|
@ -246,8 +225,8 @@ def test_timeperiodexception_creation_from_ics_without_enddt():
|
|||
if line.startswith('DTEND:'):
|
||||
continue
|
||||
lines.append(line)
|
||||
ics_sample = "\n".join(lines)
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
||||
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||
for exception in TimePeriodException.objects.filter(desk=desk):
|
||||
end_time = localtime(exception.end_datetime).time()
|
||||
assert end_time == datetime.time(23, 59, 59, 999999)
|
||||
|
@ -259,18 +238,13 @@ def test_timeperiodexception_creation_from_ics_with_recurrences():
|
|||
agenda.save()
|
||||
desk = Desk(label='Test 4 desk', agenda=agenda)
|
||||
desk.save()
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
|
||||
assert (
|
||||
desk.import_timeperiod_exceptions_from_ics_file(
|
||||
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
|
||||
)
|
||||
== 3
|
||||
)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 3
|
||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
||||
)
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
|
||||
# verify occurences are cleaned when count changed
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]
|
||||
)
|
||||
|
||||
|
||||
def test_timeexception_creation_from_ics_with_dates():
|
||||
|
@ -284,8 +258,8 @@ def test_timeexception_creation_from_ics_with_dates():
|
|||
if line.startswith('RRULE:'):
|
||||
continue
|
||||
lines.append(line)
|
||||
ics_sample = "\n".join(lines)
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
||||
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||
assert exceptions_count == 2
|
||||
for exception in TimePeriodException.objects.filter(desk=desk):
|
||||
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
||||
|
@ -298,7 +272,7 @@ def test_timeexception_create_from_invalid_ics():
|
|||
desk = Desk(label='Test 6 desk', agenda=agenda)
|
||||
desk.save()
|
||||
with pytest.raises(ICSError) as e:
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
|
||||
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
|
||||
assert str(e.value) == 'File format is invalid.'
|
||||
|
||||
|
||||
|
@ -308,7 +282,9 @@ def test_timeexception_create_from_ics_with_no_events():
|
|||
desk = Desk(label='Test 7 desk', agenda=agenda)
|
||||
desk.save()
|
||||
with pytest.raises(ICSError) as e:
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
|
||||
desk.import_timeperiod_exceptions_from_ics_file(
|
||||
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
|
||||
)
|
||||
assert str(e.value) == "The file doesn't contain any events."
|
||||
|
||||
|
||||
|
@ -321,19 +297,14 @@ def test_timeperiodexception_creation_from_remote_ics(mocked_get):
|
|||
mocked_response = mock.Mock()
|
||||
mocked_response.text = ICS_SAMPLE
|
||||
mocked_get.return_value = mocked_response
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
assert exceptions_count == 2
|
||||
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
|
||||
mocked_get.return_value = mocked_response
|
||||
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
|
||||
assert 'New summary ' in timeperiod.label
|
||||
|
||||
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
||||
mocked_get.return_value = mocked_response
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
assert exceptions_count == 0
|
||||
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
|
||||
with pytest.raises(ICSError) as e:
|
||||
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
assert str(e.value) == "The file doesn't contain any events."
|
||||
|
||||
|
||||
@mock.patch('chrono.agendas.models.requests.get')
|
||||
|
@ -351,7 +322,7 @@ def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
|
|||
|
||||
mocked_get.side_effect = mocked_requests_connection_error
|
||||
with pytest.raises(ICSError) as e:
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
|
||||
|
||||
|
||||
|
@ -371,7 +342,7 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
|||
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
||||
|
||||
with pytest.raises(ICSError) as e:
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||
assert (
|
||||
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
|
||||
)
|
||||
|
@ -381,10 +352,9 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
|||
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
||||
agenda = Agenda(label=u'Test 11 agenda')
|
||||
agenda.save()
|
||||
desk = Desk(
|
||||
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics'
|
||||
)
|
||||
desk = Desk(label='Test 11 desk', agenda=agenda)
|
||||
desk.save()
|
||||
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
||||
mocked_response = mock.Mock()
|
||||
mocked_response.status_code = 403
|
||||
mocked_get.return_value = mocked_response
|
||||
|
@ -401,41 +371,6 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
|||
)
|
||||
|
||||
|
||||
@mock.patch('chrono.agendas.models.requests.get')
|
||||
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
|
||||
agenda = Agenda(label=u'Test 11 agenda')
|
||||
agenda.save()
|
||||
desk = Desk(
|
||||
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics'
|
||||
)
|
||||
desk.save()
|
||||
mocked_response = mock.Mock()
|
||||
mocked_response.text = ICS_SAMPLE
|
||||
mocked_get.return_value = mocked_response
|
||||
call_command('sync_desks_timeperiod_exceptions')
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
mocked_response.text = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
DTSTAMP:20180824T082855Z
|
||||
UID:new-and-unique-uid
|
||||
DTSTART:20180831T170800Z
|
||||
DTEND:20180831T203400Z
|
||||
SUMMARY:Wonderfull event
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
mocked_get.return_value = mocked_response
|
||||
call_command('sync_desks_timeperiod_exceptions')
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||
exception = TimePeriodException.objects.get(desk=desk)
|
||||
assert exception.external_id == 'new-and-unique-uid'
|
||||
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
||||
mocked_get.return_value = mocked_response
|
||||
call_command('sync_desks_timeperiod_exceptions')
|
||||
assert not TimePeriodException.objects.filter(desk=desk).exists()
|
||||
|
||||
|
||||
def test_base_meeting_duration():
|
||||
agenda = Agenda(label='Meeting', kind='meetings')
|
||||
agenda.save()
|
||||
|
@ -463,7 +398,9 @@ def test_timeperiodexception_creation_from_ics_with_duration():
|
|||
agenda.save()
|
||||
desk = Desk(label='Test 1 desk', agenda=agenda)
|
||||
desk.save()
|
||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
|
||||
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
||||
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
|
||||
)
|
||||
assert exceptions_count == 2
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||
|
@ -488,12 +425,16 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
|
|||
agenda.save()
|
||||
desk = Desk(label='Test 4 desk', agenda=agenda)
|
||||
desk.save()
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
|
||||
assert (
|
||||
desk.import_timeperiod_exceptions_from_ics_file(
|
||||
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
|
||||
)
|
||||
== 2
|
||||
)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
||||
)
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
|
||||
|
||||
|
||||
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
||||
|
@ -501,7 +442,7 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
|||
agenda.save()
|
||||
desk = Desk(label='Test atreal desk', agenda=agenda)
|
||||
desk.save()
|
||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
|
||||
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
|
||||
|
||||
|
||||
def test_management_role_deletion():
|
||||
|
|
|
@ -14,10 +14,16 @@ import pytest
|
|||
import requests
|
||||
from webtest import Upload
|
||||
|
||||
from chrono.wsgi import application
|
||||
|
||||
from chrono.agendas.models import Agenda, Event, Booking, MeetingType, TimePeriod, Desk, TimePeriodException
|
||||
from chrono.manager.utils import export_site
|
||||
from chrono.agendas.models import (
|
||||
Agenda,
|
||||
Booking,
|
||||
Desk,
|
||||
Event,
|
||||
MeetingType,
|
||||
TimePeriod,
|
||||
TimePeriodException,
|
||||
TimePeriodExceptionSource,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
@ -1158,8 +1164,9 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user):
|
|||
resp = resp.click('Settings')
|
||||
assert 'Import exceptions from .ics' in resp.text
|
||||
resp = resp.click('upload')
|
||||
assert "You can upload a file or specify an address to a remote calendar." in resp
|
||||
resp = resp.form.submit(status=302)
|
||||
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
|
||||
resp = resp.form.submit(status=200)
|
||||
assert 'Please provide an ICS File or an URL.' in resp.text
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
resp = resp.click('upload')
|
||||
|
@ -1200,6 +1207,12 @@ END:VCALENDAR"""
|
|||
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
|
||||
resp = resp.form.submit(status=302)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
|
||||
source = TimePeriodExceptionSource.objects.latest('pk')
|
||||
exception = TimePeriodException.objects.latest('pk')
|
||||
assert exception.source == source
|
||||
assert source.ics_filename == 'exceptions.ics'
|
||||
assert source.ics_url is None
|
||||
resp = resp.follow()
|
||||
assert 'An exception has been imported.' in resp.text
|
||||
|
||||
|
@ -1257,7 +1270,6 @@ def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, ad
|
|||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
UID:random-event-id
|
||||
DTSTART:20180101
|
||||
DTEND:20180101
|
||||
SUMMARY:New Year's Eve
|
||||
|
@ -1267,15 +1279,12 @@ END:VCALENDAR"""
|
|||
resp = resp.form.submit(status=302)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||
exception = TimePeriodException.objects.get(desk=desk)
|
||||
assert exception.external_id == 'random-event-id'
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
resp = resp.click('upload')
|
||||
resp.form['ics_url'] = ''
|
||||
resp = resp.form.submit(status=302)
|
||||
assert not TimePeriodException.objects.filter(
|
||||
desk=desk, external_id='desk-%s:random-event-id' % desk.id
|
||||
).exists()
|
||||
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
|
||||
source = TimePeriodExceptionSource.objects.latest('pk')
|
||||
exception = TimePeriodException.objects.latest('pk')
|
||||
assert exception.source == source
|
||||
assert source.ics_filename is None
|
||||
assert source.ics_url == 'http://example.com/foo.ics'
|
||||
|
||||
|
||||
@mock.patch('chrono.agendas.models.requests.get')
|
||||
|
@ -1301,79 +1310,10 @@ def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_ge
|
|||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
UID:random-event-id
|
||||
DTSTART:20180101
|
||||
DTEND:20180101
|
||||
SUMMARY:New Year's Eve
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
mocked_get.return_value = mocked_response
|
||||
resp = resp.form.submit(status=302)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||
exception = TimePeriodException.objects.get(desk=desk)
|
||||
assert exception.external_id == 'random-event-id'
|
||||
mocked_response.text = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
END:VCALENDAR"""
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
resp = resp.click('upload')
|
||||
resp = resp.form.submit(status=302)
|
||||
assert not TimePeriodException.objects.filter(desk=desk, external_id='random-event-id').exists()
|
||||
|
||||
|
||||
@mock.patch('chrono.agendas.models.requests.get')
|
||||
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
|
||||
agenda = Agenda.objects.create(label='New Example', kind='meetings')
|
||||
desk = Desk.objects.create(agenda=agenda, label='New Desk')
|
||||
MeetingType(agenda=agenda, label='Bar').save()
|
||||
login(app)
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
assert 'Import exceptions from .ics' not in resp.text
|
||||
|
||||
TimePeriod.objects.create(
|
||||
weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
|
||||
)
|
||||
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
resp = resp.click('upload')
|
||||
resp.form['ics_url'] = 'http://example.com/foo.ics'
|
||||
mocked_response = mock.Mock()
|
||||
mocked_response.text = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
UID:first-eventrandom-event-id
|
||||
DTSTART:20180101
|
||||
DTEND:20180101
|
||||
SUMMARY:First test event
|
||||
END:VEVENT
|
||||
BEGIN:VEVENT
|
||||
UID:second-eventrandom-event-id
|
||||
DTSTART:20190101
|
||||
DTEND:20190101
|
||||
SUMMARY:Second test event
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
mocked_get.return_value = mocked_response
|
||||
resp = resp.form.submit(status=302)
|
||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||
resp = resp.click('Settings')
|
||||
resp = resp.click('upload')
|
||||
resp.form['ics_url'] = 'http://example.com/foo.ics'
|
||||
mocked_response.text = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//foo.bar//EN
|
||||
BEGIN:VEVENT
|
||||
UID:secord-eventrandom-event-id
|
||||
DTSTART:20190101
|
||||
DTEND:20190101
|
||||
SUMMARY:Second test event
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
mocked_get.return_value = mocked_response
|
||||
resp = resp.form.submit(status=302)
|
||||
|
|
Loading…
Reference in New Issue