123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python
- from influxdb import InfluxDBClient
- from datetime import datetime, timedelta
- from os import path
- import sys
- import os
- import minimalmodbus
- import time
- import yaml
- import logging
- # Change working dir to the same dir as this script
- os.chdir(sys.path[0])
- class DataCollector:
- def __init__(self, influx_client, meter_yaml):
- self.influx_client = influx_client
- self.meter_yaml = meter_yaml
- self.max_iterations = None # run indefinitely by default
- self.meter_map = None
- self.meter_map_last_change = -1
- log.info('Meters:')
- for meter in sorted(self.get_meters()):
- log.info('\t {} <--> {}'.format( meter['id'], meter['name']))
- def get_meters(self):
- assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
- if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
- try:
- log.info('Reloading meter map as file changed')
- new_map = yaml.load(open(self.meter_yaml))
- self.meter_map = new_map['meters']
- self.meter_map_last_change = path.getmtime(self.meter_yaml)
- except Exception as e:
- log.warning('Failed to re-load meter map, going on with the old one.')
- log.warning(e)
- return self.meter_map
- def collect_and_store(self):
- #instrument.debug = True
- meters = self.get_meters()
- t_utc = datetime.utcnow()
- t_str = t_utc.isoformat() + 'Z'
- instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1) # port name, slave address (in decimal)
- instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
- datas = dict()
- meter_id_name = dict() # mapping id to name
- for meter in meters:
- meter_id_name[meter['id']] = meter['name']
- instrument.serial.baudrate = meter['baudrate']
- instrument.serial.bytesize = meter['bytesize']
- if meter['parity'] == 'none':
- instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
- elif meter['parity'] == 'odd':
- instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
- elif meter['parity'] == 'even':
- instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
- else:
- log.error('No parity specified')
- raise
- instrument.serial.stopbits = meter['stopbits']
- instrument.serial.timeout = meter['timeout'] # seconds
- instrument.address = meter['id'] # this is the slave address number
- log.debug('Reading meter %s.' % (meter['id']))
- start_time = time.time()
- parameters = yaml.load(open(meter['type']))
- datas[meter['id']] = dict()
- for parameter in parameters:
- # If random readout errors occour, e.g. CRC check fail, test to uncomment the following row
- #time.sleep(0.01) # Sleep for 10 ms between each parameter read to avoid errors
- retries = 10
- while retries > 0:
- try:
- retries -= 1
- datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
- retries = 0
- pass
- except ValueError as ve:
- log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
- .format(parameters[parameter], meter['id'], retries))
- log.error(ve)
- if retries == 0:
- raise RuntimeError
- except TypeError as te:
- log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
- .format(parameters[parameter], meter['id'], retries))
- log.error(te)
- if retries == 0:
- raise RuntimeError
- except IOError as ie:
- log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
- .format(parameters[parameter], meter['id'], retries))
- log.error(ie)
- if retries == 0:
- raise RuntimeError
- except:
- log.error("Unexpected error:", sys.exc_info()[0])
- raise
- datas[meter['id']]['Time to read'] = time.time() - start_time
- json_body = [
- {
- 'measurement': 'energy',
- 'tags': {
- 'id': meter_id,
- 'meter': meter['name'],
- },
- 'time': t_str,
- 'fields': datas[meter_id]
- }
- for meter_id in datas
- ]
- if len(json_body) > 0:
- try:
- self.influx_client.write_points(json_body)
- log.info(t_str + ' Data written for %d meters.' % len(json_body))
- except Exception as e:
- log.error('Data not written!')
- log.error(e)
- raise
- else:
- log.warning(t_str, 'No data sent.')
- def repeat(interval_sec, max_iter, func, *args, **kwargs):
- from itertools import count
- import time
- starttime = time.time()
- for i in count():
- if interval_sec > 0:
- time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
- if i % 1000 == 0:
- log.info('Collected %d readouts' % i)
- try:
- func(*args, **kwargs)
- except Exception as ex:
- log.error(ex)
- if max_iter and i >= max_iter:
- return
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument('--interval', default=60,
- help='Meter readout interval (seconds), default 60')
- parser.add_argument('--meters', default='meters.yml',
- help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
- parser.add_argument('--log', default='CRITICAL',
- help='Log levels, DEBUG, INFO, WARNING, ERROR or CRITICAL')
- parser.add_argument('--logfile', default='',
- help='Specify log file, if not specified the log is streamed to console')
- args = parser.parse_args()
- interval = int(args.interval)
- loglevel = args.log.upper()
- logfile = args.logfile
- # Setup logging
- log = logging.getLogger('energy-logger')
- log.setLevel(getattr(logging, loglevel))
- if logfile:
- loghandle = logging.FileHandler(logfile, 'w')
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- loghandle.setFormatter(formatter)
- else:
- loghandle = logging.StreamHandler()
- log.addHandler(loghandle)
- log.info('Started app')
- # Create the InfluxDB object
- influx_config = yaml.load(open('influx_config.yml'))
- client = InfluxDBClient(influx_config['host'],
- influx_config['port'],
- influx_config['user'],
- influx_config['password'],
- influx_config['dbname'])
- collector = DataCollector(influx_client=client,
- meter_yaml=args.meters)
- repeat(interval,
- max_iter=collector.max_iterations,
- func=lambda: collector.collect_and_store())
|