Release 0.4

* Generalized the stats formatting and delivery into GraphiteSink
   class.  Move the stats tables (timers/counts) into a Stats class.
 * Updated README files, added script to derive README.txt from
   README.md, changed '-d / --dest' option to '-s / --sink'.
This commit is contained in:
Patrick Hensley 2011-06-30 10:22:50 -04:00
parent e83d53f948
commit 2b8948ae0a
8 changed files with 225 additions and 150 deletions

View File

@ -29,31 +29,31 @@ Options:
Options: Options:
--version show program's version number and exit --version show program's version number and exit
-h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR -b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '') bind [host]:port (host defaults to '')
-d DEST_ADDR, --dest=DEST_ADDR -s SINK, --sink=SINK a graphite service to which stats are sent
receiver [backend:]host:port (backend defaults to ([host]:port).
'graphite')
-v increase verbosity (currently used for debugging) -v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL -f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10) flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT -p PERCENT, --percent=PERCENT
percent threshold (default 90) percent threshold (default 90)
-l, --list list supported backends
-D, --daemonize daemonize the service -D, --daemonize daemonize the service
-h, --help
Start gstatsd and send stats to port 9100 every 5 seconds: Start gstatsd listening on the default port 8125, and send stats to graphite
server on port 2003 every 5 seconds:
% gstatsd -d :9100 -f 5 % gstatsd -s 2003 -f 5
Bind listener to host 'hostname' port 8126: Bind listener to host 'foo' port 8126, and send stats to the Graphite server
on host 'bar' port 2003 every 20 seconds:
% gstatsd -b hostname:8126 -d :9100 -f 5 % gstatsd -b foo:8126 -s bar:2003 -f 20
To send the stats to multiple graphite servers, specify multiple destinations: To send the stats to multiple graphite servers, specify '-s' multiple times:
% gstatsd -b :8125 -d stats1:9100 stats2:9100 % gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client Using the client

View File

@ -37,38 +37,38 @@ Options:
Options: Options:
--version show program's version number and exit --version show program's version number and exit
-h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR -b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '') bind [host]:port (host defaults to '')
-d DEST_ADDR, --dest=DEST_ADDR -s SINK, --sink=SINK a graphite service to which stats are sent
receiver [backend:]host:port (backend defaults to ([host]:port).
'graphite')
-v increase verbosity (currently used for debugging) -v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL -f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10) flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT -p PERCENT, --percent=PERCENT
percent threshold (default 90) percent threshold (default 90)
-l, --list list supported backends
-D, --daemonize daemonize the service -D, --daemonize daemonize the service
-h, --help
Start gstatsd and send stats to port 9100 every 5 seconds: Start gstatsd listening on the default port 8125, and send stats to
graphite server on port 2003 every 5 seconds:
:: ::
% gstatsd -d :9100 -f 5 % gstatsd -s 2003 -f 5
Bind listener to host 'hostname' port 8126: Bind listener to host 'foo' port 8126, and send stats to the
Graphite server on host 'bar' port 2003 every 20 seconds:
:: ::
% gstatsd -b hostname:8126 -d :9100 -f 5 % gstatsd -b foo:8126 -s bar:2003 -f 20
To send the stats to multiple graphite servers, specify multiple To send the stats to multiple graphite servers, specify '-s'
destinations: multiple times:
:: ::
% gstatsd -b :8125 -d stats1:9100 stats2:9100 % gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client Using the client
---------------- ----------------

2
debian/changelog vendored
View File

@ -1,4 +1,4 @@
gstatsd (0.3.1) lucid; urgency=low gstatsd (0.4) lucid; urgency=low
* Release. * Release.

View File

@ -1,3 +1,3 @@
__version__ = '0.3.1' __version__ = '0.4'

View File

