#!/usr/bin/python3 -u # import serial #from time import sleep import time import os import sys import paho.mqtt.client as mqtt import json import yaml from yaml.constructor import ConstructorError from threading import Thread from influxdb import InfluxDBClient import configparser #from datetime import datetime import datetime # Change working dir to the same dir as this script os.chdir(sys.path[0]) config = configparser.ConfigParser() config.read('s0meters.ini') quiet = False verbose = False debug = False if len(sys.argv) >= 2: if sys.argv[1] == "-q": verbose = False quiet = True debug = False elif sys.argv[1] == "-v": verbose = True quiet = False debug = False print("VERBOSE=ON") elif sys.argv[1] == "-d": verbose = True quiet = False debug = True print("DEBUG=ON") consoleAddTimestamp = config['main'].get('consoleAddTimestamp') conf_storage_path = config['filelog'].get('storage_path') if conf_storage_path[-1:] != '/': conf_storage_path += '/' ser = serial.Serial(port=config['hardware'].get('serialPort'), baudrate = config['hardware'].get('serialBaud'), parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=config['hardware'].getint('serialTout')) if not quiet: print("S0MetersLog by Flo Kra") print("=======================================================================") meters_yaml = yaml.load(open(config['main'].get('meters_config_yml')), Loader=yaml.SafeLoader) if not quiet: print("Meters configuration:") print(json.dumps(meters_yaml, indent=2)) print("loading InfluxDB configuration...") influxdb_yaml = yaml.load(open(config['main'].get('influx_config_yml')), Loader=yaml.SafeLoader) if not quiet: print("InfluxDB Instances:") print(json.dumps(influxdb_yaml, indent=4)) influxclient = dict() for instance in influxdb_yaml: i_host = influxdb_yaml[instance].get('host', None) i_port = int(influxdb_yaml[instance].get('port', 8086)) i_username = influxdb_yaml[instance].get('username', None) i_password = influxdb_yaml[instance].get('password', None) i_database = influxdb_yaml[instance].get('database', None) if i_host != None and i_database != None: if i_username != None and i_password != None: influxclient[instance] = InfluxDBClient(i_host, i_port, i_username, i_password, i_database) else: influxclient[instance] = InfluxDBClient(i_host, i_port, i_database) data_energy = dict() data_momentary = dict() saved_todays_date = dict() influxdb_energy_lastWriteTime = dict() saved_energy_today_min = dict() saved_energy_yesterday_total = dict() for meter in meters_yaml: data_energy[meter] = dict() data_momentary[meter] = dict() MQTTenabled = config['mqtt'].getboolean('enable', False) if MQTTenabled: def on_connect(client, userdata, flags, rc): if not quiet: print("MQTT: connected with result code " + str(rc)) client.subscribe(config['mqtt'].get('topic_cmd')) if not quiet: print("MQTT: subscribed topic:", config['mqtt'].get('topic_cmd')) def on_message(client, userdata, msg): if not quiet: print("MQTT incoming on topic", msg.topic) if msg.topic == config['mqtt'].get('topic_cmd'): if not quiet: print("MQTT CMD:", msg.payload) cmd = msg.payload.decode('ascii') + '\n' ser.write(cmd.encode('ascii')) mqttc = mqtt.Client() mqttc.on_connect = on_connect #mqttc.on_disconnect = on_disconnect mqttc.on_message = on_message if config['mqtt'].get('user') != "" and config['mqtt'].get('password') != "": mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password')) mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port'), config['mqtt'].getint('keepalive')) mqttc.loop_start() def processMeterData(data): try: cJson = json.loads(data) except: cJson = False #print(json.dumps(cJson, indent=1)) #print(cJson['C']) #print(json.dumps(meters_yaml[cJson['C']])) #print(json.dumps(cJson)) if cJson: cNum = cJson.get('C') cReading = float(cJson.get('reading', None)) #print(cNum) dTime = cJson.get('dTime', None) write_energy_to_influxdb = False cName = meters_yaml[cNum].get('name', False) statTopic = meters_yaml[cNum].get('statTopic', False) unit = meters_yaml[cNum].get('unit', False) momUnit = meters_yaml[cNum].get('momUnit', False) momType = meters_yaml[cNum].get('momType', False) impPerUnit = meters_yaml[cNum].get('impPerUnit', False) if cName: cJson['name'] = cName momFactor = meters_yaml[cNum].get('momFactor', False) if not momFactor: momFactor = 1 momDigits = meters_yaml[cNum].get('momDigits', None) if momDigits == None: momDigits = 3 digits = meters_yaml[cNum].get('digits', False) influxMinWriteInterval = meters_yaml[cNum].get('influxMinWriteInterval_energy', None) if influxMinWriteInterval == None: influxMinWriteInterval = 0 # check for date rollover since last impulse today = datetime.date.today() today_str = today.strftime('%Y%m%d') yesterday = today - datetime.timedelta(days = 1) yesterday_str = yesterday.strftime('%Y%m%d') dateRollover = False savedtoday = saved_todays_date.get(cName, False) if not savedtoday or savedtoday != today: if debug: print("date rollover happened or no date has been saved yet for meter " + str(cName)) if savedtoday and savedtoday == yesterday: # a date rollover just happened, so change todays date to current and proceed with what has to be done dateRollover = True #log.debug(savedtoday) saved_todays_date[cName] = today strformat = "{:.3f}" cReading_formatted = None if digits: strformat = "{:."+str(digits)+"f}" cReading_formatted = strformat.format(cReading) cJson['reading'] = round(cReading, digits) if unit: cJson['unit'] = unit if impPerUnit and dTime is not None: if dTime == 0: momValue = 0.0 else: momValue = (3600000 / dTime / impPerUnit) * momFactor if momDigits > 0: momValue = round(momValue, momDigits) else: momValue = round(momValue) cJson['momValue'] = momValue if momType: cJson['momType'] = momType if momUnit: cJson['momUnit'] = momUnit if statTopic: if momType and momValue is not None: if MQTTenabled: subtop = momType mqttc.publish(statTopic + "/" + subtop, str(momValue), qos=0, retain=False) if statTopic: if cReading_formatted != None: if MQTTenabled: mqttc.publish(statTopic + "/reading", str(cReading_formatted), qos=0, retain=False) data_energy[cNum][meters_yaml[cNum].get('influxFieldName_energy', 'energyTotal')] = round(float(cReading), digits) data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom', 'momentaryUsage')] = round(float(momValue), momDigits) # InfluxDB t_utc = datetime.datetime.utcnow() t_str = t_utc.isoformat() + 'Z' # InfluxDB - energy readings if dateRollover: write_energy_to_influxdb = True ts = int(time.time()) influxEnergyWrite_elapsedTime = 0 if influxdb_energy_lastWriteTime.get(cNum, None) != None: influxEnergyWrite_elapsedTime = ts - influxdb_energy_lastWriteTime.get(cNum) if influxEnergyWrite_elapsedTime >= influxMinWriteInterval: write_energy_to_influxdb = True influxdb_energy_lastWriteTime[cNum] = ts else: # first run - do write immediately write_energy_to_influxdb = True influxdb_energy_lastWriteTime[cNum] = ts influxInstance_energy = meters_yaml[cNum].get('influxInstance_energy', None) if influxInstance_energy is not None: if write_energy_to_influxdb: influx_measurement = meters_yaml[cNum].get('influxMeasurement_energy', None) if influx_measurement == None: influx_measurement = influxdb_yaml[influxInstance_energy].get('defaultMeasurement', 'energy') jsondata_energy = [ { 'measurement': influx_measurement, 'tags': { 'meter': cName, }, 'time': t_str, 'fields': data_energy[cNum] } ] if verbose: print("Writing ENERGY to InfluxDB:") print(json.dumps(jsondata_energy, indent = 4)) try: #influxclient_energy.write_points(jsondata_energy) influxclient[influxInstance_energy].write_points(jsondata_energy) except Exception as e: print('Data not written!') #log.error('Data not written!') print(e) #log.error(e) else: if verbose: print("Writing ENERGY to InfluxDB skipped (influxMinWriteInterval="+ str(influxMinWriteInterval) + ", elapsedTime=" + str(influxEnergyWrite_elapsedTime) + ")") # InfluxDB - momentary values influxInstance_mom = meters_yaml[cNum].get('influxInstance_mom', None) if influxInstance_mom is not None: influx_measurement = meters_yaml[cNum].get('influxMeasurement_mom', None) if influx_measurement == None: influx_measurement = influxdb_yaml[influxInstance_mom].get('defaultMeasurement', 'energy') jsondata_momentary = [ { 'measurement': influx_measurement, 'tags': { 'meter': cName, }, 'time': t_str, 'fields': data_momentary[cNum] } ] if verbose: print("Writing MOMENTARY to InfluxDB:") print(json.dumps(jsondata_momentary, indent = 4)) try: #influxclient_momentary.write_points(jsondata_momentary) influxclient[influxInstance_mom].write_points(jsondata_momentary) except Exception as e: print('Data not written!') #log.error('Data not written!') print(e) #log.error(e) # file log if config['filelog'].getboolean('enable'): # save and restore yesterday´s total energy to calculate today´s energy # check if total energy from yesterday is stored in memory, if not try to get it from saved file file_path_meter = conf_storage_path + cName + "/" file_today_min = file_path_meter + today_str + "_min.txt" file_yesterday_total = file_path_meter + yesterday_str + "_total.txt" energy_today_total = 0 energy_yesterday_min = 0 energy_today_min = saved_energy_today_min.get(cName, None) try: if dateRollover: energy_today_min = None if energy_today_min == None: exists = os.path.isfile(file_today_min) if exists: # load energy_today_min from file if exists f = open(file_today_min, "r") if f.mode == 'r': contents = f.read() f.close() if contents != '': energy_today_min = float(contents) saved_energy_today_min[cName] = energy_today_min if verbose: print(cName + " - Energy Today min read from file -> = " + str(energy_today_min)) else: energy_today_min = None else: # save current Energy_total to min-file if not os.path.exists(file_path_meter): os.mkdir(file_path_meter) f = open(file_today_min, "w+") energy_today_min = cReading saved_energy_today_min[cName] = energy_today_min #f.write(str('{0:.3f}'.format(energy_today_min))) #f.write(str(energy_today_min)) f.write(strformat.format(energy_today_min)) f.close() #try: if energy_today_min != None: energy_today_total = cReading - energy_today_min if verbose: print(cName + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total))) energy_yesterday_total = saved_energy_yesterday_total.get(cName, None) if dateRollover: energy_yesterday_total = None if energy_yesterday_total == None: exists = os.path.isfile(file_yesterday_total) if exists: # load energy_yesterday_total from file if exists f = open(file_yesterday_total, "r") if f.mode == 'r': contents = f.read() f.close() if contents != '': energy_yesterday_total = float(contents) saved_energy_yesterday_total[cName] = energy_yesterday_total if debug: print(cName + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total)) else: energy_yesterday_total = None else: file_yesterday_min = file_path_meter + yesterday_str + "_min.txt" exists = os.path.isfile(file_yesterday_min) if exists: # load yesterday_min from file #if args_output_verbose1: # print("file file_yesterday_min exists") f = open(file_yesterday_min, "r") if f.mode == 'r': contents =f.read() f.close() if contents != '': energy_yesterday_min = float(contents) if debug: print(cName + " - Energy yesterday min: " + str(energy_yesterday_min)) else: energy_yesterday_min = None if energy_yesterday_min != None: energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3) ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total)) if not os.path.exists(file_path_meter): os.mkdir(file_path_meter) f = open(file_yesterday_total, "w+") #f.write(str('{0:.3f}'.format(energy_yesterday_total))) f.write(strformat.format(energy_yesterday_total)) f.close() #else: # # file yesterday_min does not exist except: e = sys.exc_info()[0] print( "
Error in file log: %s
" % e ) if energy_today_total is not None: cJson['Today'] = round(energy_today_total, digits) if MQTTenabled: mqttc.publish(statTopic + "/today", str(round(energy_today_total, digits)), qos=0, retain=False) if energy_yesterday_total is not None: cJson['Yesterday'] = round(energy_yesterday_total, digits) if MQTTenabled: mqttc.publish(statTopic + "/yesterday", str(round(energy_yesterday_total, digits)), qos=0, retain=False) # END file log if verbose: if consoleAddTimestamp: print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] meterData:", json.dumps(cJson)) else: print("meterData:", json.dumps(cJson)) if MQTTenabled: mqttc.publish(statTopic + "/json", json.dumps(cJson), qos=0, retain=False) def publishStatMsg(data): if MQTTenabled: mqttc.publish(config['mqtt'].get('topic_stat'), data, qos=0, retain=False) def publishCmdResponseMsg(data): if MQTTenabled: mqttc.publish(config['mqtt'].get('topic_cmdresponse'), data, qos=0, retain=False) try: while True: serLine = ser.readline().strip() # catch exception on invalid char coming in: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf4 in position 6: ordinal not in range(128) try: serLine = serLine.decode('ascii') except: serLine = "" if(serLine): if verbose: if consoleAddTimestamp: print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] DEV_SENT: ", serLine) else: print ("DEV_SENT: ", serLine) #Echo the serial buffer bytes up to the CRLF back to screen # check if data looks like meter readings JSON - then process if serLine.startswith('{"C":'): Thread(target=processMeterData, args=(serLine,)).start() # response to a command elif serLine.startswith('cmd:'): Thread(target=publishCmdResponseMsg, args=(serLine,)).start() # other cases it is a normal "stat" message else: Thread(target=publishStatMsg, args=(serLine,)).start() except KeyboardInterrupt: print('\n') exit()