diff --git a/README.md b/README.md index 5e258fb..cd0d08c 100644 --- a/README.md +++ b/README.md @@ -29,31 +29,31 @@ Options: Options: --version show program's version number and exit - -h, --help show this help message and exit -b BIND_ADDR, --bind=BIND_ADDR bind [host]:port (host defaults to '') - -d DEST_ADDR, --dest=DEST_ADDR - receiver [backend:]host:port (backend defaults to - 'graphite') + -s SINK, --sink=SINK a graphite service to which stats are sent + ([host]:port). -v increase verbosity (currently used for debugging) -f INTERVAL, --flush=INTERVAL flush interval, in seconds (default 10) -p PERCENT, --percent=PERCENT percent threshold (default 90) - -l, --list list supported backends -D, --daemonize daemonize the service + -h, --help -Start gstatsd and send stats to port 9100 every 5 seconds: +Start gstatsd listening on the default port 8125, and send stats to graphite +server on port 2003 every 5 seconds: - % gstatsd -d :9100 -f 5 + % gstatsd -s 2003 -f 5 -Bind listener to host 'hostname' port 8126: +Bind listener to host 'foo' port 8126, and send stats to the Graphite server +on host 'bar' port 2003 every 20 seconds: - % gstatsd -b hostname:8126 -d :9100 -f 5 + % gstatsd -b foo:8126 -s bar:2003 -f 20 -To send the stats to multiple graphite servers, specify multiple destinations: +To send the stats to multiple graphite servers, specify '-s' multiple times: - % gstatsd -b :8125 -d stats1:9100 stats2:9100 + % gstatsd -b :8125 -s stats1:2003 -s stats2:2004 Using the client diff --git a/README.txt b/README.txt index 64cde23..c901cb1 100644 --- a/README.txt +++ b/README.txt @@ -37,38 +37,38 @@ Options: Options: --version show program's version number and exit - -h, --help show this help message and exit -b BIND_ADDR, --bind=BIND_ADDR bind [host]:port (host defaults to '') - -d DEST_ADDR, --dest=DEST_ADDR - receiver [backend:]host:port (backend defaults to - 'graphite') + -s SINK, --sink=SINK a graphite service to which stats are sent + ([host]:port). -v increase verbosity (currently used for debugging) -f INTERVAL, --flush=INTERVAL flush interval, in seconds (default 10) -p PERCENT, --percent=PERCENT percent threshold (default 90) - -l, --list list supported backends -D, --daemonize daemonize the service + -h, --help -Start gstatsd and send stats to port 9100 every 5 seconds: +Start gstatsd listening on the default port 8125, and send stats to +graphite server on port 2003 every 5 seconds: :: - % gstatsd -d :9100 -f 5 + % gstatsd -s 2003 -f 5 -Bind listener to host 'hostname' port 8126: +Bind listener to host 'foo' port 8126, and send stats to the +Graphite server on host 'bar' port 2003 every 20 seconds: :: - % gstatsd -b hostname:8126 -d :9100 -f 5 + % gstatsd -b foo:8126 -s bar:2003 -f 20 -To send the stats to multiple graphite servers, specify multiple -destinations: +To send the stats to multiple graphite servers, specify '-s' +multiple times: :: - % gstatsd -b :8125 -d stats1:9100 stats2:9100 + % gstatsd -b :8125 -s stats1:2003 -s stats2:2004 Using the client ---------------- diff --git a/debian/changelog b/debian/changelog index 89b969b..69bac0a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,4 +1,4 @@ -gstatsd (0.3.1) lucid; urgency=low +gstatsd (0.4) lucid; urgency=low * Release. diff --git a/gstatsd/core.py b/gstatsd/core.py index c78aaf0..2370051 100644 --- a/gstatsd/core.py +++ b/gstatsd/core.py @@ -1,3 +1,3 @@ -__version__ = '0.3.1' +__version__ = '0.4' diff --git a/gstatsd/service.py b/gstatsd/service.py index ee9ef99..ae009ea 100644 --- a/gstatsd/service.py +++ b/gstatsd/service.py @@ -12,6 +12,7 @@ import traceback from collections import defaultdict # local +import sink from core import __version__ # vendor @@ -26,6 +27,9 @@ MAX_PACKET = 2048 DESCRIPTION = ''' A statsd service in Python + gevent. +''' +EPILOG = ''' + ''' # table to remove invalid characters from keys @@ -36,10 +40,16 @@ KEY_DELETIONS = ''.join(ALL_ASCII.difference(KEY_VALID + '/')) # error messages E_BADADDR = 'invalid bind address specified %r' -E_BADTARGET = 'invalid target specified %r' -E_BADBACKEND = 'invalid backend specified %r' -E_NOTARGETS = 'you must specify at least one stats destination' -E_SENDFAIL = 'failed to send stats to %s %s: %s' +E_NOSINKS = 'you must specify at least one stats sink' + + +class Stats(object): + + def __init__(self): + self.timers = defaultdict(list) + self.counts = defaultdict(float) + self.percent = PERCENT + self.interval = INTERVAL def daemonize(umask=0027): @@ -83,41 +93,49 @@ class StatsDaemon(object): A statsd service implementation in Python + gevent. """ - def __init__(self, bindaddr, targets, interval, percent, debug=0): - # parse and validate + def __init__(self, bindaddr, sinkspecs, interval, percent, debug=0): _, host, port = parse_addr(bindaddr) if port is None: - self._exit(E_BADADDR % bindaddr) + self.exit(E_BADADDR % bindaddr) self._bindaddr = (host, port) - # parse backend targets - if not targets: - self._exit(E_NOTARGETS) - self._targets = [] - for target in targets: - backend, host, port = parse_addr(target) - if backend is None: - backend = 'graphite' - if port is None: - self._exit(E_BADTARGET % target) - func = getattr(self, '_send_%s' % backend, None) - if not func: - self._exit(E_BADBACKEND % backend) - self._targets.append((func, (host, port))) + # TODO: generalize to support more than one sink type. currently + # only the graphite backend is present, but we may want to write + # stats to hbase, redis, etc. - ph + + # construct the sink and add hosts to it + if not sinkspecs: + self.exit(E_NOSINKS) + self._sink = sink.GraphiteSink() + errors = [] + for spec in sinkspecs: + try: + self._sink.add(spec) + except ValueError, ex: + errors.append(ex) + if errors: + for err in errors: + self.error(str(err)) + self.exit('exiting.') - self._interval = float(interval) self._percent = float(percent) + self._interval = float(interval) self._debug = debug - self._timers = defaultdict(list) - self._counts = defaultdict(float) self._sock = None self._flush_task = None - def _exit(self, msg, code=1): - self._error(msg) + self._reset_stats() + + def _reset_stats(self): + self._stats = Stats() + self._stats.percent = self._percent + self._stats.interval = self._interval + + def exit(self, msg, code=1): + self.error(msg) sys.exit(code) - def _error(self, msg): + def error(self, msg): sys.stderr.write(msg + '\n') def start(self): @@ -128,12 +146,19 @@ class StatsDaemon(object): # spawn the flush trigger def _flush_impl(): while 1: - gevent.sleep(self._interval) - for func, hostport in self._targets: - try: - func(hostport) - except Exception, ex: - self._error(traceback.format_tb(sys.exc_info()[-1])) + gevent.sleep(self._stats.interval) + + # rotate stats + stats = self._stats + self._reset_stats() + + # send the stats to the sink which in turn broadcasts + # the stats packet to one or more hosts. + try: + self._sink.send(stats) + except Exception, ex: + trace = traceback.format_tb(sys.exc_info()[-1]) + self.error(''.join(trace)) self._flush_task = gevent.spawn(_flush_impl) @@ -145,19 +170,22 @@ class StatsDaemon(object): try: self._process(*self._sock.recvfrom(MAX_PACKET)) except Exception, ex: - self._error(str(ex)) + self.error(str(ex)) def _shutdown(self): "Shutdown the server" - self._exit("service exiting", code=0) + self.exit("service exiting", code=0) def _process(self, data, _): - "Process a single packet" + "Process a single packet and update the internal tables." parts = data.split(':') if self._debug: - self._error('packet: %r' % data) + self.error('packet: %r' % data) if not parts: return + + # interpret the packet and update stats + stats = self._stats key = parts[0].translate(KEY_TABLE, KEY_DELETIONS) for part in parts[1:]: srate = 1.0 @@ -170,106 +198,45 @@ class StatsDaemon(object): # timer (milliseconds) if stype == 'ms': - self._timers[key].append(float(value if value else 0)) + stats.timers[key].append(float(value if value else 0)) # counter with optional sample rate elif stype == 'c': if length == 3 and fields[2].startswith('@'): srate = float(fields[2][1:]) value = float(value if value else 1) * (1 / srate) - self._counts[key] += value - - def _send_graphite(self, dest): - "Send blob of stats data to graphite server" - buf = cStringIO.StringIO() - now = int(time.time()) - num_stats = 0 - - # timer stats - pct = self._percent - timers = self._timers - for key, vals in timers.iteritems(): - if not vals: - continue - - # compute statistics - num = len(vals) - vals = sorted(vals) - vmin = vals[0] - vmax = vals[-1] - mean = vmin - max_at_thresh = vmax - if num > 1: - idx = round((pct / 100.0) * num) - tmp = vals[:int(idx)] - if tmp: - max_at_thresh = tmp[-1] - mean = sum(tmp) / idx - - key = 'stats.timers.%s' % key - buf.write('%s.mean %f %d\n' % (key, mean, now)) - buf.write('%s.upper %f %d\n' % (key, vmax, now)) - buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now)) - buf.write('%s.lower %f %d\n' % (key, vmin, now)) - buf.write('%s.count %d %d\n' % (key, num, now)) - num_stats += 1 - - # counter stats - counts = self._counts - for key, val in counts.iteritems(): - buf.write('stats.%s %f %d\n' % (key, val / self._interval, now)) - buf.write('stats_counts.%s %f %d\n' % (key, val, now)) - num_stats += 1 - - buf.write('statsd.numStats %d %d\n' % (num_stats, now)) - - # reset - self._timers = defaultdict(list) - self._counts = defaultdict(float) - del timers - del counts - - # XXX: add support for N retries - - # flush stats to graphite - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(dest) - sock.sendall(buf.getvalue()) - sock.close() - except Exception, ex: - self._error(E_SENDFAIL % ('graphite', dest, ex)) + stats.counts[key] += value def main(): - opts = optparse.OptionParser(description=DESCRIPTION, version=__version__) + opts = optparse.OptionParser(description=DESCRIPTION, version=__version__, + add_help_option=False) opts.add_option('-b', '--bind', dest='bind_addr', default=':8125', help="bind [host]:port (host defaults to '')") - opts.add_option('-d', '--dest', dest='dest_addr', action='append', - default=[], - help="receiver [backend:]host:port (backend defaults to 'graphite')") + opts.add_option('-s', '--sink', dest='sink', action='append', default=[], + help="a graphite service to which stats are sent ([host]:port).") opts.add_option('-v', dest='verbose', action='count', default=0, help="increase verbosity (currently used for debugging)") opts.add_option('-f', '--flush', dest='interval', default=INTERVAL, help="flush interval, in seconds (default 10)") opts.add_option('-p', '--percent', dest='percent', default=PERCENT, help="percent threshold (default 90)") - opts.add_option('-l', '--list', dest='list_backends', action='store_true', - help="list supported backends") opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true', help='daemonize the service') + opts.add_option('-h', '--help', dest='usage', action='store_true') (options, args) = opts.parse_args() - if options.list_backends: - for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]: - print(key[6:]) + if options.usage: + # TODO: write epilog. usage is manually output since optparse will + # wrap the epilog and we want pre-formatted output. - ph + print(opts.format_help()) sys.exit() if options.daemonize: daemonize() - sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval, + sd = StatsDaemon(options.bind_addr, options.sink, options.interval, options.percent, options.verbose) sd.start() diff --git a/gstatsd/service_test.py b/gstatsd/service_test.py index 1b6bc2d..6f46ff8 100644 --- a/gstatsd/service_test.py +++ b/gstatsd/service_test.py @@ -9,54 +9,56 @@ from gstatsd import service class StatsServiceTest(unittest.TestCase): def setUp(self): - args = (':8125', [':9100'], 5, 90, 0) + args = (':8125', [':2003'], 5, 90, 0) self.svc = service.StatsDaemon(*args) + self.stats = self.svc._stats def test_construct(self): - svc = service.StatsDaemon('8125', ['9100'], 5, 90, 0) + svc = service.StatsDaemon('8125', ['2003'], 5, 90, 0) + stats = svc._stats self.assertEquals(svc._bindaddr, ('', 8125)) self.assertEquals(svc._interval, 5.0) - self.assertEquals(svc._percent, 90.0) self.assertEquals(svc._debug, 0) - self.assertEquals(svc._targets[0], (svc._send_graphite, ('', 9100))) + self.assertEquals(stats.percent, 90.0) + self.assertEquals(svc._sink._hosts, [('', 2003)]) - svc = service.StatsDaemon('bar:8125', ['foo:9100'], 5, 90, 1) + svc = service.StatsDaemon('bar:8125', ['foo:2003'], 5, 90, 1) self.assertEquals(svc._bindaddr, ('bar', 8125)) - self.assertEquals(svc._targets[0], (svc._send_graphite, ('foo', 9100))) + self.assertEquals(svc._sink._hosts, [('foo', 2003)]) self.assertEquals(svc._debug, 1) def test_backend(self): service.StatsDaemon._send_foo = lambda self, x, y: None - svc = service.StatsDaemon('8125', ['foo:bar:9100'], 5, 90, 0) - self.assertEquals(svc._targets[0], (svc._send_foo, ('bar', 9100))) + svc = service.StatsDaemon('8125', ['bar:2003'], 5, 90, 0) + self.assertEquals(svc._sink._hosts, [('bar', 2003)]) def test_counters(self): pkt = 'foo:1|c' self.svc._process(pkt, None) - self.assertEquals(self.svc._counts, {'foo': 1}) + self.assertEquals(self.stats.counts, {'foo': 1}) self.svc._process(pkt, None) - self.assertEquals(self.svc._counts, {'foo': 2}) + self.assertEquals(self.stats.counts, {'foo': 2}) pkt = 'foo:-1|c' self.svc._process(pkt, None) - self.assertEquals(self.svc._counts, {'foo': 1}) + self.assertEquals(self.stats.counts, {'foo': 1}) def test_counters_sampled(self): pkt = 'foo:1|c|@.5' self.svc._process(pkt, None) - self.assertEquals(self.svc._counts, {'foo': 2}) + self.assertEquals(self.stats.counts, {'foo': 2}) def test_timers(self): pkt = 'foo:20|ms' self.svc._process(pkt, None) - self.assertEquals(self.svc._timers, {'foo': [20.0]}) + self.assertEquals(self.stats.timers, {'foo': [20.0]}) pkt = 'foo:10|ms' self.svc._process(pkt, None) - self.assertEquals(self.svc._timers, {'foo': [20.0, 10.0]}) + self.assertEquals(self.stats.timers, {'foo': [20.0, 10.0]}) def test_key_sanitize(self): pkt = '\t\n#! foo . bar \0 ^:1|c' self.svc._process(pkt, None) - self.assertEquals(self.svc._counts, {'foo.bar': 1}) + self.assertEquals(self.stats.counts, {'foo.bar': 1}) def main(): diff --git a/gstatsd/sink.py b/gstatsd/sink.py new file mode 100644 index 0000000..588c9d3 --- /dev/null +++ b/gstatsd/sink.py @@ -0,0 +1,102 @@ + +# standard +import cStringIO +import sys +import time + +# vendor +from gevent import socket + +E_BADSPEC = "bad sink spec %r: %s" +E_SENDFAIL = 'failed to send stats to %s %s: %s' + + +class Sink(object): + + """ + A resource to which stats will be sent. + """ + + def error(self, msg): + sys.stderr.write(msg + '\n') + + def _parse_hostport(self, spec): + try: + parts = spec.split(':') + if len(parts) == 2: + return (parts[0], int(parts[1])) + if len(parts) == 1: + return ('', int(parts[0])) + except ValueError, ex: + raise ValueError(E_BADSPEC % (spec, ex)) + raise ValueError("expected '[host]:port' but got %r" % spec) + + +class GraphiteSink(Sink): + + """ + Sends stats to one or more Graphite servers. + """ + + def __init__(self): + self._hosts = [] + + def add(self, spec): + self._hosts.append(self._parse_hostport(spec)) + + def send(self, stats): + "Format stats and send to one or more Graphite hosts" + buf = cStringIO.StringIO() + now = int(time.time()) + num_stats = 0 + + # timer stats + pct = stats.percent + timers = stats.timers + for key, vals in timers.iteritems(): + if not vals: + continue + + # compute statistics + num = len(vals) + vals = sorted(vals) + vmin = vals[0] + vmax = vals[-1] + mean = vmin + max_at_thresh = vmax + if num > 1: + idx = round((pct / 100.0) * num) + tmp = vals[:int(idx)] + if tmp: + max_at_thresh = tmp[-1] + mean = sum(tmp) / idx + + key = 'stats.timers.%s' % key + buf.write('%s.mean %f %d\n' % (key, mean, now)) + buf.write('%s.upper %f %d\n' % (key, vmax, now)) + buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now)) + buf.write('%s.lower %f %d\n' % (key, vmin, now)) + buf.write('%s.count %d %d\n' % (key, num, now)) + num_stats += 1 + + # counter stats + counts = stats.counts + for key, val in counts.iteritems(): + buf.write('stats.%s %f %d\n' % (key, val / stats.interval, now)) + buf.write('stats_counts.%s %f %d\n' % (key, val, now)) + num_stats += 1 + + buf.write('statsd.numStats %d %d\n' % (num_stats, now)) + + # TODO: add support for N retries + + for host in self._hosts: + # flush stats to graphite + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(host) + sock.sendall(buf.getvalue()) + sock.close() + except Exception, ex: + self.error(E_SENDFAIL % ('graphite', host, ex)) + diff --git a/makedoc.sh b/makedoc.sh new file mode 100755 index 0000000..118b271 --- /dev/null +++ b/makedoc.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +pandoc -f markdown -t rst -o README.txt README.md +