123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- #!/usr/bin/python3 -u
- #
- import serial
- #from time import sleep
- import time
- import os
- import sys
- import paho.mqtt.client as mqtt
- import json
- import yaml
- from yaml.constructor import ConstructorError
- from threading import Thread
- from influxdb import InfluxDBClient
- import configparser
- #from datetime import datetime
- import datetime
- # Change working dir to the same dir as this script
- os.chdir(sys.path[0])
- config = configparser.ConfigParser()
- config.read('s0meters.ini')
- quiet = False
- verbose = False
- debug = False
- if len(sys.argv) >= 2:
- if sys.argv[1] == "-q":
- verbose = False
- quiet = True
- debug = False
- elif sys.argv[1] == "-v":
- verbose = True
- quiet = False
- debug = False
- print("VERBOSE=ON")
- elif sys.argv[1] == "-d":
- verbose = True
- quiet = False
- debug = True
- print("DEBUG=ON")
- consoleAddTimestamp = config['main'].get('consoleAddTimestamp')
- conf_storage_path = config['filelog'].get('storage_path')
- if conf_storage_path[-1:] != '/':
- conf_storage_path += '/'
- ser = serial.Serial(port=config['hardware'].get('serialPort'),
- baudrate = config['hardware'].get('serialBaud'),
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS,
- timeout=config['hardware'].getint('serialTout'))
- if not quiet:
- print("S0MetersLog by Flo Kra")
- print("=======================================================================")
- meters_yaml = yaml.load(open(config['main'].get('meters_config_yml')), Loader=yaml.SafeLoader)
- if not quiet:
- print("Meters configuration:")
- print(json.dumps(meters_yaml, indent=2))
-
- print("loading InfluxDB configuration...")
-
- influxdb_yaml = yaml.load(open(config['main'].get('influx_config_yml')), Loader=yaml.SafeLoader)
- if not quiet:
- print("InfluxDB Instances:")
- print(json.dumps(influxdb_yaml, indent=4))
- influxclient = dict()
- for instance in influxdb_yaml:
- i_host = influxdb_yaml[instance].get('host', None)
- i_port = int(influxdb_yaml[instance].get('port', 8086))
- i_username = influxdb_yaml[instance].get('username', None)
- i_password = influxdb_yaml[instance].get('password', None)
- i_database = influxdb_yaml[instance].get('database', None)
- if i_host != None and i_database != None:
- if i_username != None and i_password != None:
- influxclient[instance] = InfluxDBClient(i_host, i_port, i_username, i_password, i_database)
- else:
- influxclient[instance] = InfluxDBClient(i_host, i_port, i_database)
- data_energy = dict()
- data_momentary = dict()
- saved_todays_date = dict()
- influxdb_energy_lastWriteTime = dict()
- saved_energy_today_min = dict()
- saved_energy_yesterday_total = dict()
- for meter in meters_yaml:
- data_energy[meter] = dict()
- data_momentary[meter] = dict()
- MQTTenabled = config['mqtt'].getboolean('enable', False)
- if MQTTenabled:
- def on_connect(client, userdata, flags, rc):
- if not quiet:
- print("MQTT: connected with result code " + str(rc))
- client.subscribe(config['mqtt'].get('topic_cmd'))
- if not quiet:
- print("MQTT: subscribed topic:", config['mqtt'].get('topic_cmd'))
-
- def on_message(client, userdata, msg):
- if not quiet:
- print("MQTT incoming on topic", msg.topic)
- if msg.topic == config['mqtt'].get('topic_cmd'):
- if not quiet:
- print("MQTT CMD:", msg.payload)
- cmd = msg.payload.decode('ascii') + '\n'
- ser.write(cmd.encode('ascii'))
- mqttc = mqtt.Client()
- mqttc.on_connect = on_connect
- #mqttc.on_disconnect = on_disconnect
- mqttc.on_message = on_message
-
- if config['mqtt'].get('user') != "" and config['mqtt'].get('password') != "":
- mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password'))
- mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port'), config['mqtt'].getint('keepalive'))
-
- mqttc.loop_start()
- def processMeterData(data):
- try:
- cJson = json.loads(data)
- except:
- cJson = False
- #print(json.dumps(cJson, indent=1))
- #print(cJson['C'])
- #print(json.dumps(meters_yaml[cJson['C']]))
- #print(json.dumps(cJson))
-
- if cJson:
- cNum = cJson.get('C')
- cReading = float(cJson.get('reading', None))
- #print(cNum)
-
- dTime = cJson.get('dTime', None)
-
- write_energy_to_influxdb = False
-
- cName = meters_yaml[cNum].get('name', False)
- statTopic = meters_yaml[cNum].get('statTopic', False)
- unit = meters_yaml[cNum].get('unit', False)
-
- momUnit = meters_yaml[cNum].get('momUnit', False)
- momType = meters_yaml[cNum].get('momType', False)
-
- impPerUnit = meters_yaml[cNum].get('impPerUnit', False)
-
- if cName:
- cJson['name'] = cName
-
- momFactor = meters_yaml[cNum].get('momFactor', False)
- if not momFactor:
- momFactor = 1
-
- momDigits = meters_yaml[cNum].get('momDigits', None)
- if momDigits == None:
- momDigits = 3
-
- digits = meters_yaml[cNum].get('digits', False)
-
- influxMinWriteInterval = meters_yaml[cNum].get('influxMinWriteInterval_energy', None)
- if influxMinWriteInterval == None: influxMinWriteInterval = 0
-
-
- # check for date rollover since last impulse
- today = datetime.date.today()
- today_str = today.strftime('%Y%m%d')
- yesterday = today - datetime.timedelta(days = 1)
- yesterday_str = yesterday.strftime('%Y%m%d')
- dateRollover = False
- savedtoday = saved_todays_date.get(cName, False)
- if not savedtoday or savedtoday != today:
- if debug:
- print("date rollover happened or no date has been saved yet for meter " + str(cName))
- if savedtoday and savedtoday == yesterday:
- # a date rollover just happened, so change todays date to current and proceed with what has to be done
- dateRollover = True
- #log.debug(savedtoday)
- saved_todays_date[cName] = today
-
-
- strformat = "{:.3f}"
- cReading_formatted = None
- if digits:
- strformat = "{:."+str(digits)+"f}"
- cReading_formatted = strformat.format(cReading)
- cJson['reading'] = round(cReading, digits)
-
- if unit:
- cJson['unit'] = unit
-
- if impPerUnit and dTime is not None:
- if dTime == 0:
- momValue = 0.0
- else:
- momValue = (3600000 / dTime / impPerUnit) * momFactor
-
- if momDigits > 0:
- momValue = round(momValue, momDigits)
- else:
- momValue = round(momValue)
-
- cJson['momValue'] = momValue
- if momType:
- cJson['momType'] = momType
- if momUnit:
- cJson['momUnit'] = momUnit
-
- if statTopic:
- if momType and momValue is not None:
- if MQTTenabled:
- subtop = momType
- mqttc.publish(statTopic + "/" + subtop, str(momValue), qos=0, retain=False)
-
- if statTopic:
- if cReading_formatted != None:
- if MQTTenabled:
- mqttc.publish(statTopic + "/reading", str(cReading_formatted), qos=0, retain=False)
-
-
- data_energy[cNum][meters_yaml[cNum].get('influxFieldName_energy', 'energyTotal')] = round(float(cReading), digits)
- data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom', 'momentaryUsage')] = round(float(momValue), momDigits)
-
- # InfluxDB
- t_utc = datetime.datetime.utcnow()
- t_str = t_utc.isoformat() + 'Z'
-
-
-
- # InfluxDB - energy readings
- if dateRollover:
- write_energy_to_influxdb = True
-
- ts = int(time.time())
- influxEnergyWrite_elapsedTime = 0
- if influxdb_energy_lastWriteTime.get(cNum, None) != None:
- influxEnergyWrite_elapsedTime = ts - influxdb_energy_lastWriteTime.get(cNum)
- if influxEnergyWrite_elapsedTime >= influxMinWriteInterval:
- write_energy_to_influxdb = True
- influxdb_energy_lastWriteTime[cNum] = ts
- else:
- # first run - do write immediately
- write_energy_to_influxdb = True
- influxdb_energy_lastWriteTime[cNum] = ts
-
-
- influxInstance_energy = meters_yaml[cNum].get('influxInstance_energy', None)
- if influxInstance_energy is not None:
- if write_energy_to_influxdb:
- influx_measurement = meters_yaml[cNum].get('influxMeasurement_energy', None)
- if influx_measurement == None:
- influx_measurement = influxdb_yaml[influxInstance_energy].get('defaultMeasurement', 'energy')
-
- jsondata_energy = [
- {
- 'measurement': influx_measurement,
- 'tags': {
- 'meter': cName,
- },
- 'time': t_str,
- 'fields': data_energy[cNum]
- }
- ]
- if verbose:
- print("Writing ENERGY to InfluxDB:")
- print(json.dumps(jsondata_energy, indent = 4))
- try:
- #influxclient_energy.write_points(jsondata_energy)
- influxclient[influxInstance_energy].write_points(jsondata_energy)
- except Exception as e:
- print('Data not written!')
- #log.error('Data not written!')
- print(e)
- #log.error(e)
- else:
- if verbose:
- print("Writing ENERGY to InfluxDB skipped (influxMinWriteInterval="+ str(influxMinWriteInterval) + ", elapsedTime=" + str(influxEnergyWrite_elapsedTime) + ")")
-
-
- # InfluxDB - momentary values
- influxInstance_mom = meters_yaml[cNum].get('influxInstance_mom', None)
- if influxInstance_mom is not None:
- influx_measurement = meters_yaml[cNum].get('influxMeasurement_mom', None)
- if influx_measurement == None:
- influx_measurement = influxdb_yaml[influxInstance_mom].get('defaultMeasurement', 'energy')
-
- jsondata_momentary = [
- {
- 'measurement': influx_measurement,
- 'tags': {
- 'meter': cName,
- },
- 'time': t_str,
- 'fields': data_momentary[cNum]
- }
- ]
- if verbose:
- print("Writing MOMENTARY to InfluxDB:")
- print(json.dumps(jsondata_momentary, indent = 4))
- try:
- #influxclient_momentary.write_points(jsondata_momentary)
- influxclient[influxInstance_mom].write_points(jsondata_momentary)
- except Exception as e:
- print('Data not written!')
- #log.error('Data not written!')
- print(e)
- #log.error(e)
-
-
- # file log
- if config['filelog'].getboolean('enable'):
- # save and restore yesterday´s total energy to calculate today´s energy
- # check if total energy from yesterday is stored in memory, if not try to get it from saved file
-
- file_path_meter = conf_storage_path + cName + "/"
- file_today_min = file_path_meter + today_str + "_min.txt"
- file_yesterday_total = file_path_meter + yesterday_str + "_total.txt"
-
- energy_today_total = 0
- energy_yesterday_min = 0
- energy_today_min = saved_energy_today_min.get(cName, None)
-
- try:
- if dateRollover:
- energy_today_min = None
-
- if energy_today_min == None:
- exists = os.path.isfile(file_today_min)
- if exists:
- # load energy_today_min from file if exists
- f = open(file_today_min, "r")
- if f.mode == 'r':
- contents = f.read()
- f.close()
- if contents != '':
- energy_today_min = float(contents)
- saved_energy_today_min[cName] = energy_today_min
- if verbose:
- print(cName + " - Energy Today min read from file -> = " + str(energy_today_min))
- else: energy_today_min = None
- else:
- # save current Energy_total to min-file
- if not os.path.exists(file_path_meter):
- os.mkdir(file_path_meter)
- f = open(file_today_min, "w+")
- energy_today_min = cReading
- saved_energy_today_min[cName] = energy_today_min
- #f.write(str('{0:.3f}'.format(energy_today_min)))
- #f.write(str(energy_today_min))
- f.write(strformat.format(energy_today_min))
- f.close()
-
- #try:
- if energy_today_min != None:
- energy_today_total = cReading - energy_today_min
- if verbose:
- print(cName + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total)))
-
-
- energy_yesterday_total = saved_energy_yesterday_total.get(cName, None)
- if dateRollover:
- energy_yesterday_total = None
- if energy_yesterday_total == None:
- exists = os.path.isfile(file_yesterday_total)
- if exists:
- # load energy_yesterday_total from file if exists
- f = open(file_yesterday_total, "r")
- if f.mode == 'r':
- contents = f.read()
- f.close()
- if contents != '':
- energy_yesterday_total = float(contents)
- saved_energy_yesterday_total[cName] = energy_yesterday_total
- if debug:
- print(cName + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total))
- else: energy_yesterday_total = None
- else:
- file_yesterday_min = file_path_meter + yesterday_str + "_min.txt"
- exists = os.path.isfile(file_yesterday_min)
- if exists:
- # load yesterday_min from file
- #if args_output_verbose1:
- # print("file file_yesterday_min exists")
- f = open(file_yesterday_min, "r")
- if f.mode == 'r':
- contents =f.read()
- f.close()
-
- if contents != '':
- energy_yesterday_min = float(contents)
- if debug:
- print(cName + " - Energy yesterday min: " + str(energy_yesterday_min))
- else: energy_yesterday_min = None
-
- if energy_yesterday_min != None:
- energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3)
- ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total))
-
- if not os.path.exists(file_path_meter):
- os.mkdir(file_path_meter)
- f = open(file_yesterday_total, "w+")
- #f.write(str('{0:.3f}'.format(energy_yesterday_total)))
- f.write(strformat.format(energy_yesterday_total))
- f.close()
- #else:
- # # file yesterday_min does not exist
- except:
- e = sys.exc_info()[0]
- print( "<p>Error in file log: %s</p>" % e )
-
- if energy_today_total is not None:
- cJson['Today'] = round(energy_today_total, digits)
- if MQTTenabled:
- mqttc.publish(statTopic + "/today", str(round(energy_today_total, digits)), qos=0, retain=False)
-
- if energy_yesterday_total is not None:
- cJson['Yesterday'] = round(energy_yesterday_total, digits)
- if MQTTenabled:
- mqttc.publish(statTopic + "/yesterday", str(round(energy_yesterday_total, digits)), qos=0, retain=False)
- # END file log
-
- if verbose:
- if consoleAddTimestamp:
- print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] meterData:", json.dumps(cJson))
- else:
- print("meterData:", json.dumps(cJson))
-
- if MQTTenabled:
- mqttc.publish(statTopic + "/json", json.dumps(cJson), qos=0, retain=False)
- def publishStatMsg(data):
- if MQTTenabled:
- mqttc.publish(config['mqtt'].get('topic_stat'), data, qos=0, retain=False)
- def publishCmdResponseMsg(data):
- if MQTTenabled:
- mqttc.publish(config['mqtt'].get('topic_cmdresponse'), data, qos=0, retain=False)
- try:
- while True:
- serLine = ser.readline().strip()
- # catch exception on invalid char coming in: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf4 in position 6: ordinal not in range(128)
- try:
- serLine = serLine.decode('ascii')
- except:
- serLine = ""
- if(serLine):
- if verbose:
- if consoleAddTimestamp:
- print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] DEV_SENT: ", serLine)
- else:
- print ("DEV_SENT: ", serLine) #Echo the serial buffer bytes up to the CRLF back to screen
-
- # check if data looks like meter readings JSON - then process
- if serLine.startswith('{"C":'):
- Thread(target=processMeterData, args=(serLine,)).start()
-
- # response to a command
- elif serLine.startswith('cmd:'):
- Thread(target=publishCmdResponseMsg, args=(serLine,)).start()
-
- # other cases it is a normal "stat" message
- else:
- Thread(target=publishStatMsg, args=(serLine,)).start()
-
- except KeyboardInterrupt:
- print('\n')
- exit()
|