use celery to push notification changes to agents (#4577)

This commit is contained in:
Frédéric Péters 2014-07-23 11:08:14 +02:00
parent 5535fb4ae3
commit 8404eef58b
10 changed files with 208 additions and 20 deletions

1
hobo/agent/__init__.py Normal file
View File

@ -0,0 +1 @@
from .celery import app as celery_app

19
hobo/agent/celery.py Normal file
View File

@ -0,0 +1,19 @@
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from . import services
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hobo.settings')
app = Celery('hobo')
app.config_from_object('django.conf:settings')
@app.task(bind=True)
def deploy(self, environment):
services.deploy(environment)

View File

View File

@ -0,0 +1,24 @@
import json
from optparse import make_option
import urllib2
from django.core.management.base import BaseCommand, CommandError
from hobo.agent import services
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--url', dest='url', metavar='URL', default=None),
make_option('--file', dest='filename', metavar='FILENAME', default=None),
)
def handle(self, url=None, filename=None, **kwargs):
if filename:
fd = file(filename)
elif url:
fd = urllib2.urlopen(url)
services.deploy(json.load(fd))
fd.close()

111
hobo/agent/services.py Normal file
View File

@ -0,0 +1,111 @@
import ConfigParser
import fnmatch
import json
import logging
import os
import string
import subprocess
import urllib2
from django.conf import settings
logger = logging.getLogger('hobo.agent')
class BaseService(object):
def __init__(self, base_url, title, secret_key, **kwargs):
self.base_url = base_url
self.title = title
self.secret_key = secret_key
def is_for_us(self):
# This function checks if the requested service is to be hosted
# on this server, and return True if appropriate.
#
# It matches against a set of patterns taken from
# settings.AGENT_HOST_PATTERNS; patterns can be full hostname or use
# globbing characters (ex: "*.au-quotidien.com"); it is also possible
# to # prefix the pattern by an exclamation mark to exclude those ones
# (ex: "! *.dev.au-quotidien.com").
if not settings.AGENT_HOST_PATTERNS:
return True
patterns = settings.AGENT_HOST_PATTERNS.get(self.service_id)
if not patterns:
return True
parsed_url = urllib2.urlparse.urlsplit(self.base_url)
netloc = parsed_url.netloc
match = False
for pattern in patterns:
if pattern.startswith('!'):
if fnmatch.fnmatch(netloc, pattern[1:].strip()):
match = False
break
else:
if fnmatch.fnmatch(netloc, pattern.strip()):
match = True
return match
def check_timestamp(self, timestamp):
'''Return True if site is uptodate'''
return False
def execute(self, environment):
pass
class Passerelle(BaseService):
service_id = 'passerelle'
class Wcs(BaseService):
service_id = 'wcs'
def __init__(self, **kwargs):
super(Wcs, self).__init__(**kwargs)
parsed_url = urllib2.urlparse.urlsplit(self.base_url)
instance_path = parsed_url.netloc
if parsed_url.path:
instance_path = '%s+' % parsed_url.path.replace('/', '+')
self.app_dir = os.path.join(settings.AGENT_WCS_APP_DIR, instance_path)
def check_timestamp(self, timestamp):
config = ConfigParser.RawConfigParser()
site_options_filepath = os.path.join(self.app_dir, 'site-options.cfg')
if os.path.exists(site_options_filepath):
config.read(site_options_filepath)
try:
if config.getint('hobo', 'timestamp') == timestamp:
return True
except ConfigParser.NoSectionError:
pass
return False
def execute(self, environment):
cmd = string.Template(settings.AGENT_WCS_COMMAND)
cmd = cmd.substitute(wcs_url=self.base_url)
cmd_process = subprocess.Popen(cmd.split(' '),
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout = cmd_process.communicate(input=json.dumps(environment))
def deploy(environment):
hobo_timestamp = environment.get('timestamp')
service_classes = {}
for klassname, service in globals().items():
if not hasattr(service, 'service_id'):
continue
service_classes[service.service_id] = service
for service in environment.get('services', []):
service_id = service.get('service-id')
if not service_id in service_classes:
continue
service_obj = service_classes.get(service_id)(**service)
if not service_obj.is_for_us():
logger.debug('skipping as not for us:', service_obj)
continue
if service_obj.check_timestamp(hobo_timestamp):
logger.debug('skipping uptodate site:', service_obj)
continue
service_obj.execute(environment)

View File

@ -6,8 +6,10 @@ from django.db import models
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.db.models.signals import post_save
from django.dispatch import receiver
from .utils import Zone
from .utils import Zone, get_installed_services_dict
class Variable(models.Model):
@ -116,3 +118,14 @@ class Passerelle(ServiceBase):
]
AVAILABLE_SERVICES = [Authentic, Wcs, Passerelle]
@receiver(post_save)
def notify_agents(sender, instance, **kwargs):
if not sender in [Variable]+AVAILABLE_SERVICES:
return
try:
from hobo.agent.celery import deploy
except ImportError:
return
deploy.apply_async((get_installed_services_dict(),) , queue='broadcast_tasks')

