modbuslog.py 39 KB


  1. #!/usr/bin/env python3
  2. from influxdb import InfluxDBClient
  3. from os import path
  4. import configparser
  5. import sys
  6. import os
  7. import minimalmodbus
  8. import time
  9. import datetime
  10. import yaml
  11. import logging
  12. import json
  13. import paho.mqtt.client as mqtt
  14. print_errors = True
  15. # Change working dir to the same dir as this script
  16. os.chdir(sys.path[0])
  17. config = configparser.ConfigParser()
  18. config.read('modbuslog.ini')
  19. #print(config.sections())
  20. # additional conffile names
  21. conffile_meter_types = 'meter_types.yml'
  22. conffile_readings_names = 'readings_names.yml'
  23. # config vars used more than once or updateable via commandline argument are stored as global vars
  24. conf_modbus_read_retries = config['rs485'].getint('read_retries', 4)
  25. conf_modbus_raise_error_on_reading_failure = config['rs485'].getboolean('raise_error_on_reading_failure', False)
  26. conf_modbus_sleep_between_readings = config['rs485'].getfloat('sleep_between_readings', 0.1)
  27. conf_modbus_sleep_between_instruments = config['rs485'].getfloat('sleep_between_instruments', 0.7)
  28. conf_publish_on_mqtt = config['main'].getboolean('publish_on_mqtt', False)
  29. conf_store_in_influxdb = config['main'].getboolean('store_in_influxdb', False)
  30. conf_mqtt_enabled = config['mqtt'].getboolean('enable', False)
  31. conf_mqtt_topic_prefix = config['mqtt'].get('topic_prefix') #must NOT end with / !!
  32. if conf_mqtt_topic_prefix[-1:] == '/':
  33. conf_mqtt_topic_prefix = conf_mqtt_topic_prefix[0:-1]
  34. conf_mqtt_topic_error = config['mqtt'].get('topic_error') #must NOT end with / !!
  35. conf_storage_path = config['filelog'].get('storage_path')
  36. if conf_storage_path[-1:] != '/':
  37. conf_storage_path += '/'
  38. meters_interval_momentary = config['meters'].getint('interval_momentary', 1) # s - base interval for reading instruments
  39. meters_interval_report_momentary = config['meters'].getint('interval_report_momentary', 60) # interval for reporting momentary readings, 0 to report immediately, overruled by powerdelta settings
  40. meters_interval_energy = config['meters'].getint('interval_energy', 60) # s - interval for reporting kWh-readings
  41. # for now this is not a seperate function but based on "meters_interval_momentary"
  42. # easuring elapsed time since last reading, so actual interval can vary, especially when
  43. # high "meters_interval_momentary" is set. To avoid that set
  44. # "meters_interval_momentary" (= command parameter --interval) to desired value
  45. # and configure "meters_use_only_one_interval" to True
  46. meters_use_only_one_interval = config['meters'].getboolean('use_only_one_interval', False) # use only interval 1 "meters_interval_momentary"
  47. meters_report_on_powerdelta_low = config['meters'].getfloat('report_on_powerdelta_low', 0.95) # % in decimal notation - immediately report if power value changes by more then this %
  48. meters_report_on_powerdelta_high = config['meters'].getfloat('report_on_powerdelta_high', 1.05) # % in decimal notation - immediately report if power value changes by more then this %
  49. meters_report_on_lowpower_treshold = config['meters'].getint('report_on_lowpower_treshold', 10) # treshold under which measured power is considered "low" and different powerdeltas are used
  50. meters_report_on_lowpower_powerdelta_low = config['meters'].getfloat('report_on_lowpower_powerdelta_low', 0.70) # % in decimal notation - immediately report if power value changes by more then this %
  51. meters_report_on_lowpower_powerdelta_high = config['meters'].getfloat('report_on_lowpower_powerdelta_high', 1.30) # % in decimal notation - immediately report if power value changes by more then this %
  52. ##report_instrument_read_retries = config['rs485'].getboolean('report_instrument_read_retries', False)
  53. conf_send_meters_readTime = config['meters'].getboolean('send_readtime', True)
  54. conf_default_decimals = config['readings'].getint('default_decimals', 3)
  55. conf_readingerror_publish_after = config['rs485'].getint('readingerror_publish_after', 60) # time in s after that repeated instrument reading errors are published via MQTT
  56. conf_readingerror_publish_interval = config['rs485'].getint('readingerror_publish_interval', 300) # interval in s to publish repeated instrument reading errors via MQTT
  57. # -------------------------------------------------------------
  58. # global variables - not for configuration
  59. args_output_verbose1 = False
  60. args_output_verbose2 = False
  61. influxdb_write_energy_today_total = True
  62. influxdb_write_energy_yesterday_total = True
  63. class DataCollector:
  64. def __init__(self, influx_client_momentary, influx_client_energy, meter_yaml):
  65. self.influx_client_momentary = influx_client_momentary
  66. self.influx_client_energy = influx_client_energy
  67. self.meter_yaml = meter_yaml
  68. self.max_iterations = None # run indefinitely by default
  69. #self.meter_types = None
  70. self.meter_types = dict()
  71. self.meter_types_last_change = dict()
  72. #self.meter_types_last_change = -1
  73. self.meter_map = None
  74. self.meter_map_last_change = -1
  75. self.meter_configuration_lastchecktime = None
  76. self.meter_typesconfiguration_lastchecktime = None
  77. self.lastMomentaryReportTime = dict()
  78. self.lastEnergyUpdate = dict()
  79. self.lastReadingErrorTime = dict()
  80. self.lastReadingErrorPublishtime = dict()
  81. #self.totalEnergy = dict()
  82. self.saved_energy_today_min = dict()
  83. self.data_momentary_last = dict()
  84. self.saved_energy_yesterday_total = dict() # remember total energy for each meter
  85. self.saved_todays_date = dict() # remember today´s date for each meter, needed to check for date rollover in order to calculate energy yesterday/today
  86. log.info('Meters:')
  87. #for meter in sorted(self.get_meters()): # does not work in Python 3, so dont sort for now
  88. # reading conffile_readings_names
  89. self.readingsNames = yaml.load(open(conffile_readings_names), Loader=yaml.FullLoader)
  90. for meter in self.get_meters():
  91. log.info('\t {} <--> {}'.format( meter['id'], meter['name']))
  92. def load_meter_type(self, metertype):
  93. log.info("Loading meter type: " + metertype)
  94. conffile_meter_type = "metertype_" + metertype + ".yml"
  95. assert path.exists(conffile_meter_type), 'Meters configuration not found: %s' % conffile_meter_type
  96. lastchange = self.meter_types_last_change.get(metertype, None)
  97. lastchange_file = path.getmtime(conffile_meter_type)
  98. if lastchange == None or (lastchange and lastchange_file != lastchange):
  99. try:
  100. log.info('Reloading meter type configuration for ' + metertype + 'as file changed')
  101. self.meter_types[metertype] = yaml.load(open(conffile_meter_type), Loader=yaml.FullLoader)
  102. self.meter_types_last_change[metertype] = lastchange_file
  103. log.debug('Reloaded meters configuration')
  104. except Exception as e:
  105. log.warning('Failed to re-load meter type configuration, going on with the old one.')
  106. log.warning(e)
  107. def check_load_reload_meter_types(self):
  108. log.debug("")
  109. log.debug("checking loaded meter types...")
  110. for metertype in self.meter_types:
  111. log.debug(metertype)
  112. self.load_meter_type(metertype)
  113. log.debug("")
  114. def get_meters(self):
  115. reloadconf = False
  116. ts = int(time.time())
  117. if self.meter_configuration_lastchecktime == None or (ts - self.meter_configuration_lastchecktime) > 60:
  118. self.meter_configuration_lastchecktime = ts
  119. assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
  120. if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
  121. reloadconf = True
  122. if reloadconf:
  123. try:
  124. log.info('Reloading meter map as file changed')
  125. new_map = yaml.load(open(self.meter_yaml), Loader=yaml.FullLoader)
  126. self.meter_map = new_map['meters']
  127. self.meter_map_last_change = path.getmtime(self.meter_yaml)
  128. log.debug('Reloaded meter map')
  129. for entry in self.meter_map:
  130. log.debug(entry['type'])
  131. self.load_meter_type(entry['type'])
  132. except Exception as e:
  133. log.warning('Failed to re-load meter map, going on with the old one.')
  134. log.warning(e)
  135. if self.meter_typesconfiguration_lastchecktime == None or (ts - self.meter_typesconfiguration_lastchecktime) > 60:
  136. self.meter_typesconfiguration_lastchecktime = ts
  137. self.check_load_reload_meter_types()
  138. return self.meter_map
  139. def collect_and_store(self):
  140. #instrument.debug = True
  141. meters = self.get_meters()
  142. instrument = minimalmodbus.Instrument(config['rs485'].get('serialdevice','/dev/ttyUSB0'), config['rs485'].getfloat('serialtimeout', 1.0))
  143. instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
  144. data_momentary = dict()
  145. data_energy = dict()
  146. meter_id_name = dict() # mapping id to name
  147. for meter in meters:
  148. meterReadingError_momentary = False
  149. meterReadingError_energy = False
  150. if conf_modbus_sleep_between_instruments > 0:
  151. time.sleep(conf_modbus_sleep_between_instruments)
  152. meter_id_name[meter['id']] = meter['name']
  153. instrument.serial.baudrate = meter['baudrate']
  154. instrument.serial.bytesize = meter['bytesize']
  155. if meter['parity'] == 'none':
  156. instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
  157. elif meter['parity'] == 'odd':
  158. instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
  159. elif meter['parity'] == 'even':
  160. instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
  161. else:
  162. log.error('No parity specified')
  163. raise
  164. instrument.serial.stopbits = meter['stopbits']
  165. instrument.serial.timeout = meter['timeout'] # seconds
  166. instrument.address = meter['id'] # this is the slave address number
  167. log.debug('\nReading meter %s \'%s\'' % (meter['id'], meter_id_name[meter['id']]))
  168. start_time = time.time()
  169. #if not self.meter_types.get(meter['type'], False):
  170. # self.load_meter_type(meter['type'])
  171. readings = self.meter_types[meter['type']]
  172. if args_output_verbose2:
  173. log.debug("")
  174. log.debug("Meter Type " + meter['type'] + " - defined readings:")
  175. log.debug(json.dumps(readings, indent = 4))
  176. log.debug("")
  177. data_momentary[meter['id']] = dict()
  178. data_energy[meter['id']] = dict()
  179. reading_success_momentary = 0
  180. for reading in readings['momentary']:
  181. # to prevent random readout errors, e.g. CRC check fail, sleep for a short time between the readings
  182. if conf_modbus_sleep_between_readings > 0:
  183. time.sleep(conf_modbus_sleep_between_readings) # Sleep between readings to avoid read errors
  184. retries = conf_modbus_read_retries
  185. # get decimals needed from meter_types config
  186. decimals = readings['momentary'][reading].get('decimals', conf_default_decimals)
  187. while retries > 0:
  188. try:
  189. retries -= 1
  190. data_momentary[meter['id']][reading] = round(instrument.read_float(readings['momentary'][reading]['address'], 4, 2), decimals)
  191. log.debug('OK read meter {}, {} retries => \'{}\' = \'{}\''.format(meter['id'], conf_modbus_read_retries - retries, reading, data_momentary[meter['id']][reading]))
  192. retries = 0
  193. reading_success_momentary += 1
  194. pass
  195. except ValueError as ve:
  196. log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
  197. .format(readings['momentary'][reading]['address'], meter['id'], retries))
  198. log.error(ve)
  199. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  200. raise RuntimeError
  201. except TypeError as te:
  202. log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
  203. .format(readings['momentary'][reading]['address'], meter['id'], retries))
  204. log.error(te)
  205. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  206. raise RuntimeError
  207. except IOError as ie:
  208. log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
  209. .format(readings['momentary'][reading]['address'], meter['id'], retries))
  210. log.error(ie)
  211. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  212. raise RuntimeError
  213. except:
  214. log.error("Unexpected error:", sys.exc_info()[0])
  215. raise
  216. if reading_success_momentary < len(readings['momentary']):
  217. log.debug("THERE WERE READING ERRORS")
  218. meterReadingError_momentary = True
  219. # report momentary interval
  220. reportMomentary = False
  221. if meters_interval_report_momentary > 0:
  222. ts = int(time.time())
  223. lastMomentaryReportTime = self.lastMomentaryReportTime.get(meter['id'], False)
  224. if lastMomentaryReportTime:
  225. tdiff = ts - lastMomentaryReportTime
  226. if (tdiff > meters_interval_report_momentary):
  227. log.debug('Reporting momentary readings for meter %s' % meter['id'])
  228. reportMomentary = True
  229. self.lastMomentaryReportTime[meter['id']] = ts
  230. else:
  231. log.debug('No lastMomentaryReportTime has yet been saved for meter %s' % meter['id'])
  232. reportMomentary = True
  233. self.lastMomentaryReportTime[meter['id']] = ts
  234. else:
  235. reportMomentary = True
  236. # override meters_interval_report_momentary if power has changed for more than configured powerdelta and no interval reporting is due in this iteration
  237. if config['meters'].getboolean('report_on_powerdelta_enable', False) and not reportMomentary:
  238. lastValues = self.data_momentary_last.get(meter['id'], None)
  239. for usedReading in data_momentary[meter['id']].keys():
  240. currentValue = data_momentary[meter['id']][usedReading]
  241. for powerreadingname in self.readingsNames['power']:
  242. if usedReading == powerreadingname:
  243. lastValue = None
  244. if lastValues != None:
  245. lastValue = lastValues.get(powerreadingname, None)
  246. if lastValue != None:
  247. if (currentValue >= meters_report_on_lowpower_treshold):
  248. powerdelta_high = meters_report_on_powerdelta_high
  249. powerdelta_low = meters_report_on_powerdelta_low
  250. else:
  251. powerdelta_high = meters_report_on_lowpower_powerdelta_high
  252. powerdelta_low = meters_report_on_lowpower_powerdelta_low
  253. if (currentValue > (lastValue * powerdelta_high)):
  254. log.debug(powerreadingname + " INCREASED by more than factor " + str(powerdelta_high) + " currentValue=" + str(currentValue) + " lastValue=" + str(lastValue))
  255. reportMomentary = True
  256. if (currentValue < (lastValue * powerdelta_low)):
  257. log.debug(powerreadingname + " DECREASED by more than factor " + str(powerdelta_low) + " currentValue=" + str(currentValue) + " lastValue=" + str(lastValue))
  258. reportMomentary = True
  259. if lastValues == None:
  260. self.data_momentary_last[meter['id']] = dict()
  261. self.data_momentary_last[meter['id']][powerreadingname] = data_momentary[meter['id']][powerreadingname]
  262. # influxdb
  263. t_utc = datetime.datetime.utcnow()
  264. t_str = t_utc.isoformat() + 'Z'
  265. if conf_store_in_influxdb and not meterReadingError_momentary and reportMomentary:
  266. jsondata_momentary = [
  267. {
  268. 'measurement': 'energy',
  269. 'tags': {
  270. 'meter': meter_id_name[meter['id']],
  271. },
  272. 'time': t_str,
  273. 'fields': data_momentary[meter['id']]
  274. }
  275. ]
  276. if args_output_verbose1:
  277. print(json.dumps(jsondata_momentary, indent = 4))
  278. try:
  279. self.influx_client_momentary.write_points(jsondata_momentary)
  280. except Exception as e:
  281. log.error('Data not written!')
  282. log.error(e)
  283. if conf_send_meters_readTime:
  284. readtime = round(time.time() - start_time, 3)
  285. log.debug("Read time: " + str(readtime))
  286. data_momentary[meter['id']]['Read time'] = readtime
  287. if conf_mqtt_enabled and conf_publish_on_mqtt:
  288. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/ReadTime", str(readtime))
  289. if conf_mqtt_enabled and conf_publish_on_mqtt and reportMomentary:
  290. for reading in readings['momentary']:
  291. tmpreading = data_momentary[meter['id']].get(reading, None)
  292. if tmpreading != None:
  293. if tmpreading.is_integer():
  294. tmpreading = int(tmpreading)
  295. #mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str('{0:.3f}'.format(tmpreading)))
  296. log.debug("MQTT pub: '"+conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading + "' = '" + str(tmpreading) + "'")
  297. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str(tmpreading))
  298. if meters_use_only_one_interval:
  299. readEnergyData = True
  300. else:
  301. readEnergyData = False
  302. ts = int(time.time())
  303. lastUpdate = self.lastEnergyUpdate.get(meter['id'], False)
  304. if lastUpdate:
  305. tdiff = ts - lastUpdate
  306. if (tdiff > meters_interval_energy):
  307. readEnergyData = True
  308. self.lastEnergyUpdate[meter['id']] = ts
  309. else:
  310. log.debug('No lastEnergyUpdate has yet been saved for meter %s' % meter['id'])
  311. readEnergyData = True
  312. self.lastEnergyUpdate[meter['id']] = ts
  313. # save and restore yesterday´s total energy to calculate today´s energy
  314. # check if total energy from yesterday is stored in memory, if not try to get it from saved file
  315. today = datetime.date.today()
  316. today_str = today.strftime('%Y%m%d')
  317. yesterday = today - datetime.timedelta(days = 1)
  318. yesterday_str = yesterday.strftime('%Y%m%d')
  319. # check for date rollover
  320. dateRollover = False
  321. savedtoday = self.saved_todays_date.get(meter['id'], False)
  322. if not savedtoday or savedtoday != today:
  323. log.debug("date rollover happened or no date has been saved yet for meter " + str(meter['id']))
  324. if savedtoday and savedtoday == yesterday:
  325. # a date rollover just happened, so change todays date to current and proceed with what has to be done
  326. dateRollover = True
  327. readEnergyData = True
  328. #log.debug(savedtoday)
  329. self.saved_todays_date[meter['id']] = today
  330. if readEnergyData:
  331. reading_success_energy = 0
  332. for reading in readings['energy']:
  333. # to prevent random readout errors, e.g. CRC check fail, sleep for a short time between the readings
  334. if conf_modbus_sleep_between_readings > 0:
  335. time.sleep(conf_modbus_sleep_between_readings) # Sleep between readings to avoid read errors
  336. retries = conf_modbus_read_retries
  337. # get decimals needed from meter_types config
  338. decimals = readings['energy'][reading].get('decimals', conf_default_decimals)
  339. while retries > 0:
  340. try:
  341. retries -= 1
  342. data_energy[meter['id']][reading] = round(instrument.read_float(readings['energy'][reading]['address'], 4, 2), decimals)
  343. log.debug('OK read meter {}, {} retries => \'{}\' = \'{}\''
  344. .format(meter['id'], conf_modbus_read_retries - retries, reading, data_energy[meter['id']][reading]))
  345. reading_success_energy += 1
  346. retries = 0
  347. pass
  348. except ValueError as ve:
  349. log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
  350. .format(readings['energy'][reading]['address'], meter['id'], retries))
  351. log.error(ve)
  352. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  353. raise RuntimeError
  354. except TypeError as te:
  355. log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
  356. .format(readings['energy'][reading]['address'], meter['id'], retries))
  357. log.error(te)
  358. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  359. raise RuntimeError
  360. except IOError as ie:
  361. log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
  362. .format(readings['energy'][reading]['address'], meter['id'], retries))
  363. log.error(ie)
  364. if retries == 0 and conf_modbus_raise_error_on_reading_failure:
  365. raise RuntimeError
  366. except:
  367. log.error("Unexpected error:", sys.exc_info()[0])
  368. if conf_modbus_raise_error_on_reading_failure:
  369. raise
  370. if reading_success_energy < len(readings['energy']):
  371. log.debug("THERE WERE READING ERRORS")
  372. meterReadingError_energy = True
  373. file_path_meter = conf_storage_path + meter_id_name[meter['id']] + "/"
  374. file_today_min = file_path_meter + today_str + "_min.txt"
  375. file_yesterday_total = file_path_meter + yesterday_str + "_total.txt"
  376. energy_today_total = 0
  377. energy_yesterday_min = 0
  378. energy_today_min = self.saved_energy_today_min.get(meter['id'], None)
  379. if dateRollover:
  380. energy_today_min = None
  381. if energy_today_min == None:
  382. exists = os.path.isfile(file_today_min)
  383. if exists:
  384. # load energy_today_min from file if exists
  385. f = open(file_today_min, "r")
  386. if f.mode == 'r':
  387. contents = f.read()
  388. f.close()
  389. energy_today_min = float(contents)
  390. self.saved_energy_today_min[meter['id']] = energy_today_min
  391. log.debug(meter_id_name[meter['id']] + " - Energy Today min read from file -> = " + str(energy_today_min) + " kWh")
  392. else:
  393. # save current Energy_total to min-file
  394. if not os.path.exists(file_path_meter):
  395. os.mkdir(file_path_meter)
  396. f = open(file_today_min, "w+")
  397. energy_today_min = data_energy[meter['id']][self.readingsNames['energy_total']]
  398. self.saved_energy_today_min[meter['id']] = energy_today_min
  399. f.write(str('{0:.3f}'.format(energy_today_min)))
  400. f.close()
  401. log.debug(meter_id_name[meter['id']] + " - Energy Today Min: " + str(energy_today_min) + " kWh")
  402. try:
  403. energy_today_total = data_energy[meter['id']][self.readingsNames['energy_total']] - energy_today_min
  404. log.debug(meter_id_name[meter['id']] + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total)) + " kWh")
  405. except:
  406. pass
  407. energy_yesterday_total = self.saved_energy_yesterday_total.get(meter['id'], None)
  408. if dateRollover:
  409. energy_yesterday_total = None
  410. if energy_yesterday_total == None:
  411. exists = os.path.isfile(file_yesterday_total)
  412. if exists:
  413. # load energy_yesterday_total from file if exists
  414. f = open(file_yesterday_total, "r")
  415. if f.mode == 'r':
  416. contents = f.read()
  417. f.close()
  418. energy_yesterday_total = float(contents)
  419. self.saved_energy_yesterday_total[meter['id']] = energy_yesterday_total
  420. log.debug(meter_id_name[meter['id']] + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total) + " kWh")
  421. else:
  422. file_yesterday_min = file_path_meter + yesterday_str + "_min.txt"
  423. exists = os.path.isfile(file_yesterday_min)
  424. if exists:
  425. # load yesterday_min from file
  426. #if args_output_verbose1:
  427. # print("file file_yesterday_min exists")
  428. f = open(file_yesterday_min, "r")
  429. if f.mode == 'r':
  430. contents =f.read()
  431. f.close()
  432. energy_yesterday_min = float(contents)
  433. log.debug(meter_id_name[meter['id']] + " - Energy yesterday min: " + str(energy_yesterday_min) + " kWh")
  434. energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3)
  435. ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total))
  436. if not os.path.exists(file_path_meter):
  437. os.mkdir(file_path_meter)
  438. f = open(file_yesterday_total, "w+")
  439. f.write(str('{0:.3f}'.format(energy_yesterday_total)))
  440. f.close()
  441. #else:
  442. # # file yesterday_min does not exist
  443. log.debug(meter_id_name[meter['id']] + " - Energy Yesterday Total: " + str(energy_yesterday_total) + " kWh")
  444. if influxdb_write_energy_today_total:
  445. data_energy[meter['id']][self.readingsNames['energy_today']] = energy_today_total
  446. if influxdb_write_energy_yesterday_total:
  447. data_energy[meter['id']][self.readingsNames['energy_yesterday']] = energy_yesterday_total
  448. t_utc = datetime.datetime.utcnow()
  449. t_str = t_utc.isoformat() + 'Z'
  450. if conf_store_in_influxdb and not meterReadingError_energy:
  451. jsondata_energy = [
  452. {
  453. 'measurement': 'energy',
  454. 'tags': {
  455. 'meter': meter_id_name[meter['id']],
  456. },
  457. 'time': t_str,
  458. 'fields': data_energy[meter['id']]
  459. }
  460. ]
  461. if args_output_verbose1:
  462. print(json.dumps(jsondata_energy, indent = 4))
  463. try:
  464. self.influx_client_energy.write_points(jsondata_energy)
  465. except Exception as e:
  466. log.error('Data not written!')
  467. log.error(e)
  468. if conf_send_meters_readTime:
  469. readtime = round(time.time() - start_time, 3)
  470. log.debug("Read time: " + str(readtime))
  471. data_energy[meter['id']]['Read time'] = readtime
  472. if conf_mqtt_enabled and conf_publish_on_mqtt:
  473. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/ReadTime", str(readtime))
  474. if conf_mqtt_enabled and conf_publish_on_mqtt:
  475. for reading in readings['energy']:
  476. tmpreading = data_energy[meter['id']].get(reading, None)
  477. if tmpreading != None:
  478. if tmpreading.is_integer():
  479. tmpreading = int(tmpreading)
  480. #mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str('{0:.3f}'.format(tmpreading)))
  481. log.debug("MQTT pub: '"+conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading + "' = '" + str(tmpreading) + "'")
  482. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str(tmpreading))
  483. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + self.readingsNames['energy_today'], str('{0:.3f}'.format(energy_today_total)))
  484. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + self.readingsNames['energy_yesterday'], str('{0:.3f}'.format(energy_yesterday_total)))
  485. if meterReadingError_momentary or meterReadingError_energy:
  486. ts = int(time.time())
  487. lasterrortime = self.lastReadingErrorTime.get(meter['id'], 0)
  488. if lasterrortime == 0:
  489. self.lastReadingErrorTime[meter['id']] = ts
  490. elif (ts - lasterrortime) > conf_readingerror_publish_after:
  491. lasterrorpubtime = self.lastReadingErrorPublishtime.get(meter['id'], 0)
  492. if lasterrorpubtime == 0 or (lasterrorpubtime > 0 and (ts - lasterrorpubtime) > conf_readingerror_publish_interval):
  493. self.lastReadingErrorPublishtime[meter['id']] = ts
  494. if conf_mqtt_enabled and conf_publish_on_mqtt:
  495. lasterrortime_str = datetime.datetime.fromtimestamp(lasterrortime).strftime("%Y-%m-%d %H:%M:%S")
  496. mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/STATE", "ERROR: could not read MODBUS meter " + meter_id_name[meter['id']] + " with ID=" + str(meter['id']) + " since " + str(lasterrortime_str))
  497. mqttc.publish(conf_mqtt_topic_error, "ERROR: could not read MODBUS meter " + meter_id_name[meter['id']] + " with ID=" + str(meter['id']) + " since " + str(lasterrortime_str))
  498. else:
  499. self.lastReadingErrorTime[meter['id']] = 0
  500. # END class DataCollector
  501. ################################
  502. def mqtt_on_connect(client, userdata, flags, rc):
  503. if args_output_verbose1:
  504. print("MQTT connected with result code " + str(rc))
  505. #client.subscribe("some/topic")
  506. def mqtt_on_disconnect(client, userdata, rc):
  507. if rc != 0:
  508. if print_errors:
  509. print("Unexpected MQTT disconnection. Will auto-reconnect")
  510. def repeat(interval_sec, max_iter, func, *args, **kwargs):
  511. from itertools import count
  512. starttime = 0
  513. for i in count():
  514. if i > 0 and interval_sec > 0: # do not wait for interval time on first run
  515. if ((time.time() - starttime) < interval_sec):
  516. sleeptime = interval_sec - (time.time() - starttime)
  517. print("\nsleep " + str(sleeptime) + " s")
  518. time.sleep(sleeptime)
  519. try:
  520. starttime = time.time()
  521. func(*args, **kwargs)
  522. except Exception as ex:
  523. log.error(ex)
  524. if max_iter and i >= max_iter:
  525. return
  526. if __name__ == '__main__':
  527. import argparse
  528. parser = argparse.ArgumentParser()
  529. parser.add_argument('--interval', default=meters_interval_momentary,
  530. help='Meter readout interval for momentary values i.e. power, current... - in seconds, default 1s')
  531. parser.add_argument('--energyinterval', default=meters_interval_energy,
  532. help='Meter readout interval for energy values, i.e. total kWh - in seconds, default 60s')
  533. parser.add_argument('--use-only-one-interval', default=False,
  534. help='Meter readout interval for energy values, i.e. total kWh - in seconds, default 60s', action='store_true')
  535. parser.add_argument('--meters', default='meters.yml',
  536. help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
  537. #parser.add_argument('--verbose', '-v', default=0, help='print read data from the instruments to console', action='store_true')
  538. parser.add_argument('--verbose', '-v', type=int, default=0, choices=[1, 2], help='print read data from the instruments to console')
  539. parser.add_argument('--log', default='CRITICAL',
  540. help='Log levels, DEBUG, INFO, WARNING, ERROR or CRITICAL')
  541. parser.add_argument('--logfile', default='',
  542. help='Specify log file, if not specified the log is streamed to console')
  543. args = parser.parse_args()
  544. loglevel = args.log.upper()
  545. logfile = args.logfile
  546. # Setup logging
  547. log = logging.getLogger('energy-logger')
  548. log.setLevel(getattr(logging, loglevel))
  549. if logfile:
  550. loghandle = logging.FileHandler(logfile, 'w')
  551. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  552. loghandle.setFormatter(formatter)
  553. else:
  554. loghandle = logging.StreamHandler()
  555. log.addHandler(loghandle)
  556. log.info('Started app')
  557. #if args.verbose:
  558. if int(args.verbose) == 1 or int(args.verbose) == None:
  559. args_output_verbose1 = True
  560. args_output_verbose2 = False
  561. log.info("Verbose Level 1 ON - printing read data to console.")
  562. elif int(args.verbose) == 2:
  563. args_output_verbose1 = True
  564. args_output_verbose2 = True
  565. log.info("Verbose Level 2 ON - printing read data and more to console.")
  566. interval = int(args.interval)
  567. log.info("Interval 1 (for MOMENTARY readings): " + str(interval) + " s")
  568. if args.use_only_one_interval:
  569. meters_use_only_one_interval = True
  570. log.info("Using only Interval 1")
  571. else:
  572. meters_interval_energy = int(args.energyinterval)
  573. log.info("Interval 2 (for ENERGY readings): " + str(meters_interval_energy) + " s")
  574. # create MQTT client object
  575. if conf_mqtt_enabled:
  576. mqttc = mqtt.Client()
  577. mqttc.on_connect = mqtt_on_connect
  578. mqttc.on_disconnect = mqtt_on_disconnect
  579. ##mqttc.on_message = on_message # callback for incoming msg (unused)
  580. if len(config['mqtt'].get('password')) > 0 or len(config['mqtt'].get('server')) > 0:
  581. mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password'))
  582. mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port', 1883), 60)
  583. mqttc.loop_start()
  584. #mqttc.loop_forever()
  585. # Create the InfluxDB object
  586. if config['influxdb'].getboolean('separate_momentary_database', False):
  587. influxclient_energy = InfluxDBClient(config['influxdb'].get('host'),
  588. config['influxdb'].getint('port', 8086),
  589. config['influxdb'].get('user'),
  590. config['influxdb'].get('password'),
  591. config['influxdb'].get('database'))
  592. influxclient_momentary = InfluxDBClient(config['influxdb_momentary'].get('host'),
  593. config['influxdb_momentary'].getint('port', 8086),
  594. config['influxdb_momentary'].get('user'),
  595. config['influxdb_momentary'].get('password'),
  596. config['influxdb_momentary'].get('database'))
  597. else:
  598. influxclient_energy = InfluxDBClient(config['influxdb'].get('host'),
  599. config['influxdb'].getint('port', 8086),
  600. config['influxdb'].get('user'),
  601. config['influxdb'].get('password'),
  602. config['influxdb'].get('database'))
  603. influxclient_momentary = InfluxDBClient(config['influxdb'].get('host'),
  604. config['influxdb'].getint('port', 8086),
  605. config['influxdb'].get('user'),
  606. config['influxdb'].get('password'),
  607. config['influxdb'].get('database'))
  608. collector = DataCollector(influx_client_momentary=influxclient_momentary,
  609. influx_client_energy=influxclient_energy,
  610. meter_yaml=args.meters)
  611. repeat(interval,
  612. max_iter=collector.max_iterations,
  613. func=lambda: collector.collect_and_store())