#!/usr/bin/python3 -u # import serial #from time import sleep import time import os import sys import paho.mqtt.client as mqtt import json import yaml from yaml.constructor import ConstructorError from threading import Thread from influxdb import InfluxDBClient import configparser #from datetime import datetime import datetime # Change working dir to the same dir as this script os.chdir(sys.path[0]) config = configparser.ConfigParser() config.read('s0meters.ini') quiet = False verbose = False debug = False if len(sys.argv) >= 2: if sys.argv[1] == "-q": verbose = False quiet = True debug = False elif sys.argv[1] == "-v": verbose = True quiet = False debug = False print("VERBOSE=ON") elif sys.argv[1] == "-d": verbose = True quiet = False debug = True print("DEBUG=ON") consoleAddTimestamp = config['main'].get('consoleAddTimestamp') conf_storage_path = config['filelog'].get('storage_path') if conf_storage_path[-1:] != '/': conf_storage_path += '/' ser = serial.Serial(port=config['hardware'].get('serialPort'), baudrate = config['hardware'].get('serialBaud'), parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=config['hardware'].getint('serialTout')) if not quiet: print("S0MetersLog by Flo Kra") print("=======================================================================") meters_yaml = yaml.load(open(config['main'].get('meters_config_yml')), Loader=yaml.SafeLoader) if not quiet: print("Meters configuration:") print(json.dumps(meters_yaml, indent=2)) print("loading InfluxDB configuration...") influxdb_yaml = yaml.load(open(config['main'].get('influx_config_yml')), Loader=yaml.SafeLoader) if not quiet: print("InfluxDB Instances:") print(json.dumps(influxdb_yaml, indent=4)) influxclient = dict() for instance in influxdb_yaml: i_host = influxdb_yaml[instance].get('host', None) i_port = int(influxdb_yaml[instance].get('port', 8086)) i_username = influxdb_yaml[instance].get('username', None) i_password = influxdb_yaml[instance].get('password', None) i_database = influxdb_yaml[instance].get('database', None) if i_host != None and i_database != None: if i_username != None and i_password != None: influxclient[instance] = InfluxDBClient(i_host, i_port, i_username, i_password, i_database) else: influxclient[instance] = InfluxDBClient(i_host, i_port, i_database) data_energy = dict() data_momentary = dict() saved_todays_date = dict() influxdb_energy_lastWriteTime = dict() saved_energy_today_min = dict() saved_energy_yesterday_total = dict() for meter in meters_yaml: data_energy[meter] = dict() data_momentary[meter] = dict() MQTTenabled = config['mqtt'].getboolean('enable', False) if MQTTenabled: def on_connect(client, userdata, flags, rc): if not quiet: print("MQTT: connected with result code " + str(rc)) client.subscribe(config['mqtt'].get('topic_cmd')) if not quiet: print("MQTT: subscribed topic:", config['mqtt'].get('topic_cmd')) def on_message(client, userdata, msg): if not quiet: print("MQTT incoming on topic", msg.topic) if msg.topic == config['mqtt'].get('topic_cmd'): if not quiet: print("MQTT CMD:", msg.payload) cmd = msg.payload.decode('ascii') + '\n' ser.write(cmd.encode('ascii')) mqttc = mqtt.Client() mqttc.on_connect = on_connect #mqttc.on_disconnect = on_disconnect mqttc.on_message = on_message if config['mqtt'].get('user') != "" and config['mqtt'].get('password') != "": mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password')) mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port'), config['mqtt'].getint('keepalive')) mqttc.loop_start() def processMeterData(data): try: cJson = json.loads(data) except: cJson = False #print(json.dumps(cJson, indent=1)) #print(cJson['C']) #print(json.dumps(meters_yaml[cJson['C']])) #print(json.dumps(cJson)) if cJson: cNum = cJson.get('C') cReading = float(cJson.get('reading', None)) #print(cNum) dTime = cJson.get('dTime', None) write_energy_to_influxdb = False cName = meters_yaml[cNum].get('name', False) statTopic = meters_yaml[cNum].get('statTopic', None) unit = meters_yaml[cNum].get('unit', None) conv_unit = meters_yaml[cNum].get('conv_unit', None) conv_factor = meters_yaml[cNum].get('conv_factor', None) conv_digits = meters_yaml[cNum].get('conv_digits', None) if conv_digits is None: conv_digits = 2 cost_unit = meters_yaml[cNum].get('cost_unit', None) cost_per_unit = meters_yaml[cNum].get('cost_per_unit', None) cost_from_conv = meters_yaml[cNum].get('cost_from_conv', False) momUnit = meters_yaml[cNum].get('momUnit', None) momType = meters_yaml[cNum].get('momType', None) momUnit_conv1 = meters_yaml[cNum].get('momUnit_conv1', None) momType_conv1 = meters_yaml[cNum].get('momType_conv1', None) momUnit_conv2 = meters_yaml[cNum].get('momUnit_conv2', None) momType_conv2 = meters_yaml[cNum].get('momType_conv2', None) impPerUnit = meters_yaml[cNum].get('impPerUnit', None) if cName: cJson['name'] = cName momFactor = meters_yaml[cNum].get('momFactor', None) if not momFactor: momFactor = 1 momDigits = meters_yaml[cNum].get('momDigits', None) if momDigits == None: momDigits = 3 momFactor_conv1 = meters_yaml[cNum].get('momFactor_conv1', None) if not momFactor_conv1: momFactor_conv1 = 1 momDigits_conv1 = meters_yaml[cNum].get('momDigits_conv1', None) if momDigits_conv1 == None: momDigits_conv1 = 3 momFactor_conv2 = meters_yaml[cNum].get('momFactor_conv2', None) if not momFactor_conv2: momFactor_conv2 = 1 momDigits_conv2 = meters_yaml[cNum].get('momDigits_conv2', None) if momDigits_conv2 == None: momDigits_conv2 = 3 digits = meters_yaml[cNum].get('digits', None) influxMinWriteInterval = meters_yaml[cNum].get('influxMinWriteInterval_energy', None) if influxMinWriteInterval == None: influxMinWriteInterval = 0 # check for date rollover since last impulse today = datetime.date.today() today_str = today.strftime('%Y%m%d') yesterday = today - datetime.timedelta(days = 1) yesterday_str = yesterday.strftime('%Y%m%d') dateRollover = False savedtoday = saved_todays_date.get(cName, False) if not savedtoday or savedtoday != today: if debug: print("date rollover happened or no date has been saved yet for meter " + str(cName)) if savedtoday and savedtoday == yesterday: # a date rollover just happened, so change todays date to current and proceed with what has to be done dateRollover = True #log.debug(savedtoday) saved_todays_date[cName] = today strformat = "{:.3f}" cReading_formatted = None if digits: strformat = "{:."+str(digits)+"f}" cReading_formatted = strformat.format(cReading) cJson['reading'] = round(cReading, digits) if unit: cJson['unit'] = unit if impPerUnit and dTime is not None: if dTime == 0: momValue = 0.0 else: momValue = (3600000 / dTime / impPerUnit) * momFactor # conversions of momValue momValue_conv1 = 0.0 momValue_conv2 = 0.0 if momType_conv1 is not None: momValue_conv1 = momValue * momFactor_conv1 if momType_conv2 is not None: momValue_conv2 = momValue * momFactor_conv2 # round value of momValue if momDigits > 0: momValue = round(momValue, momDigits) else: momValue = round(momValue) if momDigits_conv1 > 0: momValue_conv1 = round(momValue_conv1, momDigits_conv1) else: momValue_conv1 = round(momValue_conv1) if momDigits_conv2 > 0: momValue_conv2 = round(momValue_conv2, momDigits_conv2) else: momValue_conv2 = round(momValue_conv2) #cJson['momValue'] = momValue #if momType: # cJson['momType'] = momType #if momUnit: # cJson['momUnit'] = momUnit if momType: cJson[momType] = momValue if momType_conv1: cJson[momType_conv1] = momValue_conv1 if momType_conv2: cJson[momType_conv2] = momValue_conv2 if statTopic: if momType is not None: if MQTTenabled: mqttc.publish(statTopic + "/" + momType, str(momValue), qos=0, retain=False) if momType_conv1 is not None: mqttc.publish(statTopic + "/" + momType_conv1, str(momValue_conv1), qos=0, retain=False) if momType_conv2 is not None: mqttc.publish(statTopic + "/" + momType_conv2, str(momValue_conv2), qos=0, retain=False) if statTopic: if cReading_formatted != None: if MQTTenabled: mqttc.publish(statTopic + "/reading", str(cReading_formatted), qos=0, retain=False) data_energy[cNum][meters_yaml[cNum].get('influxFieldName_energy', 'energyTotal')] = round(float(cReading), digits) #if conv_unit is not None and conv_factor is not None and meters_yaml[cNum].get('influxFieldName_energy_conv', None) is not None:: # data_energy[cNum][meters_yaml[cNum].get('influxFieldName_energy_conv', 'energyTotal_conv')] = round(momValue_conv1, digits) data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom', 'momentaryUsage')] = round(float(momValue), momDigits) if momType_conv2 is not None and meters_yaml[cNum].get('influxFieldName_mom_conv1', None) is not None: data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom_conv1', 'momentaryUsage_conv1')] = round(momValue_conv1, momDigits_conv1) if momType_conv2 is not None and meters_yaml[cNum].get('influxFieldName_mom_conv2', None) is not None: data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom_conv2', 'momentaryUsage_conv2')] = round(momValue_conv2, momDigits_conv2) #print() #print("data_energy[cNum]") #print(data_energy[cNum]) #print("data_momentary[cNum]") #print(data_momentary[cNum]) #print() # InfluxDB t_utc = datetime.datetime.utcnow() t_str = t_utc.isoformat() + 'Z' # InfluxDB - energy readings if dateRollover: write_energy_to_influxdb = True ts = int(time.time()) influxEnergyWrite_elapsedTime = 0 if influxdb_energy_lastWriteTime.get(cNum, None) != None: influxEnergyWrite_elapsedTime = ts - influxdb_energy_lastWriteTime.get(cNum) if influxEnergyWrite_elapsedTime >= influxMinWriteInterval: write_energy_to_influxdb = True influxdb_energy_lastWriteTime[cNum] = ts else: # first run - do write immediately write_energy_to_influxdb = True influxdb_energy_lastWriteTime[cNum] = ts influxInstance_energy = meters_yaml[cNum].get('influxInstance_energy', None) if influxInstance_energy is not None: if write_energy_to_influxdb: influx_measurement = meters_yaml[cNum].get('influxMeasurement_energy', None) if influx_measurement == None: influx_measurement = influxdb_yaml[influxInstance_energy].get('defaultMeasurement', 'energy') jsondata_energy = [ { 'measurement': influx_measurement, 'tags': { 'meter': cName, }, 'time': t_str, 'fields': data_energy[cNum] } ] if verbose: print("Writing ENERGY to InfluxDB:") print(json.dumps(jsondata_energy, indent = 4)) try: #influxclient_energy.write_points(jsondata_energy) influxclient[influxInstance_energy].write_points(jsondata_energy) except Exception as e: print('Data not written!') #log.error('Data not written!') print(e) #log.error(e) else: if verbose: print("Writing ENERGY to InfluxDB skipped (influxMinWriteInterval="+ str(influxMinWriteInterval) + ", elapsedTime=" + str(influxEnergyWrite_elapsedTime) + ")") # InfluxDB - momentary values influxInstance_mom = meters_yaml[cNum].get('influxInstance_mom', None) if influxInstance_mom is not None: influx_measurement = meters_yaml[cNum].get('influxMeasurement_mom', None) if influx_measurement == None: influx_measurement = influxdb_yaml[influxInstance_mom].get('defaultMeasurement', 'energy') jsondata_momentary = [ { 'measurement': influx_measurement, 'tags': { 'meter': cName, }, 'time': t_str, 'fields': data_momentary[cNum] } ] if verbose: print("Writing MOMENTARY to InfluxDB:") print(json.dumps(jsondata_momentary, indent = 4)) try: #influxclient_momentary.write_points(jsondata_momentary) influxclient[influxInstance_mom].write_points(jsondata_momentary) except Exception as e: print('Data not written!') #log.error('Data not written!') print(e) #log.error(e) # file log if config['filelog'].getboolean('enable'): # save and restore yesterday´s total energy to calculate today´s energy # check if total energy from yesterday is stored in memory, if not try to get it from saved file file_path_meter = conf_storage_path + cName + "/" file_today_min = file_path_meter + today_str + "_min.txt" file_yesterday_total = file_path_meter + yesterday_str + "_total.txt" energy_today_total = 0 energy_yesterday_min = 0 energy_today_min = saved_energy_today_min.get(cName, None) try: if dateRollover: energy_today_min = None if energy_today_min == None: exists = os.path.isfile(file_today_min) if exists: # load energy_today_min from file if exists f = open(file_today_min, "r") if f.mode == 'r': contents = f.read() f.close() if contents != '': energy_today_min = float(contents) saved_energy_today_min[cName] = energy_today_min if verbose: print(cName + " - Energy Today min read from file -> = " + str(energy_today_min)) else: energy_today_min = None else: # save current Energy_total to min-file if not os.path.exists(file_path_meter): os.mkdir(file_path_meter) f = open(file_today_min, "w+") energy_today_min = cReading saved_energy_today_min[cName] = energy_today_min #f.write(str('{0:.3f}'.format(energy_today_min))) #f.write(str(energy_today_min)) f.write(strformat.format(energy_today_min)) f.close() #try: if energy_today_min != None: energy_today_total = cReading - energy_today_min if verbose: print(cName + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total))) energy_yesterday_total = saved_energy_yesterday_total.get(cName, None) if dateRollover: energy_yesterday_total = None if energy_yesterday_total == None: exists = os.path.isfile(file_yesterday_total) if exists: # load energy_yesterday_total from file if exists f = open(file_yesterday_total, "r") if f.mode == 'r': contents = f.read() f.close() if contents != '': energy_yesterday_total = float(contents) saved_energy_yesterday_total[cName] = energy_yesterday_total if debug: print(cName + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total)) else: energy_yesterday_total = None else: file_yesterday_min = file_path_meter + yesterday_str + "_min.txt" exists = os.path.isfile(file_yesterday_min) if exists: # load yesterday_min from file #if args_output_verbose1: # print("file file_yesterday_min exists") f = open(file_yesterday_min, "r") if f.mode == 'r': contents =f.read() f.close() if contents != '': energy_yesterday_min = float(contents) if debug: print(cName + " - Energy yesterday min: " + str(energy_yesterday_min)) else: energy_yesterday_min = None if energy_yesterday_min != None: energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3) ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total)) if not os.path.exists(file_path_meter): os.mkdir(file_path_meter) f = open(file_yesterday_total, "w+") #f.write(str('{0:.3f}'.format(energy_yesterday_total))) f.write(strformat.format(energy_yesterday_total)) f.close() #else: # # file yesterday_min does not exist except: e = sys.exc_info()[0] print( "

