123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- from influxdb import InfluxDBClient
- from datetime import datetime, timedelta
- from os import path
- import minimalmodbus
- import time
- import yaml
- class DataCollector:
- def __init__(self, influx_client, meter_yaml):
- self.influx_client = influx_client
- self.meter_yaml = meter_yaml
- self.max_iterations = None
- self.meter_map = None
- self.meter_map_last_change = -1
- print 'Meters:'
- for meter in sorted(self.get_meters()):
- print '\t', meter['id'], '<-->', meter['name']
- def get_meters(self):
- assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
- if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
- try:
- print('Reloading meter map as file changed')
- new_map = yaml.load(open(self.meter_yaml))
- self.meter_map = new_map['meters']
- self.meter_map_last_change = path.getmtime(self.meter_yaml)
- except Exception as e:
- print('Failed to re-load meter map, going on with the old one. Error:')
- print(e)
- return self.meter_map
- def collect_and_store(self):
- meters = self.get_meters()
- t_utc = datetime.utcnow()
- t_str = t_utc.isoformat() + 'Z'
- instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1)
- instrument.mode = minimalmodbus.MODE_RTU
-
- datas = dict()
- meter_id_name = dict()
- for meter in meters:
- meter_id_name[meter['id']] = meter['name']
- instrument.serial.baudrate = meter['baudrate']
- instrument.serial.bytesize = meter['bytesize']
- if meter['parity'] == 'none':
- instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
- elif meter['parity'] == 'odd':
- instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
- elif meter['parity'] == 'even':
- instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
- else:
- print('Error! No parity specified')
- raise
- instrument.serial.stopbits = meter['stopbits']
- instrument.serial.timeout = meter['timeout']
- instrument.address = meter['id']
-
- start_time = time.time()
- parameters = yaml.load(open(meter['type']))
- datas[meter['id']] = dict()
- for parameter in parameters:
- try:
- datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
- pass
- except Exception as e:
- print('Reading register %i from meter %i. Error:' % parameters[parameter], meter['id'])
- print(e)
- raise
- datas[meter['id']]['Time to read'] = time.time() - start_time
- json_body = [
- {
- 'measurement': 'energy',
- 'tags': {
- 'id': meter_id,
- 'meter': meter['name'],
- },
- 'time': t_str,
- 'fields': datas[meter_id]
- }
- for meter_id in datas
- ]
- if len(json_body) > 0:
- try:
- self.influx_client.write_points(json_body)
- print(t_str + ' Data written for %d meters.' % len(json_body))
- except Exception as e:
- print('Data not written! Error:')
- print(e)
- raise
- else:
- print(t_str, 'No data sent.')
- def repeat(interval_sec, max_iter, func, *args, **kwargs):
- from itertools import count
- import time
- starttime = time.time()
- for i in count():
- retry = False
- if i % 1000 == 0:
- print('Collected %d readouts' % i)
- try:
- func(*args, **kwargs)
- except Exception as ex:
- print('Error!')
- print(ex)
- retry = True
- if (retry == False) & (interval_sec > 0):
- time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
- if max_iter and i >= max_iter:
- return
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument('--interval', default=60,
- help='Meter readout interval (seconds), default 60')
- parser.add_argument('--meters', default='meters.yml',
- help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
- args = parser.parse_args()
- interval = int(args.interval)
-
- influx_config = yaml.load(open('influx_config.yml'))
- client = InfluxDBClient(influx_config['host'],
- influx_config['port'],
- influx_config['user'],
- influx_config['password'],
- influx_config['dbname'])
- collector = DataCollector(influx_client=client,
- meter_yaml='meters.yml')
- repeat(interval,
- max_iter=collector.max_iterations,
- func=lambda: collector.collect_and_store())
|