This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
misc-thomas/roboteo.py

280 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
import caldav
from datetime import datetime, date, timedelta
from dateutil import rrule
import icalendar
import logging
from pytz import timezone
import sleekxmpp
import sys
LOGIN = sys.argv[1]
PASSWORD = sys.argv[2]
JID = '%s@im.libre-entreprise.com' % LOGIN
ROOM = 'entrouvert@conference.im.libre-entreprise.com'
NICK = 'roboteo'
TZ = timezone('Europe/Paris')
USERS = ('roboteo', 'bdauvergne', 'tnoel', 'pcros', 'mates', 'vclaudet', 'bmallet', 'eshowk', 'smihai',
'jkouka', 'ecazenave', 'fpeters', 'csiraut', 'pmarillonnet', 'slaget', 'eshowk', 'lseguin')
CALENDARS_URL = ['https://calendar.entrouvert.org/groupdav.php/%s/calendar/' %
user for user in USERS]
DAV_URL = 'https://%s:%s@calendar.entrouvert.org/groupdav.php/' % (LOGIN, PASSWORD)
# egroupware
def get_attendees(ical_component):
ret = []
for attendee in ical_component.get('attendee') or []:
cn = attendee.params.get('cn')
if '@' in attendee:
ret.append('%s <%s>' % (cn, attendee))
else:
ret.append('%s' % cn)
return sorted(ret, key=lambda s: '@' in s)
def get_all_start_end(ical_component):
dtstart = ical_component.get('dtstart')
dtend = ical_component.get('dtend')
if not dtstart or not dtend:
return []
dtstart = dtstart.dt
dtend = dtend.dt
if not isinstance(dtstart, datetime): # date -> datetime
dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=TZ)
if not isinstance(dtend, datetime): # date -> datetime (end of day)
dtend = datetime(dtend.year, dtend.month, dtend.day, tzinfo=TZ) - timedelta(seconds=1)
ical_rrule = ical_component.get('rrule')
if not ical_rrule:
return [(dtstart, dtend)]
# fasten your seat belt...
rrule_string = ical_rrule.to_ical().decode('utf-8')
rules = rrule.rrulestr(rrule_string, dtstart=dtstart, forceset=True)
excludes = ical_component.get('exdate')
if excludes:
if not isinstance(excludes, list):
excludes = [excludes]
for exclude in excludes:
for dts in exclude.dts:
rules.exdate(dts.dt)
delta = dtend - dtstart
start = datetime.now(tz=TZ) - timedelta(days=1)
end = start + timedelta(days=31)
return [(dtstart, dtstart + delta) for dtstart in rules.between(start, end)]
def get_events(ical_component, owners):
if ical_component.name != 'VEVENT':
return {}
uid = '%s' % ical_component.get('uid')
attendees = get_attendees(ical_component) or owners
events = {}
for dtstart, dtend in get_all_start_end(ical_component):
events['%s#%s-%s' % (uid, dtstart.isoformat(), dtend.isoformat())] = {
'summary': '%s' % ical_component.get('summary'),
'attendees': attendees,
'dtstart': dtstart,
'dtend': dtend,
}
return events
_future_events_cache = None
def get_future_events(force_update=False):
global _future_events_cache
if not force_update and _future_events_cache:
return _future_events_cache
client = caldav.DAVClient(DAV_URL)
start_date = datetime.today() - timedelta(1)
events = {}
for url in CALENDARS_URL:
calendar = caldav.objects.Calendar(client, url)
owners = calendar.get_properties([caldav.elements.cdav.CalendarDescription()]).values()
for event in calendar.date_search(start=start_date):
ical = icalendar.Calendar.from_ical(event.data)
for component in ical.walk():
events.update(get_events(component, owners))
_future_events_cache = sorted(events.values(), key=lambda v: v['dtstart'])
return _future_events_cache
def get_today_events(force_update=False):
today = date.today()
events = get_future_events(force_update=force_update)
return [event for event in events
if event['dtstart'].date() <= today <= event['dtend'].date()]
def display_events(events):
ret = []
for event in events:
dtstart, dtend = event['dtstart'], event['dtend']
if dtstart.date() == dtend.date():
s = '%2d/%.2d' % (dtstart.day, dtstart.month)
if (dtstart.hour, dtstart.minute) != (0, 0):
s += ' de %2dh%.2d à %2dh%.2d' % (dtstart.hour, dtstart.minute,
dtend.hour, dtend.minute)
else:
s = 'du %d/%d' % (dtstart.day, dtstart.month)
if (dtstart.hour, dtstart.minute, dtend.hour, dtend.minute) != (0, 0, 23, 59):
s += ' %2dh%.2d' % (dtstart.hour, dtstart.minute)
s += ' au %d/%d' % (dtend.day, dtend.month)
if (dtstart.hour, dtstart.minute, dtend.hour, dtend.minute) != (0, 0, 23, 59):
s += ' %2dh%.2d' % (dtend.hour, dtend.minute)
s += ' : %s%s' % (event['summary'], ', '.join(event['attendees']))
ret.append(s)
return '\n'.join(ret)
# xmpp robot
class Robeo(sleekxmpp.ClientXMPP):
"""
A simple SleekXMPP bot that will greets those
who enter the room, and acknowledge any messages
that mentions the bot's nickname.
"""
def __init__(self, jid, password, room, nick):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
# The session_start event will be triggered when
# the bot establishes its connection with the server
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
self.add_event_handler("session_start", self.start)
# The groupchat_message event is triggered whenever a message
# stanza is received from any chat room. If you also also
# register a handler for the 'message' event, MUC messages
# will be processed by both handlers.
self.add_event_handler("groupchat_message", self.muc_message)
# The groupchat_presence event is triggered whenever a
# presence stanza is received from any chat room, including
# any presences you send yourself. To limit event handling
# to a single room, use the events muc::room@server::presence,
# muc::room@server::got_online, or muc::room@server::got_offline.
self.add_event_handler("muc::%s::got_online" % self.room,
self.muc_online)
def start(self, event):
"""
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
Arguments:
event -- An empty dictionary. The session_start
event does not provide any additional
data.
"""
self.get_roster()
self.send_presence()
self.plugin['xep_0045'].joinMUC(self.room, self.nick, wait=True)
self.show_events(force_update=True)
def muc_message(self, msg):
"""
Process incoming message stanzas from any chat room. Be aware
that if you also have any handlers for the 'message' event,
message stanzas may be processed by both handlers, so check
the 'type' attribute when using a 'message' event handler.
Whenever the bot's nickname is mentioned, respond to
the message.
IMPORTANT: Always check that a message is not from yourself,
otherwise you will create an infinite loop responding
to your own messages.
This handler will reply to messages that mention
the bot's nickname.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
if msg['mucnick'] == self.nick:
return
if not msg['body'].startswith(('%s:' % self.nick, '@%s' % self.nick,
'%s,' % self.nick, '%s ' % self.nick)):
return
msg = msg['body'][len(self.nick)+1:].strip().lower()
words = msg.split()
if 'cal' in words or 'calendrier' in words or 'calendar' in words:
self.show_events(force_update=True)
elif 'rappel' in words:
self.show_events(force_update=False)
else:
self.help()
def muc_online(self, presence):
"""
Process a presence stanza from a chat room. In this case,
presences from users that have just come online are
handled by sending a welcome message that includes
the user's nickname and role in the room.
Arguments:
presence -- The received presence stanza. See the
documentation for the Presence stanza
to see how else it may be used.
"""
if presence['muc']['nick'] != self.nick:
pass
# self.send_message(mto=presence['from'].bare,
# mbody="%s est dans la place" % presence['muc']['nick'],
# mtype='groupchat')
def help(self):
message = '''
calendrier : recharge les rendez-vous du jour
rappel : affiche les rendez-vous en cache
n'importe quoi d'autre : cette aide
'''
self.send_message(mto=self.room,
mbody=message,
mtype='groupchat')
def show_events(self, force_update):
if force_update:
self.send_message(mto=self.room,
mbody='/me interroge calendar, un peu de patience ...',
mtype='groupchat')
events = display_events(get_today_events(force_update=force_update))
self.send_message(mto=self.room,
mbody=events,
mtype='groupchat')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(message)s')
xmpp = Robeo(JID, PASSWORD, ROOM, NICK)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0199') # XMPP Ping
# Connect to the XMPP server and start processing XMPP stanzas.
if xmpp.connect(('labs.libre-entreprise.org', 5222)):
xmpp.process(block=True)
print("Done")
else:
print("Unable to connect.")