summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore5
-rw-r--r--README.md20
l---------README.txt1
-rw-r--r--gstatsd/__init__.py4
-rw-r--r--gstatsd/client.py43
-rw-r--r--gstatsd/core.py4
-rw-r--r--gstatsd/service.py276
-rw-r--r--gstatsd/test.py22
-rw-r--r--setup.py44
9 files changed, 419 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c69a9bb
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+*.pyc
+*.egg-info
+.coverage
+build
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..88b70b9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,20 @@
+
+gstatsd - A statsd service implementation in Python + gevent.
+
+License: Apache 2.0
+
+Usage:
+------
+
+Show gstatsd help:
+
+ % gstatsd -h
+
+Start gstatsd and send stats to port 9100 every 5 seconds:
+
+ % gstatsd -d :9100 -f 5
+
+Bind listener to host 'hostname' port 8126:
+
+ % gstatsd -b hostname:8126 -d :9100 -f 5
+
diff --git a/README.txt b/README.txt
new file mode 120000
index 0000000..42061c0
--- /dev/null
+++ b/README.txt
@@ -0,0 +1 @@
+README.md \ No newline at end of file
diff --git a/gstatsd/__init__.py b/gstatsd/__init__.py
new file mode 100644
index 0000000..bb2c6f9
--- /dev/null
+++ b/gstatsd/__init__.py
@@ -0,0 +1,4 @@
+
+from core import __version__
+
+
diff --git a/gstatsd/client.py b/gstatsd/client.py
new file mode 100644
index 0000000..5d5f34f
--- /dev/null
+++ b/gstatsd/client.py
@@ -0,0 +1,43 @@
+
+
+import random
+import socket
+
+
+class StatsClient(object):
+
+ "Simple client to exercise the statsd server."
+
+ HOSTPORT = ('', 8125)
+
+ def __init__(self, hostport=None):
+ if hostport is None:
+ hostport = StatsClient.HOSTPORT
+ self._hostport = hostport
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+ def timer(self, key, timestamp, sample_rate=1):
+ self._send('%s:%d|ms' % (key, timestamp), sample_rate)
+
+ def increment(self, key, sample_rate=1):
+ return self.counter(key, 1, sample_rate)
+
+ def decrement(self, key, sample_rate=1):
+ return self.counter(key, -1, sample_rate)
+
+ def counter(self, keys, magnitude=1, sample_rate=1):
+ if not isinstance(keys, (list, tuple)):
+ keys = [keys]
+ for key in keys:
+ self._send('%s:%s|c' % (key, magnitude), sample_rate)
+
+ def _send(self, data, sample_rate=1):
+ packet = None
+ if sample_rate < 1.0:
+ if random.random() < sample_rate:
+ packet = data + '|@%s' % sample_rate
+ else:
+ packet = data
+ if packet:
+ self._sock.sendto(packet, self._hostport)
+
diff --git a/gstatsd/core.py b/gstatsd/core.py
new file mode 100644
index 0000000..0b9ed6e
--- /dev/null
+++ b/gstatsd/core.py
@@ -0,0 +1,4 @@
+
+
+__version__ = '0.1'
+
diff --git a/gstatsd/service.py b/gstatsd/service.py
new file mode 100644
index 0000000..c4ef565
--- /dev/null
+++ b/gstatsd/service.py
@@ -0,0 +1,276 @@
+
+# standard
+import cStringIO
+import optparse
+import os
+import resource
+import signal
+import string
+import sys
+import time
+import traceback
+from collections import defaultdict
+
+# local
+from core import __version__
+
+# vendor
+import gevent, gevent.socket
+socket = gevent.socket
+
+
+# constants
+INTERVAL = 10.0
+PERCENT = 90.0
+MAX_PACKET = 2048
+
+DESCRIPTION = '''
+A statsd service in Python + gevent.
+'''
+
+# table to remove invalid characters from keys
+KEY_VALIDCHARS = string.uppercase + string.lowercase + string.digits + '_-.'
+KEY_SANITIZE = string.maketrans(KEY_VALIDCHARS + '/', KEY_VALIDCHARS + '_')
+
+# error messages
+E_BADADDR = 'invalid bind address specified %r'
+E_BADTARGET = 'invalid target specified %r'
+E_BADBACKEND = 'invalid backend specified %r'
+E_NOTARGETS = 'you must specify at least one stats destination'
+E_SENDFAIL = 'failed to send stats to %s %s: %s'
+
+
+def daemonize(umask=0027):
+ if gevent.fork():
+ os._exit(0)
+ os.setsid()
+ if gevent.fork():
+ os._exit(0)
+ os.umask(umask)
+ fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+ if fd_limit == resource.RLIM_INFINITY:
+ fd_limit = 1024
+ for fd in xrange(0, fd_limit):
+ try:
+ os.close(fd)
+ except:
+ pass
+ os.open(os.devnull, os.O_RDWR)
+ os.dup2(0, 1)
+ os.dup2(0, 2)
+ gevent.reinit()
+
+
+def parse_addr(text):
+ "Parse a 1- to 3-part address spec."
+ if text:
+ parts = text.split(':')
+ length = len(parts)
+ if length== 3:
+ return parts[0], parts[1], int(parts[2])
+ elif length == 2:
+ return None, parts[0], int(parts[1])
+ elif length == 1:
+ return None, '', int(parts[0])
+ return None, None, None
+
+
+class StatsDaemon(object):
+
+ """
+ A statsd service implementation in Python + gevent.
+ """
+
+ def __init__(self, bindaddr, targets, interval, percent, debug=0):
+ # parse and validate
+ _, host, port = parse_addr(bindaddr)
+ if port is None:
+ self._exit(E_BADADDR % bindaddr)
+ self._bindaddr = (host, port)
+
+ # parse backend targets
+ if not targets:
+ self._exit(E_NOTARGETS)
+ self._targets = []
+ for target in targets:
+ backend, host, port = parse_addr(target)
+ if backend is None:
+ backend = 'graphite'
+ if port is None:
+ self._exit(E_BADTARGET % target)
+ func = getattr(self, '_send_%s' % backend, None)
+ if not func:
+ self._exit(E_BADBACKEND % backend)
+ self._targets.append((func, (host, port)))
+
+ self._interval = float(interval)
+ self._percent = float(percent)
+ self._debug = debug
+ self._timers = defaultdict(list)
+ self._counts = defaultdict(float)
+ self._sock = None
+ self._flush_task = None
+
+ def _exit(self, msg, code=1):
+ self._error(msg)
+ sys.exit(code)
+
+ def _error(self, msg):
+ sys.stderr.write(msg + '\n')
+
+ def start(self):
+ "Start the service"
+ # register signals
+ gevent.signal(signal.SIGINT, self._shutdown)
+
+ # spawn the flush trigger
+ def _flush_impl():
+ while 1:
+ gevent.sleep(self._interval)
+ for func, hostport in self._targets:
+ try:
+ func(hostport)
+ except Exception, ex:
+ self._error(traceback.format_tb(sys.exc_info()[-1]))
+
+ self._flush_task = gevent.spawn(_flush_impl)
+
+ # start accepting connections
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
+ socket.IPPROTO_UDP)
+ self._sock.bind(self._bindaddr)
+ while 1:
+ try:
+ self._process(*self._sock.recvfrom(MAX_PACKET))
+ except Exception, ex:
+ self._error(str(ex))
+
+ def _shutdown(self):
+ "Shutdown the server"
+ self._exit("service exiting", code=0)
+
+ def _process(self, data, _):
+ "Process a single packet"
+ parts = data.split(':')
+ if self._debug:
+ self._error('packet: %r' % data)
+ if not parts:
+ return
+ key = parts[0].translate(KEY_SANITIZE, string.whitespace)
+ for part in parts[1:]:
+ srate = 1.0
+ fields = part.split('|')
+ length = len(fields)
+ if length < 2:
+ continue
+ value = fields[0]
+ stype = fields[1].strip()
+
+ # timer (milliseconds)
+ if stype == 'ms':
+ self._timers[key].append(float(value if value else 0))
+
+ # counter with optional sample rate
+ elif stype == 'c':
+ if length == 3 and fields[2].startswith('@'):
+ srate = float(fields[2][1:])
+ value = float(value if value else 1) * (1 / srate)
+ self._counts[key] += value
+
+ def _send_graphite(self, dest):
+ "Send blob of stats data to graphite server"
+ buf = cStringIO.StringIO()
+ now = int(time.time())
+ num_stats = 0
+
+ # timer stats
+ pct = self._percent
+ timers = self._timers
+ for key, vals in timers.iteritems():
+ if not vals:
+ continue
+
+ # compute statistics
+ num = len(vals)
+ vals = sorted(vals)
+ vmin = vals[0]
+ vmax = vals[-1]
+ mean = vmin
+ max_at_thresh = vmax
+ if num > 1:
+ idx = round((pct / 100.0) * num)
+ tmp = vals[:int(idx)]
+ if tmp:
+ max_at_thresh = tmp[-1]
+ mean = sum(tmp) / idx
+
+ key = 'stats.timers.%s' % key
+ buf.write('%s.mean %f %d\n' % (key, mean, now))
+ buf.write('%s.upper %f %d\n' % (key, vmax, now))
+ buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
+ buf.write('%s.lower %f %d\n' % (key, vmin, now))
+ buf.write('%s.count %d %d\n' % (key, num, now))
+ num_stats += 1
+
+ # counter stats
+ counts = self._counts
+ for key, val in counts.iteritems():
+ buf.write('stats.%s %f %d\n' % (key, val / self._interval, now))
+ buf.write('stats_counts.%s %f %d\n' % (key, val, now))
+ num_stats += 1
+
+ buf.write('statsd.numStats %d %d\n' % (num_stats, now))
+
+ # reset
+ self._timers = defaultdict(list)
+ self._counts = defaultdict(float)
+ del timers
+ del counts
+
+ # XXX: add support for N retries
+
+ # flush stats to graphite
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(dest)
+ sock.sendall(buf.getvalue())
+ sock.close()
+ except Exception, ex:
+ self._error(E_SENDFAIL % ('graphite', dest, ex))
+
+
+def main():
+ opts = optparse.OptionParser(description=DESCRIPTION, version=__version__)
+ opts.add_option('-b', '--bind', dest='bind_addr', default=':8125',
+ help="bind [host]:port (host defaults to '')")
+ opts.add_option('-d', '--dest', dest='dest_addr', action='append',
+ default=[],
+ help="receiver [backend:]host:port (backend defaults to 'graphite')")
+ opts.add_option('-v', dest='verbose', action='count', default=0)
+ opts.add_option('-f', '--flush', dest='interval', default=INTERVAL,
+ help="flush interval, in seconds")
+ opts.add_option('-p', '--percent', dest='percent', default=PERCENT,
+ help="percent threshold")
+ opts.add_option('-l', '--list', dest='list_backends', action='store_true',
+ help="list supported backends")
+ opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
+ help='daemonize the service')
+
+ (options, args) = opts.parse_args()
+
+ if options.list_backends:
+ for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]:
+ print(key[6:])
+ sys.exit()
+
+ if options.daemonize:
+ daemonize()
+
+ sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval,
+ options.percent, options.verbose)
+ sd.start()
+
+
+if __name__ == '__main__':
+ main()
+
diff --git a/gstatsd/test.py b/gstatsd/test.py
new file mode 100644
index 0000000..824a323
--- /dev/null
+++ b/gstatsd/test.py
@@ -0,0 +1,22 @@
+
+from client import StatsClient
+
+
+def main():
+ cli = StatsClient()
+ for num in range(1, 11):
+ cli.timer('foo', num)
+ return
+ cli.increment('baz', 0.5)
+ cli.increment('baz', 0.5)
+ cli.timer('t3', 100, 0.5)
+ return
+
+ cli.increment('foo')
+ cli.counter(['foo', 'bar'], 2)
+ cli.timer('t1', 12)
+ cli.timer('t2', 30)
+
+if __name__ == '__main__':
+ main()
+
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..b615b2d
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,44 @@
+
+import os
+from setuptools import setup
+
+from gstatsd import __version__
+
+
+def main():
+ cwd = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(cwd, 'README.txt')
+ readme = open(path, 'rb').read()
+
+ setup(
+ name = 'gstatsd',
+ version = __version__,
+ description = 'A statsd service and client in Python + gevent',
+ license = 'Apache 2.0',
+ author = 'Patrick Hensley',
+ author_email = 'spaceboy@indirect.com',
+ keywords = ['stats', 'graphite', 'statsd', 'gevent'],
+ url = 'http://github.com/phensley/gstatsd',
+ packages = ['gstatsd'],
+ entry_points = {
+ 'console_scripts': ['gstatsd=gstatsd.service:main']
+ },
+ classifiers = [
+ "Development Status :: 2 - Pre-Alpha",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache 2.0",
+ "Operating System :: MacOS :: MacOS X",
+ "Operating System :: POSIX :: Linux",
+ "Operating System :: Unix",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ long_description = readme
+ )
+
+
+if __name__ == '__main__':
+ main()
+