235 lines
8.4 KiB
Python
235 lines
8.4 KiB
Python
import time
|
|
import datetime
|
|
import urllib2
|
|
import vobject
|
|
|
|
from quixote import get_request, get_publisher, get_response
|
|
from quixote.html import htmltext, TemplateIO, htmlescape
|
|
|
|
from qommon import _
|
|
from qommon.publisher import get_publisher_class
|
|
from qommon.storage import StorableObject
|
|
from qommon.cron import CronJob
|
|
from qommon import misc
|
|
|
|
class Event(StorableObject):
|
|
_names = 'events'
|
|
|
|
title = None
|
|
description = None
|
|
url = None
|
|
date_start = None
|
|
date_end = None
|
|
location = None
|
|
organizer = None
|
|
more_infos = None
|
|
keywords = None
|
|
|
|
def in_month(self, year, month):
|
|
if not self.date_end: # a single date
|
|
return tuple(self.date_start[:2]) == (year, month)
|
|
else:
|
|
# an interval
|
|
if tuple(self.date_start[:2]) > (year, month): # start later
|
|
return False
|
|
if tuple(self.date_end[:2]) < (year, month): # ended before
|
|
return False
|
|
return True
|
|
|
|
def after_today(self):
|
|
today = time.localtime()[:3]
|
|
if not self.date_end:
|
|
return tuple(self.date_start[:3]) > today
|
|
return tuple(self.date_end[:3]) > today
|
|
|
|
def format_date(self):
|
|
d = {
|
|
'year_start': self.date_start[0],
|
|
'month_start': misc.get_month_name(self.date_start[1]),
|
|
'day_start': self.date_start[2]
|
|
}
|
|
if self.date_end and self.date_start[:3] != self.date_end[:3]:
|
|
d.update({
|
|
'year_end': self.date_end[0],
|
|
'month_end': misc.get_month_name(self.date_end[1]),
|
|
'day_end': self.date_end[2]
|
|
})
|
|
d2 = datetime.date(*self.date_start[:3]) + datetime.timedelta(days=1)
|
|
if tuple(self.date_end[:3]) == (d2.year, d2.month, d2.day):
|
|
# two consecutive days
|
|
if self.date_start[1] == self.date_end[1]:
|
|
return _('On %(month_start)s %(day_start)s and %(day_end)s') % d
|
|
else:
|
|
return _('On %(month_start)s %(day_start)s and %(month_end)s %(day_end)s') % d
|
|
else:
|
|
if self.date_start[0] == self.date_end[0]: # same year
|
|
if self.date_start[1] == self.date_end[1]: # same month
|
|
return _('From %(month_start)s %(day_start)s to %(day_end)s') % d
|
|
else:
|
|
return _('From %(month_start)s %(day_start)s '
|
|
'to %(month_end)s %(day_end)s') % d
|
|
else:
|
|
return _('From %(month_start)s %(day_start)s %(year_start)s '
|
|
'to %(month_end)s %(day_end)s %(year_end)s') % d
|
|
else:
|
|
return _('On %(month_start)s %(day_start)s') % d
|
|
|
|
def as_vevent(self):
|
|
vevent = vobject.newFromBehavior('vevent')
|
|
site_charset = get_publisher().site_charset
|
|
vevent.add('uid').value = '%04d%02d%02d-%s@%s' % (self.date_start[:3] + (self.id,
|
|
get_request().get_server().lower().split(':')[0].rstrip('.')))
|
|
vevent.add('summary').value = unicode(self.title, site_charset)
|
|
vevent.add('dtstart').value = datetime.date(*self.date_start[:3])
|
|
vevent.dtstart.value_param = 'DATE'
|
|
if self.date_end:
|
|
vevent.add('dtend').value = datetime.date(*self.date_end[:3])
|
|
vevent.dtend.value_param = 'DATE'
|
|
if self.description:
|
|
vevent.add('description').value = unicode(self.description.strip(), site_charset)
|
|
if self.url:
|
|
vevent.add('url').value = unicode(self.url, site_charset)
|
|
if self.location:
|
|
vevent.add('location').value = unicode(self.location, site_charset)
|
|
if self.organizer:
|
|
vevent.add('organizer').value = unicode(self.organizer, site_charset)
|
|
if self.keywords:
|
|
vevent.add('categories').value = [unicode(x, site_charset) for x in self.keywords]
|
|
vevent.add('class').value = 'PUBLIC'
|
|
return vevent
|
|
|
|
def as_vcalendar(cls):
|
|
cal = vobject.iCalendar()
|
|
cal.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
|
|
for x in cls.select():
|
|
cal.add(x.as_vevent())
|
|
return cal.serialize()
|
|
as_vcalendar = classmethod(as_vcalendar)
|
|
|
|
def as_html_dt_dd(self):
|
|
root_url = get_publisher().get_root_url()
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<dt>')
|
|
r += self.format_date()
|
|
r += htmltext('</dt>')
|
|
r += htmltext('<p><dd><strong>%s</strong>') % self.title
|
|
if self.description:
|
|
r += ' - ' + self.description
|
|
r += htmltext('</p>')
|
|
if (self.location or self.organizer or self.more_infos or self.keywords):
|
|
r += htmltext('<ul>')
|
|
if self.location:
|
|
r += htmltext('<li>%s: %s</li>') % (_('Location'), self.location)
|
|
if self.organizer:
|
|
r += htmltext('<li>%s: %s</li>') % (_('Organizer'), self.organizer)
|
|
if self.more_infos:
|
|
r += htmltext('<li>%s</li>') % self.more_infos
|
|
if self.keywords:
|
|
r += htmltext('<li>')
|
|
for k in self.keywords:
|
|
r += htmltext('<a class="tag" href="%sagenda/tag/%s">%s</a> ') % (root_url, k, k)
|
|
r += htmltext('</li>')
|
|
r += htmltext('</ul>')
|
|
|
|
if self.url:
|
|
r += htmltext('<a class="external" href="%s">%s</a>') % (
|
|
self.url, _('More information'))
|
|
r += htmltext('</dd>')
|
|
return r.getvalue()
|
|
|
|
def get_url(self):
|
|
return '%s/agenda/events/%s/' % (get_publisher().get_frontoffice_url(), self.id)
|
|
|
|
def get_atom_entry(self):
|
|
from pyatom import pyatom
|
|
entry = pyatom.Entry()
|
|
entry.id = self.get_url()
|
|
entry.title = self.title
|
|
|
|
entry.content.attrs['type'] = 'html'
|
|
entry.content.text = str('<p>' + htmlescape(
|
|
unicode(self.description, get_publisher().site_charset).encode('utf-8')) + '</p>')
|
|
|
|
return entry
|
|
|
|
|
|
class RemoteCalendar(StorableObject):
|
|
_names = 'remote_calendars'
|
|
|
|
label = None
|
|
url = None
|
|
content = None
|
|
events = None
|
|
error = None # (time, string, params)
|
|
|
|
def download_and_parse(self, job=None):
|
|
old_content = self.content
|
|
|
|
try:
|
|
self.content = urllib2.urlopen(self.url).read()
|
|
except urllib2.HTTPError, e:
|
|
self.error = (time.localtime(), N_('HTTP Error %s on download'), (e.code,))
|
|
self.store()
|
|
return
|
|
except urllib2.URLError, e:
|
|
self.error = (time.localtime(), N_('Error on download'), ())
|
|
self.store()
|
|
return
|
|
|
|
if self.error:
|
|
self.error = None
|
|
self.store()
|
|
|
|
if self.content == old_content:
|
|
return
|
|
|
|
self.events = []
|
|
try:
|
|
parsed_cal = vobject.readOne(self.content)
|
|
except vobject.base.ParseError:
|
|
self.error = (time.localtime(), N_('Failed to parse file'), ())
|
|
self.store()
|
|
return
|
|
|
|
site_charset = get_publisher().site_charset
|
|
for vevent in parsed_cal.vevent_list:
|
|
ev = Event()
|
|
ev.title = vevent.summary.value.encode(site_charset, 'replace')
|
|
try:
|
|
ev.url = vevent.url.value.encode(site_charset, 'replace')
|
|
except AttributeError:
|
|
pass
|
|
ev.date_start = vevent.dtstart.value.timetuple()
|
|
try:
|
|
ev.date_end = vevent.dtend.value.timetuple()
|
|
except AttributeError:
|
|
pass
|
|
try:
|
|
ev.description = vevent.description.value.encode(site_charset, 'replace')
|
|
except AttributeError:
|
|
pass
|
|
try:
|
|
ev.keywords = [x.encode(site_charset) for x in vevent.categories.value]
|
|
except AttributeError:
|
|
pass
|
|
self.events.append(ev)
|
|
self.store()
|
|
|
|
|
|
def get_error_message(self):
|
|
if not self.error:
|
|
return None
|
|
return '(%s) %s' % (misc.localstrftime(self.error[0]),
|
|
_(self.error[1]) % self.error[2])
|
|
|
|
|
|
def update_remote_calendars(publisher):
|
|
for source in RemoteCalendar.select():
|
|
source.download_and_parse()
|
|
|
|
def get_default_event_tags():
|
|
return [_('All Public'), _('Adults'), _('Children'), _('Free')]
|
|
|
|
get_publisher_class().register_cronjob(CronJob(update_remote_calendars, minutes = [0]))
|
|
|