summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrick Hensley <spaceboy@indirect.com>2011-06-30 14:22:50 (GMT)
committerPatrick Hensley <spaceboy@indirect.com>2011-06-30 21:20:53 (GMT)
commit2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc (patch)
treedcdc419ab690c1399e612866dcdfa6c7e5a4d0b4
parente83d53f948df502b14119e4faf6dac88806c21ba (diff)
downloadgstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.zip
gstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.tar.gz
gstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.tar.bz2
Release 0.40.4
* Generalized the stats formatting and delivery into GraphiteSink class. Move the stats tables (timers/counts) into a Stats class. * Updated README files, added script to derive README.txt from README.md, changed '-d / --dest' option to '-s / --sink'.
-rw-r--r--README.md22
-rw-r--r--README.txt24
-rw-r--r--debian/changelog2
-rw-r--r--gstatsd/core.py2
-rw-r--r--gstatsd/service.py187
-rw-r--r--gstatsd/service_test.py32
-rw-r--r--gstatsd/sink.py102
-rwxr-xr-xmakedoc.sh4
8 files changed, 225 insertions, 150 deletions
diff --git a/README.md b/README.md
index 5e258fb..cd0d08c 100644
--- a/README.md
+++ b/README.md
@@ -29,31 +29,31 @@ Options:
Options:
--version show program's version number and exit
- -h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '')
- -d DEST_ADDR, --dest=DEST_ADDR
- receiver [backend:]host:port (backend defaults to
- 'graphite')
+ -s SINK, --sink=SINK a graphite service to which stats are sent
+ ([host]:port).
-v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT
percent threshold (default 90)
- -l, --list list supported backends
-D, --daemonize daemonize the service
+ -h, --help
-Start gstatsd and send stats to port 9100 every 5 seconds:
+Start gstatsd listening on the default port 8125, and send stats to graphite
+server on port 2003 every 5 seconds:
- % gstatsd -d :9100 -f 5
+ % gstatsd -s 2003 -f 5
-Bind listener to host 'hostname' port 8126:
+Bind listener to host 'foo' port 8126, and send stats to the Graphite server
+on host 'bar' port 2003 every 20 seconds:
- % gstatsd -b hostname:8126 -d :9100 -f 5
+ % gstatsd -b foo:8126 -s bar:2003 -f 20
-To send the stats to multiple graphite servers, specify multiple destinations:
+To send the stats to multiple graphite servers, specify '-s' multiple times:
- % gstatsd -b :8125 -d stats1:9100 stats2:9100
+ % gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client
diff --git a/README.txt b/README.txt
index 64cde23..c901cb1 100644
--- a/README.txt
+++ b/README.txt
@@ -37,38 +37,38 @@ Options:
Options:
--version show program's version number and exit
- -h, --help show this help message and exit
-b BIND_ADDR, --bind=BIND_ADDR
bind [host]:port (host defaults to '')
- -d DEST_ADDR, --dest=DEST_ADDR
- receiver [backend:]host:port (backend defaults to
- 'graphite')
+ -s SINK, --sink=SINK a graphite service to which stats are sent
+ ([host]:port).
-v increase verbosity (currently used for debugging)
-f INTERVAL, --flush=INTERVAL
flush interval, in seconds (default 10)
-p PERCENT, --percent=PERCENT
percent threshold (default 90)
- -l, --list list supported backends
-D, --daemonize daemonize the service
+ -h, --help
-Start gstatsd and send stats to port 9100 every 5 seconds:
+Start gstatsd listening on the default port 8125, and send stats to
+graphite server on port 2003 every 5 seconds:
::
- % gstatsd -d :9100 -f 5
+ % gstatsd -s 2003 -f 5
-Bind listener to host 'hostname' port 8126:
+Bind listener to host 'foo' port 8126, and send stats to the
+Graphite server on host 'bar' port 2003 every 20 seconds:
::
- % gstatsd -b hostname:8126 -d :9100 -f 5
+ % gstatsd -b foo:8126 -s bar:2003 -f 20
-To send the stats to multiple graphite servers, specify multiple
-destinations:
+To send the stats to multiple graphite servers, specify '-s'
+multiple times:
::
- % gstatsd -b :8125 -d stats1:9100 stats2:9100
+ % gstatsd -b :8125 -s stats1:2003 -s stats2:2004
Using the client
----------------
diff --git a/debian/changelog b/debian/changelog
index 89b969b..69bac0a 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,4 +1,4 @@
-gstatsd (0.3.1) lucid; urgency=low
+gstatsd (0.4) lucid; urgency=low
* Release.
diff --git a/gstatsd/core.py b/gstatsd/core.py
index c78aaf0..2370051 100644
--- a/gstatsd/core.py
+++ b/gstatsd/core.py
@@ -1,3 +1,3 @@
-__version__ = '0.3.1'
+__version__ = '0.4'
diff --git a/gstatsd/service.py b/gstatsd/service.py
index ee9ef99..ae009ea 100644
--- a/gstatsd/service.py
+++ b/gstatsd/service.py
@@ -12,6 +12,7 @@ import traceback
from collections import defaultdict
# local
+import sink
from core import __version__
# vendor
@@ -27,6 +28,9 @@ MAX_PACKET = 2048
DESCRIPTION = '''
A statsd service in Python + gevent.
'''
+EPILOG = '''
+
+'''
# table to remove invalid characters from keys
ALL_ASCII = set(chr(c) for c in range(256))
@@ -36,10 +40,16 @@ KEY_DELETIONS = ''.join(ALL_ASCII.difference(KEY_VALID + '/'))
# error messages
E_BADADDR = 'invalid bind address specified %r'
-E_BADTARGET = 'invalid target specified %r'
-E_BADBACKEND = 'invalid backend specified %r'
-E_NOTARGETS = 'you must specify at least one stats destination'
-E_SENDFAIL = 'failed to send stats to %s %s: %s'
+E_NOSINKS = 'you must specify at least one stats sink'
+
+
+class Stats(object):
+
+ def __init__(self):
+ self.timers = defaultdict(list)
+ self.counts = defaultdict(float)
+ self.percent = PERCENT
+ self.interval = INTERVAL
def daemonize(umask=0027):
@@ -83,41 +93,49 @@ class StatsDaemon(object):
A statsd service implementation in Python + gevent.
"""
- def __init__(self, bindaddr, targets, interval, percent, debug=0):
- # parse and validate
+ def __init__(self, bindaddr, sinkspecs, interval, percent, debug=0):
_, host, port = parse_addr(bindaddr)
if port is None:
- self._exit(E_BADADDR % bindaddr)
+ self.exit(E_BADADDR % bindaddr)
self._bindaddr = (host, port)
- # parse backend targets
- if not targets:
- self._exit(E_NOTARGETS)
- self._targets = []
- for target in targets:
- backend, host, port = parse_addr(target)
- if backend is None:
- backend = 'graphite'
- if port is None:
- self._exit(E_BADTARGET % target)
- func = getattr(self, '_send_%s' % backend, None)
- if not func:
- self._exit(E_BADBACKEND % backend)
- self._targets.append((func, (host, port)))
+ # TODO: generalize to support more than one sink type. currently
+ # only the graphite backend is present, but we may want to write
+ # stats to hbase, redis, etc. - ph
+
+ # construct the sink and add hosts to it
+ if not sinkspecs:
+ self.exit(E_NOSINKS)
+ self._sink = sink.GraphiteSink()
+ errors = []
+ for spec in sinkspecs:
+ try:
+ self._sink.add(spec)
+ except ValueError, ex:
+ errors.append(ex)
+ if errors:
+ for err in errors:
+ self.error(str(err))
+ self.exit('exiting.')
- self._interval = float(interval)
self._percent = float(percent)
+ self._interval = float(interval)
self._debug = debug
- self._timers = defaultdict(list)
- self._counts = defaultdict(float)
self._sock = None
self._flush_task = None
- def _exit(self, msg, code=1):
- self._error(msg)
+ self._reset_stats()
+
+ def _reset_stats(self):
+ self._stats = Stats()
+ self._stats.percent = self._percent
+ self._stats.interval = self._interval
+
+ def exit(self, msg, code=1):
+ self.error(msg)
sys.exit(code)
- def _error(self, msg):
+ def error(self, msg):
sys.stderr.write(msg + '\n')
def start(self):
@@ -128,12 +146,19 @@ class StatsDaemon(object):
# spawn the flush trigger
def _flush_impl():
while 1:
- gevent.sleep(self._interval)
- for func, hostport in self._targets:
- try:
- func(hostport)
- except Exception, ex:
- self._error(traceback.format_tb(sys.exc_info()[-1]))
+ gevent.sleep(self._stats.interval)
+
+ # rotate stats
+ stats = self._stats
+ self._reset_stats()
+
+ # send the stats to the sink which in turn broadcasts
+ # the stats packet to one or more hosts.
+ try:
+ self._sink.send(stats)
+ except Exception, ex:
+ trace = traceback.format_tb(sys.exc_info()[-1])
+ self.error(''.join(trace))
self._flush_task = gevent.spawn(_flush_impl)
@@ -145,19 +170,22 @@ class StatsDaemon(object):
try:
self._process(*self._sock.recvfrom(MAX_PACKET))
except Exception, ex:
- self._error(str(ex))
+ self.error(str(ex))
def _shutdown(self):
"Shutdown the server"
- self._exit("service exiting", code=0)
+ self.exit("service exiting", code=0)
def _process(self, data, _):
- "Process a single packet"
+ "Process a single packet and update the internal tables."
parts = data.split(':')
if self._debug:
- self._error('packet: %r' % data)
+ self.error('packet: %r' % data)
if not parts:
return
+
+ # interpret the packet and update stats
+ stats = self._stats
key = parts[0].translate(KEY_TABLE, KEY_DELETIONS)
for part in parts[1:]:
srate = 1.0
@@ -170,106 +198,45 @@ class StatsDaemon(object):
# timer (milliseconds)
if stype == 'ms':
- self._timers[key].append(float(value if value else 0))
+ stats.timers[key].append(float(value if value else 0))
# counter with optional sample rate
elif stype == 'c':
if length == 3 and fields[2].startswith('@'):
srate = float(fields[2][1:])
value = float(value if value else 1) * (1 / srate)
- self._counts[key] += value
-
- def _send_graphite(self, dest):
- "Send blob of stats data to graphite server"
- buf = cStringIO.StringIO()
- now = int(time.time())
- num_stats = 0
-
- # timer stats
- pct = self._percent
- timers = self._timers
- for key, vals in timers.iteritems():
- if not vals:
- continue
-
- # compute statistics
- num = len(vals)
- vals = sorted(vals)
- vmin = vals[0]
- vmax = vals[-1]
- mean = vmin
- max_at_thresh = vmax
- if num > 1:
- idx = round((pct / 100.0) * num)
- tmp = vals[:int(idx)]
- if tmp:
- max_at_thresh = tmp[-1]
- mean = sum(tmp) / idx
-
- key = 'stats.timers.%s' % key
- buf.write('%s.mean %f %d\n' % (key, mean, now))
- buf.write('%s.upper %f %d\n' % (key, vmax, now))
- buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
- buf.write('%s.lower %f %d\n' % (key, vmin, now))
- buf.write('%s.count %d %d\n' % (key, num, now))
- num_stats += 1
-
- # counter stats
- counts = self._counts
- for key, val in counts.iteritems():
- buf.write('stats.%s %f %d\n' % (key, val / self._interval, now))
- buf.write('stats_counts.%s %f %d\n' % (key, val, now))
- num_stats += 1
-
- buf.write('statsd.numStats %d %d\n' % (num_stats, now))
-
- # reset
- self._timers = defaultdict(list)
- self._counts = defaultdict(float)
- del timers
- del counts
-
- # XXX: add support for N retries
-
- # flush stats to graphite
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(dest)
- sock.sendall(buf.getvalue())
- sock.close()
- except Exception, ex:
- self._error(E_SENDFAIL % ('graphite', dest, ex))
+ stats.counts[key] += value
def main():
- opts = optparse.OptionParser(description=DESCRIPTION, version=__version__)
+ opts = optparse.OptionParser(description=DESCRIPTION, version=__version__,
+ add_help_option=False)
opts.add_option('-b', '--bind', dest='bind_addr', default=':8125',
help="bind [host]:port (host defaults to '')")
- opts.add_option('-d', '--dest', dest='dest_addr', action='append',
- default=[],
- help="receiver [backend:]host:port (backend defaults to 'graphite')")
+ opts.add_option('-s', '--sink', dest='sink', action='append', default=[],
+ help="a graphite service to which stats are sent ([host]:port).")
opts.add_option('-v', dest='verbose', action='count', default=0,
help="increase verbosity (currently used for debugging)")
opts.add_option('-f', '--flush', dest='interval', default=INTERVAL,
help="flush interval, in seconds (default 10)")
opts.add_option('-p', '--percent', dest='percent', default=PERCENT,
help="percent threshold (default 90)")
- opts.add_option('-l', '--list', dest='list_backends', action='store_true',
- help="list supported backends")
opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
help='daemonize the service')
+ opts.add_option('-h', '--help', dest='usage', action='store_true')
(options, args) = opts.parse_args()
- if options.list_backends:
- for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]:
- print(key[6:])
+ if options.usage:
+ # TODO: write epilog. usage is manually output since optparse will
+ # wrap the epilog and we want pre-formatted output. - ph
+ print(opts.format_help())
sys.exit()
if options.daemonize:
daemonize()
- sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval,
+ sd = StatsDaemon(options.bind_addr, options.sink, options.interval,
options.percent, options.verbose)
sd.start()
diff --git a/gstatsd/service_test.py b/gstatsd/service_test.py
index 1b6bc2d..6f46ff8 100644
--- a/gstatsd/service_test.py
+++ b/gstatsd/service_test.py
@@ -9,54 +9,56 @@ from gstatsd import service
class StatsServiceTest(unittest.TestCase):
def setUp(self):
- args = (':8125', [':9100'], 5, 90, 0)
+ args = (':8125', [':2003'], 5, 90, 0)
self.svc = service.StatsDaemon(*args)
+ self.stats = self.svc._stats
def test_construct(self):
- svc = service.StatsDaemon('8125', ['9100'], 5, 90, 0)
+ svc = service.StatsDaemon('8125', ['2003'], 5, 90, 0)
+ stats = svc._stats
self.assertEquals(svc._bindaddr, ('', 8125))
self.assertEquals(svc._interval, 5.0)
- self.assertEquals(svc._percent, 90.0)
self.assertEquals(svc._debug, 0)
- self.assertEquals(svc._targets[0], (svc._send_graphite, ('', 9100)))
+ self.assertEquals(stats.percent, 90.0)
+ self.assertEquals(svc._sink._hosts, [('', 2003)])
- svc = service.StatsDaemon('bar:8125', ['foo:9100'], 5, 90, 1)
+ svc = service.StatsDaemon('bar:8125', ['foo:2003'], 5, 90, 1)
self.assertEquals(svc._bindaddr, ('bar', 8125))
- self.assertEquals(svc._targets[0], (svc._send_graphite, ('foo', 9100)))
+ self.assertEquals(svc._sink._hosts, [('foo', 2003)])
self.assertEquals(svc._debug, 1)
def test_backend(self):
service.StatsDaemon._send_foo = lambda self, x, y: None
- svc = service.StatsDaemon('8125', ['foo:bar:9100'], 5, 90, 0)
- self.assertEquals(svc._targets[0], (svc._send_foo, ('bar', 9100)))
+ svc = service.StatsDaemon('8125', ['bar:2003'], 5, 90, 0)
+ self.assertEquals(svc._sink._hosts, [('bar', 2003)])
def test_counters(self):
pkt = 'foo:1|c'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._counts, {'foo': 1})
+ self.assertEquals(self.stats.counts, {'foo': 1})
self.svc._process(pkt, None)
- self.assertEquals(self.svc._counts, {'foo': 2})
+ self.assertEquals(self.stats.counts, {'foo': 2})
pkt = 'foo:-1|c'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._counts, {'foo': 1})
+ self.assertEquals(self.stats.counts, {'foo': 1})
def test_counters_sampled(self):
pkt = 'foo:1|c|@.5'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._counts, {'foo': 2})
+ self.assertEquals(self.stats.counts, {'foo': 2})
def test_timers(self):
pkt = 'foo:20|ms'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._timers, {'foo': [20.0]})
+ self.assertEquals(self.stats.timers, {'foo': [20.0]})
pkt = 'foo:10|ms'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._timers, {'foo': [20.0, 10.0]})
+ self.assertEquals(self.stats.timers, {'foo': [20.0, 10.0]})
def test_key_sanitize(self):
pkt = '\t\n#! foo . bar \0 ^:1|c'
self.svc._process(pkt, None)
- self.assertEquals(self.svc._counts, {'foo.bar': 1})
+ self.assertEquals(self.stats.counts, {'foo.bar': 1})
def main():
diff --git a/gstatsd/sink.py b/gstatsd/sink.py
new file mode 100644
index 0000000..588c9d3
--- /dev/null
+++ b/gstatsd/sink.py
@@ -0,0 +1,102 @@
+
+# standard
+import cStringIO
+import sys
+import time
+
+# vendor
+from gevent import socket
+
+E_BADSPEC = "bad sink spec %r: %s"
+E_SENDFAIL = 'failed to send stats to %s %s: %s'
+
+
+class Sink(object):
+
+ """
+ A resource to which stats will be sent.
+ """
+
+ def error(self, msg):
+ sys.stderr.write(msg + '\n')
+
+ def _parse_hostport(self, spec):
+ try:
+ parts = spec.split(':')
+ if len(parts) == 2:
+ return (parts[0], int(parts[1]))
+ if len(parts) == 1:
+ return ('', int(parts[0]))
+ except ValueError, ex:
+ raise ValueError(E_BADSPEC % (spec, ex))
+ raise ValueError("expected '[host]:port' but got %r" % spec)
+
+
+class GraphiteSink(Sink):
+
+ """
+ Sends stats to one or more Graphite servers.
+ """
+
+ def __init__(self):
+ self._hosts = []
+
+ def add(self, spec):
+ self._hosts.append(self._parse_hostport(spec))
+
+ def send(self, stats):
+ "Format stats and send to one or more Graphite hosts"
+ buf = cStringIO.StringIO()
+ now = int(time.time())
+ num_stats = 0
+
+ # timer stats
+ pct = stats.percent
+ timers = stats.timers
+ for key, vals in timers.iteritems():
+ if not vals:
+ continue
+
+ # compute statistics
+ num = len(vals)
+ vals = sorted(vals)
+ vmin = vals[0]
+ vmax = vals[-1]
+ mean = vmin
+ max_at_thresh = vmax
+ if num > 1:
+ idx = round((pct / 100.0) * num)
+ tmp = vals[:int(idx)]
+ if tmp:
+ max_at_thresh = tmp[-1]
+ mean = sum(tmp) / idx
+
+ key = 'stats.timers.%s' % key
+ buf.write('%s.mean %f %d\n' % (key, mean, now))
+ buf.write('%s.upper %f %d\n' % (key, vmax, now))
+ buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
+ buf.write('%s.lower %f %d\n' % (key, vmin, now))
+ buf.write('%s.count %d %d\n' % (key, num, now))
+ num_stats += 1
+
+ # counter stats
+ counts = stats.counts
+ for key, val in counts.iteritems():
+ buf.write('stats.%s %f %d\n' % (key, val / stats.interval, now))
+ buf.write('stats_counts.%s %f %d\n' % (key, val, now))
+ num_stats += 1
+
+ buf.write('statsd.numStats %d %d\n' % (num_stats, now))
+
+ # TODO: add support for N retries
+
+ for host in self._hosts:
+ # flush stats to graphite
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(host)
+ sock.sendall(buf.getvalue())
+ sock.close()
+ except Exception, ex:
+ self.error(E_SENDFAIL % ('graphite', host, ex))
+
diff --git a/makedoc.sh b/makedoc.sh
new file mode 100755
index 0000000..118b271
--- /dev/null
+++ b/makedoc.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+pandoc -f markdown -t rst -o README.txt README.md
+