View File

@ -1,3 +1,8 @@
import calendar
import datetime
from django.db.models import Max
def get_installed_services():
from .models import AVAILABLE_SERVICES
installed_services = []
@ -9,6 +14,26 @@ def get_operational_services():
return [x for x in get_installed_services() if x.is_operational()]
def get_installed_services_dict():
from .models import Variable, AVAILABLE_SERVICES
timestamp = None
for klass in [Variable] + AVAILABLE_SERVICES:
ts = klass.objects.all().aggregate(Max('last_update_timestamp')
).get('last_update_timestamp__max')
if timestamp is None or (ts and ts > timestamp):
timestamp = ts
if timestamp is None:
timestamp = datetime.datetime.now()
return {
'timestamp': calendar.timegm(timestamp.timetuple()),
'services': [x.as_dict() for x in get_installed_services()],
'variables': dict(((v.name, v.json)
for v in Variable.objects.all())),
}
class Zone:
title = None
zone_icon_id = None

View File

@ -1,5 +1,3 @@
import calendar
import datetime
import json
import string
@ -8,7 +6,6 @@ from django.core.urlresolvers import reverse_lazy
from django.http import HttpResponse
from django.views.generic.base import TemplateView
from django.views.generic.edit import CreateView, UpdateView, DeleteView
from django.db.models import Max
from .models import Variable, Authentic, Wcs, Passerelle, AVAILABLE_SERVICES
from . import forms, utils
@ -140,22 +137,7 @@ def operational_check_view(request, service, slug, **kwargs):
def installed_services_json_view(request, **kwargs):
response = HttpResponse(content_type='application/json')
timestamp = None
for klass in [Variable] + AVAILABLE_SERVICES:
ts = klass.objects.all().aggregate(Max('last_update_timestamp')
).get('last_update_timestamp__max')
if timestamp is None or (ts and ts > timestamp):
timestamp = ts
if timestamp is None:
timestamp = datetime.datetime.now()
json.dump({
'timestamp': calendar.timegm(timestamp.timetuple()),
'services': [x.as_dict() for x in utils.get_installed_services()],
'variables': dict(((v.name, v.json)
for v in Variable.objects.all())),
}, response)
json.dump(utils.get_installed_services_dict(), response)
return response

View File

@ -40,6 +40,7 @@ INSTALLED_APPS = (
'django.contrib.staticfiles',
'gadjo',
'hobo.environment',
'hobo.agent',
'allauth_a2',
)
@ -104,6 +105,18 @@ LOCALE_PATHS = (
SERVICE_URL_TEMPLATE = 'https://${app}.example.net'
AGENT_HOST_PATTERNS = None
AGENT_WCS_APP_DIR = '/var/lib/wcs'
AGENT_WCS_COMMAND = '/usr/sbin/wcsctl check-hobos --site-url ${wcs_url}'
try:
from kombu.common import Broadcast
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_QUEUES = (Broadcast('broadcast_tasks'), )
except ImportError:
pass
if os.environ.get('MULTITENANT_MODE'):
try:
from tenant_settings import *