@ -12,6 +12,7 @@ import traceback
from collections import defaultdict from collections import defaultdict
# local # local
import sink
from core import __version__ from core import __version__
# vendor # vendor
@ -26,6 +27,9 @@ MAX_PACKET = 2048
DESCRIPTION = ''' DESCRIPTION = '''
A statsd service in Python + gevent. A statsd service in Python + gevent.
'''
EPILOG = '''
''' '''
# table to remove invalid characters from keys # table to remove invalid characters from keys
@ -36,10 +40,16 @@ KEY_DELETIONS = ''.join(ALL_ASCII.difference(KEY_VALID + '/'))
# error messages # error messages
E_BADADDR = 'invalid bind address specified %r' E_BADADDR = 'invalid bind address specified %r'
E_BADTARGET = 'invalid target specified %r' E_NOSINKS = 'you must specify at least one stats sink'
E_BADBACKEND = 'invalid backend specified %r'
E_NOTARGETS = 'you must specify at least one stats destination'
E_SENDFAIL = 'failed to send stats to %s %s: %s' class Stats(object):
def __init__(self):
self.timers = defaultdict(list)
self.counts = defaultdict(float)
self.percent = PERCENT
self.interval = INTERVAL
def daemonize(umask=0027): def daemonize(umask=0027):
@ -83,41 +93,49 @@ class StatsDaemon(object):
A statsd service implementation in Python + gevent. A statsd service implementation in Python + gevent.
""" """
def __init__(self, bindaddr, targets, interval, percent, debug=0): def __init__(self, bindaddr, sinkspecs, interval, percent, debug=0):
# parse and validate
_, host, port = parse_addr(bindaddr) _, host, port = parse_addr(bindaddr)
if port is None: if port is None:
self._exit(E_BADADDR % bindaddr) self.exit(E_BADADDR % bindaddr)
self._bindaddr = (host, port) self._bindaddr = (host, port)
# parse backend targets # TODO: generalize to support more than one sink type. currently
if not targets: # only the graphite backend is present, but we may want to write
self._exit(E_NOTARGETS) # stats to hbase, redis, etc. - ph
self._targets = []
for target in targets: # construct the sink and add hosts to it
backend, host, port = parse_addr(target) if not sinkspecs:
if backend is None: self.exit(E_NOSINKS)
backend = 'graphite' self._sink = sink.GraphiteSink()
if port is None: errors = []
self._exit(E_BADTARGET % target) for spec in sinkspecs:
func = getattr(self, '_send_%s' % backend, None) try:
if not func: self._sink.add(spec)
self._exit(E_BADBACKEND % backend) except ValueError, ex:
self._targets.append((func, (host, port))) errors.append(ex)
if errors:
for err in errors:
self.error(str(err))
self.exit('exiting.')
self._interval = float(interval)
self._percent = float(percent) self._percent = float(percent)
self._interval = float(interval)
self._debug = debug self._debug = debug
self._timers = defaultdict(list)
self._counts = defaultdict(float)
self._sock = None self._sock = None
self._flush_task = None self._flush_task = None
def _exit(self, msg, code=1): self._reset_stats()
self._error(msg)
def _reset_stats(self):
self._stats = Stats()
self._stats.percent = self._percent
self._stats.interval = self._interval
def exit(self, msg, code=1):
self.error(msg)
sys.exit(code) sys.exit(code)
def _error(self, msg): def error(self, msg):
sys.stderr.write(msg + '\n') sys.stderr.write(msg + '\n')
def start(self): def start(self):
@ -128,12 +146,19 @@ class StatsDaemon(object):
# spawn the flush trigger # spawn the flush trigger
def _flush_impl(): def _flush_impl():
while 1: while 1:
gevent.sleep(self._interval) gevent.sleep(self._stats.interval)
for func, hostport in self._targets:
try: # rotate stats
func(hostport) stats = self._stats
except Exception, ex: self._reset_stats()
self._error(traceback.format_tb(sys.exc_info()[-1]))
# send the stats to the sink which in turn broadcasts
# the stats packet to one or more hosts.
try:
self._sink.send(stats)
except Exception, ex:
trace = traceback.format_tb(sys.exc_info()[-1])
self.error(''.join(trace))
self._flush_task = gevent.spawn(_flush_impl) self._flush_task = gevent.spawn(_flush_impl)
@ -145,19 +170,22 @@ class StatsDaemon(object):
try: try:
self._process(*self._sock.recvfrom(MAX_PACKET)) self._process(*self._sock.recvfrom(MAX_PACKET))
except Exception, ex: except Exception, ex:
self._error(str(ex)) self.error(str(ex))
def _shutdown(self): def _shutdown(self):
"Shutdown the server" "Shutdown the server"
self._exit("service exiting", code=0) self.exit("service exiting", code=0)
def _process(self, data, _): def _process(self, data, _):
"Process a single packet" "Process a single packet and update the internal tables."
parts = data.split(':') parts = data.split(':')
if self._debug: if self._debug:
self._error('packet: %r' % data) self.error('packet: %r' % data)
if not parts: if not parts:
return return
# interpret the packet and update stats
stats = self._stats
key = parts[0].translate(KEY_TABLE, KEY_DELETIONS) key = parts[0].translate(KEY_TABLE, KEY_DELETIONS)
for part in parts[1:]: for part in parts[1:]:
srate = 1.0 srate = 1.0
@ -170,106 +198,45 @@ class StatsDaemon(object):
# timer (milliseconds) # timer (milliseconds)
if stype == 'ms': if stype == 'ms':
self._timers[key].append(float(value if value else 0)) stats.timers[key].append(float(value if value else 0))
# counter with optional sample rate # counter with optional sample rate
elif stype == 'c': elif stype == 'c':
if length == 3 and fields[2].startswith('@'): if length == 3 and fields[2].startswith('@'):
srate = float(fields[2][1:]) srate = float(fields[2][1:])
value = float(value if value else 1) * (1 / srate) value = float(value if value else 1) * (1 / srate)
self._counts[key] += value stats.counts[key] += value
def _send_graphite(self, dest):
"Send blob of stats data to graphite server"
buf = cStringIO.StringIO()
now = int(time.time())
num_stats = 0
# timer stats
pct = self._percent
timers = self._timers
for key, vals in timers.iteritems():
if not vals:
continue
# compute statistics
num = len(vals)
vals = sorted(vals)
vmin = vals[0]
vmax = vals[-1]
mean = vmin
max_at_thresh = vmax
if num > 1:
idx = round((pct / 100.0) * num)
tmp = vals[:int(idx)]
if tmp:
max_at_thresh = tmp[-1]
mean = sum(tmp) / idx
key = 'stats.timers.%s' % key
buf.write('%s.mean %f %d\n' % (key, mean, now))
buf.write('%s.upper %f %d\n' % (key, vmax, now))
buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
buf.write('%s.lower %f %d\n' % (key, vmin, now))
buf.write('%s.count %d %d\n' % (key, num, now))
num_stats += 1
# counter stats
counts = self._counts
for key, val in counts.iteritems():
buf.write('stats.%s %f %d\n' % (key, val / self._interval, now))
buf.write('stats_counts.%s %f %d\n' % (key, val, now))
num_stats += 1
buf.write('statsd.numStats %d %d\n' % (num_stats, now))
# reset
self._timers = defaultdict(list)
self._counts = defaultdict(float)
del timers
del counts
# XXX: add support for N retries
# flush stats to graphite
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(dest)
sock.sendall(buf.getvalue())
sock.close()
except Exception, ex:
self._error(E_SENDFAIL % ('graphite', dest, ex))
def main(): def main():
opts = optparse.OptionParser(description=DESCRIPTION, version=__version__) opts = optparse.OptionParser(description=DESCRIPTION, version=__version__,
add_help_option=False)
opts.add_option('-b', '--bind', dest='bind_addr', default=':8125', opts.add_option('-b', '--bind', dest='bind_addr', default=':8125',
help="bind [host]:port (host defaults to '')") help="bind [host]:port (host defaults to '')")
opts.add_option('-d', '--dest', dest='dest_addr', action='append', opts.add_option('-s', '--sink', dest='sink', action='append', default=[],
default=[], help="a graphite service to which stats are sent ([host]:port).")
help="receiver [backend:]host:port (backend defaults to 'graphite')")
opts.add_option('-v', dest='verbose', action='count', default=0, opts.add_option('-v', dest='verbose', action='count', default=0,
help="increase verbosity (currently used for debugging)") help="increase verbosity (currently used for debugging)")
opts.add_option('-f', '--flush', dest='interval', default=INTERVAL, opts.add_option('-f', '--flush', dest='interval', default=INTERVAL,
help="flush interval, in seconds (default 10)") help="flush interval, in seconds (default 10)")
opts.add_option('-p', '--percent', dest='percent', default=PERCENT, opts.add_option('-p', '--percent', dest='percent', default=PERCENT,
help="percent threshold (default 90)") help="percent threshold (default 90)")
opts.add_option('-l', '--list', dest='list_backends', action='store_true',
help="list supported backends")
opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true', opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
help='daemonize the service') help='daemonize the service')
opts.add_option('-h', '--help', dest='usage', action='store_true')
(options, args) = opts.parse_args() (options, args) = opts.parse_args()
if options.list_backends: if options.usage:
for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]: # TODO: write epilog. usage is manually output since optparse will
print(key[6:]) # wrap the epilog and we want pre-formatted output. - ph
print(opts.format_help())
sys.exit() sys.exit()
if options.daemonize: if options.daemonize:
daemonize() daemonize()
sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval, sd = StatsDaemon(options.bind_addr, options.sink, options.interval,
options.percent, options.verbose) options.percent, options.verbose)
sd.start() sd.start()

