Release 0.4

* Generalized the stats formatting and delivery into GraphiteSink
   class.  Move the stats tables (timers/counts) into a Stats class.
 * Updated README files, added script to derive README.txt from
   README.md, changed '-d / --dest' option to '-s / --sink'.
This commit is contained in:
Patrick Hensley 2011-06-30 10:22:50 -04:00
parent e83d53f948
commit 2b8948ae0a
8 changed files with 225 additions and 150 deletions

View File

@ -29,31 +29,31 @@ Options:
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '')
-d DEST_ADDR, --dest=DEST_ADDR
receiver [backend:]host:port (backend defaults to
'graphite')
-s SINK, --sink=SINK a graphite service to which stats are sent
([host]:port).
-v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT
percent threshold (default 90)
-l, --list list supported backends
-D, --daemonize daemonize the service
-h, --help
Start gstatsd and send stats to port 9100 every 5 seconds:
Start gstatsd listening on the default port 8125, and send stats to graphite
server on port 2003 every 5 seconds:
% gstatsd -d :9100 -f 5
% gstatsd -s 2003 -f 5
Bind listener to host 'hostname' port 8126:
Bind listener to host 'foo' port 8126, and send stats to the Graphite server
on host 'bar' port 2003 every 20 seconds:
% gstatsd -b hostname:8126 -d :9100 -f 5
% gstatsd -b foo:8126 -s bar:2003 -f 20
To send the stats to multiple graphite servers, specify multiple destinations:
To send the stats to multiple graphite servers, specify '-s' multiple times:
% gstatsd -b :8125 -d stats1:9100 stats2:9100
% gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client

View File

@ -37,38 +37,38 @@ Options:
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '')
-d DEST_ADDR, --dest=DEST_ADDR
receiver [backend:]host:port (backend defaults to
'graphite')
-s SINK, --sink=SINK a graphite service to which stats are sent
([host]:port).
-v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT
percent threshold (default 90)
-l, --list list supported backends
-D, --daemonize daemonize the service
-h, --help
Start gstatsd and send stats to port 9100 every 5 seconds:
Start gstatsd listening on the default port 8125, and send stats to
graphite server on port 2003 every 5 seconds:
::
% gstatsd -d :9100 -f 5
% gstatsd -s 2003 -f 5
Bind listener to host 'hostname' port 8126:
Bind listener to host 'foo' port 8126, and send stats to the
Graphite server on host 'bar' port 2003 every 20 seconds:
::
% gstatsd -b hostname:8126 -d :9100 -f 5
% gstatsd -b foo:8126 -s bar:2003 -f 20
To send the stats to multiple graphite servers, specify multiple
destinations:
To send the stats to multiple graphite servers, specify '-s'
multiple times:
::
% gstatsd -b :8125 -d stats1:9100 stats2:9100
% gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client
----------------

2
debian/changelog vendored
View File

@ -1,4 +1,4 @@
gstatsd (0.3.1) lucid; urgency=low
gstatsd (0.4) lucid; urgency=low
* Release.

View File

@ -1,3 +1,3 @@
__version__ = '0.3.1'
__version__ = '0.4'

View File

