read_energy_meter.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python
  2. from influxdb import InfluxDBClient
  3. from datetime import datetime, timedelta
  4. from os import path
  5. import sys
  6. import os
  7. import minimalmodbus
  8. import time
  9. import yaml
  10. import logging
  11. # Change working dir to the same dir as this script
  12. os.chdir(sys.path[0])
  13. class DataCollector:
  14. def __init__(self, influx_client, meter_yaml):
  15. self.influx_client = influx_client
  16. self.meter_yaml = meter_yaml
  17. self.max_iterations = None # run indefinitely by default
  18. self.meter_map = None
  19. self.meter_map_last_change = -1
  20. log.info('Meters:')
  21. for meter in sorted(self.get_meters()):
  22. log.info('\t {} <--> {}'.format( meter['id'], meter['name']))
  23. def get_meters(self):
  24. assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
  25. if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
  26. try:
  27. log.info('Reloading meter map as file changed')
  28. new_map = yaml.load(open(self.meter_yaml), Loader=yaml.FullLoader)
  29. self.meter_map = new_map['meters']
  30. self.meter_map_last_change = path.getmtime(self.meter_yaml)
  31. except Exception as e:
  32. log.warning('Failed to re-load meter map, going on with the old one.')
  33. log.warning(e)
  34. return self.meter_map
  35. def collect_and_store(self):
  36. #instrument.debug = True
  37. meters = self.get_meters()
  38. t_utc = datetime.utcnow()
  39. t_str = t_utc.isoformat() + 'Z'
  40. instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1) # port name, slave address (in decimal)
  41. instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
  42. datas = dict()
  43. meter_id_name = dict() # mapping id to name
  44. for meter in meters:
  45. meter_id_name[meter['id']] = meter['name']
  46. instrument.serial.baudrate = meter['baudrate']
  47. instrument.serial.bytesize = meter['bytesize']
  48. if meter['parity'] == 'none':
  49. instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
  50. elif meter['parity'] == 'odd':
  51. instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
  52. elif meter['parity'] == 'even':
  53. instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
  54. else:
  55. log.error('No parity specified')
  56. raise
  57. instrument.serial.stopbits = meter['stopbits']
  58. instrument.serial.timeout = meter['timeout'] # seconds
  59. instrument.address = meter['id'] # this is the slave address number
  60. log.debug('Reading meter %s.' % (meter['id']))
  61. start_time = time.time()
  62. parameters = yaml.load(open(meter['type']), Loader=yaml.FullLoader)
  63. datas[meter['id']] = dict()
  64. for parameter in parameters:
  65. # If random readout errors occour, e.g. CRC check fail, test to uncomment the following row
  66. #time.sleep(0.01) # Sleep for 10 ms between each parameter read to avoid errors
  67. retries = 10
  68. while retries > 0:
  69. try:
  70. retries -= 1
  71. datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
  72. retries = 0
  73. pass
  74. except ValueError as ve:
  75. log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
  76. .format(parameters[parameter], meter['id'], retries))
  77. log.error(ve)
  78. if retries == 0:
  79. raise RuntimeError
  80. except TypeError as te:
  81. log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
  82. .format(parameters[parameter], meter['id'], retries))
  83. log.error(te)
  84. if retries == 0:
  85. raise RuntimeError
  86. except IOError as ie:
  87. log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
  88. .format(parameters[parameter], meter['id'], retries))
  89. log.error(ie)
  90. if retries == 0:
  91. raise RuntimeError
  92. except:
  93. log.error("Unexpected error:", sys.exc_info()[0])
  94. raise
  95. datas[meter['id']]['Read time'] = time.time() - start_time
  96. json_body = [
  97. {
  98. 'measurement': 'energy',
  99. 'tags': {
  100. 'id': meter_id,
  101. 'meter': meter_id_name[meter_id],
  102. },
  103. 'time': t_str,
  104. 'fields': datas[meter_id]
  105. }
  106. for meter_id in datas
  107. ]
  108. if len(json_body) > 0:
  109. try:
  110. self.influx_client.write_points(json_body)
  111. log.info(t_str + ' Data written for %d meters.' % len(json_body))
  112. except Exception as e:
  113. log.error('Data not written!')
  114. log.error(e)
  115. raise
  116. else:
  117. log.warning(t_str, 'No data sent.')
  118. def repeat(interval_sec, max_iter, func, *args, **kwargs):
  119. from itertools import count
  120. import time
  121. starttime = time.time()
  122. for i in count():
  123. if interval_sec > 0:
  124. time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
  125. if i % 1000 == 0:
  126. log.info('Collected %d readouts' % i)
  127. try:
  128. func(*args, **kwargs)
  129. except Exception as ex:
  130. log.error(ex)
  131. if max_iter and i >= max_iter:
  132. return
  133. if __name__ == '__main__':
  134. import argparse
  135. parser = argparse.ArgumentParser()
  136. parser.add_argument('--interval', default=60,
  137. help='Meter readout interval (seconds), default 60')
  138. parser.add_argument('--meters', default='meters.yml',
  139. help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
  140. parser.add_argument('--log', default='CRITICAL',
  141. help='Log levels, DEBUG, INFO, WARNING, ERROR or CRITICAL')
  142. parser.add_argument('--logfile', default='',
  143. help='Specify log file, if not specified the log is streamed to console')
  144. args = parser.parse_args()
  145. interval = int(args.interval)
  146. loglevel = args.log.upper()
  147. logfile = args.logfile
  148. # Setup logging
  149. log = logging.getLogger('energy-logger')
  150. log.setLevel(getattr(logging, loglevel))
  151. if logfile:
  152. loghandle = logging.FileHandler(logfile, 'w')
  153. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  154. loghandle.setFormatter(formatter)
  155. else:
  156. loghandle = logging.StreamHandler()
  157. log.addHandler(loghandle)
  158. log.info('Started app')
  159. # Create the InfluxDB object
  160. influx_config = yaml.load(open('influx_config.yml'), Loader=yaml.FullLoader)
  161. client = InfluxDBClient(influx_config['host'],
  162. influx_config['port'],
  163. influx_config['user'],
  164. influx_config['password'],
  165. influx_config['dbname'])
  166. collector = DataCollector(influx_client=client,
  167. meter_yaml=args.meters)
  168. repeat(interval,
  169. max_iter=collector.max_iterations,
  170. func=lambda: collector.collect_and_store())