manager: import/export events - update event if slug exists (#42343)

This commit is contained in:
Lauréline Guérin 2020-09-11 09:29:16 +02:00
parent 1cf8ab96d7
commit c9ed2fad11
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
5 changed files with 286 additions and 35 deletions

View File

@ -27,6 +27,7 @@ from django.forms import ValidationError
from django.utils.encoding import force_text
from django.utils.six import StringIO
from django.utils.timezone import make_aware
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from chrono.agendas.models import (
@ -45,6 +46,7 @@ from chrono.agendas.models import (
AgendaReminderSettings,
WEEKDAYS_LIST,
UnavailabilityCalendar,
generate_slug,
)
from . import widgets
@ -356,7 +358,14 @@ class ImportEventsForm(forms.Form):
dialect = None
events = []
slugs = set()
warnings = {}
events_by_slug = {e.slug: e for e in Event.objects.filter(agenda=self.agenda_pk)}
event_ids_with_bookings = set(
Booking.objects.filter(
event__agenda=self.agenda_pk, cancellation_datetime__isnull=True
).values_list('event_id', flat=True)
)
seen_slugs = set(events_by_slug.keys())
for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect)):
if not csvline:
continue
@ -364,8 +373,31 @@ class ImportEventsForm(forms.Form):
raise ValidationError(_('Invalid file format. (line %d)') % (i + 1))
if i == 0 and csvline[0].strip('#') in ('date', 'Date', _('date'), _('Date')):
continue
event = Event()
event.agenda_id = self.agenda_pk
# label needed to generate a slug
label = None
if len(csvline) >= 5:
label = force_text(csvline[4])
# get or create event
event = None
slug = None
if len(csvline) >= 6:
slug = force_text(csvline[5]) if csvline[5] else None
# get existing event if relevant
if slug and slug in seen_slugs:
event = events_by_slug[slug]
# update label
event.label = label
if event is None:
# new event
event = Event(agenda_id=self.agenda_pk, label=label)
# generate a slug if not provided
event.slug = slug or generate_slug(event, seen_slugs=seen_slugs, agenda=self.agenda_pk)
# maintain caches
seen_slugs.add(event.slug)
events_by_slug[event.slug] = event
for datetime_fmt in (
'%Y-%m-%d %H:%M',
'%d/%m/%Y %H:%M',
@ -374,10 +406,22 @@ class ImportEventsForm(forms.Form):
'%d/%m/%Y %H:%M:%S',
):
try:
event_datetime = datetime.datetime.strptime('%s %s' % tuple(csvline[:2]), datetime_fmt)
event_datetime = make_aware(
datetime.datetime.strptime('%s %s' % tuple(csvline[:2]), datetime_fmt)
)
except ValueError:
continue
event.start_datetime = make_aware(event_datetime)
if (
event.pk is not None
and event.start_datetime != event_datetime
and event.start_datetime > now()
and event.pk in event_ids_with_bookings
and event.pk not in warnings
):
# event start datetime has changed, event is not past and has not cancelled bookings
# => warn the user
warnings[event.pk] = event
event.start_datetime = event_datetime
break
else:
raise ValidationError(_('Invalid file format. (date/time format, line %d)') % (i + 1))
@ -392,17 +436,7 @@ class ImportEventsForm(forms.Form):
raise ValidationError(
_('Invalid file format. (number of places in waiting list, line %d)') % (i + 1)
)
if len(csvline) >= 5:
event.label = force_text(csvline[4])
exclude = ['desk', 'meeting_type']
if len(csvline) >= 6:
event.slug = force_text(csvline[5]) if csvline[5] else None
if event.slug and event.slug in slugs:
raise ValidationError(_('File contains duplicated event identifiers: %s') % event.slug)
else:
slugs.add(event.slug)
else:
exclude += ['slug']
column_index = 7
for more_attr in ('description', 'pricing', 'url'):
if len(csvline) >= column_index:
@ -426,7 +460,7 @@ class ImportEventsForm(forms.Form):
raise ValidationError(_('Invalid file format. (duration, line %d)') % (i + 1))
try:
event.full_clean(exclude=exclude)
event.full_clean(exclude=['desk', 'meeting_type'])
except ValidationError as e:
errors = [_('Invalid file format:\n')]
for label, field_errors in e.message_dict.items():
@ -440,6 +474,7 @@ class ImportEventsForm(forms.Form):
raise ValidationError(errors)
events.append(event)
self.events = events
self.warnings = warnings
@staticmethod
def get_verbose_name(field_name):