@ -12,6 +12,7 @@ import traceback
from collections import defaultdict
# local
import sink
from core import __version__
# vendor
@ -26,6 +27,9 @@ MAX_PACKET = 2048
DESCRIPTION = '''
A statsd service in Python + gevent.
'''
EPILOG = '''
'''
# table to remove invalid characters from keys
@ -36,10 +40,16 @@ KEY_DELETIONS = ''.join(ALL_ASCII.difference(KEY_VALID + '/'))
# error messages
E_BADADDR = 'invalid bind address specified %r'
E_BADTARGET = 'invalid target specified %r'
E_BADBACKEND = 'invalid backend specified %r'
E_NOTARGETS = 'you must specify at least one stats destination'
E_SENDFAIL = 'failed to send stats to %s %s: %s'
E_NOSINKS = 'you must specify at least one stats sink'
class Stats(object):
def __init__(self):
self.timers = defaultdict(list)
self.counts = defaultdict(float)
self.percent = PERCENT
self.interval = INTERVAL
def daemonize(umask=0027):
@ -83,41 +93,49 @@ class StatsDaemon(object):
A statsd service implementation in Python + gevent.
"""
def __init__(self, bindaddr, targets, interval, percent, debug=0):
# parse and validate
def __init__(self, bindaddr, sinkspecs, interval, percent, debug=0):
_, host, port = parse_addr(bindaddr)
if port is None:
self._exit(E_BADADDR % bindaddr)
self.exit(E_BADADDR % bindaddr)
self._bindaddr = (host, port)
# parse backend targets
if not targets:
self._exit(E_NOTARGETS)
self._targets = []
for target in targets:
backend, host, port = parse_addr(target)
if backend is None:
backend = 'graphite'
if port is None:
self._exit(E_BADTARGET % target)
func = getattr(self, '_send_%s' % backend, None)
if not func:
self._exit(E_BADBACKEND % backend)
self._targets.append((func, (host, port)))
# TODO: generalize to support more than one sink type. currently
# only the graphite backend is present, but we may want to write
# stats to hbase, redis, etc. - ph
# construct the sink and add hosts to it
if not sinkspecs:
self.exit(E_NOSINKS)
self._sink = sink.GraphiteSink()
errors = []
for spec in sinkspecs:
try:
self._sink.add(spec)
except ValueError, ex:
errors.append(ex)
if errors:
for err in errors:
self.error(str(err))
self.exit('exiting.')
self._interval = float(interval)
self._percent = float(percent)
self._interval = float(interval)
self._debug = debug
self._timers = defaultdict(list)
self._counts = defaultdict(float)
self._sock = None
self._flush_task = None
def _exit(self, msg, code=1):
self._error(msg)
self._reset_stats()
def _reset_stats(self):
self._stats = Stats()
self._stats.percent = self._percent
self._stats.interval = self._interval
def exit(self, msg, code=1):
self.error(msg)
sys.exit(code)
def _error(self, msg):
def error(self, msg):
sys.stderr.write(msg + '\n')
def start(self):
@ -128,12 +146,19 @@ class StatsDaemon(object):
# spawn the flush trigger
def _flush_impl():
while 1:
gevent.sleep(self._interval)
for func, hostport in self._targets:
try:
func(hostport)
except Exception, ex:
self._error(traceback.format_tb(sys.exc_info()[-1]))
gevent.sleep(self._stats.interval)
# rotate stats
stats = self._stats
self._reset_stats()
# send the stats to the sink which in turn broadcasts
# the stats packet to one or more hosts.
try:
self._sink.send(stats)
except Exception, ex:
trace = traceback.format_tb(sys.exc_info()[-1])
self.error(''.join(trace))
self._flush_task = gevent.spawn(_flush_impl)
@ -145,19 +170,22 @@ class StatsDaemon(object):
try:
self._process(*self._sock.recvfrom(MAX_PACKET))
except Exception, ex:
self._error(str(ex))
self.error(str(ex))
def _shutdown(self):
"Shutdown the server"
self._exit("service exiting", code=0)
self.exit("service exiting", code=0)
def _process(self, data, _):
"Process a single packet"
"Process a single packet and update the internal tables."
parts = data.split(':')
if self._debug:
self._error('packet: %r' % data)
self.error('packet: %r' % data)
if not parts:
return
# interpret the packet and update stats
stats = self._stats
key = parts[0].translate(KEY_TABLE, KEY_DELETIONS)
for part in parts[1:]:
srate = 1.0
@ -170,106 +198,45 @@ class StatsDaemon(object):
# timer (milliseconds)
if stype == 'ms':
self._timers[key].append(float(value if value else 0))
stats.timers[key].append(float(value if value else 0))
# counter with optional sample rate
elif stype == 'c':
if length == 3 and fields[2].startswith('@'):
srate = float(fields[2][1:])
value = float(value if value else 1) * (1 / srate)
self._counts[key] += value
def _send_graphite(self, dest):
"Send blob of stats data to graphite server"
buf = cStringIO.StringIO()
now = int(time.time())
num_stats = 0
# timer stats
pct = self._percent
timers = self._timers
for key, vals in timers.iteritems():
if not vals:
continue
# compute statistics
num = len(vals)
vals = sorted(vals)
vmin = vals[0]
vmax = vals[-1]
mean = vmin
max_at_thresh = vmax
if num > 1:
idx = round((pct / 100.0) * num)
tmp = vals[:int(idx)]
if tmp:
max_at_thresh = tmp[-1]
mean = sum(tmp) / idx
key = 'stats.timers.%s' % key
buf.write('%s.mean %f %d\n' % (key, mean, now))
buf.write('%s.upper %f %d\n' % (key, vmax, now))
buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
buf.write('%s.lower %f %d\n' % (key, vmin, now))
buf.write('%s.count %d %d\n' % (key, num, now))
num_stats += 1
# counter stats
counts = self._counts
for key, val in counts.iteritems():
buf.write('stats.%s %f %d\n' % (key, val / self._interval, now))
buf.write('stats_counts.%s %f %d\n' % (key, val, now))
num_stats += 1
buf.write('statsd.numStats %d %d\n' % (num_stats, now))
# reset
self._timers = defaultdict(list)
self._counts = defaultdict(float)
del timers
del counts
# XXX: add support for N retries
# flush stats to graphite
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(dest)
sock.sendall(buf.getvalue())
sock.close()
except Exception, ex:
self._error(E_SENDFAIL % ('graphite', dest, ex))
stats.counts[key] += value
def main():
opts = optparse.OptionParser(description=DESCRIPTION, version=__version__)
opts = optparse.OptionParser(description=DESCRIPTION, version=__version__,
add_help_option=False)
opts.add_option('-b', '--bind', dest='bind_addr', default=':8125',
help="bind [host]:port (host defaults to '')")
opts.add_option('-d', '--dest', dest='dest_addr', action='append',
default=[],
help="receiver [backend:]host:port (backend defaults to 'graphite')")
opts.add_option('-s', '--sink', dest='sink', action='append', default=[],
help="a graphite service to which stats are sent ([host]:port).")
opts.add_option('-v', dest='verbose', action='count', default=0,
help="increase verbosity (currently used for debugging)")
opts.add_option('-f', '--flush', dest='interval', default=INTERVAL,
help="flush interval, in seconds (default 10)")
opts.add_option('-p', '--percent', dest='percent', default=PERCENT,
help="percent threshold (default 90)")
opts.add_option('-l', '--list', dest='list_backends', action='store_true',
help="list supported backends")
opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
help='daemonize the service')
opts.add_option('-h', '--help', dest='usage', action='store_true')
(options, args) = opts.parse_args()
if options.list_backends:
for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]:
print(key[6:])
if options.usage:
# TODO: write epilog. usage is manually output since optparse will
# wrap the epilog and we want pre-formatted output. - ph
print(opts.format_help())
sys.exit()
if options.daemonize:
daemonize()
sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval,
sd = StatsDaemon(options.bind_addr, options.sink, options.interval,
options.percent, options.verbose)
sd.start()

View File

@ -9,54 +9,56 @@ from gstatsd import service
class StatsServiceTest(unittest.TestCase):
def setUp(self):
args = (':8125', [':9100'], 5, 90, 0)
args = (':8125', [':2003'], 5, 90, 0)
self.svc = service.StatsDaemon(*args)
self.stats = self.svc._stats
def test_construct(self):
svc = service.StatsDaemon('8125', ['9100'], 5, 90, 0)
svc = service.StatsDaemon('8125', ['2003'], 5, 90, 0)
stats = svc._stats
self.assertEquals(svc._bindaddr, ('', 8125))
self.assertEquals(svc._interval, 5.0)
self.assertEquals(svc._percent, 90.0)
self.assertEquals(svc._debug, 0)
self.assertEquals(svc._targets[0], (svc._send_graphite, ('', 9100)))
self.assertEquals(stats.percent, 90.0)
self.assertEquals(svc._sink._hosts, [('', 2003)])
svc = service.StatsDaemon('bar:8125', ['foo:9100'], 5, 90, 1)
svc = service.StatsDaemon('bar:8125', ['foo:2003'], 5, 90, 1)
self.assertEquals(svc._bindaddr, ('bar', 8125))
self.assertEquals(svc._targets[0], (svc._send_graphite, ('foo', 9100)))
self.assertEquals(svc._sink._hosts, [('foo', 2003)])
self.assertEquals(svc._debug, 1)
def test_backend(self):
service.StatsDaemon._send_foo = lambda self, x, y: None
svc = service.StatsDaemon('8125', ['foo:bar:9100'], 5, 90, 0)
self.assertEquals(svc._targets[0], (svc._send_foo, ('bar', 9100)))
svc = service.StatsDaemon('8125', ['bar:2003'], 5, 90, 0)
self.assertEquals(svc._sink._hosts, [('bar', 2003)])
def test_counters(self):
pkt = 'foo:1|c'
self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 1})
self.assertEquals(self.stats.counts, {'foo': 1})
self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 2})
self.assertEquals(self.stats.counts, {'foo': 2})
pkt = 'foo:-1|c'
self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 1})
self.assertEquals(self.stats.counts, {'foo': 1})
def test_counters_sampled(self):
pkt = 'foo:1|c|@.5'
self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo': 2})
self.assertEquals(self.stats.counts, {'foo': 2})
def test_timers(self):
pkt = 'foo:20|ms'
self.svc._process(pkt, None)
self.assertEquals(self.svc._timers, {'foo': [20.0]})
self.assertEquals(self.stats.timers, {'foo': [20.0]})
pkt = 'foo:10|ms'
self.svc._process(pkt, None)
self.assertEquals(self.svc._timers, {'foo': [20.0, 10.0]})
self.assertEquals(self.stats.timers, {'foo': [20.0, 10.0]})
def test_key_sanitize(self):
pkt = '\t\n#! foo . bar \0 ^:1|c'
self.svc._process(pkt, None)
self.assertEquals(self.svc._counts, {'foo.bar': 1})
self.assertEquals(self.stats.counts, {'foo.bar': 1})
def main():

102
gstatsd/sink.py Normal file
View File

@ -0,0 +1,102 @@
# standard
import cStringIO
import sys
import time
# vendor
from gevent import socket
E_BADSPEC = "bad sink spec %r: %s"
E_SENDFAIL = 'failed to send stats to %s %s: %s'
class Sink(object):
"""
A resource to which stats will be sent.
"""
def error(self, msg):
sys.stderr.write(msg + '\n')
def _parse_hostport(self, spec):
try:
parts = spec.split(':')
if len(parts) == 2:
return (parts[0], int(parts[1]))
if len(parts) == 1:
return ('', int(parts[0]))
except ValueError, ex:
raise ValueError(E_BADSPEC % (spec, ex))
raise ValueError("expected '[host]:port' but got %r" % spec)
class GraphiteSink(Sink):
"""
Sends stats to one or more Graphite servers.
"""
def __init__(self):
self._hosts = []
def add(self, spec):
self._hosts.append(self._parse_hostport(spec))
def send(self, stats):
"Format stats and send to one or more Graphite hosts"
buf = cStringIO.StringIO()
now = int(time.time())
num_stats = 0
# timer stats
pct = stats.percent
timers = stats.timers
for key, vals in timers.iteritems():
if not vals:
continue
# compute statistics
num = len(vals)
vals = sorted(vals)
vmin = vals[0]
vmax = vals[-1]
mean = vmin
max_at_thresh = vmax
if num > 1:
idx = round((pct / 100.0) * num)
tmp = vals[:int(idx)]
if tmp:
max_at_thresh = tmp[-1]
mean = sum(tmp) / idx
key = 'stats.timers.%s' % key
buf.write('%s.mean %f %d\n' % (key, mean, now))
buf.write('%s.upper %f %d\n' % (key, vmax, now))
buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
buf.write('%s.lower %f %d\n' % (key, vmin, now))
buf.write('%s.count %d %d\n' % (key, num, now))
num_stats += 1
# counter stats
counts = stats.counts
for key, val in counts.iteritems():
buf.write('stats.%s %f %d\n' % (key, val / stats.interval, now))
buf.write('stats_counts.%s %f %d\n' % (key, val, now))
num_stats += 1
buf.write('statsd.numStats %d %d\n' % (num_stats, now))
# TODO: add support for N retries
for host in self._hosts:
# flush stats to graphite
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(host)
sock.sendall(buf.getvalue())
sock.close()
except Exception, ex:
self.error(E_SENDFAIL % ('graphite', host, ex))

4
makedoc.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
pandoc -f markdown -t rst -o README.txt README.md