Add transmission to a carbon server.

This commit is contained in:
Mikaël Ates 2016-03-10 09:13:36 +01:00
parent 8b8b6a0778
commit e43bb895f4
1 changed files with 109 additions and 0 deletions

View File

@ -417,3 +417,112 @@ class Reader:
sleep(reader_sleep_long)
self.inventory(delay=4000, power=power, logic_port=logic_port, silent=silent,
epcs_csv=epcs_csv, metrics_csv=metrics_csv, metrics_db=metrics_db)
class Transmitter():
def __init__(self, *args, **kwargs):
logger.debug('Transmitter initialization.')
self.config = config.Config()
for metric in METRICS:
fn = "{}.wsp".format(metric)
setattr(self, "fn_{}".format(metric), fn)
logger.debug('Metric {} : filename {}.'.format(metric, fn))
self.config.set_setting('exit_transmitter', False)
def now(self):
return int(utils.unix_time(datetime.utcnow()))
def set_last_push(self, metric, date):
with open('last_push_{}'.format(metric), 'w') as f:
f.write(str(date))
def get_last_push(self, metric):
fn = 'last_push_{}'.format(metric)
if not os.path.isfile(fn):
with open(fn, 'w') as f:
f.write(str(0))
with open(fn, 'r') as f:
line = None
for line in f:
pass
if line:
return int(line)
return None
def send(self):
while True:
(exit_transmitter,
pause_transmitter,
transmitter_sleep,
patient_id,
carbon_hostname,
carbon_port,
self.temperature,
self.heartrate,
metrics_path) = self.config.transmitter_config()
if exit_transmitter:
logger.info('Exit transmitter asked.')
exit(0)
if pause_transmitter:
logger.debug('Tranmsitter off.')
logger.debug('Sleep {}ms.'.format(transmitter_sleep))
sleep(transmitter_sleep)
continue
logger.debug('Transmitter on.')
if not patient_id:
logger.debug('No patient ID configured.')
sleep(transmitter_sleep)
continue
logger.debug('Patient ID : {}'.format(patient_id))
logger.debug('Carbon host : {}:{}'.format(carbon_hostname, carbon_port))
for metric in METRICS:
if getattr(self, metric):
logger.debug('Reading metric : {}.'.format(metric))
fn = getattr(self, "fn_{}".format(metric))
logger.debug('Filename : {}.'.format(fn))
if os.path.isfile(fn):
last_push = _from = self.get_last_push(metric)
logger.debug('Last read is : {}.'.format(_from))
until = self.now()
logger.debug('Until now : {}.'.format(until))
data = whisper.fetch(fn, _from, until)
if data:
start, end, step = data[0]
cleaned = [(start + i * step, value)
for i, value in enumerate(data[1]) if value]
logger.debug('Data to send : {}.'.format(cleaned))
if cleaned and start == last_push:
logger.debug("First value already pushed, "
"we don't send it : {}.".format(cleaned[0]))
# We remove the first item if pushed last time
cleaned = cleaned[1:]
remote_path = "{}.{}.{}".format(metrics_path, patient_id, metric)
for date, value in cleaned:
content = "{} {} {}\n".format(remote_path, value, date)
try:
logger.debug("Send command : {}.".format(content))
utils.netcat(carbon_hostname, carbon_port, content)
last_push = date
except Exception, e:
logger.debug("Error executing send command : '{}', "
"we stopped at : {}.".format(e, last_push))
break
logger.debug("Write last push at : {}.".format(last_push))
self.set_last_push(metric, last_push)
else:
logger.debug('No data to send.')
else:
logger.debug('No whisper file found.')
else:
logger.debug('Sending metric {} off.'.format(metric))
logger.debug('Sleep {} s.'.format(transmitter_sleep))
sleep(transmitter_sleep)