View File

@ -21,7 +21,10 @@
<ul class="extra-actions-menu">
<li><a rel="popup" href="{% url 'chrono-manager-agenda-edit' pk=object.id %}">{% trans 'Options' %}</a></li>
<li><a rel="popup" class="action-duplicate" href="{% url 'chrono-manager-agenda-duplicate' pk=object.pk %}">{% trans 'Duplicate' %}</a></li>
<li><a download href="{% url 'chrono-manager-agenda-export' pk=object.id %}">{% trans 'Export' %}</a></li>
<li><a download href="{% url 'chrono-manager-agenda-export' pk=object.id %}">{% trans 'Export Configuration (JSON)' %}</a></li>
{% if object.kind == 'events' %}
<li><a download href="{% url 'chrono-manager-agenda-export-events' pk=object.pk %}">{% trans 'Export Events (CSV)' %}</a></li>
{% endif %}
{% if user.is_staff %}
<li><a rel="popup" href="{% url 'chrono-manager-agenda-delete' pk=object.id %}">{% trans 'Delete' %}</a></li>
{% endif %}

View File

@ -108,6 +108,11 @@ urlpatterns = [
views.agenda_import_events,
name='chrono-manager-agenda-import-events',
),
url(
r'^agendas/(?P<pk>\d+)/export-events$',
views.agenda_export_events,
name='chrono-manager-agenda-export-events',
),
url(
r'^agendas/(?P<pk>\d+)/notifications$',
views.agenda_notifications_settings,

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import datetime
import itertools
import json
@ -1372,6 +1373,9 @@ class AgendaImportEventsView(ManagedAgendaMixin, FormView):
template_name = 'chrono/manager_import_events.html'
agenda = None
def set_agenda(self, **kwargs):
self.agenda = get_object_or_404(Agenda, pk=kwargs.get('pk'), kind='events')
def get_form_kwargs(self):
kwargs = super(FormView, self).get_form_kwargs()
kwargs['agenda_pk'] = self.kwargs['pk']
@ -1380,17 +1384,75 @@ class AgendaImportEventsView(ManagedAgendaMixin, FormView):
def form_valid(self, form):
if form.events:
# existing event slugs for this agenda
seen_slugs = set(self.agenda.event_set.values_list('slug', flat=True))
for event in form.events:
event.agenda_id = self.kwargs['pk']
event.save(seen_slugs=seen_slugs) # optimization: seen_slugs
event.agenda = self.agenda
event.save()
messages.info(self.request, _('%d events have been imported.') % len(form.events))
for event in form.warnings.values():
messages.warning(
self.request,
_('Event "%s" start date has changed. Do not forget to notify the registrants.')
% (event.label or event.slug),
)
return super(AgendaImportEventsView, self).form_valid(form)
agenda_import_events = AgendaImportEventsView.as_view()
class AgendaExportEventsView(ManagedAgendaMixin, View):
template_name = 'chrono/manager_export_events.html'
agenda = None
def set_agenda(self, **kwargs):
self.agenda = get_object_or_404(Agenda, pk=kwargs.get('pk'), kind='events')
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='text/csv')
today = datetime.date.today()
response['Content-Disposition'] = 'attachment; filename="export_agenda_events_{}_{}.csv"'.format(
self.agenda.slug, today.strftime('%Y%m%d')
)
writer = csv.writer(response)
# headers
writer.writerow(
[
_('date'),
_('time'),
_('number of places'),
_('number of places in waiting list'),
_('label'),
_('identifier'),
_('description'),
_('pricing'),
_('URL'),
_('publication date'),
_('duration'),
]
)
for event in self.agenda.event_set.all():
start_datetime = localtime(event.start_datetime)
writer.writerow(
[
start_datetime.strftime('%Y-%m-%d'),
start_datetime.strftime('%H:%M'),
event.places,
event.waiting_list_places,
event.label,
event.slug,
event.description,
event.pricing,
event.url,
event.publication_date.strftime('%Y-%m-%d') if event.publication_date else '',
event.duration,
]
)
return response
agenda_export_events = AgendaExportEventsView.as_view()
class AgendaNotificationsSettingsView(ManagedAgendaMixin, UpdateView):
template_name = 'chrono/manager_agenda_notifications_form.html'
model = AgendaNotificationsSettings

