|
- #!/usr/bin/env python3
- from influxdb import InfluxDBClient
- from os import path
- import configparser
- import sys
- import os
- import minimalmodbus
- import time
- import datetime
- import yaml
- import logging
- import json
- import paho.mqtt.client as mqtt
- print_errors = True
- # Change working dir to the same dir as this script
- os.chdir(sys.path[0])
- config = configparser.ConfigParser()
- config.read('modbuslog.ini')
- #print(config.sections())
- # additional conffile names
- conffile_meter_types = 'meter_types.yml'
- conffile_readings_names = 'readings_names.yml'
- # config vars used more than once or updateable via commandline argument are stored as global vars
- conf_modbus_read_retries = config['rs485'].getint('read_retries', 4)
- conf_modbus_raise_error_on_reading_failure = config['rs485'].getboolean('raise_error_on_reading_failure', False)
- conf_modbus_sleep_between_readings = config['rs485'].getfloat('sleep_between_readings', 0.1)
- conf_modbus_sleep_between_instruments = config['rs485'].getfloat('sleep_between_instruments', 0.7)
- conf_publish_on_mqtt = config['main'].getboolean('publish_on_mqtt', False)
- conf_store_in_influxdb = config['main'].getboolean('store_in_influxdb', False)
- conf_mqtt_enabled = config['mqtt'].getboolean('enable', False)
- conf_mqtt_topic_prefix = config['mqtt'].get('topic_prefix') #must NOT end with / !!
- if conf_mqtt_topic_prefix[-1:] == '/':
- conf_mqtt_topic_prefix = conf_mqtt_topic_prefix[0:-1]
-
- conf_mqtt_topic_error = config['mqtt'].get('topic_error') #must NOT end with / !!
- conf_storage_path = config['filelog'].get('storage_path')
- if conf_storage_path[-1:] != '/':
- conf_storage_path += '/'
-
- meters_interval_momentary = config['meters'].getint('interval_momentary', 1) # s - base interval for reading instruments
- meters_interval_report_momentary = config['meters'].getint('interval_report_momentary', 60) # interval for reporting momentary readings, 0 to report immediately, overruled by powerdelta settings
- meters_interval_energy = config['meters'].getint('interval_energy', 60) # s - interval for reporting kWh-readings
- # for now this is not a seperate function but based on "meters_interval_momentary"
- # easuring elapsed time since last reading, so actual interval can vary, especially when
- # high "meters_interval_momentary" is set. To avoid that set
- # "meters_interval_momentary" (= command parameter --interval) to desired value
- # and configure "meters_use_only_one_interval" to True
- meters_use_only_one_interval = config['meters'].getboolean('use_only_one_interval', False) # use only interval 1 "meters_interval_momentary"
- meters_report_on_powerdelta_low = config['meters'].getfloat('report_on_powerdelta_low', 0.95) # % in decimal notation - immediately report if power value changes by more then this %
- meters_report_on_powerdelta_high = config['meters'].getfloat('report_on_powerdelta_high', 1.05) # % in decimal notation - immediately report if power value changes by more then this %
- meters_report_on_lowpower_treshold = config['meters'].getint('report_on_lowpower_treshold', 10) # treshold under which measured power is considered "low" and different powerdeltas are used
- meters_report_on_lowpower_powerdelta_low = config['meters'].getfloat('report_on_lowpower_powerdelta_low', 0.70) # % in decimal notation - immediately report if power value changes by more then this %
- meters_report_on_lowpower_powerdelta_high = config['meters'].getfloat('report_on_lowpower_powerdelta_high', 1.30) # % in decimal notation - immediately report if power value changes by more then this %
- ##report_instrument_read_retries = config['rs485'].getboolean('report_instrument_read_retries', False)
- conf_send_meters_readTime = config['meters'].getboolean('send_readtime', True)
- conf_default_decimals = config['readings'].getint('default_decimals', 3)
- conf_readingerror_publish_after = config['rs485'].getint('readingerror_publish_after', 60) # time in s after that repeated instrument reading errors are published via MQTT
- conf_readingerror_publish_interval = config['rs485'].getint('readingerror_publish_interval', 300) # interval in s to publish repeated instrument reading errors via MQTT
- # -------------------------------------------------------------
- # global variables - not for configuration
- args_output_verbose1 = False
- args_output_verbose2 = False
- influxdb_write_energy_today_total = True
- influxdb_write_energy_yesterday_total = True
- class DataCollector:
- def __init__(self, influx_client_momentary, influx_client_energy, meter_yaml):
- self.influx_client_momentary = influx_client_momentary
- self.influx_client_energy = influx_client_energy
- self.meter_yaml = meter_yaml
- self.max_iterations = None # run indefinitely by default
- #self.meter_types = None
- self.meter_types = dict()
- self.meter_types_last_change = dict()
- #self.meter_types_last_change = -1
- self.meter_map = None
- self.meter_map_last_change = -1
- self.meter_configuration_lastchecktime = None
- self.meter_typesconfiguration_lastchecktime = None
- self.lastMomentaryReportTime = dict()
- self.lastEnergyUpdate = dict()
- self.lastReadingErrorTime = dict()
- self.lastReadingErrorPublishtime = dict()
- #self.totalEnergy = dict()
- self.saved_energy_today_min = dict()
- self.data_momentary_last = dict()
- self.saved_energy_yesterday_total = dict() # remember total energy for each meter
- self.saved_todays_date = dict() # remember today´s date for each meter, needed to check for date rollover in order to calculate energy yesterday/today
- log.info('Meters:')
- #for meter in sorted(self.get_meters()): # does not work in Python 3, so dont sort for now
-
- # reading conffile_readings_names
- self.readingsNames = yaml.load(open(conffile_readings_names), Loader=yaml.FullLoader)
-
- for meter in self.get_meters():
- log.info('\t {} <--> {}'.format( meter['id'], meter['name']))
-
- def load_meter_type(self, metertype):
- log.info("Loading meter type: " + metertype)
- conffile_meter_type = "metertype_" + metertype + ".yml"
- assert path.exists(conffile_meter_type), 'Meters configuration not found: %s' % conffile_meter_type
- lastchange = self.meter_types_last_change.get(metertype, None)
- lastchange_file = path.getmtime(conffile_meter_type)
- if lastchange == None or (lastchange and lastchange_file != lastchange):
- try:
- log.info('Reloading meter type configuration for ' + metertype + 'as file changed')
- self.meter_types[metertype] = yaml.load(open(conffile_meter_type), Loader=yaml.FullLoader)
- self.meter_types_last_change[metertype] = lastchange_file
- log.debug('Reloaded meters configuration')
- except Exception as e:
- log.warning('Failed to re-load meter type configuration, going on with the old one.')
- log.warning(e)
-
- def check_load_reload_meter_types(self):
- log.debug("")
- log.debug("checking loaded meter types...")
- for metertype in self.meter_types:
- log.debug(metertype)
- self.load_meter_type(metertype)
- log.debug("")
-
- def get_meters(self):
- reloadconf = False
- ts = int(time.time())
- if self.meter_configuration_lastchecktime == None or (ts - self.meter_configuration_lastchecktime) > 60:
- self.meter_configuration_lastchecktime = ts
- assert path.exists(self.meter_yaml), 'Meter map not found: %s' % self.meter_yaml
- if path.getmtime(self.meter_yaml) != self.meter_map_last_change:
- reloadconf = True
- if reloadconf:
- try:
- log.info('Reloading meter map as file changed')
- new_map = yaml.load(open(self.meter_yaml), Loader=yaml.FullLoader)
- self.meter_map = new_map['meters']
- self.meter_map_last_change = path.getmtime(self.meter_yaml)
- log.debug('Reloaded meter map')
- for entry in self.meter_map:
- log.debug(entry['type'])
- self.load_meter_type(entry['type'])
- except Exception as e:
- log.warning('Failed to re-load meter map, going on with the old one.')
- log.warning(e)
-
- if self.meter_typesconfiguration_lastchecktime == None or (ts - self.meter_typesconfiguration_lastchecktime) > 60:
- self.meter_typesconfiguration_lastchecktime = ts
- self.check_load_reload_meter_types()
-
- return self.meter_map
- def collect_and_store(self):
- #instrument.debug = True
- meters = self.get_meters()
-
- instrument = minimalmodbus.Instrument(config['rs485'].get('serialdevice','/dev/ttyUSB0'), config['rs485'].getfloat('serialtimeout', 1.0))
- instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
-
- data_momentary = dict()
- data_energy = dict()
- meter_id_name = dict() # mapping id to name
- for meter in meters:
- meterReadingError_momentary = False
- meterReadingError_energy = False
- if conf_modbus_sleep_between_instruments > 0:
- time.sleep(conf_modbus_sleep_between_instruments)
- meter_id_name[meter['id']] = meter['name']
- instrument.serial.baudrate = meter['baudrate']
- instrument.serial.bytesize = meter['bytesize']
- if meter['parity'] == 'none':
- instrument.serial.parity = minimalmodbus.serial.PARITY_NONE
- elif meter['parity'] == 'odd':
- instrument.serial.parity = minimalmodbus.serial.PARITY_ODD
- elif meter['parity'] == 'even':
- instrument.serial.parity = minimalmodbus.serial.PARITY_EVEN
- else:
- log.error('No parity specified')
- raise
- instrument.serial.stopbits = meter['stopbits']
- instrument.serial.timeout = meter['timeout'] # seconds
- instrument.address = meter['id'] # this is the slave address number
- log.debug('\nReading meter %s \'%s\'' % (meter['id'], meter_id_name[meter['id']]))
- start_time = time.time()
-
- #if not self.meter_types.get(meter['type'], False):
- # self.load_meter_type(meter['type'])
-
- readings = self.meter_types[meter['type']]
- if args_output_verbose2:
- log.debug("")
- log.debug("Meter Type " + meter['type'] + " - defined readings:")
- log.debug(json.dumps(readings, indent = 4))
- log.debug("")
-
- data_momentary[meter['id']] = dict()
- data_energy[meter['id']] = dict()
- reading_success_momentary = 0
- for reading in readings['momentary']:
- # to prevent random readout errors, e.g. CRC check fail, sleep for a short time between the readings
- if conf_modbus_sleep_between_readings > 0:
- time.sleep(conf_modbus_sleep_between_readings) # Sleep between readings to avoid read errors
- retries = conf_modbus_read_retries
-
- # get decimals needed from meter_types config
- decimals = readings['momentary'][reading].get('decimals', conf_default_decimals)
-
- while retries > 0:
- try:
- retries -= 1
- data_momentary[meter['id']][reading] = round(instrument.read_float(readings['momentary'][reading]['address'], 4, 2), decimals)
- log.debug('OK read meter {}, {} retries => \'{}\' = \'{}\''.format(meter['id'], conf_modbus_read_retries - retries, reading, data_momentary[meter['id']][reading]))
- retries = 0
- reading_success_momentary += 1
- pass
- except ValueError as ve:
- log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['momentary'][reading]['address'], meter['id'], retries))
- log.error(ve)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except TypeError as te:
- log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['momentary'][reading]['address'], meter['id'], retries))
- log.error(te)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except IOError as ie:
- log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['momentary'][reading]['address'], meter['id'], retries))
- log.error(ie)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except:
- log.error("Unexpected error:", sys.exc_info()[0])
- raise
- if reading_success_momentary < len(readings['momentary']):
- log.debug("THERE WERE READING ERRORS")
- meterReadingError_momentary = True
-
- # report momentary interval
- reportMomentary = False
- if meters_interval_report_momentary > 0:
- ts = int(time.time())
- lastMomentaryReportTime = self.lastMomentaryReportTime.get(meter['id'], False)
- if lastMomentaryReportTime:
- tdiff = ts - lastMomentaryReportTime
- if (tdiff > meters_interval_report_momentary):
- log.debug('Reporting momentary readings for meter %s' % meter['id'])
- reportMomentary = True
- self.lastMomentaryReportTime[meter['id']] = ts
- else:
- log.debug('No lastMomentaryReportTime has yet been saved for meter %s' % meter['id'])
- reportMomentary = True
- self.lastMomentaryReportTime[meter['id']] = ts
- else:
- reportMomentary = True
-
-
- # override meters_interval_report_momentary if power has changed for more than configured powerdelta and no interval reporting is due in this iteration
- if config['meters'].getboolean('report_on_powerdelta_enable', False) and not reportMomentary:
- lastValues = self.data_momentary_last.get(meter['id'], None)
-
- for usedReading in data_momentary[meter['id']].keys():
- currentValue = data_momentary[meter['id']][usedReading]
- for powerreadingname in self.readingsNames['power']:
- if usedReading == powerreadingname:
- lastValue = None
- if lastValues != None:
- lastValue = lastValues.get(powerreadingname, None)
- if lastValue != None:
- if (currentValue >= meters_report_on_lowpower_treshold):
- powerdelta_high = meters_report_on_powerdelta_high
- powerdelta_low = meters_report_on_powerdelta_low
- else:
- powerdelta_high = meters_report_on_lowpower_powerdelta_high
- powerdelta_low = meters_report_on_lowpower_powerdelta_low
-
- if (currentValue > (lastValue * powerdelta_high)):
- log.debug(powerreadingname + " INCREASED by more than factor " + str(powerdelta_high) + " currentValue=" + str(currentValue) + " lastValue=" + str(lastValue))
- reportMomentary = True
- if (currentValue < (lastValue * powerdelta_low)):
- log.debug(powerreadingname + " DECREASED by more than factor " + str(powerdelta_low) + " currentValue=" + str(currentValue) + " lastValue=" + str(lastValue))
- reportMomentary = True
- if lastValues == None:
- self.data_momentary_last[meter['id']] = dict()
- self.data_momentary_last[meter['id']][powerreadingname] = data_momentary[meter['id']][powerreadingname]
-
-
- # influxdb
- t_utc = datetime.datetime.utcnow()
- t_str = t_utc.isoformat() + 'Z'
-
- if conf_store_in_influxdb and not meterReadingError_momentary and reportMomentary:
- jsondata_momentary = [
- {
- 'measurement': 'energy',
- 'tags': {
- 'meter': meter_id_name[meter['id']],
- },
- 'time': t_str,
- 'fields': data_momentary[meter['id']]
- }
- ]
- if args_output_verbose1:
- print(json.dumps(jsondata_momentary, indent = 4))
- try:
- self.influx_client_momentary.write_points(jsondata_momentary)
- except Exception as e:
- log.error('Data not written!')
- log.error(e)
-
- if conf_send_meters_readTime:
- readtime = round(time.time() - start_time, 3)
- log.debug("Read time: " + str(readtime))
- data_momentary[meter['id']]['Read time'] = readtime
- if conf_mqtt_enabled and conf_publish_on_mqtt:
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/ReadTime", str(readtime))
-
- if conf_mqtt_enabled and conf_publish_on_mqtt and reportMomentary:
- for reading in readings['momentary']:
- tmpreading = data_momentary[meter['id']].get(reading, None)
- if tmpreading != None:
- if tmpreading.is_integer():
- tmpreading = int(tmpreading)
- #mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str('{0:.3f}'.format(tmpreading)))
- log.debug("MQTT pub: '"+conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading + "' = '" + str(tmpreading) + "'")
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str(tmpreading))
-
-
-
- if meters_use_only_one_interval:
- readEnergyData = True
- else:
- readEnergyData = False
- ts = int(time.time())
- lastUpdate = self.lastEnergyUpdate.get(meter['id'], False)
- if lastUpdate:
- tdiff = ts - lastUpdate
- if (tdiff > meters_interval_energy):
- readEnergyData = True
- self.lastEnergyUpdate[meter['id']] = ts
- else:
- log.debug('No lastEnergyUpdate has yet been saved for meter %s' % meter['id'])
- readEnergyData = True
- self.lastEnergyUpdate[meter['id']] = ts
-
-
-
- # save and restore yesterday´s total energy to calculate today´s energy
- # check if total energy from yesterday is stored in memory, if not try to get it from saved file
- today = datetime.date.today()
- today_str = today.strftime('%Y%m%d')
- yesterday = today - datetime.timedelta(days = 1)
- yesterday_str = yesterday.strftime('%Y%m%d')
-
- # check for date rollover
- dateRollover = False
- savedtoday = self.saved_todays_date.get(meter['id'], False)
- if not savedtoday or savedtoday != today:
- log.debug("date rollover happened or no date has been saved yet for meter " + str(meter['id']))
- if savedtoday and savedtoday == yesterday:
- # a date rollover just happened, so change todays date to current and proceed with what has to be done
- dateRollover = True
- readEnergyData = True
- #log.debug(savedtoday)
- self.saved_todays_date[meter['id']] = today
-
-
- if readEnergyData:
- reading_success_energy = 0
- for reading in readings['energy']:
- # to prevent random readout errors, e.g. CRC check fail, sleep for a short time between the readings
- if conf_modbus_sleep_between_readings > 0:
- time.sleep(conf_modbus_sleep_between_readings) # Sleep between readings to avoid read errors
- retries = conf_modbus_read_retries
-
- # get decimals needed from meter_types config
- decimals = readings['energy'][reading].get('decimals', conf_default_decimals)
-
- while retries > 0:
- try:
- retries -= 1
- data_energy[meter['id']][reading] = round(instrument.read_float(readings['energy'][reading]['address'], 4, 2), decimals)
- log.debug('OK read meter {}, {} retries => \'{}\' = \'{}\''
- .format(meter['id'], conf_modbus_read_retries - retries, reading, data_energy[meter['id']][reading]))
- reading_success_energy += 1
- retries = 0
- pass
- except ValueError as ve:
- log.warning('Value Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['energy'][reading]['address'], meter['id'], retries))
- log.error(ve)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except TypeError as te:
- log.warning('Type Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['energy'][reading]['address'], meter['id'], retries))
- log.error(te)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except IOError as ie:
- log.warning('IO Error while reading register {} from meter {}. Retries left {}.'
- .format(readings['energy'][reading]['address'], meter['id'], retries))
- log.error(ie)
- if retries == 0 and conf_modbus_raise_error_on_reading_failure:
- raise RuntimeError
- except:
- log.error("Unexpected error:", sys.exc_info()[0])
- if conf_modbus_raise_error_on_reading_failure:
- raise
- if reading_success_energy < len(readings['energy']):
- log.debug("THERE WERE READING ERRORS")
- meterReadingError_energy = True
-
-
- file_path_meter = conf_storage_path + meter_id_name[meter['id']] + "/"
- file_today_min = file_path_meter + today_str + "_min.txt"
- file_yesterday_total = file_path_meter + yesterday_str + "_total.txt"
-
- energy_today_total = 0
- energy_yesterday_min = 0
- energy_today_min = self.saved_energy_today_min.get(meter['id'], None)
-
- if dateRollover:
- energy_today_min = None
- if energy_today_min == None:
- exists = os.path.isfile(file_today_min)
- if exists:
- # load energy_today_min from file if exists
- f = open(file_today_min, "r")
- if f.mode == 'r':
- contents = f.read()
- f.close()
- energy_today_min = float(contents)
- self.saved_energy_today_min[meter['id']] = energy_today_min
- log.debug(meter_id_name[meter['id']] + " - Energy Today min read from file -> = " + str(energy_today_min) + " kWh")
- else:
- # save current Energy_total to min-file
- if not os.path.exists(file_path_meter):
- os.mkdir(file_path_meter)
- f = open(file_today_min, "w+")
- energy_today_min = data_energy[meter['id']][self.readingsNames['energy_total']]
- self.saved_energy_today_min[meter['id']] = energy_today_min
- f.write(str('{0:.3f}'.format(energy_today_min)))
- f.close()
- log.debug(meter_id_name[meter['id']] + " - Energy Today Min: " + str(energy_today_min) + " kWh")
-
- try:
- energy_today_total = data_energy[meter['id']][self.readingsNames['energy_total']] - energy_today_min
- log.debug(meter_id_name[meter['id']] + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total)) + " kWh")
- except:
- pass
-
-
-
- energy_yesterday_total = self.saved_energy_yesterday_total.get(meter['id'], None)
- if dateRollover:
- energy_yesterday_total = None
- if energy_yesterday_total == None:
- exists = os.path.isfile(file_yesterday_total)
- if exists:
- # load energy_yesterday_total from file if exists
- f = open(file_yesterday_total, "r")
- if f.mode == 'r':
- contents = f.read()
- f.close()
- energy_yesterday_total = float(contents)
- self.saved_energy_yesterday_total[meter['id']] = energy_yesterday_total
- log.debug(meter_id_name[meter['id']] + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total) + " kWh")
- else:
- file_yesterday_min = file_path_meter + yesterday_str + "_min.txt"
- exists = os.path.isfile(file_yesterday_min)
- if exists:
- # load yesterday_min from file
- #if args_output_verbose1:
- # print("file file_yesterday_min exists")
- f = open(file_yesterday_min, "r")
- if f.mode == 'r':
- contents =f.read()
- f.close()
- energy_yesterday_min = float(contents)
- log.debug(meter_id_name[meter['id']] + " - Energy yesterday min: " + str(energy_yesterday_min) + " kWh")
-
- energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3)
- ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total))
-
- if not os.path.exists(file_path_meter):
- os.mkdir(file_path_meter)
- f = open(file_yesterday_total, "w+")
- f.write(str('{0:.3f}'.format(energy_yesterday_total)))
- f.close()
- #else:
- # # file yesterday_min does not exist
- log.debug(meter_id_name[meter['id']] + " - Energy Yesterday Total: " + str(energy_yesterday_total) + " kWh")
-
- if influxdb_write_energy_today_total:
- data_energy[meter['id']][self.readingsNames['energy_today']] = energy_today_total
- if influxdb_write_energy_yesterday_total:
- data_energy[meter['id']][self.readingsNames['energy_yesterday']] = energy_yesterday_total
-
- t_utc = datetime.datetime.utcnow()
- t_str = t_utc.isoformat() + 'Z'
-
- if conf_store_in_influxdb and not meterReadingError_energy:
- jsondata_energy = [
- {
- 'measurement': 'energy',
- 'tags': {
- 'meter': meter_id_name[meter['id']],
- },
- 'time': t_str,
- 'fields': data_energy[meter['id']]
- }
- ]
- if args_output_verbose1:
- print(json.dumps(jsondata_energy, indent = 4))
- try:
- self.influx_client_energy.write_points(jsondata_energy)
- except Exception as e:
- log.error('Data not written!')
- log.error(e)
-
- if conf_send_meters_readTime:
- readtime = round(time.time() - start_time, 3)
- log.debug("Read time: " + str(readtime))
- data_energy[meter['id']]['Read time'] = readtime
- if conf_mqtt_enabled and conf_publish_on_mqtt:
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/ReadTime", str(readtime))
-
- if conf_mqtt_enabled and conf_publish_on_mqtt:
- for reading in readings['energy']:
- tmpreading = data_energy[meter['id']].get(reading, None)
- if tmpreading != None:
- if tmpreading.is_integer():
- tmpreading = int(tmpreading)
- #mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str('{0:.3f}'.format(tmpreading)))
- log.debug("MQTT pub: '"+conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading + "' = '" + str(tmpreading) + "'")
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + reading, str(tmpreading))
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + self.readingsNames['energy_today'], str('{0:.3f}'.format(energy_today_total)))
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/" + self.readingsNames['energy_yesterday'], str('{0:.3f}'.format(energy_yesterday_total)))
-
- if meterReadingError_momentary or meterReadingError_energy:
- ts = int(time.time())
- lasterrortime = self.lastReadingErrorTime.get(meter['id'], 0)
- if lasterrortime == 0:
- self.lastReadingErrorTime[meter['id']] = ts
- elif (ts - lasterrortime) > conf_readingerror_publish_after:
- lasterrorpubtime = self.lastReadingErrorPublishtime.get(meter['id'], 0)
- if lasterrorpubtime == 0 or (lasterrorpubtime > 0 and (ts - lasterrorpubtime) > conf_readingerror_publish_interval):
- self.lastReadingErrorPublishtime[meter['id']] = ts
- if conf_mqtt_enabled and conf_publish_on_mqtt:
- lasterrortime_str = datetime.datetime.fromtimestamp(lasterrortime).strftime("%Y-%m-%d %H:%M:%S")
- mqttc.publish(conf_mqtt_topic_prefix + "/" + meter_id_name[meter['id']] + "/STATE", "ERROR: could not read MODBUS meter " + meter_id_name[meter['id']] + " with ID=" + str(meter['id']) + " since " + str(lasterrortime_str))
- mqttc.publish(conf_mqtt_topic_error, "ERROR: could not read MODBUS meter " + meter_id_name[meter['id']] + " with ID=" + str(meter['id']) + " since " + str(lasterrortime_str))
- else:
- self.lastReadingErrorTime[meter['id']] = 0
-
- # END class DataCollector
- ################################
- def mqtt_on_connect(client, userdata, flags, rc):
- if args_output_verbose1:
- print("MQTT connected with result code " + str(rc))
- #client.subscribe("some/topic")
- def mqtt_on_disconnect(client, userdata, rc):
- if rc != 0:
- if print_errors:
- print("Unexpected MQTT disconnection. Will auto-reconnect")
- def repeat(interval_sec, max_iter, func, *args, **kwargs):
- from itertools import count
- starttime = 0
- for i in count():
- if i > 0 and interval_sec > 0: # do not wait for interval time on first run
- if ((time.time() - starttime) < interval_sec):
- sleeptime = interval_sec - (time.time() - starttime)
- print("\nsleep " + str(sleeptime) + " s")
- time.sleep(sleeptime)
- try:
- starttime = time.time()
- func(*args, **kwargs)
- except Exception as ex:
- log.error(ex)
- if max_iter and i >= max_iter:
- return
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument('--interval', default=meters_interval_momentary,
- help='Meter readout interval for momentary values i.e. power, current... - in seconds, default 1s')
- parser.add_argument('--energyinterval', default=meters_interval_energy,
- help='Meter readout interval for energy values, i.e. total kWh - in seconds, default 60s')
- parser.add_argument('--use-only-one-interval', default=False,
- help='Meter readout interval for energy values, i.e. total kWh - in seconds, default 60s', action='store_true')
- parser.add_argument('--meters', default='meters.yml',
- help='YAML file containing Meter ID, name, type etc. Default "meters.yml"')
- #parser.add_argument('--verbose', '-v', default=0, help='print read data from the instruments to console', action='store_true')
- parser.add_argument('--verbose', '-v', type=int, default=0, choices=[1, 2], help='print read data from the instruments to console')
- parser.add_argument('--log', default='CRITICAL',
- help='Log levels, DEBUG, INFO, WARNING, ERROR or CRITICAL')
- parser.add_argument('--logfile', default='',
- help='Specify log file, if not specified the log is streamed to console')
-
-
- args = parser.parse_args()
-
- loglevel = args.log.upper()
- logfile = args.logfile
-
- # Setup logging
- log = logging.getLogger('energy-logger')
- log.setLevel(getattr(logging, loglevel))
- if logfile:
- loghandle = logging.FileHandler(logfile, 'w')
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- loghandle.setFormatter(formatter)
- else:
- loghandle = logging.StreamHandler()
- log.addHandler(loghandle)
-
- log.info('Started app')
-
- #if args.verbose:
- if int(args.verbose) == 1 or int(args.verbose) == None:
- args_output_verbose1 = True
- args_output_verbose2 = False
- log.info("Verbose Level 1 ON - printing read data to console.")
- elif int(args.verbose) == 2:
- args_output_verbose1 = True
- args_output_verbose2 = True
- log.info("Verbose Level 2 ON - printing read data and more to console.")
-
- interval = int(args.interval)
- log.info("Interval 1 (for MOMENTARY readings): " + str(interval) + " s")
-
- if args.use_only_one_interval:
- meters_use_only_one_interval = True
- log.info("Using only Interval 1")
- else:
- meters_interval_energy = int(args.energyinterval)
- log.info("Interval 2 (for ENERGY readings): " + str(meters_interval_energy) + " s")
-
- # create MQTT client object
- if conf_mqtt_enabled:
- mqttc = mqtt.Client()
- mqttc.on_connect = mqtt_on_connect
- mqttc.on_disconnect = mqtt_on_disconnect
- ##mqttc.on_message = on_message # callback for incoming msg (unused)
- if len(config['mqtt'].get('password')) > 0 or len(config['mqtt'].get('server')) > 0:
- mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password'))
- mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port', 1883), 60)
- mqttc.loop_start()
- #mqttc.loop_forever()
- # Create the InfluxDB object
- if config['influxdb'].getboolean('separate_momentary_database', False):
- influxclient_energy = InfluxDBClient(config['influxdb'].get('host'),
- config['influxdb'].getint('port', 8086),
- config['influxdb'].get('user'),
- config['influxdb'].get('password'),
- config['influxdb'].get('database'))
- influxclient_momentary = InfluxDBClient(config['influxdb_momentary'].get('host'),
- config['influxdb_momentary'].getint('port', 8086),
- config['influxdb_momentary'].get('user'),
- config['influxdb_momentary'].get('password'),
- config['influxdb_momentary'].get('database'))
- else:
- influxclient_energy = InfluxDBClient(config['influxdb'].get('host'),
- config['influxdb'].getint('port', 8086),
- config['influxdb'].get('user'),
- config['influxdb'].get('password'),
- config['influxdb'].get('database'))
- influxclient_momentary = InfluxDBClient(config['influxdb'].get('host'),
- config['influxdb'].getint('port', 8086),
- config['influxdb'].get('user'),
- config['influxdb'].get('password'),
- config['influxdb'].get('database'))
-
- collector = DataCollector(influx_client_momentary=influxclient_momentary,
- influx_client_energy=influxclient_energy,
- meter_yaml=args.meters)
-
- repeat(interval,
- max_iter=collector.max_iterations,
- func=lambda: collector.collect_and_store())
|