read_energy_meter.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #!/usr/bin/env python
  2. from influxdb import InfluxDBClient
  3. from datetime import datetime, timedelta
  4. from os import path
  5. import minimalmodbus
  6. import time
  7. import yaml
  8. class DataCollector:
  9. def __init__(self, influx_client, meter_yaml):
  10. self.influx_client = influx_client
  11. self.meter_yaml = meter_yaml
  12. self.max_iterations = None # run indefinitely by default
  13. self.meter_map = None
  14. self.meter_map_last_change = -1
  15. print 'Meters:'
  16. for meter in sorted(self.get_meters()):
  17. print '\t', meter['id'], '<-->', meter['name']
  18. def get_meters(self):
  19. assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
  20. if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
  21. try:
  22. print('Reloading meter map as file changed')
  23. new_map = yaml.load(open(self.meter_yaml))
  24. self.meter_map = new_map['meters']
  25. self.meter_map_last_change = path.getmtime(self.meter_yaml)
  26. except Exception as e:
  27. print('Failed to re-load meter map, going on with the old one. Error:')
  28. print(e)
  29. return self.meter_map
  30. def collect_and_store(self):
  31. meters = self.get_meters()
  32. t_utc = datetime.utcnow()
  33. t_str = t_utc.isoformat() + 'Z'
  34. instrument = minimalmodbus.Instrument('/dev/ttyAMA0', 1) # port name, slave address (in decimal)
  35. instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
  36. #instrument.debug = True
  37. datas = dict()
  38. meter_id_name = dict() # mapping id to name
  39. for meter in meters:
  40. meter_id_name[meter['id']] = meter['name']
  41. instrument.serial.baudrate = meter['baudrate']
  42. instrument.serial.bytesize = meter['bytesize']
  43. if meter['parity'] == 'none':
  44. instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
  45. elif meter['parity'] == 'odd':
  46. instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
  47. elif meter['parity'] == 'even':
  48. instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
  49. else:
  50. print('Error! No parity specified')
  51. raise
  52. instrument.serial.stopbits = meter['stopbits']
  53. instrument.serial.timeout = meter['timeout'] # seconds
  54. instrument.address = meter['id'] # this is the slave address number
  55. #print 'Reading meter %s (%s).' % (meters[meter_id], meter_id)
  56. start_time = time.time()
  57. parameters = yaml.load(open(meter['type']))
  58. datas[meter['id']] = dict()
  59. for parameter in parameters:
  60. try:
  61. datas[meter['id']][parameter] = instrument.read_float(parameters[parameter], 4, 2)
  62. pass
  63. except Exception as e:
  64. print('Reading register %i from meter %i. Error:' % parameters[parameter], meter['id'])
  65. print(e)
  66. raise
  67. datas[meter['id']]['Time to read'] = time.time() - start_time
  68. json_body = [
  69. {
  70. 'measurement': 'energy',
  71. 'tags': {
  72. 'id': meter_id,
  73. 'meter': meter['name'],
  74. },
  75. 'time': t_str,
  76. 'fields': datas[meter_id]
  77. }
  78. for meter_id in datas
  79. ]
  80. if len(json_body) > 0:
  81. try:
  82. self.influx_client.write_points(json_body)
  83. print(t_str + ' Data written for %d meters.' % len(json_body))
  84. except Exception as e:
  85. print('Data not written! Error:')
  86. print(e)
  87. raise
  88. else:
  89. print(t_str, 'No data sent.')
  90. def repeat(interval_sec, max_iter, func, *args, **kwargs):
  91. from itertools import count
  92. import time
  93. starttime = time.time()
  94. for i in count():
  95. retry = False # Reset retry flag
  96. if i % 1000 == 0:
  97. print('Collected %d readouts' % i)
  98. try:
  99. func(*args, **kwargs)
  100. except Exception as ex:
  101. print('Error!')
  102. print(ex)
  103. retry = True # Force imidiate retry, skip sleep
  104. if (retry == False) & (interval_sec > 0):
  105. time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
  106. if max_iter and i >= max_iter:
  107. return
  108. if __name__ == '__main__':
  109. import argparse
  110. parser = argparse.ArgumentParser()
  111. parser.add_argument('--interval', default=60,
  112. help='Meter readout interval (seconds), default 60')
  113. parser.add_argument('--meters', default='meters.yml',
  114. help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
  115. args = parser.parse_args()
  116. interval = int(args.interval)
  117. # Create the InfluxDB object
  118. influx_config = yaml.load(open('influx_config.yml'))
  119. client = InfluxDBClient(influx_config['host'],
  120. influx_config['port'],
  121. influx_config['user'],
  122. influx_config['password'],
  123. influx_config['dbname'])
  124. collector = DataCollector(influx_client=client,
  125. meter_yaml='meters.yml')
  126. repeat(interval,
  127. max_iter=collector.max_iterations,
  128. func=lambda: collector.collect_and_store())