View File

@ -1405,6 +1405,56 @@ def test_delete_event_as_manager(app, manager_user):
assert Event.objects.count() == 0
def test_export_events(app, admin_user):
agenda = Agenda.objects.create(label=u'Foo bar')
app = login(app)
resp = app.get('/manage/agendas/%s/export-events' % agenda.id)
csv_export = resp.text
assert (
csv_export
== 'date,time,number of places,number of places in waiting list,label,identifier,description,pricing,URL,publication date,duration\r\n'
)
resp = app.get('/manage/agendas/%s/import-events' % agenda.id)
resp.form['events_csv_file'] = Upload('t.csv', b'2016-09-16,00:30,10', 'text/csv')
resp.form.submit(status=302)
resp = app.get('/manage/agendas/%s/export-events' % agenda.id)
csv_export = resp.text
assert (
csv_export
== 'date,time,number of places,number of places in waiting list,label,identifier,description,pricing,URL,publication date,duration\r\n'
'2016-09-16,00:30,10,0,,foo-bar-event,,,,,\r\n'
)
resp = app.get('/manage/agendas/%s/import-events' % agenda.id)
resp.form['events_csv_file'] = Upload(
't.csv',
b'2016-09-16,23:30,10,5,label,slug,"description\nfoobar",pricing,url,2016-10-16,90',
'text/csv',
)
resp.form.submit(status=302)
resp = app.get('/manage/agendas/%s/export-events' % agenda.id)
csv_export = resp.text
assert (
csv_export
== 'date,time,number of places,number of places in waiting list,label,identifier,description,pricing,URL,publication date,duration\r\n'
'2016-09-16,00:30,10,0,,foo-bar-event,,,,,\r\n'
'2016-09-16,23:30,10,5,label,slug,"description\nfoobar",pricing,url,2016-10-16,90\r\n'
)
def test_export_events_wrong_kind(app, admin_user):
agenda = Agenda.objects.create(label=u'Foo bar', kind='meetings')
app = login(app)
app.get('/manage/agendas/%s/export-events' % agenda.id, status=404)
agenda.kind = 'virtual'
agenda.save()
app.get('/manage/agendas/%s/export-events' % agenda.id, status=404)
def test_import_events(app, admin_user):
agenda = Agenda(label=u'Foo bar')
agenda.save()
@ -1550,9 +1600,10 @@ def test_import_events(app, admin_user):
assert event.slug == 'slug'
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload('t.csv', b'2016-09-16,18:00,10,5,label,slug', 'text/csv')
resp = resp.form.submit(status=200)
assert 'Event with this Agenda and Identifier already exists.' in resp.text
assert '__all__' not in resp.text
resp = resp.form.submit(status=302)
assert Event.objects.count() == 1
event = Event.objects.latest('pk')
assert event.slug == 'slug'
# additional optional attributes
Event.objects.all().delete()
@ -1603,17 +1654,20 @@ def test_import_events(app, admin_user):
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload(
't.csv',
b'2016-09-16,18:00,10,5,labela,,,pricing,\n'
b'2016-09-17,18:00,10,5,labela,,,pricing,\n'
b'2016-09-18,18:00,10,5,labela,,,pricing,\n'
b'2016-09-16,18:00,10,5,labela,labelb,,pricing,\n'
b'2016-09-17,18:00,10,5,labela,labelb-1,,pricing,\n'
b'2016-09-18,18:00,10,5,labela,labelb-2,,pricing,\n'
b'2016-09-18,18:00,10,5,labelb,,,pricing,\n'
b'2016-09-18,18:00,10,5,labelb,,,pricing,\n',
'text/csv',
)
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit(status=302)
assert len(ctx.captured_queries) == 24
assert len(ctx.captured_queries) == 22
assert Event.objects.count() == 5
assert set(Event.objects.values_list('slug', flat=True)) == set(
['labelb', 'labelb-1', 'labelb-2', 'labelb-3', 'labelb-4']
)
# forbidden numerical slug
Event.objects.all().delete()
@ -1623,14 +1677,107 @@ def test_import_events(app, admin_user):
assert 'value cannot be a number' in resp.text
assert 'Identifier:' in resp.text # verbose_name is shown, not field name ('slug:')
# handle duplicated slug
Event.objects.all().delete()
def test_import_events_existing_event(app, admin_user, freezer):
agenda = Agenda.objects.create(label=u'Foo bar')
app = login(app)
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload(
't.csv', b'2016-09-16,18:00,10,5,label,slug\n' b'2016-09-16,18:00,10,5,label,slug\n', 'text/csv',
't.csv', b'2016-09-16,18:00,10,5,label,slug\n2016-09-16,18:00,10,5,label,slug\n', 'text/csv',
)
resp = resp.form.submit(status=200)
assert 'duplicated event identifiers' in resp.text
resp.form.submit(status=302)
assert agenda.event_set.count() == 1
event = Event.objects.latest('pk')
def check_import(date, time, with_alert):
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload(
't.csv', b'%s,%s,10,5,label,slug\n' % (date.encode(), time.encode()), 'text/csv',
)
resp = resp.form.submit(status=302).follow()
assert agenda.event_set.count() == 1
event.refresh_from_db()
if with_alert:
assert (
'<li class="warning">Event &quot;label&quot; start date has changed. Do not forget to notify the registrants.</li>'
in resp.text
)
else:
assert (
'<li class="warning">Event &quot;label&quot; start date has changed. Do not forget to notify the registrants.</li>'
not in resp.text
)
assert event.start_datetime == make_aware(
datetime.datetime(*[int(v) for v in date.split('-')], *[int(v) for v in time.split(':')])
)
# change date or time
# event in the past, no alert, with or without booking
Booking.objects.create(
event=event, cancellation_datetime=make_aware(datetime.datetime(2017, 5, 20, 10, 30))
)
check_import('2016-09-15', '18:00', False) # change date
check_import('2016-09-15', '17:00', False) # change time
# available booking
Booking.objects.create(event=event)
check_import('2016-09-14', '17:00', False) # change date
check_import('2016-09-14', '16:00', False) # change time
# date in the future
freezer.move_to('2016-09-01')
# warn if available booking only
check_import('2016-09-13', '16:00', True) # change date
check_import('2016-09-13', '15:00', True) # change time
# no available booking
Booking.objects.all().delete()
Booking.objects.create(
event=event, cancellation_datetime=make_aware(datetime.datetime(2017, 5, 20, 10, 30))
)
check_import('2016-09-12', '15:00', False) # change date
check_import('2016-09-12', '14:00', False) # change time
# check there is a message per changed event
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload(
't.csv', b'2016-09-16,18:00,10,5,label,slug\n2016-09-16,19:00,10,5,label,other_slug\n', 'text/csv',
)
resp.form.submit(status=302)
assert agenda.event_set.count() == 2
event2 = Event.objects.latest('pk')
Booking.objects.create(event=event)
Booking.objects.create(event=event2)
resp = app.get('/manage/agendas/%s/import-events' % agenda.id, status=200)
resp.form['events_csv_file'] = Upload(
't.csv',
b'2016-09-17,18:00,10,5,label,slug\n2016-09-17,19:00,10,5,,other_slug\n2016-09-17,20:00,10,5,,other_slug\n',
'text/csv',
)
resp = resp.form.submit(status=302).follow()
assert agenda.event_set.count() == 2
assert (
resp.text.count(
'Event &quot;label&quot; start date has changed. Do not forget to notify the registrants.'
)
== 1
)
assert (
resp.text.count(
'Event &quot;other_slug&quot; start date has changed. Do not forget to notify the registrants.'
)
== 1
)
def test_import_events_wrong_kind(app, admin_user):
agenda = Agenda.objects.create(label=u'Foo bar', kind='meetings')
app = login(app)
app.get('/manage/agendas/%s/import-events' % agenda.id, status=404)
agenda.kind = 'virtual'
agenda.save()
app.get('/manage/agendas/%s/import-events' % agenda.id, status=404)
def test_add_meetings_agenda(app, admin_user):
@ -3322,9 +3469,8 @@ def test_import_agenda(app, admin_user):
agenda.save()
app = login(app)
resp = app.get('/manage/agendas/%s/settings' % agenda.id)
with freezegun.freeze_time('2020-06-15'):
resp = resp.click('Export')
resp = app.get('/manage/agendas/%s/export' % agenda.id)
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_agenda_foo-bar_20200615.json"'
agenda_export = resp.text