summaryrefslogtreecommitdiffstats
path: root/gstatsd/sink.py
diff options
context:
space:
mode:
authorPatrick Hensley <spaceboy@indirect.com>2011-06-30 14:22:50 (GMT)
committerPatrick Hensley <spaceboy@indirect.com>2011-06-30 21:20:53 (GMT)
commit2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc (patch)
treedcdc419ab690c1399e612866dcdfa6c7e5a4d0b4 /gstatsd/sink.py
parente83d53f948df502b14119e4faf6dac88806c21ba (diff)
downloadgstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.zip
gstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.tar.gz
gstatsd-2b8948ae0a92ab447ae78ded8e7e1eebe6c4d3fc.tar.bz2
Release 0.40.4
* Generalized the stats formatting and delivery into GraphiteSink class. Move the stats tables (timers/counts) into a Stats class. * Updated README files, added script to derive README.txt from README.md, changed '-d / --dest' option to '-s / --sink'.
Diffstat (limited to 'gstatsd/sink.py')
-rw-r--r--gstatsd/sink.py102
1 files changed, 102 insertions, 0 deletions
diff --git a/gstatsd/sink.py b/gstatsd/sink.py
new file mode 100644
index 0000000..588c9d3
--- /dev/null
+++ b/gstatsd/sink.py
@@ -0,0 +1,102 @@
+
+# standard
+import cStringIO
+import sys
+import time
+
+# vendor
+from gevent import socket
+
+E_BADSPEC = "bad sink spec %r: %s"
+E_SENDFAIL = 'failed to send stats to %s %s: %s'
+
+
+class Sink(object):
+
+ """
+ A resource to which stats will be sent.
+ """
+
+ def error(self, msg):
+ sys.stderr.write(msg + '\n')
+
+ def _parse_hostport(self, spec):
+ try:
+ parts = spec.split(':')
+ if len(parts) == 2:
+ return (parts[0], int(parts[1]))
+ if len(parts) == 1:
+ return ('', int(parts[0]))
+ except ValueError, ex:
+ raise ValueError(E_BADSPEC % (spec, ex))
+ raise ValueError("expected '[host]:port' but got %r" % spec)
+
+
+class GraphiteSink(Sink):
+
+ """
+ Sends stats to one or more Graphite servers.
+ """
+
+ def __init__(self):
+ self._hosts = []
+
+ def add(self, spec):
+ self._hosts.append(self._parse_hostport(spec))
+
+ def send(self, stats):
+ "Format stats and send to one or more Graphite hosts"
+ buf = cStringIO.StringIO()
+ now = int(time.time())
+ num_stats = 0
+
+ # timer stats
+ pct = stats.percent
+ timers = stats.timers
+ for key, vals in timers.iteritems():
+ if not vals:
+ continue
+
+ # compute statistics
+ num = len(vals)
+ vals = sorted(vals)
+ vmin = vals[0]
+ vmax = vals[-1]
+ mean = vmin
+ max_at_thresh = vmax
+ if num > 1:
+ idx = round((pct / 100.0) * num)
+ tmp = vals[:int(idx)]
+ if tmp:
+ max_at_thresh = tmp[-1]
+ mean = sum(tmp) / idx
+
+ key = 'stats.timers.%s' % key
+ buf.write('%s.mean %f %d\n' % (key, mean, now))
+ buf.write('%s.upper %f %d\n' % (key, vmax, now))
+ buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
+ buf.write('%s.lower %f %d\n' % (key, vmin, now))
+ buf.write('%s.count %d %d\n' % (key, num, now))
+ num_stats += 1
+
+ # counter stats
+ counts = stats.counts
+ for key, val in counts.iteritems():
+ buf.write('stats.%s %f %d\n' % (key, val / stats.interval, now))
+ buf.write('stats_counts.%s %f %d\n' % (key, val, now))
+ num_stats += 1
+
+ buf.write('statsd.numStats %d %d\n' % (num_stats, now))
+
+ # TODO: add support for N retries
+
+ for host in self._hosts:
+ # flush stats to graphite
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(host)
+ sock.sendall(buf.getvalue())
+ sock.close()
+ except Exception, ex:
+ self.error(E_SENDFAIL % ('graphite', host, ex))
+