This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
auquotidien/auquotidien/modules/events.py

235 lines
8.4 KiB
Python

import time
import datetime
import urllib2
import vobject
from quixote import get_request, get_publisher, get_response
from quixote.html import htmltext, TemplateIO, htmlescape
from wcs.qommon import _
from wcs.qommon.publisher import get_publisher_class
from wcs.qommon.storage import StorableObject
from wcs.qommon.cron import CronJob
from wcs.qommon import misc
class Event(StorableObject):
_names = 'events'
title = None
description = None
url = None
date_start = None
date_end = None
location = None
organizer = None
more_infos = None
keywords = None
def in_month(self, year, month):
if not self.date_end: # a single date
return tuple(self.date_start[:2]) == (year, month)
else:
# an interval
if tuple(self.date_start[:2]) > (year, month): # start later
return False
if tuple(self.date_end[:2]) < (year, month): # ended before
return False
return True
def after_today(self):
today = time.localtime()[:3]
if not self.date_end:
return tuple(self.date_start[:3]) > today
return tuple(self.date_end[:3]) > today
def format_date(self):
d = {
'year_start': self.date_start[0],
'month_start': misc.get_month_name(self.date_start[1]),
'day_start': self.date_start[2]
}
if self.date_end and self.date_start[:3] != self.date_end[:3]:
d.update({
'year_end': self.date_end[0],
'month_end': misc.get_month_name(self.date_end[1]),
'day_end': self.date_end[2]
})
d2 = datetime.date(*self.date_start[:3]) + datetime.timedelta(days=1)
if tuple(self.date_end[:3]) == (d2.year, d2.month, d2.day):
# two consecutive days
if self.date_start[1] == self.date_end[1]:
return _('On %(month_start)s %(day_start)s and %(day_end)s') % d
else:
return _('On %(month_start)s %(day_start)s and %(month_end)s %(day_end)s') % d
else:
if self.date_start[0] == self.date_end[0]: # same year
if self.date_start[1] == self.date_end[1]: # same month
return _('From %(month_start)s %(day_start)s to %(day_end)s') % d
else:
return _('From %(month_start)s %(day_start)s '
'to %(month_end)s %(day_end)s') % d
else:
return _('From %(month_start)s %(day_start)s %(year_start)s '
'to %(month_end)s %(day_end)s %(year_end)s') % d
else:
return _('On %(month_start)s %(day_start)s') % d
def as_vevent(self):
vevent = vobject.newFromBehavior('vevent')
site_charset = get_publisher().site_charset
vevent.add('uid').value = '%04d%02d%02d-%s@%s' % (self.date_start[:3] + (self.id,
get_request().get_server().lower().split(':')[0].rstrip('.')))
vevent.add('summary').value = unicode(self.title, site_charset)
vevent.add('dtstart').value = datetime.date(*self.date_start[:3])
vevent.dtstart.value_param = 'DATE'
if self.date_end:
vevent.add('dtend').value = datetime.date(*self.date_end[:3])
vevent.dtend.value_param = 'DATE'
if self.description:
vevent.add('description').value = unicode(self.description.strip(), site_charset)
if self.url:
vevent.add('url').value = unicode(self.url, site_charset)
if self.location:
vevent.add('location').value = unicode(self.location, site_charset)
if self.organizer:
vevent.add('organizer').value = unicode(self.organizer, site_charset)
if self.keywords:
vevent.add('categories').value = [unicode(x, site_charset) for x in self.keywords]
vevent.add('class').value = 'PUBLIC'
return vevent
def as_vcalendar(cls):
cal = vobject.iCalendar()
cal.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
for x in cls.select():
cal.add(x.as_vevent())
return cal.serialize()
as_vcalendar = classmethod(as_vcalendar)
def as_html_dt_dd(self):
root_url = get_publisher().get_root_url()
r = TemplateIO(html=True)
r += htmltext('<dt>')
r += self.format_date()
r += htmltext('</dt>')
r += htmltext('<p><dd><strong>%s</strong>') % self.title
if self.description:
r += ' - ' + self.description
r += htmltext('</p>')
if (self.location or self.organizer or self.more_infos or self.keywords):
r += htmltext('<ul>')
if self.location:
r += htmltext('<li>%s: %s</li>') % (_('Location'), self.location)
if self.organizer:
r += htmltext('<li>%s: %s</li>') % (_('Organizer'), self.organizer)
if self.more_infos:
r += htmltext('<li>%s</li>') % self.more_infos
if self.keywords:
r += htmltext('<li>')
for k in self.keywords:
r += htmltext('<a class="tag" href="%sagenda/tag/%s">%s</a> ') % (root_url, k, k)
r += htmltext('</li>')
r += htmltext('</ul>')
if self.url:
r += htmltext('<a class="external" href="%s">%s</a>') % (
self.url, _('More information'))
r += htmltext('</dd>')
return r.getvalue()
def get_url(self):
return '%s/agenda/events/%s/' % (get_publisher().get_frontoffice_url(), self.id)
def get_atom_entry(self):
from pyatom import pyatom
entry = pyatom.Entry()
entry.id = self.get_url()
entry.title = self.title
entry.content.attrs['type'] = 'html'
entry.content.text = str('<p>' + htmlescape(
unicode(self.description, get_publisher().site_charset).encode('utf-8')) + '</p>')
return entry
class RemoteCalendar(StorableObject):
_names = 'remote_calendars'
label = None
url = None
content = None
events = None
error = None # (time, string, params)
def download_and_parse(self, job=None):
old_content = self.content
try:
self.content = urllib2.urlopen(self.url).read()
except urllib2.HTTPError, e:
self.error = (time.localtime(), N_('HTTP Error %s on download'), (e.code,))
self.store()
return
except urllib2.URLError, e:
self.error = (time.localtime(), N_('Error on download'), ())
self.store()
return
if self.error:
self.error = None
self.store()
if self.content == old_content:
return
self.events = []
try:
parsed_cal = vobject.readOne(self.content)
except vobject.base.ParseError:
self.error = (time.localtime(), N_('Failed to parse file'), ())
self.store()
return
site_charset = get_publisher().site_charset
for vevent in parsed_cal.vevent_list:
ev = Event()
ev.title = vevent.summary.value.encode(site_charset, 'replace')
try:
ev.url = vevent.url.value.encode(site_charset, 'replace')
except AttributeError:
pass
ev.date_start = vevent.dtstart.value.timetuple()
try:
ev.date_end = vevent.dtend.value.timetuple()
except AttributeError:
pass
try:
ev.description = vevent.description.value.encode(site_charset, 'replace')
except AttributeError:
pass
try:
ev.keywords = [x.encode(site_charset) for x in vevent.categories.value]
except AttributeError:
pass
self.events.append(ev)
self.store()
def get_error_message(self):
if not self.error:
return None
return '(%s) %s' % (misc.localstrftime(self.error[0]),
_(self.error[1]) % self.error[2])
def update_remote_calendars(publisher):
for source in RemoteCalendar.select():
source.download_and_parse()
def get_default_event_tags():
return [_('All Public'), _('Adults'), _('Children'), _('Free')]
get_publisher_class().register_cronjob(CronJob(update_remote_calendars, minutes = [0]))