Error in file log: %s

" % e ) if energy_today_total is not None: cJson['Today__' + unit] = round(energy_today_total, digits) if MQTTenabled: mqttc.publish(statTopic + "/today__" + unit, str(round(energy_today_total, digits)), qos=0, retain=False) if conv_unit and conv_factor is not None: conv_value = energy_today_total * conv_factor mqttc.publish(statTopic + "/today__" + conv_unit, str(round(conv_value, conv_digits)), qos=0, retain=False) cJson['Today__' + conv_unit] = round(conv_value, conv_digits) if cost_unit and cost_per_unit is not None and cost_from_conv: cost_value = round(conv_value * cost_per_unit, 2) mqttc.publish(statTopic + "/cost_today__" + cost_unit, str(cost_value), qos=0, retain=False) cJson['cost_today__' + cost_unit] = round(cost_value, 2) elif not cost_from_conv: cost_value = round(energy_today_total * cost_per_unit, 2) mqttc.publish(statTopic + "/cost_today__" + cost_unit, str(cost_value), qos=0, retain=False) cJson['cost_today__' + cost_unit] = round(cost_value, 2) if energy_yesterday_total is not None: cJson['Yesterday__' + unit] = round(energy_yesterday_total, digits) if MQTTenabled: mqttc.publish(statTopic + "/yesterday__" + unit, str(round(energy_yesterday_total, digits)), qos=0, retain=False) if conv_unit and conv_factor is not None: conv_value = energy_yesterday_total * conv_factor mqttc.publish(statTopic + "/yesterday__" + conv_unit, str(round(conv_value, conv_digits)), qos=0, retain=False) cJson['Yesterday__' + conv_unit] = round(conv_value, conv_digits) if cost_unit and cost_per_unit is not None: if cost_from_conv: cost_value = round(conv_value * cost_per_unit, 2) mqttc.publish(statTopic + "/cost_yesterday__" + cost_unit, str(cost_value), qos=0, retain=False) cJson['cost_yesterday__' + cost_unit] = round(cost_value, 2) elif not cost_from_conv: cost_value = round(energy_yesterday_total * cost_per_unit, 2) mqttc.publish(statTopic + "/cost_yesterday__" + cost_unit, str(cost_value), qos=0, retain=False) cJson['cost_yesterday__' + cost_unit] = round(cost_value, 2) # END file log if verbose: if consoleAddTimestamp: print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] meterData:", json.dumps(cJson)) else: print("meterData:", json.dumps(cJson)) if MQTTenabled: mqttc.publish(statTopic + "/json", json.dumps(cJson), qos=0, retain=False) def publishStatMsg(data): if MQTTenabled: mqttc.publish(config['mqtt'].get('topic_stat'), data, qos=0, retain=False) def publishCmdResponseMsg(data): if MQTTenabled: mqttc.publish(config['mqtt'].get('topic_cmdresponse'), data, qos=0, retain=False) try: while True: serLine = ser.readline().strip() # catch exception on invalid char coming in: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf4 in position 6: ordinal not in range(128) try: serLine = serLine.decode('ascii') except: serLine = "" if(serLine): if verbose: if consoleAddTimestamp: print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] DEV_SENT: ", serLine) else: print ("DEV_SENT: ", serLine) #Echo the serial buffer bytes up to the CRLF back to screen # check if data looks like meter readings JSON - then process if serLine.startswith('{"C":'): Thread(target=processMeterData, args=(serLine,)).start() # response to a command elif serLine.startswith('cmd:'): Thread(target=publishCmdResponseMsg, args=(serLine,)).start() # other cases it is a normal "stat" message else: Thread(target=publishStatMsg, args=(serLine,)).start() except KeyboardInterrupt: print('\n') exit()