read_energy_meter.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python
  2. from influxdb import InfluxDBClient
  3. from datetime import datetime, timedelta
  4. from os import path
  5. import sys
  6. import os
  7. import minimalmodbus
  8. import time
  9. import yaml
  10. # Change working dir to the same dir as this script
  11. os.chdir(sys.path[0])
  12. class DataCollector:
  13. def __init__(self, influx_client, meter_yaml):
  14. self.influx_client = influx_client
  15. self.meter_yaml = meter_yaml
  16. self.max_iterations = None # run indefinitely by default
  17. self.meter_map = None
  18. self.meter_map_last_change = -1
  19. print 'Meters:'
  20. for meter in sorted(self.get_meters()):
  21. print '\t', meter['id'], '<-->', meter['name']
  22. def get_meters(self):
  23. assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
  24. if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
  25. try:
  26. print('Reloading meter map as file changed')
  27. new_map = yaml.load(open(self.meter_yaml))
  28. self.meter_map = new_map['meters']
  29. self.meter_map_last_change = path.getmtime(self.meter_yaml)
  30. except Exception as e:
  31. print('Failed to re-load meter map, going on with the old one. Error:')
  32. print(e)
  33. return self.meter_map
  34. def collect_and_store(self):
  35. meters = self.get_meters()
  36. t_utc = datetime.utcnow()
  37. t_str = t_utc.isoformat() + 'Z'
  38. instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1) # port name, slave address (in decimal)
  39. instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
  40. #instrument.debug = True
  41. datas = dict()
  42. meter_id_name = dict() # mapping id to name
  43. for meter in meters:
  44. meter_id_name[meter['id']] = meter['name']
  45. instrument.serial.baudrate = meter['baudrate']
  46. instrument.serial.bytesize = meter['bytesize']
  47. if meter['parity'] == 'none':
  48. instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
  49. elif meter['parity'] == 'odd':
  50. instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
  51. elif meter['parity'] == 'even':
  52. instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
  53. else:
  54. print('Error! No parity specified')
  55. raise
  56. instrument.serial.stopbits = meter['stopbits']
  57. instrument.serial.timeout = meter['timeout'] # seconds
  58. instrument.address = meter['id'] # this is the slave address number
  59. #print 'Reading meter %s (%s).' % (meters[meter_id], meter_id)
  60. assert path.exists(meter['type']), 'Meter model yaml file not found: %s' % meter['type']
  61. try:
  62. parameters = yaml.load(open(meter['type']))
  63. except Exception as e:
  64. print('Error! Loading model yaml file')
  65. print(e)
  66. raise
  67. start_time = time.time()
  68. datas[meter['id']] = dict()
  69. for parameter in parameters:
  70. try:
  71. datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
  72. pass
  73. except Exception as e:
  74. print('Reading register %i from meter %i. Error:' % parameters[parameter], meter['id'])
  75. print(e)
  76. raise
  77. datas[meter['id']]['Time to read'] = time.time() - start_time
  78. json_body = [
  79. {
  80. 'measurement': 'energy',
  81. 'tags': {
  82. 'id': meter_id,
  83. 'meter': meter['name'],
  84. },
  85. 'time': t_str,
  86. 'fields': datas[meter_id]
  87. }
  88. for meter_id in datas
  89. ]
  90. if len(json_body) > 0:
  91. try:
  92. self.influx_client.write_points(json_body)
  93. print(t_str + ' Data written for %d meters.' % len(json_body))
  94. except Exception as e:
  95. print('Data not written! Error:')
  96. print(e)
  97. raise
  98. else:
  99. print(t_str, 'No data sent.')
  100. def repeat(interval_sec, max_iter, func, *args, **kwargs):
  101. from itertools import count
  102. import time
  103. starttime = time.time()
  104. retry = False
  105. for i in count():
  106. if (retry == False) & (interval_sec > 0):
  107. time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
  108. retry = False # Reset retry flag
  109. if i % 1000 == 0:
  110. print('Collected %d readouts' % i)
  111. try:
  112. func(*args, **kwargs)
  113. except Exception as ex:
  114. print('Error!')
  115. print(ex)
  116. retry = True # Force imidiate retry, skip sleep
  117. if max_iter and i >= max_iter:
  118. return
  119. if __name__ == '__main__':
  120. import argparse
  121. parser = argparse.ArgumentParser()
  122. parser.add_argument('--interval', default=60,
  123. help='Meter readout interval (seconds), default 60')
  124. parser.add_argument('--meters', default='meters.yml',
  125. help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
  126. args = parser.parse_args()
  127. interval = int(args.interval)
  128. # Create the InfluxDB object
  129. influx_config = yaml.load(open('influx_config.yml'))
  130. client = InfluxDBClient(influx_config['host'],
  131. influx_config['port'],
  132. influx_config['user'],
  133. influx_config['password'],
  134. influx_config['dbname'])
  135. collector = DataCollector(influx_client=client,
  136. meter_yaml='meters.yml')
  137. repeat(interval,
  138. max_iter=collector.max_iterations,
  139. func=lambda: collector.collect_and_store())