read_energy_meter.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #!/usr/bin/env python
  2. from influxdb import InfluxDBClient
  3. from datetime import datetime, timedelta
  4. from os import path
  5. import sys
  6. import os
  7. import minimalmodbus
  8. import time
  9. import yaml
  10. # Change working dir to the same dir as this script
  11. os.chdir(sys.path[0])
  12. class DataCollector:
  13. def __init__(self, influx_client, meter_yaml):
  14. self.influx_client = influx_client
  15. self.meter_yaml = meter_yaml
  16. self.max_iterations = None # run indefinitely by default
  17. self.meter_map = None
  18. self.meter_map_last_change = -1
  19. print 'Meters:'
  20. for meter in sorted(self.get_meters()):
  21. print '\t', meter['id'], '<-->', meter['name']
  22. def get_meters(self):
  23. assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
  24. if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
  25. try:
  26. print ('Reloading meter map as file changed')
  27. new_map = yaml.load(open(self.meter_yaml))
  28. self.meter_map = new_map['meters']
  29. self.meter_map_last_change = path.getmtime(self.meter_yaml)
  30. except Exception as e:
  31. print ('Failed to re-load meter map, going on with the old one. Error:')
  32. print (e)
  33. return self.meter_map
  34. def collect_and_store(self):
  35. #instrument.debug = True
  36. meters = self.get_meters()
  37. t_utc = datetime.utcnow()
  38. t_str = t_utc.isoformat() + 'Z'
  39. instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1) # port name, slave address (in decimal)
  40. instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
  41. datas = dict()
  42. meter_id_name = dict() # mapping id to name
  43. for meter in meters:
  44. meter_id_name[meter['id']] = meter['name']
  45. instrument.serial.baudrate = meter['baudrate']
  46. instrument.serial.bytesize = meter['bytesize']
  47. if meter['parity'] == 'none':
  48. instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
  49. elif meter['parity'] == 'odd':
  50. instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
  51. elif meter['parity'] == 'even':
  52. instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
  53. else:
  54. print ('Error: No parity specified')
  55. raise
  56. instrument.serial.stopbits = meter['stopbits']
  57. instrument.serial.timeout = meter['timeout'] # seconds
  58. instrument.address = meter['id'] # this is the slave address number
  59. #print 'Reading meter %s.' % (meter['id'])
  60. start_time = time.time()
  61. parameters = yaml.load(open(meter['type']))
  62. datas[meter['id']] = dict()
  63. for parameter in parameters:
  64. retries = 10
  65. while retries > 0:
  66. try:
  67. retries -= 1
  68. datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
  69. retries = 0
  70. pass
  71. except ValueError as ve:
  72. print ('Value Error while reading register {} from meter {}. Retries left {}.'
  73. .format(parameters[parameter], meter['id'], retries))
  74. print ve
  75. if retries == 0:
  76. raise RuntimeError
  77. except TypeError as te:
  78. print ('Type Error while reading register {} from meter {}. Retries left {}.'
  79. .format(parameters[parameter], meter['id'], retries))
  80. print te
  81. if retries == 0:
  82. raise RuntimeError
  83. except IOError as ie:
  84. print ('IO Error while reading register {} from meter {}. Retries left {}.'
  85. .format(parameters[parameter], meter['id'], retries))
  86. print ie
  87. if retries == 0:
  88. raise RuntimeError
  89. except:
  90. print "Unexpected error:", sys.exc_info()[0]
  91. raise
  92. datas[meter['id']]['Time to read'] = time.time() - start_time
  93. json_body = [
  94. {
  95. 'measurement': 'energy',
  96. 'tags': {
  97. 'id': meter_id,
  98. 'meter': meter['name'],
  99. },
  100. 'time': t_str,
  101. 'fields': datas[meter_id]
  102. }
  103. for meter_id in datas
  104. ]
  105. if len(json_body) > 0:
  106. try:
  107. self.influx_client.write_points(json_body)
  108. print(t_str + ' Data written for %d meters.' % len(json_body))
  109. except Exception as e:
  110. print('Data not written! Error:')
  111. print(e)
  112. raise
  113. else:
  114. print(t_str, 'No data sent.')
  115. def repeat(interval_sec, max_iter, func, *args, **kwargs):
  116. from itertools import count
  117. import time
  118. starttime = time.time()
  119. retry = False
  120. for i in count():
  121. if (retry == False) & (interval_sec > 0):
  122. time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
  123. retry = False # Reset retry flag
  124. if i % 1000 == 0:
  125. print('Collected %d readouts' % i)
  126. try:
  127. func(*args, **kwargs)
  128. except Exception as ex:
  129. print('Error!')
  130. print(ex)
  131. #retry = True # Force imidiate retry, skip sleep
  132. if max_iter and i >= max_iter:
  133. return
  134. if __name__ == '__main__':
  135. import argparse
  136. parser = argparse.ArgumentParser()
  137. parser.add_argument('--interval', default=60,
  138. help='Meter readout interval (seconds), default 60')
  139. parser.add_argument('--meters', default='meters.yml',
  140. help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
  141. args = parser.parse_args()
  142. interval = int(args.interval)
  143. # Create the InfluxDB object
  144. influx_config = yaml.load(open('influx_config.yml'))
  145. client = InfluxDBClient(influx_config['host'],
  146. influx_config['port'],
  147. influx_config['user'],
  148. influx_config['password'],
  149. influx_config['dbname'])
  150. collector = DataCollector(influx_client=client,
  151. meter_yaml=args.meters)
  152. repeat(interval,
  153. max_iter=collector.max_iterations,
  154. func=lambda: collector.collect_and_store())