commit 29bb10a1b743b90448c3bbfb456677e36b50f16e Author: Patrick Hensley Date: Tue Jun 28 17:52:36 2011 -0400 Initial release of gstatsd 0.1. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c69a9bb --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.pyc +*.egg-info +.coverage +build + diff --git a/README.md b/README.md new file mode 100644 index 0000000..88b70b9 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ + +gstatsd - A statsd service implementation in Python + gevent. + +License: Apache 2.0 + +Usage: +------ + +Show gstatsd help: + + % gstatsd -h + +Start gstatsd and send stats to port 9100 every 5 seconds: + + % gstatsd -d :9100 -f 5 + +Bind listener to host 'hostname' port 8126: + + % gstatsd -b hostname:8126 -d :9100 -f 5 + diff --git a/README.txt b/README.txt new file mode 120000 index 0000000..42061c0 --- /dev/null +++ b/README.txt @@ -0,0 +1 @@ +README.md \ No newline at end of file diff --git a/gstatsd/__init__.py b/gstatsd/__init__.py new file mode 100644 index 0000000..bb2c6f9 --- /dev/null +++ b/gstatsd/__init__.py @@ -0,0 +1,4 @@ + +from core import __version__ + + diff --git a/gstatsd/client.py b/gstatsd/client.py new file mode 100644 index 0000000..5d5f34f --- /dev/null +++ b/gstatsd/client.py @@ -0,0 +1,43 @@ + + +import random +import socket + + +class StatsClient(object): + + "Simple client to exercise the statsd server." + + HOSTPORT = ('', 8125) + + def __init__(self, hostport=None): + if hostport is None: + hostport = StatsClient.HOSTPORT + self._hostport = hostport + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + def timer(self, key, timestamp, sample_rate=1): + self._send('%s:%d|ms' % (key, timestamp), sample_rate) + + def increment(self, key, sample_rate=1): + return self.counter(key, 1, sample_rate) + + def decrement(self, key, sample_rate=1): + return self.counter(key, -1, sample_rate) + + def counter(self, keys, magnitude=1, sample_rate=1): + if not isinstance(keys, (list, tuple)): + keys = [keys] + for key in keys: + self._send('%s:%s|c' % (key, magnitude), sample_rate) + + def _send(self, data, sample_rate=1): + packet = None + if sample_rate < 1.0: + if random.random() < sample_rate: + packet = data + '|@%s' % sample_rate + else: + packet = data + if packet: + self._sock.sendto(packet, self._hostport) + diff --git a/gstatsd/core.py b/gstatsd/core.py new file mode 100644 index 0000000..0b9ed6e --- /dev/null +++ b/gstatsd/core.py @@ -0,0 +1,4 @@ + + +__version__ = '0.1' + diff --git a/gstatsd/service.py b/gstatsd/service.py new file mode 100644 index 0000000..c4ef565 --- /dev/null +++ b/gstatsd/service.py @@ -0,0 +1,276 @@ + +# standard +import cStringIO +import optparse +import os +import resource +import signal +import string +import sys +import time +import traceback +from collections import defaultdict + +# local +from core import __version__ + +# vendor +import gevent, gevent.socket +socket = gevent.socket + + +# constants +INTERVAL = 10.0 +PERCENT = 90.0 +MAX_PACKET = 2048 + +DESCRIPTION = ''' +A statsd service in Python + gevent. +''' + +# table to remove invalid characters from keys +KEY_VALIDCHARS = string.uppercase + string.lowercase + string.digits + '_-.' +KEY_SANITIZE = string.maketrans(KEY_VALIDCHARS + '/', KEY_VALIDCHARS + '_') + +# error messages +E_BADADDR = 'invalid bind address specified %r' +E_BADTARGET = 'invalid target specified %r' +E_BADBACKEND = 'invalid backend specified %r' +E_NOTARGETS = 'you must specify at least one stats destination' +E_SENDFAIL = 'failed to send stats to %s %s: %s' + + +def daemonize(umask=0027): + if gevent.fork(): + os._exit(0) + os.setsid() + if gevent.fork(): + os._exit(0) + os.umask(umask) + fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if fd_limit == resource.RLIM_INFINITY: + fd_limit = 1024 + for fd in xrange(0, fd_limit): + try: + os.close(fd) + except: + pass + os.open(os.devnull, os.O_RDWR) + os.dup2(0, 1) + os.dup2(0, 2) + gevent.reinit() + + +def parse_addr(text): + "Parse a 1- to 3-part address spec." + if text: + parts = text.split(':') + length = len(parts) + if length== 3: + return parts[0], parts[1], int(parts[2]) + elif length == 2: + return None, parts[0], int(parts[1]) + elif length == 1: + return None, '', int(parts[0]) + return None, None, None + + +class StatsDaemon(object): + + """ + A statsd service implementation in Python + gevent. + """ + + def __init__(self, bindaddr, targets, interval, percent, debug=0): + # parse and validate + _, host, port = parse_addr(bindaddr) + if port is None: + self._exit(E_BADADDR % bindaddr) + self._bindaddr = (host, port) + + # parse backend targets + if not targets: + self._exit(E_NOTARGETS) + self._targets = [] + for target in targets: + backend, host, port = parse_addr(target) + if backend is None: + backend = 'graphite' + if port is None: + self._exit(E_BADTARGET % target) + func = getattr(self, '_send_%s' % backend, None) + if not func: + self._exit(E_BADBACKEND % backend) + self._targets.append((func, (host, port))) + + self._interval = float(interval) + self._percent = float(percent) + self._debug = debug + self._timers = defaultdict(list) + self._counts = defaultdict(float) + self._sock = None + self._flush_task = None + + def _exit(self, msg, code=1): + self._error(msg) + sys.exit(code) + + def _error(self, msg): + sys.stderr.write(msg + '\n') + + def start(self): + "Start the service" + # register signals + gevent.signal(signal.SIGINT, self._shutdown) + + # spawn the flush trigger + def _flush_impl(): + while 1: + gevent.sleep(self._interval) + for func, hostport in self._targets: + try: + func(hostport) + except Exception, ex: + self._error(traceback.format_tb(sys.exc_info()[-1])) + + self._flush_task = gevent.spawn(_flush_impl) + + # start accepting connections + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + self._sock.bind(self._bindaddr) + while 1: + try: + self._process(*self._sock.recvfrom(MAX_PACKET)) + except Exception, ex: + self._error(str(ex)) + + def _shutdown(self): + "Shutdown the server" + self._exit("service exiting", code=0) + + def _process(self, data, _): + "Process a single packet" + parts = data.split(':') + if self._debug: + self._error('packet: %r' % data) + if not parts: + return + key = parts[0].translate(KEY_SANITIZE, string.whitespace) + for part in parts[1:]: + srate = 1.0 + fields = part.split('|') + length = len(fields) + if length < 2: + continue + value = fields[0] + stype = fields[1].strip() + + # timer (milliseconds) + if stype == 'ms': + self._timers[key].append(float(value if value else 0)) + + # counter with optional sample rate + elif stype == 'c': + if length == 3 and fields[2].startswith('@'): + srate = float(fields[2][1:]) + value = float(value if value else 1) * (1 / srate) + self._counts[key] += value + + def _send_graphite(self, dest): + "Send blob of stats data to graphite server" + buf = cStringIO.StringIO() + now = int(time.time()) + num_stats = 0 + + # timer stats + pct = self._percent + timers = self._timers + for key, vals in timers.iteritems(): + if not vals: + continue + + # compute statistics + num = len(vals) + vals = sorted(vals) + vmin = vals[0] + vmax = vals[-1] + mean = vmin + max_at_thresh = vmax + if num > 1: + idx = round((pct / 100.0) * num) + tmp = vals[:int(idx)] + if tmp: + max_at_thresh = tmp[-1] + mean = sum(tmp) / idx + + key = 'stats.timers.%s' % key + buf.write('%s.mean %f %d\n' % (key, mean, now)) + buf.write('%s.upper %f %d\n' % (key, vmax, now)) + buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now)) + buf.write('%s.lower %f %d\n' % (key, vmin, now)) + buf.write('%s.count %d %d\n' % (key, num, now)) + num_stats += 1 + + # counter stats + counts = self._counts + for key, val in counts.iteritems(): + buf.write('stats.%s %f %d\n' % (key, val / self._interval, now)) + buf.write('stats_counts.%s %f %d\n' % (key, val, now)) + num_stats += 1 + + buf.write('statsd.numStats %d %d\n' % (num_stats, now)) + + # reset + self._timers = defaultdict(list) + self._counts = defaultdict(float) + del timers + del counts + + # XXX: add support for N retries + + # flush stats to graphite + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(dest) + sock.sendall(buf.getvalue()) + sock.close() + except Exception, ex: + self._error(E_SENDFAIL % ('graphite', dest, ex)) + + +def main(): + opts = optparse.OptionParser(description=DESCRIPTION, version=__version__) + opts.add_option('-b', '--bind', dest='bind_addr', default=':8125', + help="bind [host]:port (host defaults to '')") + opts.add_option('-d', '--dest', dest='dest_addr', action='append', + default=[], + help="receiver [backend:]host:port (backend defaults to 'graphite')") + opts.add_option('-v', dest='verbose', action='count', default=0) + opts.add_option('-f', '--flush', dest='interval', default=INTERVAL, + help="flush interval, in seconds") + opts.add_option('-p', '--percent', dest='percent', default=PERCENT, + help="percent threshold") + opts.add_option('-l', '--list', dest='list_backends', action='store_true', + help="list supported backends") + opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true', + help='daemonize the service') + + (options, args) = opts.parse_args() + + if options.list_backends: + for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]: + print(key[6:]) + sys.exit() + + if options.daemonize: + daemonize() + + sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval, + options.percent, options.verbose) + sd.start() + + +if __name__ == '__main__': + main() + diff --git a/gstatsd/test.py b/gstatsd/test.py new file mode 100644 index 0000000..824a323 --- /dev/null +++ b/gstatsd/test.py @@ -0,0 +1,22 @@ + +from client import StatsClient + + +def main(): + cli = StatsClient() + for num in range(1, 11): + cli.timer('foo', num) + return + cli.increment('baz', 0.5) + cli.increment('baz', 0.5) + cli.timer('t3', 100, 0.5) + return + + cli.increment('foo') + cli.counter(['foo', 'bar'], 2) + cli.timer('t1', 12) + cli.timer('t2', 30) + +if __name__ == '__main__': + main() + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b615b2d --- /dev/null +++ b/setup.py @@ -0,0 +1,44 @@ + +import os +from setuptools import setup + +from gstatsd import __version__ + + +def main(): + cwd = os.path.dirname(os.path.abspath(__file__)) + path = os.path.join(cwd, 'README.txt') + readme = open(path, 'rb').read() + + setup( + name = 'gstatsd', + version = __version__, + description = 'A statsd service and client in Python + gevent', + license = 'Apache 2.0', + author = 'Patrick Hensley', + author_email = 'spaceboy@indirect.com', + keywords = ['stats', 'graphite', 'statsd', 'gevent'], + url = 'http://github.com/phensley/gstatsd', + packages = ['gstatsd'], + entry_points = { + 'console_scripts': ['gstatsd=gstatsd.service:main'] + }, + classifiers = [ + "Development Status :: 2 - Pre-Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache 2.0", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Operating System :: Unix", + "Programming Language :: Python", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + long_description = readme + ) + + +if __name__ == '__main__': + main() +