View File

@ -9,54 +9,56 @@ from gstatsd import service
class StatsServiceTest(unittest.TestCase): class StatsServiceTest(unittest.TestCase):
def setUp(self): def setUp(self):
args = (':8125', [':9100'], 5, 90, 0) args = (':8125', [':2003'], 5, 90, 0)
self.svc = service.StatsDaemon(*args) self.svc = service.StatsDaemon(*args)
self.stats = self.svc._stats
def test_construct(self): def test_construct(self):
svc = service.StatsDaemon('8125', ['9100'], 5, 90, 0) svc = service.StatsDaemon('8125', ['2003'], 5, 90, 0)
stats = svc._stats
self.assertEquals(svc._bindaddr, ('', 8125)) self.assertEquals(svc._bindaddr, ('', 8125))
self.assertEquals(svc._interval, 5.0) self.assertEquals(svc._interval, 5.0)
self.assertEquals(svc._percent, 90.0)
self.assertEquals(svc._debug, 0) self.assertEquals(svc._debug, 0)
self.assertEquals(svc._targets[0], (svc._send_graphite, ('', 9100))) self.assertEquals(stats.percent, 90.0)
self.assertEquals(svc._sink._hosts, [('', 2003)])
svc = service.StatsDaemon('bar:8125', ['foo:9100'], 5, 90, 1) svc = service.StatsDaemon('bar:8125', ['foo:2003'], 5, 90, 1)
self.assertEquals(svc._bindaddr, ('bar', 8125)) self.assertEquals(svc._bindaddr, ('bar', 8125))
self.assertEquals(svc._targets[0], (svc._send_graphite, ('foo', 9100))) self.assertEquals(svc._sink._hosts, [('foo', 2003)])
self.assertEquals(svc._debug, 1) self.assertEquals(svc._debug, 1)
def test_backend(self): def test_backend(self):
service.StatsDaemon._send_foo = lambda self, x, y: None service.StatsDaemon._send_foo = lambda self, x, y: None
svc = service.StatsDaemon('8125', ['foo:bar:9100'], 5, 90, 0) svc = service.StatsDaemon('8125', ['bar:2003'], 5, 90, 0)
self.assertEquals(svc._targets[0], (svc._send_foo, ('bar', 9100))) self.assertEquals(svc._sink._hosts, [('bar', 2003)])
def test_counters(self): def test_counters(self):
pkt = 'foo:1|c' pkt = 'foo:1|c'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 1}) self.assertEquals(self.stats.counts, {'foo': 1})
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 2}) self.assertEquals(self.stats.counts, {'foo': 2})
pkt = 'foo:-1|c' pkt = 'foo:-1|c'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 1}) self.assertEquals(self.stats.counts, {'foo': 1})
def test_counters_sampled(self): def test_counters_sampled(self):
pkt = 'foo:1|c|@.5' pkt = 'foo:1|c|@.5'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 2}) self.assertEquals(self.stats.counts, {'foo': 2})
def test_timers(self): def test_timers(self):
pkt = 'foo:20|ms' pkt = 'foo:20|ms'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._timers, {'foo': [20.0]}) self.assertEquals(self.stats.timers, {'foo': [20.0]})
pkt = 'foo:10|ms' pkt = 'foo:10|ms'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._timers, {'foo': [20.0, 10.0]}) self.assertEquals(self.stats.timers, {'foo': [20.0, 10.0]})
def test_key_sanitize(self): def test_key_sanitize(self):
pkt = '\t\n#! foo . bar \0 ^:1|c' pkt = '\t\n#! foo . bar \0 ^:1|c'
self.svc._process(pkt, None) self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo.bar': 1}) self.assertEquals(self.stats.counts, {'foo.bar': 1})
def main(): def main():

