Initial release of gstatsd 0.1.
This commit is contained in:
commit
29bb10a1b7
|
@ -0,0 +1,5 @@
|
||||||
|
*.pyc
|
||||||
|
*.egg-info
|
||||||
|
.coverage
|
||||||
|
build
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
|
||||||
|
gstatsd - A statsd service implementation in Python + gevent.
|
||||||
|
|
||||||
|
License: Apache 2.0
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
------
|
||||||
|
|
||||||
|
Show gstatsd help:
|
||||||
|
|
||||||
|
% gstatsd -h
|
||||||
|
|
||||||
|
Start gstatsd and send stats to port 9100 every 5 seconds:
|
||||||
|
|
||||||
|
% gstatsd -d :9100 -f 5
|
||||||
|
|
||||||
|
Bind listener to host 'hostname' port 8126:
|
||||||
|
|
||||||
|
% gstatsd -b hostname:8126 -d :9100 -f 5
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
README.md
|
|
@ -0,0 +1,4 @@
|
||||||
|
|
||||||
|
from core import __version__
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
|
||||||
|
|
||||||
|
import random
|
||||||
|
import socket
|
||||||
|
|
||||||
|
|
||||||
|
class StatsClient(object):
|
||||||
|
|
||||||
|
"Simple client to exercise the statsd server."
|
||||||
|
|
||||||
|
HOSTPORT = ('', 8125)
|
||||||
|
|
||||||
|
def __init__(self, hostport=None):
|
||||||
|
if hostport is None:
|
||||||
|
hostport = StatsClient.HOSTPORT
|
||||||
|
self._hostport = hostport
|
||||||
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
def timer(self, key, timestamp, sample_rate=1):
|
||||||
|
self._send('%s:%d|ms' % (key, timestamp), sample_rate)
|
||||||
|
|
||||||
|
def increment(self, key, sample_rate=1):
|
||||||
|
return self.counter(key, 1, sample_rate)
|
||||||
|
|
||||||
|
def decrement(self, key, sample_rate=1):
|
||||||
|
return self.counter(key, -1, sample_rate)
|
||||||
|
|
||||||
|
def counter(self, keys, magnitude=1, sample_rate=1):
|
||||||
|
if not isinstance(keys, (list, tuple)):
|
||||||
|
keys = [keys]
|
||||||
|
for key in keys:
|
||||||
|
self._send('%s:%s|c' % (key, magnitude), sample_rate)
|
||||||
|
|
||||||
|
def _send(self, data, sample_rate=1):
|
||||||
|
packet = None
|
||||||
|
if sample_rate < 1.0:
|
||||||
|
if random.random() < sample_rate:
|
||||||
|
packet = data + '|@%s' % sample_rate
|
||||||
|
else:
|
||||||
|
packet = data
|
||||||
|
if packet:
|
||||||
|
self._sock.sendto(packet, self._hostport)
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
|
||||||
|
|
||||||
|
__version__ = '0.1'
|
||||||
|
|
|
@ -0,0 +1,276 @@
|
||||||
|
|
||||||
|
# standard
|
||||||
|
import cStringIO
|
||||||
|
import optparse
|
||||||
|
import os
|
||||||
|
import resource
|
||||||
|
import signal
|
||||||
|
import string
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
# local
|
||||||
|
from core import __version__
|
||||||
|
|
||||||
|
# vendor
|
||||||
|
import gevent, gevent.socket
|
||||||
|
socket = gevent.socket
|
||||||
|
|
||||||
|
|
||||||
|
# constants
|
||||||
|
INTERVAL = 10.0
|
||||||
|
PERCENT = 90.0
|
||||||
|
MAX_PACKET = 2048
|
||||||
|
|
||||||
|
DESCRIPTION = '''
|
||||||
|
A statsd service in Python + gevent.
|
||||||
|
'''
|
||||||
|
|
||||||
|
# table to remove invalid characters from keys
|
||||||
|
KEY_VALIDCHARS = string.uppercase + string.lowercase + string.digits + '_-.'
|
||||||
|
KEY_SANITIZE = string.maketrans(KEY_VALIDCHARS + '/', KEY_VALIDCHARS + '_')
|
||||||
|
|
||||||
|
# error messages
|
||||||
|
E_BADADDR = 'invalid bind address specified %r'
|
||||||
|
E_BADTARGET = 'invalid target specified %r'
|
||||||
|
E_BADBACKEND = 'invalid backend specified %r'
|
||||||
|
E_NOTARGETS = 'you must specify at least one stats destination'
|
||||||
|
E_SENDFAIL = 'failed to send stats to %s %s: %s'
|
||||||
|
|
||||||
|
|
||||||
|
def daemonize(umask=0027):
|
||||||
|
if gevent.fork():
|
||||||
|
os._exit(0)
|
||||||
|
os.setsid()
|
||||||
|
if gevent.fork():
|
||||||
|
os._exit(0)
|
||||||
|
os.umask(umask)
|
||||||
|
fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
|
||||||
|
if fd_limit == resource.RLIM_INFINITY:
|
||||||
|
fd_limit = 1024
|
||||||
|
for fd in xrange(0, fd_limit):
|
||||||
|
try:
|
||||||
|
os.close(fd)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
os.open(os.devnull, os.O_RDWR)
|
||||||
|
os.dup2(0, 1)
|
||||||
|
os.dup2(0, 2)
|
||||||
|
gevent.reinit()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_addr(text):
|
||||||
|
"Parse a 1- to 3-part address spec."
|
||||||
|
if text:
|
||||||
|
parts = text.split(':')
|
||||||
|
length = len(parts)
|
||||||
|
if length== 3:
|
||||||
|
return parts[0], parts[1], int(parts[2])
|
||||||
|
elif length == 2:
|
||||||
|
return None, parts[0], int(parts[1])
|
||||||
|
elif length == 1:
|
||||||
|
return None, '', int(parts[0])
|
||||||
|
return None, None, None
|
||||||
|
|
||||||
|
|
||||||
|
class StatsDaemon(object):
|
||||||
|
|
||||||
|
"""
|
||||||
|
A statsd service implementation in Python + gevent.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, bindaddr, targets, interval, percent, debug=0):
|
||||||
|
# parse and validate
|
||||||
|
_, host, port = parse_addr(bindaddr)
|
||||||
|
if port is None:
|
||||||
|
self._exit(E_BADADDR % bindaddr)
|
||||||
|
self._bindaddr = (host, port)
|
||||||
|
|
||||||
|
# parse backend targets
|
||||||
|
if not targets:
|
||||||
|
self._exit(E_NOTARGETS)
|
||||||
|
self._targets = []
|
||||||
|
for target in targets:
|
||||||
|
backend, host, port = parse_addr(target)
|
||||||
|
if backend is None:
|
||||||
|
backend = 'graphite'
|
||||||
|
if port is None:
|
||||||
|
self._exit(E_BADTARGET % target)
|
||||||
|
func = getattr(self, '_send_%s' % backend, None)
|
||||||
|
if not func:
|
||||||
|
self._exit(E_BADBACKEND % backend)
|
||||||
|
self._targets.append((func, (host, port)))
|
||||||
|
|
||||||
|
self._interval = float(interval)
|
||||||
|
self._percent = float(percent)
|
||||||
|
self._debug = debug
|
||||||
|
self._timers = defaultdict(list)
|
||||||
|
self._counts = defaultdict(float)
|
||||||
|
self._sock = None
|
||||||
|
self._flush_task = None
|
||||||
|
|
||||||
|
def _exit(self, msg, code=1):
|
||||||
|
self._error(msg)
|
||||||
|
sys.exit(code)
|
||||||
|
|
||||||
|
def _error(self, msg):
|
||||||
|
sys.stderr.write(msg + '\n')
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"Start the service"
|
||||||
|
# register signals
|
||||||
|
gevent.signal(signal.SIGINT, self._shutdown)
|
||||||
|
|
||||||
|
# spawn the flush trigger
|
||||||
|
def _flush_impl():
|
||||||
|
while 1:
|
||||||
|
gevent.sleep(self._interval)
|
||||||
|
for func, hostport in self._targets:
|
||||||
|
try:
|
||||||
|
func(hostport)
|
||||||
|
except Exception, ex:
|
||||||
|
self._error(traceback.format_tb(sys.exc_info()[-1]))
|
||||||
|
|
||||||
|
self._flush_task = gevent.spawn(_flush_impl)
|
||||||
|
|
||||||
|
# start accepting connections
|
||||||
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
|
||||||
|
socket.IPPROTO_UDP)
|
||||||
|
self._sock.bind(self._bindaddr)
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
self._process(*self._sock.recvfrom(MAX_PACKET))
|
||||||
|
except Exception, ex:
|
||||||
|
self._error(str(ex))
|
||||||
|
|
||||||
|
def _shutdown(self):
|
||||||
|
"Shutdown the server"
|
||||||
|
self._exit("service exiting", code=0)
|
||||||
|
|
||||||
|
def _process(self, data, _):
|
||||||
|
"Process a single packet"
|
||||||
|
parts = data.split(':')
|
||||||
|
if self._debug:
|
||||||
|
self._error('packet: %r' % data)
|
||||||
|
if not parts:
|
||||||
|
return
|
||||||
|
key = parts[0].translate(KEY_SANITIZE, string.whitespace)
|
||||||
|
for part in parts[1:]:
|
||||||
|
srate = 1.0
|
||||||
|
fields = part.split('|')
|
||||||
|
length = len(fields)
|
||||||
|
if length < 2:
|
||||||
|
continue
|
||||||
|
value = fields[0]
|
||||||
|
stype = fields[1].strip()
|
||||||
|
|
||||||
|
# timer (milliseconds)
|
||||||
|
if stype == 'ms':
|
||||||
|
self._timers[key].append(float(value if value else 0))
|
||||||
|
|
||||||
|
# counter with optional sample rate
|
||||||
|
elif stype == 'c':
|
||||||
|
if length == 3 and fields[2].startswith('@'):
|
||||||
|
srate = float(fields[2][1:])
|
||||||
|
value = float(value if value else 1) * (1 / srate)
|
||||||
|
self._counts[key] += value
|
||||||
|
|
||||||
|
def _send_graphite(self, dest):
|
||||||
|
"Send blob of stats data to graphite server"
|
||||||
|
buf = cStringIO.StringIO()
|
||||||
|
now = int(time.time())
|
||||||
|
num_stats = 0
|
||||||
|
|
||||||
|
# timer stats
|
||||||
|
pct = self._percent
|
||||||
|
timers = self._timers
|
||||||
|
for key, vals in timers.iteritems():
|
||||||
|
if not vals:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# compute statistics
|
||||||
|
num = len(vals)
|
||||||
|
vals = sorted(vals)
|
||||||
|
vmin = vals[0]
|
||||||
|
vmax = vals[-1]
|
||||||
|
mean = vmin
|
||||||
|
max_at_thresh = vmax
|
||||||
|
if num > 1:
|
||||||
|
idx = round((pct / 100.0) * num)
|
||||||
|
tmp = vals[:int(idx)]
|
||||||
|
if tmp:
|
||||||
|
max_at_thresh = tmp[-1]
|
||||||
|
mean = sum(tmp) / idx
|
||||||
|
|
||||||
|
key = 'stats.timers.%s' % key
|
||||||
|
buf.write('%s.mean %f %d\n' % (key, mean, now))
|
||||||
|
buf.write('%s.upper %f %d\n' % (key, vmax, now))
|
||||||
|
buf.write('%s.upper_%d %f %d\n' % (key, pct, max_at_thresh, now))
|
||||||
|
buf.write('%s.lower %f %d\n' % (key, vmin, now))
|
||||||
|
buf.write('%s.count %d %d\n' % (key, num, now))
|
||||||
|
num_stats += 1
|
||||||
|
|
||||||
|
# counter stats
|
||||||
|
counts = self._counts
|
||||||
|
for key, val in counts.iteritems():
|
||||||
|
buf.write('stats.%s %f %d\n' % (key, val / self._interval, now))
|
||||||
|
buf.write('stats_counts.%s %f %d\n' % (key, val, now))
|
||||||
|
num_stats += 1
|
||||||
|
|
||||||
|
buf.write('statsd.numStats %d %d\n' % (num_stats, now))
|
||||||
|
|
||||||
|
# reset
|
||||||
|
self._timers = defaultdict(list)
|
||||||
|
self._counts = defaultdict(float)
|
||||||
|
del timers
|
||||||
|
del counts
|
||||||
|
|
||||||
|
# XXX: add support for N retries
|
||||||
|
|
||||||
|
# flush stats to graphite
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.connect(dest)
|
||||||
|
sock.sendall(buf.getvalue())
|
||||||
|
sock.close()
|
||||||
|
except Exception, ex:
|
||||||
|
self._error(E_SENDFAIL % ('graphite', dest, ex))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
opts = optparse.OptionParser(description=DESCRIPTION, version=__version__)
|
||||||
|
opts.add_option('-b', '--bind', dest='bind_addr', default=':8125',
|
||||||
|
help="bind [host]:port (host defaults to '')")
|
||||||
|
opts.add_option('-d', '--dest', dest='dest_addr', action='append',
|
||||||
|
default=[],
|
||||||
|
help="receiver [backend:]host:port (backend defaults to 'graphite')")
|
||||||
|
opts.add_option('-v', dest='verbose', action='count', default=0)
|
||||||
|
opts.add_option('-f', '--flush', dest='interval', default=INTERVAL,
|
||||||
|
help="flush interval, in seconds")
|
||||||
|
opts.add_option('-p', '--percent', dest='percent', default=PERCENT,
|
||||||
|
help="percent threshold")
|
||||||
|
opts.add_option('-l', '--list', dest='list_backends', action='store_true',
|
||||||
|
help="list supported backends")
|
||||||
|
opts.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
|
||||||
|
help='daemonize the service')
|
||||||
|
|
||||||
|
(options, args) = opts.parse_args()
|
||||||
|
|
||||||
|
if options.list_backends:
|
||||||
|
for key in [k for k in dir(StatsDaemon) if k.startswith('_send_')]:
|
||||||
|
print(key[6:])
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
if options.daemonize:
|
||||||
|
daemonize()
|
||||||
|
|
||||||
|
sd = StatsDaemon(options.bind_addr, options.dest_addr, options.interval,
|
||||||
|
options.percent, options.verbose)
|
||||||
|
sd.start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
|
||||||
|
from client import StatsClient
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
cli = StatsClient()
|
||||||
|
for num in range(1, 11):
|
||||||
|
cli.timer('foo', num)
|
||||||
|
return
|
||||||
|
cli.increment('baz', 0.5)
|
||||||
|
cli.increment('baz', 0.5)
|
||||||
|
cli.timer('t3', 100, 0.5)
|
||||||
|
return
|
||||||
|
|
||||||
|
cli.increment('foo')
|
||||||
|
cli.counter(['foo', 'bar'], 2)
|
||||||
|
cli.timer('t1', 12)
|
||||||
|
cli.timer('t2', 30)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
|
||||||
|
import os
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
from gstatsd import __version__
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
cwd = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
path = os.path.join(cwd, 'README.txt')
|
||||||
|
readme = open(path, 'rb').read()
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name = 'gstatsd',
|
||||||
|
version = __version__,
|
||||||
|
description = 'A statsd service and client in Python + gevent',
|
||||||
|
license = 'Apache 2.0',
|
||||||
|
author = 'Patrick Hensley',
|
||||||
|
author_email = 'spaceboy@indirect.com',
|
||||||
|
keywords = ['stats', 'graphite', 'statsd', 'gevent'],
|
||||||
|
url = 'http://github.com/phensley/gstatsd',
|
||||||
|
packages = ['gstatsd'],
|
||||||
|
entry_points = {
|
||||||
|
'console_scripts': ['gstatsd=gstatsd.service:main']
|
||||||
|
},
|
||||||
|
classifiers = [
|
||||||
|
"Development Status :: 2 - Pre-Alpha",
|
||||||
|
"Intended Audience :: Developers",
|
||||||
|
"License :: OSI Approved :: Apache 2.0",
|
||||||
|
"Operating System :: MacOS :: MacOS X",
|
||||||
|
"Operating System :: POSIX :: Linux",
|
||||||
|
"Operating System :: Unix",
|
||||||
|
"Programming Language :: Python",
|
||||||
|
"Programming Language :: Python :: 2.6",
|
||||||
|
"Programming Language :: Python :: 2.7",
|
||||||
|
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||||
|
],
|
||||||
|
long_description = readme
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
Loading…
Reference in New Issue