From e43bb895f49f47fa757503dcedbfffaa57a4d712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mika=C3=ABl=20Ates?= Date: Thu, 10 Mar 2016 09:13:36 +0100 Subject: [PATCH] Add transmission to a carbon server. --- rfiddriver.py | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/rfiddriver.py b/rfiddriver.py index 1808b85..e03dbe0 100644 --- a/rfiddriver.py +++ b/rfiddriver.py @@ -417,3 +417,112 @@ class Reader: sleep(reader_sleep_long) self.inventory(delay=4000, power=power, logic_port=logic_port, silent=silent, epcs_csv=epcs_csv, metrics_csv=metrics_csv, metrics_db=metrics_db) + + +class Transmitter(): + + def __init__(self, *args, **kwargs): + logger.debug('Transmitter initialization.') + self.config = config.Config() + for metric in METRICS: + fn = "{}.wsp".format(metric) + setattr(self, "fn_{}".format(metric), fn) + logger.debug('Metric {} : filename {}.'.format(metric, fn)) + self.config.set_setting('exit_transmitter', False) + + def now(self): + return int(utils.unix_time(datetime.utcnow())) + + def set_last_push(self, metric, date): + with open('last_push_{}'.format(metric), 'w') as f: + f.write(str(date)) + + def get_last_push(self, metric): + fn = 'last_push_{}'.format(metric) + if not os.path.isfile(fn): + with open(fn, 'w') as f: + f.write(str(0)) + with open(fn, 'r') as f: + line = None + for line in f: + pass + if line: + return int(line) + return None + + def send(self): + while True: + (exit_transmitter, + pause_transmitter, + transmitter_sleep, + patient_id, + carbon_hostname, + carbon_port, + self.temperature, + self.heartrate, + metrics_path) = self.config.transmitter_config() + + if exit_transmitter: + logger.info('Exit transmitter asked.') + exit(0) + + if pause_transmitter: + logger.debug('Tranmsitter off.') + logger.debug('Sleep {}ms.'.format(transmitter_sleep)) + sleep(transmitter_sleep) + continue + + logger.debug('Transmitter on.') + + if not patient_id: + logger.debug('No patient ID configured.') + sleep(transmitter_sleep) + continue + + logger.debug('Patient ID : {}'.format(patient_id)) + + logger.debug('Carbon host : {}:{}'.format(carbon_hostname, carbon_port)) + + for metric in METRICS: + if getattr(self, metric): + logger.debug('Reading metric : {}.'.format(metric)) + fn = getattr(self, "fn_{}".format(metric)) + logger.debug('Filename : {}.'.format(fn)) + if os.path.isfile(fn): + last_push = _from = self.get_last_push(metric) + logger.debug('Last read is : {}.'.format(_from)) + until = self.now() + logger.debug('Until now : {}.'.format(until)) + data = whisper.fetch(fn, _from, until) + if data: + start, end, step = data[0] + cleaned = [(start + i * step, value) + for i, value in enumerate(data[1]) if value] + logger.debug('Data to send : {}.'.format(cleaned)) + if cleaned and start == last_push: + logger.debug("First value already pushed, " + "we don't send it : {}.".format(cleaned[0])) + # We remove the first item if pushed last time + cleaned = cleaned[1:] + remote_path = "{}.{}.{}".format(metrics_path, patient_id, metric) + for date, value in cleaned: + content = "{} {} {}\n".format(remote_path, value, date) + try: + logger.debug("Send command : {}.".format(content)) + utils.netcat(carbon_hostname, carbon_port, content) + last_push = date + except Exception, e: + logger.debug("Error executing send command : '{}', " + "we stopped at : {}.".format(e, last_push)) + break + logger.debug("Write last push at : {}.".format(last_push)) + self.set_last_push(metric, last_push) + else: + logger.debug('No data to send.') + else: + logger.debug('No whisper file found.') + else: + logger.debug('Sending metric {} off.'.format(metric)) + + logger.debug('Sleep {} s.'.format(transmitter_sleep)) + sleep(transmitter_sleep)