agendas: add exception source model (#29209)
This commit is contained in:
parent
30bbc8c90f
commit
a5a8a3fe3d
|
@ -18,16 +18,19 @@ from __future__ import print_function
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from chrono.agendas.models import Desk, ICSError
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
from chrono.agendas.models import ICSError, TimePeriodExceptionSource
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = 'Synchronize time period exceptions from desks remote ics'
|
help = 'Synchronize time period exceptions from desks remote ics'
|
||||||
|
|
||||||
def handle(self, **options):
|
def handle(self, **options):
|
||||||
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
|
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
|
||||||
try:
|
try:
|
||||||
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
|
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
|
||||||
except ICSError as e:
|
except ICSError as e:
|
||||||
print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
|
print(
|
||||||
|
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
|
||||||
|
)
|
||||||
|
|
|
@ -54,10 +54,10 @@ class Migration(migrations.Migration):
|
||||||
model_name='agenda',
|
model_name='agenda',
|
||||||
name='kind',
|
name='kind',
|
||||||
field=models.CharField(
|
field=models.CharField(
|
||||||
default=b'events',
|
default='events',
|
||||||
max_length=20,
|
max_length=20,
|
||||||
verbose_name='Kind',
|
verbose_name='Kind',
|
||||||
choices=[(b'events', 'Events'), (b'meetings', 'Meetings')],
|
choices=[('events', 'Events'), ('meetings', 'Meetings')],
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
migrations.AddField(
|
migrations.AddField(
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import django.db.models.deletion
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('agendas', '0032_auto_20191127_0919'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.CreateModel(
|
||||||
|
name='TimePeriodExceptionSource',
|
||||||
|
fields=[
|
||||||
|
(
|
||||||
|
'id',
|
||||||
|
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||||
|
),
|
||||||
|
('ics_filename', models.CharField(max_length=256, null=True)),
|
||||||
|
('ics_url', models.URLField(null=True, max_length=500)),
|
||||||
|
('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='timeperiodexception',
|
||||||
|
name='source',
|
||||||
|
field=models.ForeignKey(
|
||||||
|
null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='desk',
|
||||||
|
name='timeperiod_exceptions_remote_url',
|
||||||
|
field=models.URLField(
|
||||||
|
blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='timeperiodexception',
|
||||||
|
name='external_id',
|
||||||
|
field=models.CharField(blank=True, max_length=256, null=True, verbose_name='External ID'),
|
||||||
|
),
|
||||||
|
]
|
|
@ -0,0 +1,49 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
from django.db import migrations
|
||||||
|
|
||||||
|
|
||||||
|
def create_source(apps, schema_editor):
|
||||||
|
Desk = apps.get_model('agendas', 'Desk')
|
||||||
|
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
|
||||||
|
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
|
||||||
|
# create a source for each remote url
|
||||||
|
source = TimePeriodExceptionSource.objects.create(
|
||||||
|
desk=desk, ics_url=desk.timeperiod_exceptions_remote_url
|
||||||
|
)
|
||||||
|
# clear timeperiod_exceptions_remote_url
|
||||||
|
desk.timeperiod_exceptions_remote_url = None
|
||||||
|
desk.save()
|
||||||
|
# attach exceptions to the created source
|
||||||
|
desk.timeperiodexception_set.filter(external_id__isnull=False).exclude(external_id='').update(
|
||||||
|
source=source
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def init_remote_url(apps, schema_editor):
|
||||||
|
Desk = apps.get_model('agendas', 'Desk')
|
||||||
|
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
|
||||||
|
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
|
||||||
|
# set timeperiod_exceptions_remote_url
|
||||||
|
source.desk.timeperiod_exceptions_remote_url = source.ics_url
|
||||||
|
source.desk.save()
|
||||||
|
# unlink exceptions
|
||||||
|
source.timeperiodexception_set.update(source=None)
|
||||||
|
# delete the source
|
||||||
|
source.delete()
|
||||||
|
# reset remote_url
|
||||||
|
for desk in Desk.objects.filter(timeperiod_exceptions_remote_url__isnull=True):
|
||||||
|
desk.timeperiod_exceptions_remote_url = ''
|
||||||
|
desk.save()
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('agendas', '0033_timeperiodexceptionsource'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.RunPython(create_source, init_remote_url),
|
||||||
|
]
|
|
@ -0,0 +1,17 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Generated by Django 1.11.18 on 2019-12-09 14:24
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
from django.db import migrations
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('agendas', '0034_initial_source'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.RemoveField(model_name='desk', name='timeperiod_exceptions_remote_url',),
|
||||||
|
migrations.RemoveField(model_name='timeperiodexception', name='external_id',),
|
||||||
|
]
|
|
@ -469,9 +469,6 @@ class Desk(models.Model):
|
||||||
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
|
||||||
label = models.CharField(_('Label'), max_length=150)
|
label = models.CharField(_('Label'), max_length=150)
|
||||||
slug = models.SlugField(_('Identifier'), max_length=160)
|
slug = models.SlugField(_('Identifier'), max_length=160)
|
||||||
timeperiod_exceptions_remote_url = models.URLField(
|
|
||||||
_('URL to fetch time period exceptions from'), blank=True, max_length=500
|
|
||||||
)
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.label
|
return self.label
|
||||||
|
@ -528,27 +525,31 @@ class Desk(models.Model):
|
||||||
in_two_weeks = self.get_exceptions_within_two_weeks()
|
in_two_weeks = self.get_exceptions_within_two_weeks()
|
||||||
return self.timeperiodexception_set.count() == len(in_two_weeks)
|
return self.timeperiodexception_set.count() == len(in_two_weeks)
|
||||||
|
|
||||||
def create_timeperiod_exceptions_from_remote_ics(self, url):
|
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
|
||||||
try:
|
try:
|
||||||
response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
|
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.HTTPError as e:
|
except requests.HTTPError as e:
|
||||||
raise ICSError(
|
raise ICSError(
|
||||||
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
|
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
|
||||||
% {'url': url, 'status_code': e.response.status_code}
|
% {'url': ics_url, 'status_code': e.response.status_code}
|
||||||
)
|
)
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
raise ICSError(
|
raise ICSError(
|
||||||
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
|
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
|
||||||
% {'url': url, 'exception': e}
|
% {'url': ics_url, 'exception': e}
|
||||||
)
|
)
|
||||||
|
|
||||||
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
|
if source is None:
|
||||||
|
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
|
||||||
|
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
|
||||||
|
|
||||||
def remove_timeperiod_exceptions_from_remote_ics(self):
|
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
|
||||||
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
|
if source is None:
|
||||||
|
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
|
||||||
|
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
|
||||||
|
|
||||||
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
|
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
|
||||||
try:
|
try:
|
||||||
parsed = vobject.readOne(data)
|
parsed = vobject.readOne(data)
|
||||||
except vobject.base.ParseError:
|
except vobject.base.ParseError:
|
||||||
|
@ -556,10 +557,15 @@ class Desk(models.Model):
|
||||||
|
|
||||||
total_created = 0
|
total_created = 0
|
||||||
|
|
||||||
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
|
if not parsed.contents.get('vevent'):
|
||||||
raise ICSError(_('The file doesn\'t contain any events.'))
|
raise ICSError(_('The file doesn\'t contain any events.'))
|
||||||
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
|
if source.pk is None:
|
||||||
|
source.save()
|
||||||
|
# delete old exceptions related to this source
|
||||||
|
source.timeperiodexception_set.all().delete()
|
||||||
|
# create new exceptions
|
||||||
update_datetime = now()
|
update_datetime = now()
|
||||||
for vevent in parsed.contents.get('vevent', []):
|
for vevent in parsed.contents.get('vevent', []):
|
||||||
if 'summary' in vevent.contents:
|
if 'summary' in vevent.contents:
|
||||||
|
@ -590,24 +596,19 @@ class Desk(models.Model):
|
||||||
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
|
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
|
||||||
duration = end_dt - start_dt
|
duration = end_dt - start_dt
|
||||||
|
|
||||||
event = {}
|
event = {
|
||||||
event['start_datetime'] = start_dt
|
'start_datetime': start_dt,
|
||||||
event['end_datetime'] = end_dt
|
'end_datetime': end_dt,
|
||||||
event['label'] = summary
|
'label': summary,
|
||||||
|
'desk': self,
|
||||||
kwargs = {}
|
'source': source,
|
||||||
kwargs['desk'] = self
|
'recurrence_id': 0,
|
||||||
kwargs['recurrence_id'] = 0
|
}
|
||||||
if keep_synced_by_uid:
|
|
||||||
kwargs['external_id'] = vevent.contents['uid'][0].value
|
|
||||||
else:
|
|
||||||
kwargs['label'] = summary
|
|
||||||
|
|
||||||
if not vevent.rruleset:
|
if not vevent.rruleset:
|
||||||
# classical event
|
# classical event
|
||||||
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
|
TimePeriodException.objects.create(**event)
|
||||||
if created:
|
total_created += 1
|
||||||
total_created += 1
|
|
||||||
elif vevent.rruleset.count():
|
elif vevent.rruleset.count():
|
||||||
# recurring event until recurring_days in the future
|
# recurring event until recurring_days in the future
|
||||||
from_dt = start_dt
|
from_dt = start_dt
|
||||||
|
@ -621,26 +622,12 @@ class Desk(models.Model):
|
||||||
if not is_aware(start_dt):
|
if not is_aware(start_dt):
|
||||||
start_dt = make_aware(start_dt)
|
start_dt = make_aware(start_dt)
|
||||||
end_dt = start_dt + duration
|
end_dt = start_dt + duration
|
||||||
kwargs['recurrence_id'] = i
|
event['recurrence_id'] = i
|
||||||
event['start_datetime'] = start_dt
|
event['start_datetime'] = start_dt
|
||||||
event['end_datetime'] = end_dt
|
event['end_datetime'] = end_dt
|
||||||
if end_dt < update_datetime:
|
if end_dt >= update_datetime:
|
||||||
TimePeriodException.objects.filter(**kwargs).update(**event)
|
TimePeriodException.objects.create(**event)
|
||||||
else:
|
total_created += 1
|
||||||
obj, created = TimePeriodException.objects.update_or_create(
|
|
||||||
defaults=event, **kwargs
|
|
||||||
)
|
|
||||||
if created:
|
|
||||||
total_created += 1
|
|
||||||
# delete unseen occurrences
|
|
||||||
kwargs.pop('recurrence_id', None)
|
|
||||||
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
|
|
||||||
|
|
||||||
if keep_synced_by_uid:
|
|
||||||
# delete all outdated exceptions from remote calendar
|
|
||||||
TimePeriodException.objects.filter(update_datetime__lt=update_datetime, desk=self).exclude(
|
|
||||||
external_id=''
|
|
||||||
).delete()
|
|
||||||
|
|
||||||
return total_created
|
return total_created
|
||||||
|
|
||||||
|
@ -661,10 +648,22 @@ class Desk(models.Model):
|
||||||
return openslots.search(aware_date, aware_next_date)
|
return openslots.search(aware_date, aware_next_date)
|
||||||
|
|
||||||
|
|
||||||
|
@python_2_unicode_compatible
|
||||||
|
class TimePeriodExceptionSource(models.Model):
|
||||||
|
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
||||||
|
ics_filename = models.CharField(null=True, max_length=256)
|
||||||
|
ics_url = models.URLField(null=True, max_length=500)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
if self.ics_filename is not None:
|
||||||
|
return self.ics_filename
|
||||||
|
return self.ics_url
|
||||||
|
|
||||||
|
|
||||||
@python_2_unicode_compatible
|
@python_2_unicode_compatible
|
||||||
class TimePeriodException(models.Model):
|
class TimePeriodException(models.Model):
|
||||||
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
|
||||||
external_id = models.CharField(_('External ID'), max_length=256, blank=True)
|
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
|
||||||
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
|
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
|
||||||
start_datetime = models.DateTimeField(_('Exception start time'))
|
start_datetime = models.DateTimeField(_('Exception start time'))
|
||||||
end_datetime = models.DateTimeField(_('Exception end time'))
|
end_datetime = models.DateTimeField(_('Exception end time'))
|
||||||
|
@ -739,7 +738,6 @@ class TimePeriodException(models.Model):
|
||||||
'label': self.label,
|
'label': self.label,
|
||||||
'start_datetime': export_datetime(self.start_datetime),
|
'start_datetime': export_datetime(self.start_datetime),
|
||||||
'end_datetime': export_datetime(self.end_datetime),
|
'end_datetime': export_datetime(self.end_datetime),
|
||||||
'external_id': self.external_id,
|
|
||||||
'recurrence_id': self.recurrence_id,
|
'recurrence_id': self.recurrence_id,
|
||||||
'update_datetime': export_datetime(self.update_datetime),
|
'update_datetime': export_datetime(self.update_datetime),
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ class NewDeskForm(forms.ModelForm):
|
||||||
widgets = {
|
widgets = {
|
||||||
'agenda': forms.HiddenInput(),
|
'agenda': forms.HiddenInput(),
|
||||||
}
|
}
|
||||||
exclude = ['slug', 'timeperiod_exceptions_remote_url']
|
exclude = ['slug']
|
||||||
|
|
||||||
|
|
||||||
class DeskForm(forms.ModelForm):
|
class DeskForm(forms.ModelForm):
|
||||||
|
@ -151,7 +151,7 @@ class DeskForm(forms.ModelForm):
|
||||||
widgets = {
|
widgets = {
|
||||||
'agenda': forms.HiddenInput(),
|
'agenda': forms.HiddenInput(),
|
||||||
}
|
}
|
||||||
exclude = ['timeperiod_exceptions_remote_url']
|
exclude = []
|
||||||
|
|
||||||
|
|
||||||
class TimePeriodExceptionForm(forms.ModelForm):
|
class TimePeriodExceptionForm(forms.ModelForm):
|
||||||
|
@ -262,10 +262,6 @@ class ImportEventsForm(forms.Form):
|
||||||
|
|
||||||
|
|
||||||
class ExceptionsImportForm(forms.ModelForm):
|
class ExceptionsImportForm(forms.ModelForm):
|
||||||
class Meta:
|
|
||||||
model = Desk
|
|
||||||
fields = []
|
|
||||||
|
|
||||||
ics_file = forms.FileField(
|
ics_file = forms.FileField(
|
||||||
label=_('ICS File'),
|
label=_('ICS File'),
|
||||||
required=False,
|
required=False,
|
||||||
|
@ -277,6 +273,15 @@ class ExceptionsImportForm(forms.ModelForm):
|
||||||
help_text=_('URL to remote calendar which will be synchronised hourly.'),
|
help_text=_('URL to remote calendar which will be synchronised hourly.'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = Desk
|
||||||
|
fields = []
|
||||||
|
|
||||||
|
def clean(self, *args, **kwargs):
|
||||||
|
cleaned_data = super().clean(*args, **kwargs)
|
||||||
|
if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
|
||||||
|
raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
|
||||||
|
|
||||||
|
|
||||||
class AgendasImportForm(forms.Form):
|
class AgendasImportForm(forms.Form):
|
||||||
agendas_json = forms.FileField(label=_('Agendas Export File'))
|
agendas_json = forms.FileField(label=_('Agendas Export File'))
|
||||||
|
|
|
@ -13,7 +13,37 @@
|
||||||
{% block content %}
|
{% block content %}
|
||||||
|
|
||||||
<form method="post" enctype="multipart/form-data">
|
<form method="post" enctype="multipart/form-data">
|
||||||
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
|
{% if exception_sources %}
|
||||||
|
<table class="main">
|
||||||
|
<thead>
|
||||||
|
<tr>
|
||||||
|
<th>{% trans "Exceptions" %}</th>
|
||||||
|
<th></th>
|
||||||
|
<th></th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody>
|
||||||
|
{% for object in exception_sources %}
|
||||||
|
<tr>
|
||||||
|
<td>
|
||||||
|
<span title="{{ object }}">
|
||||||
|
{% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
|
||||||
|
</span>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
<a rel="popup" href="">
|
||||||
|
{% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
|
||||||
|
</a>
|
||||||
|
</td>
|
||||||
|
<td><a rel="popup" href="">{% trans "remove" %}</a></td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
<br />
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
|
||||||
{% csrf_token %}
|
{% csrf_token %}
|
||||||
{{ form.as_p }}
|
{{ form.as_p }}
|
||||||
<p>
|
<p>
|
||||||
|
|
|
@ -20,7 +20,8 @@ import json
|
||||||
from django.contrib import messages
|
from django.contrib import messages
|
||||||
from django.core.exceptions import PermissionDenied
|
from django.core.exceptions import PermissionDenied
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.http import HttpResponse, Http404, HttpResponseRedirect
|
from django.forms import ValidationError
|
||||||
|
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
||||||
from django.shortcuts import get_object_or_404
|
from django.shortcuts import get_object_or_404
|
||||||
from django.urls import reverse, reverse_lazy
|
from django.urls import reverse, reverse_lazy
|
||||||
from django.utils.dates import MONTHS
|
from django.utils.dates import MONTHS
|
||||||
|
@ -881,26 +882,26 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView
|
||||||
form_class = ExceptionsImportForm
|
form_class = ExceptionsImportForm
|
||||||
template_name = 'chrono/manager_import_exceptions.html'
|
template_name = 'chrono/manager_import_exceptions.html'
|
||||||
|
|
||||||
def get_initial(self):
|
def get_context_data(self, **kwargs):
|
||||||
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
|
context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
|
||||||
|
context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
|
||||||
|
return context
|
||||||
|
|
||||||
def form_valid(self, form):
|
def form_valid(self, form):
|
||||||
exceptions = None
|
exceptions = None
|
||||||
try:
|
try:
|
||||||
if form.cleaned_data['ics_file']:
|
if form.cleaned_data['ics_file']:
|
||||||
ics_file_content = force_text(form.cleaned_data['ics_file'].read())
|
exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(
|
||||||
exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
|
form.cleaned_data['ics_file']
|
||||||
|
)
|
||||||
elif form.cleaned_data['ics_url']:
|
elif form.cleaned_data['ics_url']:
|
||||||
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(
|
exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(
|
||||||
form.cleaned_data['ics_url']
|
form.cleaned_data['ics_url']
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
form.instance.remove_timeperiod_exceptions_from_remote_ics()
|
|
||||||
except ICSError as e:
|
except ICSError as e:
|
||||||
form.add_error(None, force_text(e))
|
form.add_error(None, force_text(e))
|
||||||
return self.form_invalid(form)
|
return self.form_invalid(form)
|
||||||
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
|
|
||||||
form.instance.save()
|
|
||||||
if exceptions is not None:
|
if exceptions is not None:
|
||||||
message = ungettext(
|
message = ungettext(
|
||||||
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
|
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
|
||||||
|
|
|
@ -1,24 +1,23 @@
|
||||||
import pytest
|
import pytest
|
||||||
import datetime
|
import datetime
|
||||||
import mock
|
import mock
|
||||||
import re
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
|
||||||
from django.utils.timezone import now, make_aware, localtime
|
|
||||||
from django.contrib.auth.models import Group
|
from django.contrib.auth.models import Group
|
||||||
|
from django.core.files.base import ContentFile
|
||||||
from django.core.management import call_command
|
from django.core.management import call_command
|
||||||
from django.core.management.base import CommandError
|
from django.utils.timezone import localtime, make_aware, now
|
||||||
|
|
||||||
from chrono.agendas.models import (
|
from chrono.agendas.models import (
|
||||||
Agenda,
|
Agenda,
|
||||||
Event,
|
|
||||||
Booking,
|
Booking,
|
||||||
MeetingType,
|
|
||||||
Desk,
|
Desk,
|
||||||
TimePeriod,
|
Event,
|
||||||
TimePeriodException,
|
|
||||||
ICSError,
|
ICSError,
|
||||||
|
MeetingType,
|
||||||
|
TimePeriodException,
|
||||||
|
TimePeriodExceptionSource,
|
||||||
)
|
)
|
||||||
|
|
||||||
pytestmark = pytest.mark.django_db
|
pytestmark = pytest.mark.django_db
|
||||||
|
@ -28,7 +27,6 @@ VERSION:2.0
|
||||||
PRODID:-//foo.bar//EN
|
PRODID:-//foo.bar//EN
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
DTSTAMP:20170824T082855Z
|
DTSTAMP:20170824T082855Z
|
||||||
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
|
|
||||||
DTSTART:20170831T170800Z
|
DTSTART:20170831T170800Z
|
||||||
DTEND:20170831T203400Z
|
DTEND:20170831T203400Z
|
||||||
SEQUENCE:1
|
SEQUENCE:1
|
||||||
|
@ -36,7 +34,6 @@ SUMMARY:Event 1
|
||||||
END:VEVENT
|
END:VEVENT
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
DTSTAMP:20170824T092855Z
|
DTSTAMP:20170824T092855Z
|
||||||
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
|
|
||||||
DTSTART:20170830T180800Z
|
DTSTART:20170830T180800Z
|
||||||
DTEND:20170831T223400Z
|
DTEND:20170831T223400Z
|
||||||
SEQUENCE:2
|
SEQUENCE:2
|
||||||
|
@ -48,7 +45,6 @@ VERSION:2.0
|
||||||
PRODID:-//foo.bar//EN
|
PRODID:-//foo.bar//EN
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
DTSTAMP:20170824T082855Z
|
DTSTAMP:20170824T082855Z
|
||||||
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
|
|
||||||
DTSTART:20170831T170800Z
|
DTSTART:20170831T170800Z
|
||||||
DURATION:PT3H26M
|
DURATION:PT3H26M
|
||||||
SEQUENCE:1
|
SEQUENCE:1
|
||||||
|
@ -56,7 +52,6 @@ SUMMARY:Event 1
|
||||||
END:VEVENT
|
END:VEVENT
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
DTSTAMP:20170824T092855Z
|
DTSTAMP:20170824T092855Z
|
||||||
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
|
|
||||||
DTSTART:20170830T180800Z
|
DTSTART:20170830T180800Z
|
||||||
DURATION:P1D4H26M
|
DURATION:P1D4H26M
|
||||||
SEQUENCE:2
|
SEQUENCE:2
|
||||||
|
@ -83,25 +78,6 @@ RRULE:FREQ=YEARLY
|
||||||
END:VEVENT
|
END:VEVENT
|
||||||
END:VCALENDAR"""
|
END:VCALENDAR"""
|
||||||
|
|
||||||
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
|
|
||||||
VERSION:2.0
|
|
||||||
PRODID:-//foo.bar//EN
|
|
||||||
BEGIN:VEVENT
|
|
||||||
DTSTAMP:20170720T145803Z
|
|
||||||
DESCRIPTION:Vacances d'ete
|
|
||||||
DTSTART;VALUE=DATE:20180101
|
|
||||||
DTEND;VALUE=DATE:20180101
|
|
||||||
SUMMARY:reccurent event
|
|
||||||
END:VEVENT
|
|
||||||
BEGIN:VEVENT
|
|
||||||
DTSTAMP:20170824T082855Z
|
|
||||||
DTSTART:20180102
|
|
||||||
DTEND:20180101
|
|
||||||
SUMMARY:New Year's Eve
|
|
||||||
RRULE:FREQ=YEARLY;COUNT=1
|
|
||||||
END:VEVENT
|
|
||||||
END:VCALENDAR"""
|
|
||||||
|
|
||||||
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
|
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//foo.bar//EN
|
PRODID:-//foo.bar//EN
|
||||||
|
@ -122,6 +98,7 @@ END:VCALENDAR"""
|
||||||
INVALID_ICS_SAMPLE = """content
|
INVALID_ICS_SAMPLE = """content
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
with open('tests/data/atreal.ics') as f:
|
with open('tests/data/atreal.ics') as f:
|
||||||
ICS_ATREAL = f.read()
|
ICS_ATREAL = f.read()
|
||||||
|
|
||||||
|
@ -213,7 +190,9 @@ def test_timeperiodexception_creation_from_ics():
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(label='Test 1 desk', agenda=agenda)
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
||||||
|
ContentFile(ICS_SAMPLE, name='sample.ics')
|
||||||
|
)
|
||||||
assert exceptions_count == 2
|
assert exceptions_count == 2
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||||
|
|
||||||
|
@ -229,9 +208,9 @@ def test_timeperiodexception_creation_from_ics_without_startdt():
|
||||||
if line.startswith('DTSTART:'):
|
if line.startswith('DTSTART:'):
|
||||||
continue
|
continue
|
||||||
lines.append(line)
|
lines.append(line)
|
||||||
ics_sample = "\n".join(lines)
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||||
with pytest.raises(ICSError) as e:
|
with pytest.raises(ICSError) as e:
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||||
assert 'Event "Event 1" has no start date.' == str(e.value)
|
assert 'Event "Event 1" has no start date.' == str(e.value)
|
||||||
|
|
||||||
|
|
||||||
|
@ -246,8 +225,8 @@ def test_timeperiodexception_creation_from_ics_without_enddt():
|
||||||
if line.startswith('DTEND:'):
|
if line.startswith('DTEND:'):
|
||||||
continue
|
continue
|
||||||
lines.append(line)
|
lines.append(line)
|
||||||
ics_sample = "\n".join(lines)
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||||
for exception in TimePeriodException.objects.filter(desk=desk):
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
||||||
end_time = localtime(exception.end_datetime).time()
|
end_time = localtime(exception.end_datetime).time()
|
||||||
assert end_time == datetime.time(23, 59, 59, 999999)
|
assert end_time == datetime.time(23, 59, 59, 999999)
|
||||||
|
@ -259,18 +238,13 @@ def test_timeperiodexception_creation_from_ics_with_recurrences():
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(label='Test 4 desk', agenda=agenda)
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
|
assert (
|
||||||
|
desk.import_timeperiod_exceptions_from_ics_file(
|
||||||
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
|
||||||
|
)
|
||||||
|
== 3
|
||||||
|
)
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 3
|
assert TimePeriodException.objects.filter(desk=desk).count() == 3
|
||||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
||||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
|
||||||
)
|
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
|
|
||||||
# verify occurences are cleaned when count changed
|
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
|
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
||||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
|
||||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_timeexception_creation_from_ics_with_dates():
|
def test_timeexception_creation_from_ics_with_dates():
|
||||||
|
@ -284,8 +258,8 @@ def test_timeexception_creation_from_ics_with_dates():
|
||||||
if line.startswith('RRULE:'):
|
if line.startswith('RRULE:'):
|
||||||
continue
|
continue
|
||||||
lines.append(line)
|
lines.append(line)
|
||||||
ics_sample = "\n".join(lines)
|
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
||||||
assert exceptions_count == 2
|
assert exceptions_count == 2
|
||||||
for exception in TimePeriodException.objects.filter(desk=desk):
|
for exception in TimePeriodException.objects.filter(desk=desk):
|
||||||
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
|
||||||
|
@ -298,7 +272,7 @@ def test_timeexception_create_from_invalid_ics():
|
||||||
desk = Desk(label='Test 6 desk', agenda=agenda)
|
desk = Desk(label='Test 6 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
with pytest.raises(ICSError) as e:
|
with pytest.raises(ICSError) as e:
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
|
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
|
||||||
assert str(e.value) == 'File format is invalid.'
|
assert str(e.value) == 'File format is invalid.'
|
||||||
|
|
||||||
|
|
||||||
|
@ -308,7 +282,9 @@ def test_timeexception_create_from_ics_with_no_events():
|
||||||
desk = Desk(label='Test 7 desk', agenda=agenda)
|
desk = Desk(label='Test 7 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
with pytest.raises(ICSError) as e:
|
with pytest.raises(ICSError) as e:
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
|
desk.import_timeperiod_exceptions_from_ics_file(
|
||||||
|
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
|
||||||
|
)
|
||||||
assert str(e.value) == "The file doesn't contain any events."
|
assert str(e.value) == "The file doesn't contain any events."
|
||||||
|
|
||||||
|
|
||||||
|
@ -321,19 +297,14 @@ def test_timeperiodexception_creation_from_remote_ics(mocked_get):
|
||||||
mocked_response = mock.Mock()
|
mocked_response = mock.Mock()
|
||||||
mocked_response.text = ICS_SAMPLE
|
mocked_response.text = ICS_SAMPLE
|
||||||
mocked_get.return_value = mocked_response
|
mocked_get.return_value = mocked_response
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||||
assert exceptions_count == 2
|
assert exceptions_count == 2
|
||||||
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
||||||
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
|
|
||||||
assert 'New summary ' in timeperiod.label
|
|
||||||
|
|
||||||
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
||||||
mocked_get.return_value = mocked_response
|
mocked_get.return_value = mocked_response
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
with pytest.raises(ICSError) as e:
|
||||||
assert exceptions_count == 0
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||||
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
|
assert str(e.value) == "The file doesn't contain any events."
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('chrono.agendas.models.requests.get')
|
@mock.patch('chrono.agendas.models.requests.get')
|
||||||
|
@ -351,7 +322,7 @@ def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
|
||||||
|
|
||||||
mocked_get.side_effect = mocked_requests_connection_error
|
mocked_get.side_effect = mocked_requests_connection_error
|
||||||
with pytest.raises(ICSError) as e:
|
with pytest.raises(ICSError) as e:
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||||
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
|
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
|
||||||
|
|
||||||
|
|
||||||
|
@ -371,7 +342,7 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
||||||
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
mocked_get.side_effect = mocked_requests_http_forbidden_error
|
||||||
|
|
||||||
with pytest.raises(ICSError) as e:
|
with pytest.raises(ICSError) as e:
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
||||||
assert (
|
assert (
|
||||||
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
|
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
|
||||||
)
|
)
|
||||||
|
@ -381,10 +352,9 @@ def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
|
||||||
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
||||||
agenda = Agenda(label=u'Test 11 agenda')
|
agenda = Agenda(label=u'Test 11 agenda')
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(
|
desk = Desk(label='Test 11 desk', agenda=agenda)
|
||||||
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics'
|
|
||||||
)
|
|
||||||
desk.save()
|
desk.save()
|
||||||
|
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
|
||||||
mocked_response = mock.Mock()
|
mocked_response = mock.Mock()
|
||||||
mocked_response.status_code = 403
|
mocked_response.status_code = 403
|
||||||
mocked_get.return_value = mocked_response
|
mocked_get.return_value = mocked_response
|
||||||
|
@ -401,41 +371,6 @@ def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('chrono.agendas.models.requests.get')
|
|
||||||
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
|
|
||||||
agenda = Agenda(label=u'Test 11 agenda')
|
|
||||||
agenda.save()
|
|
||||||
desk = Desk(
|
|
||||||
label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics'
|
|
||||||
)
|
|
||||||
desk.save()
|
|
||||||
mocked_response = mock.Mock()
|
|
||||||
mocked_response.text = ICS_SAMPLE
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
call_command('sync_desks_timeperiod_exceptions')
|
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
||||||
mocked_response.text = """BEGIN:VCALENDAR
|
|
||||||
VERSION:2.0
|
|
||||||
PRODID:-//foo.bar//EN
|
|
||||||
BEGIN:VEVENT
|
|
||||||
DTSTAMP:20180824T082855Z
|
|
||||||
UID:new-and-unique-uid
|
|
||||||
DTSTART:20180831T170800Z
|
|
||||||
DTEND:20180831T203400Z
|
|
||||||
SUMMARY:Wonderfull event
|
|
||||||
END:VEVENT
|
|
||||||
END:VCALENDAR"""
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
call_command('sync_desks_timeperiod_exceptions')
|
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
|
||||||
exception = TimePeriodException.objects.get(desk=desk)
|
|
||||||
assert exception.external_id == 'new-and-unique-uid'
|
|
||||||
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
call_command('sync_desks_timeperiod_exceptions')
|
|
||||||
assert not TimePeriodException.objects.filter(desk=desk).exists()
|
|
||||||
|
|
||||||
|
|
||||||
def test_base_meeting_duration():
|
def test_base_meeting_duration():
|
||||||
agenda = Agenda(label='Meeting', kind='meetings')
|
agenda = Agenda(label='Meeting', kind='meetings')
|
||||||
agenda.save()
|
agenda.save()
|
||||||
|
@ -463,7 +398,9 @@ def test_timeperiodexception_creation_from_ics_with_duration():
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(label='Test 1 desk', agenda=agenda)
|
desk = Desk(label='Test 1 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
|
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
||||||
|
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
|
||||||
|
)
|
||||||
assert exceptions_count == 2
|
assert exceptions_count == 2
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||||
|
@ -488,12 +425,16 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past():
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(label='Test 4 desk', agenda=agenda)
|
desk = Desk(label='Test 4 desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
|
assert (
|
||||||
|
desk.import_timeperiod_exceptions_from_ics_file(
|
||||||
|
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
|
||||||
|
)
|
||||||
|
== 2
|
||||||
|
)
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
||||||
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
|
||||||
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
|
||||||
)
|
)
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
||||||
|
@ -501,7 +442,7 @@ def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
|
||||||
agenda.save()
|
agenda.save()
|
||||||
desk = Desk(label='Test atreal desk', agenda=agenda)
|
desk = Desk(label='Test atreal desk', agenda=agenda)
|
||||||
desk.save()
|
desk.save()
|
||||||
assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
|
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
|
||||||
|
|
||||||
|
|
||||||
def test_management_role_deletion():
|
def test_management_role_deletion():
|
||||||
|
|
|
@ -14,10 +14,16 @@ import pytest
|
||||||
import requests
|
import requests
|
||||||
from webtest import Upload
|
from webtest import Upload
|
||||||
|
|
||||||
from chrono.wsgi import application
|
from chrono.agendas.models import (
|
||||||
|
Agenda,
|
||||||
from chrono.agendas.models import Agenda, Event, Booking, MeetingType, TimePeriod, Desk, TimePeriodException
|
Booking,
|
||||||
from chrono.manager.utils import export_site
|
Desk,
|
||||||
|
Event,
|
||||||
|
MeetingType,
|
||||||
|
TimePeriod,
|
||||||
|
TimePeriodException,
|
||||||
|
TimePeriodExceptionSource,
|
||||||
|
)
|
||||||
|
|
||||||
pytestmark = pytest.mark.django_db
|
pytestmark = pytest.mark.django_db
|
||||||
|
|
||||||
|
@ -1158,8 +1164,9 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user):
|
||||||
resp = resp.click('Settings')
|
resp = resp.click('Settings')
|
||||||
assert 'Import exceptions from .ics' in resp.text
|
assert 'Import exceptions from .ics' in resp.text
|
||||||
resp = resp.click('upload')
|
resp = resp.click('upload')
|
||||||
assert "You can upload a file or specify an address to a remote calendar." in resp
|
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
|
||||||
resp = resp.form.submit(status=302)
|
resp = resp.form.submit(status=200)
|
||||||
|
assert 'Please provide an ICS File or an URL.' in resp.text
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
||||||
resp = resp.click('Settings')
|
resp = resp.click('Settings')
|
||||||
resp = resp.click('upload')
|
resp = resp.click('upload')
|
||||||
|
@ -1200,6 +1207,12 @@ END:VCALENDAR"""
|
||||||
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
|
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
|
||||||
resp = resp.form.submit(status=302)
|
resp = resp.form.submit(status=302)
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||||
|
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
|
||||||
|
source = TimePeriodExceptionSource.objects.latest('pk')
|
||||||
|
exception = TimePeriodException.objects.latest('pk')
|
||||||
|
assert exception.source == source
|
||||||
|
assert source.ics_filename == 'exceptions.ics'
|
||||||
|
assert source.ics_url is None
|
||||||
resp = resp.follow()
|
resp = resp.follow()
|
||||||
assert 'An exception has been imported.' in resp.text
|
assert 'An exception has been imported.' in resp.text
|
||||||
|
|
||||||
|
@ -1257,7 +1270,6 @@ def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, ad
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//foo.bar//EN
|
PRODID:-//foo.bar//EN
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
UID:random-event-id
|
|
||||||
DTSTART:20180101
|
DTSTART:20180101
|
||||||
DTEND:20180101
|
DTEND:20180101
|
||||||
SUMMARY:New Year's Eve
|
SUMMARY:New Year's Eve
|
||||||
|
@ -1267,15 +1279,12 @@ END:VCALENDAR"""
|
||||||
resp = resp.form.submit(status=302)
|
resp = resp.form.submit(status=302)
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
||||||
exception = TimePeriodException.objects.get(desk=desk)
|
exception = TimePeriodException.objects.get(desk=desk)
|
||||||
assert exception.external_id == 'random-event-id'
|
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
source = TimePeriodExceptionSource.objects.latest('pk')
|
||||||
resp = resp.click('Settings')
|
exception = TimePeriodException.objects.latest('pk')
|
||||||
resp = resp.click('upload')
|
assert exception.source == source
|
||||||
resp.form['ics_url'] = ''
|
assert source.ics_filename is None
|
||||||
resp = resp.form.submit(status=302)
|
assert source.ics_url == 'http://example.com/foo.ics'
|
||||||
assert not TimePeriodException.objects.filter(
|
|
||||||
desk=desk, external_id='desk-%s:random-event-id' % desk.id
|
|
||||||
).exists()
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('chrono.agendas.models.requests.get')
|
@mock.patch('chrono.agendas.models.requests.get')
|
||||||
|
@ -1301,79 +1310,10 @@ def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_ge
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//foo.bar//EN
|
PRODID:-//foo.bar//EN
|
||||||
BEGIN:VEVENT
|
BEGIN:VEVENT
|
||||||
UID:random-event-id
|
|
||||||
DTSTART:20180101
|
DTSTART:20180101
|
||||||
DTEND:20180101
|
DTEND:20180101
|
||||||
SUMMARY:New Year's Eve
|
SUMMARY:New Year's Eve
|
||||||
END:VEVENT
|
END:VEVENT
|
||||||
END:VCALENDAR"""
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
resp = resp.form.submit(status=302)
|
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
|
||||||
exception = TimePeriodException.objects.get(desk=desk)
|
|
||||||
assert exception.external_id == 'random-event-id'
|
|
||||||
mocked_response.text = """BEGIN:VCALENDAR
|
|
||||||
VERSION:2.0
|
|
||||||
PRODID:-//foo.bar//EN
|
|
||||||
END:VCALENDAR"""
|
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
|
||||||
resp = resp.click('Settings')
|
|
||||||
resp = resp.click('upload')
|
|
||||||
resp = resp.form.submit(status=302)
|
|
||||||
assert not TimePeriodException.objects.filter(desk=desk, external_id='random-event-id').exists()
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('chrono.agendas.models.requests.get')
|
|
||||||
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
|
|
||||||
agenda = Agenda.objects.create(label='New Example', kind='meetings')
|
|
||||||
desk = Desk.objects.create(agenda=agenda, label='New Desk')
|
|
||||||
MeetingType(agenda=agenda, label='Bar').save()
|
|
||||||
login(app)
|
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
|
||||||
resp = resp.click('Settings')
|
|
||||||
assert 'Import exceptions from .ics' not in resp.text
|
|
||||||
|
|
||||||
TimePeriod.objects.create(
|
|
||||||
weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
|
|
||||||
)
|
|
||||||
|
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
|
||||||
resp = resp.click('Settings')
|
|
||||||
resp = resp.click('upload')
|
|
||||||
resp.form['ics_url'] = 'http://example.com/foo.ics'
|
|
||||||
mocked_response = mock.Mock()
|
|
||||||
mocked_response.text = """BEGIN:VCALENDAR
|
|
||||||
VERSION:2.0
|
|
||||||
PRODID:-//foo.bar//EN
|
|
||||||
BEGIN:VEVENT
|
|
||||||
UID:first-eventrandom-event-id
|
|
||||||
DTSTART:20180101
|
|
||||||
DTEND:20180101
|
|
||||||
SUMMARY:First test event
|
|
||||||
END:VEVENT
|
|
||||||
BEGIN:VEVENT
|
|
||||||
UID:second-eventrandom-event-id
|
|
||||||
DTSTART:20190101
|
|
||||||
DTEND:20190101
|
|
||||||
SUMMARY:Second test event
|
|
||||||
END:VEVENT
|
|
||||||
END:VCALENDAR"""
|
|
||||||
mocked_get.return_value = mocked_response
|
|
||||||
resp = resp.form.submit(status=302)
|
|
||||||
assert TimePeriodException.objects.filter(desk=desk).count() == 2
|
|
||||||
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
|
|
||||||
resp = resp.click('Settings')
|
|
||||||
resp = resp.click('upload')
|
|
||||||
resp.form['ics_url'] = 'http://example.com/foo.ics'
|
|
||||||
mocked_response.text = """BEGIN:VCALENDAR
|
|
||||||
VERSION:2.0
|
|
||||||
PRODID:-//foo.bar//EN
|
|
||||||
BEGIN:VEVENT
|
|
||||||
UID:secord-eventrandom-event-id
|
|
||||||
DTSTART:20190101
|
|
||||||
DTEND:20190101
|
|
||||||
SUMMARY:Second test event
|
|
||||||
END:VEVENT
|
|
||||||
END:VCALENDAR"""
|
END:VCALENDAR"""
|
||||||
mocked_get.return_value = mocked_response
|
mocked_get.return_value = mocked_response
|
||||||
resp = resp.form.submit(status=302)
|
resp = resp.form.submit(status=302)
|
||||||
|
|
Loading…
Reference in New Issue