102
gstatsd/sink.py Normal file
View File

@ -0,0 +1,102 @@
# standard
import cStringIO
import sys
import time
# vendor
from gevent import socket
E_BADSPEC = "bad sink spec %r: %s"
E_SENDFAIL = 'failed to send stats to %s %s: %s'
class Sink(object):
"""
A resource to which stats will be sent.
"""
def error(self, msg):
sys.stderr.write(msg + '\n')
def _parse_hostport(self, spec):
try:
parts = spec.split(':')
if len(parts) == 2:
return (parts[0], int(parts[1]))
if len(parts) == 1:
return ('', int(parts[0]))
except ValueError, ex:
raise ValueError(E_BADSPEC % (spec, ex))
raise ValueError("expected '[host]:port' but got %r" % spec)
class GraphiteSink(Sink):
"""
Sends stats to one or more Graphite servers.
"""
def __init__(self):
self._hosts = []
def add(self, spec):
self._hosts.append(self._parse_hostport(spec))
def send(self, stats):
"Format stats and send to one or more Graphite hosts"
buf = cStringIO.StringIO()
now = int(time.time())
num_stats = 0
# timer stats
pct = stats.percent
timers = stats.timers
for key, vals in timers.iteritems():
if not vals:
continue
# compute statistics
num = len(vals)
vals = sorted(vals)
vmin = vals[0]
vmax = vals[-1]
mean = vmin
max_at_thresh = vmax
if num > 1:
idx = round((pct / 100.0) * num)
tmp = vals[:int(idx)]
if tmp:
max_at_thresh = tmp[-1]
mean = sum(tmp) / idx
key = 'stats.timers.%s' % key
buf.write('%s.mean %f %d\n' % (key, mean, now))
buf.write('%s.upper %f %d\n' % (key, vmax, now))
buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
buf.write('%s.lower %f %d\n' % (key, vmin, now))
buf.write('%s.count %d %d\n' % (key, num, now))
num_stats += 1
# counter stats
counts = stats.counts
for key, val in counts.iteritems():
buf.write('stats.%s %f %d\n' % (key, val / stats.interval, now))
buf.write('stats_counts.%s %f %d\n' % (key, val, now))
num_stats += 1
buf.write('statsd.numStats %d %d\n' % (num_stats, now))
# TODO: add support for N retries
for host in self._hosts:
# flush stats to graphite
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(host)
sock.sendall(buf.getvalue())
sock.close()
except Exception, ex:
self.error(E_SENDFAIL % ('graphite', host, ex))

4
makedoc.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
pandoc -f markdown -t rst -o README.txt README.md