s0meters.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. #!/usr/bin/python3 -u
  2. #
  3. import serial
  4. #from time import sleep
  5. import time
  6. import os
  7. import sys
  8. import paho.mqtt.client as mqtt
  9. import json
  10. import yaml
  11. from yaml.constructor import ConstructorError
  12. from threading import Thread
  13. from influxdb import InfluxDBClient
  14. import configparser
  15. #from datetime import datetime
  16. import datetime
  17. # Change working dir to the same dir as this script
  18. os.chdir(sys.path[0])
  19. config = configparser.ConfigParser()
  20. config.read('s0meters.ini')
  21. quiet = False
  22. verbose = False
  23. debug = False
  24. if len(sys.argv) >= 2:
  25. if sys.argv[1] == "-q":
  26. verbose = False
  27. quiet = True
  28. debug = False
  29. elif sys.argv[1] == "-v":
  30. verbose = True
  31. quiet = False
  32. debug = False
  33. print("VERBOSE=ON")
  34. elif sys.argv[1] == "-d":
  35. verbose = True
  36. quiet = False
  37. debug = True
  38. print("DEBUG=ON")
  39. consoleAddTimestamp = config['main'].get('consoleAddTimestamp')
  40. conf_storage_path = config['filelog'].get('storage_path')
  41. if conf_storage_path[-1:] != '/':
  42. conf_storage_path += '/'
  43. ser = serial.Serial(port=config['hardware'].get('serialPort'),
  44. baudrate = config['hardware'].get('serialBaud'),
  45. parity=serial.PARITY_NONE,
  46. stopbits=serial.STOPBITS_ONE,
  47. bytesize=serial.EIGHTBITS,
  48. timeout=config['hardware'].getint('serialTout'))
  49. if not quiet:
  50. print("S0MetersLog by Flo Kra")
  51. print("=======================================================================")
  52. meters_yaml = yaml.load(open(config['main'].get('meters_config_yml')), Loader=yaml.SafeLoader)
  53. if not quiet:
  54. print("Meters configuration:")
  55. print(json.dumps(meters_yaml, indent=2))
  56. print("loading InfluxDB configuration...")
  57. influxdb_yaml = yaml.load(open(config['main'].get('influx_config_yml')), Loader=yaml.SafeLoader)
  58. if not quiet:
  59. print("InfluxDB Instances:")
  60. print(json.dumps(influxdb_yaml, indent=4))
  61. influxclient = dict()
  62. for instance in influxdb_yaml:
  63. i_host = influxdb_yaml[instance].get('host', None)
  64. i_port = int(influxdb_yaml[instance].get('port', 8086))
  65. i_username = influxdb_yaml[instance].get('username', None)
  66. i_password = influxdb_yaml[instance].get('password', None)
  67. i_database = influxdb_yaml[instance].get('database', None)
  68. if i_host != None and i_database != None:
  69. if i_username != None and i_password != None:
  70. influxclient[instance] = InfluxDBClient(i_host, i_port, i_username, i_password, i_database)
  71. else:
  72. influxclient[instance] = InfluxDBClient(i_host, i_port, i_database)
  73. data_energy = dict()
  74. data_momentary = dict()
  75. saved_todays_date = dict()
  76. influxdb_energy_lastWriteTime = dict()
  77. saved_energy_today_min = dict()
  78. saved_energy_yesterday_total = dict()
  79. for meter in meters_yaml:
  80. data_energy[meter] = dict()
  81. data_momentary[meter] = dict()
  82. MQTTenabled = config['mqtt'].getboolean('enable', False)
  83. if MQTTenabled:
  84. def on_connect(client, userdata, flags, rc):
  85. if not quiet:
  86. print("MQTT: connected with result code " + str(rc))
  87. client.subscribe(config['mqtt'].get('topic_cmd'))
  88. if not quiet:
  89. print("MQTT: subscribed topic:", config['mqtt'].get('topic_cmd'))
  90. def on_message(client, userdata, msg):
  91. if not quiet:
  92. print("MQTT incoming on topic", msg.topic)
  93. if msg.topic == config['mqtt'].get('topic_cmd'):
  94. if not quiet:
  95. print("MQTT CMD:", msg.payload)
  96. cmd = msg.payload.decode('ascii') + '\n'
  97. ser.write(cmd.encode('ascii'))
  98. mqttc = mqtt.Client()
  99. mqttc.on_connect = on_connect
  100. #mqttc.on_disconnect = on_disconnect
  101. mqttc.on_message = on_message
  102. if config['mqtt'].get('user') != "" and config['mqtt'].get('password') != "":
  103. mqttc.username_pw_set(config['mqtt'].get('user'), config['mqtt'].get('password'))
  104. mqttc.connect(config['mqtt'].get('server'), config['mqtt'].getint('port'), config['mqtt'].getint('keepalive'))
  105. mqttc.loop_start()
  106. def processMeterData(data):
  107. try:
  108. cJson = json.loads(data)
  109. except:
  110. cJson = False
  111. #print(json.dumps(cJson, indent=1))
  112. #print(cJson['C'])
  113. #print(json.dumps(meters_yaml[cJson['C']]))
  114. #print(json.dumps(cJson))
  115. if cJson:
  116. cNum = cJson.get('C')
  117. cReading = float(cJson.get('reading', None))
  118. #print(cNum)
  119. dTime = cJson.get('dTime', None)
  120. write_energy_to_influxdb = False
  121. cName = meters_yaml[cNum].get('name', False)
  122. statTopic = meters_yaml[cNum].get('statTopic', False)
  123. unit = meters_yaml[cNum].get('unit', False)
  124. momUnit = meters_yaml[cNum].get('momUnit', False)
  125. momType = meters_yaml[cNum].get('momType', False)
  126. impPerUnit = meters_yaml[cNum].get('impPerUnit', False)
  127. if cName:
  128. cJson['name'] = cName
  129. momFactor = meters_yaml[cNum].get('momFactor', False)
  130. if not momFactor:
  131. momFactor = 1
  132. momDigits = meters_yaml[cNum].get('momDigits', None)
  133. if momDigits == None:
  134. momDigits = 3
  135. digits = meters_yaml[cNum].get('digits', False)
  136. influxMinWriteInterval = meters_yaml[cNum].get('influxMinWriteInterval_energy', None)
  137. if influxMinWriteInterval == None: influxMinWriteInterval = 0
  138. # check for date rollover since last impulse
  139. today = datetime.date.today()
  140. today_str = today.strftime('%Y%m%d')
  141. yesterday = today - datetime.timedelta(days = 1)
  142. yesterday_str = yesterday.strftime('%Y%m%d')
  143. dateRollover = False
  144. savedtoday = saved_todays_date.get(cName, False)
  145. if not savedtoday or savedtoday != today:
  146. if debug:
  147. print("date rollover happened or no date has been saved yet for meter " + str(cName))
  148. if savedtoday and savedtoday == yesterday:
  149. # a date rollover just happened, so change todays date to current and proceed with what has to be done
  150. dateRollover = True
  151. #log.debug(savedtoday)
  152. saved_todays_date[cName] = today
  153. strformat = "{:.3f}"
  154. cReading_formatted = None
  155. if digits:
  156. strformat = "{:."+str(digits)+"f}"
  157. cReading_formatted = strformat.format(cReading)
  158. cJson['reading'] = round(cReading, digits)
  159. if unit:
  160. cJson['unit'] = unit
  161. if impPerUnit and dTime is not None:
  162. if dTime == 0:
  163. momValue = 0.0
  164. else:
  165. momValue = (3600000 / dTime / impPerUnit) * momFactor
  166. if momDigits > 0:
  167. momValue = round(momValue, momDigits)
  168. else:
  169. momValue = round(momValue)
  170. cJson['momValue'] = momValue
  171. if momType:
  172. cJson['momType'] = momType
  173. if momUnit:
  174. cJson['momUnit'] = momUnit
  175. if statTopic:
  176. if momType and momValue is not None:
  177. if MQTTenabled:
  178. subtop = momType
  179. mqttc.publish(statTopic + "/" + subtop, str(momValue), qos=0, retain=False)
  180. if statTopic:
  181. if cReading_formatted != None:
  182. if MQTTenabled:
  183. mqttc.publish(statTopic + "/reading", str(cReading_formatted), qos=0, retain=False)
  184. data_energy[cNum][meters_yaml[cNum].get('influxFieldName_energy', 'energyTotal')] = round(float(cReading), digits)
  185. data_momentary[cNum][meters_yaml[cNum].get('influxFieldName_mom', 'momentaryUsage')] = round(float(momValue), momDigits)
  186. # InfluxDB
  187. t_utc = datetime.datetime.utcnow()
  188. t_str = t_utc.isoformat() + 'Z'
  189. # InfluxDB - energy readings
  190. if dateRollover:
  191. write_energy_to_influxdb = True
  192. ts = int(time.time())
  193. influxEnergyWrite_elapsedTime = 0
  194. if influxdb_energy_lastWriteTime.get(cNum, None) != None:
  195. influxEnergyWrite_elapsedTime = ts - influxdb_energy_lastWriteTime.get(cNum)
  196. if influxEnergyWrite_elapsedTime >= influxMinWriteInterval:
  197. write_energy_to_influxdb = True
  198. influxdb_energy_lastWriteTime[cNum] = ts
  199. else:
  200. # first run - do write immediately
  201. write_energy_to_influxdb = True
  202. influxdb_energy_lastWriteTime[cNum] = ts
  203. influxInstance_energy = meters_yaml[cNum].get('influxInstance_energy', None)
  204. if influxInstance_energy is not None:
  205. if write_energy_to_influxdb:
  206. influx_measurement = meters_yaml[cNum].get('influxMeasurement_energy', None)
  207. if influx_measurement == None:
  208. influx_measurement = influxdb_yaml[influxInstance_energy].get('defaultMeasurement', 'energy')
  209. jsondata_energy = [
  210. {
  211. 'measurement': influx_measurement,
  212. 'tags': {
  213. 'meter': cName,
  214. },
  215. 'time': t_str,
  216. 'fields': data_energy[cNum]
  217. }
  218. ]
  219. if verbose:
  220. print("Writing ENERGY to InfluxDB:")
  221. print(json.dumps(jsondata_energy, indent = 4))
  222. try:
  223. #influxclient_energy.write_points(jsondata_energy)
  224. influxclient[influxInstance_energy].write_points(jsondata_energy)
  225. except Exception as e:
  226. print('Data not written!')
  227. #log.error('Data not written!')
  228. print(e)
  229. #log.error(e)
  230. else:
  231. if verbose:
  232. print("Writing ENERGY to InfluxDB skipped (influxMinWriteInterval="+ str(influxMinWriteInterval) + ", elapsedTime=" + str(influxEnergyWrite_elapsedTime) + ")")
  233. # InfluxDB - momentary values
  234. influxInstance_mom = meters_yaml[cNum].get('influxInstance_mom', None)
  235. if influxInstance_mom is not None:
  236. influx_measurement = meters_yaml[cNum].get('influxMeasurement_mom', None)
  237. if influx_measurement == None:
  238. influx_measurement = influxdb_yaml[influxInstance_mom].get('defaultMeasurement', 'energy')
  239. jsondata_momentary = [
  240. {
  241. 'measurement': influx_measurement,
  242. 'tags': {
  243. 'meter': cName,
  244. },
  245. 'time': t_str,
  246. 'fields': data_momentary[cNum]
  247. }
  248. ]
  249. if verbose:
  250. print("Writing MOMENTARY to InfluxDB:")
  251. print(json.dumps(jsondata_momentary, indent = 4))
  252. try:
  253. #influxclient_momentary.write_points(jsondata_momentary)
  254. influxclient[influxInstance_mom].write_points(jsondata_momentary)
  255. except Exception as e:
  256. print('Data not written!')
  257. #log.error('Data not written!')
  258. print(e)
  259. #log.error(e)
  260. # file log
  261. if config['filelog'].getboolean('enable'):
  262. # save and restore yesterday´s total energy to calculate today´s energy
  263. # check if total energy from yesterday is stored in memory, if not try to get it from saved file
  264. file_path_meter = conf_storage_path + cName + "/"
  265. file_today_min = file_path_meter + today_str + "_min.txt"
  266. file_yesterday_total = file_path_meter + yesterday_str + "_total.txt"
  267. energy_today_total = 0
  268. energy_yesterday_min = 0
  269. energy_today_min = saved_energy_today_min.get(cName, None)
  270. try:
  271. if dateRollover:
  272. energy_today_min = None
  273. if energy_today_min == None:
  274. exists = os.path.isfile(file_today_min)
  275. if exists:
  276. # load energy_today_min from file if exists
  277. f = open(file_today_min, "r")
  278. if f.mode == 'r':
  279. contents = f.read()
  280. f.close()
  281. if contents != '':
  282. energy_today_min = float(contents)
  283. saved_energy_today_min[cName] = energy_today_min
  284. if verbose:
  285. print(cName + " - Energy Today min read from file -> = " + str(energy_today_min))
  286. else: energy_today_min = None
  287. else:
  288. # save current Energy_total to min-file
  289. if not os.path.exists(file_path_meter):
  290. os.mkdir(file_path_meter)
  291. f = open(file_today_min, "w+")
  292. energy_today_min = cReading
  293. saved_energy_today_min[cName] = energy_today_min
  294. #f.write(str('{0:.3f}'.format(energy_today_min)))
  295. #f.write(str(energy_today_min))
  296. f.write(strformat.format(energy_today_min))
  297. f.close()
  298. #try:
  299. if energy_today_min != None:
  300. energy_today_total = cReading - energy_today_min
  301. if verbose:
  302. print(cName + " - Energy Today total: " + str('{0:.3f}'.format(energy_today_total)))
  303. energy_yesterday_total = saved_energy_yesterday_total.get(cName, None)
  304. if dateRollover:
  305. energy_yesterday_total = None
  306. if energy_yesterday_total == None:
  307. exists = os.path.isfile(file_yesterday_total)
  308. if exists:
  309. # load energy_yesterday_total from file if exists
  310. f = open(file_yesterday_total, "r")
  311. if f.mode == 'r':
  312. contents = f.read()
  313. f.close()
  314. if contents != '':
  315. energy_yesterday_total = float(contents)
  316. saved_energy_yesterday_total[cName] = energy_yesterday_total
  317. if debug:
  318. print(cName + " - Energy Yesterday total read from file -> = " + str(energy_yesterday_total))
  319. else: energy_yesterday_total = None
  320. else:
  321. file_yesterday_min = file_path_meter + yesterday_str + "_min.txt"
  322. exists = os.path.isfile(file_yesterday_min)
  323. if exists:
  324. # load yesterday_min from file
  325. #if args_output_verbose1:
  326. # print("file file_yesterday_min exists")
  327. f = open(file_yesterday_min, "r")
  328. if f.mode == 'r':
  329. contents =f.read()
  330. f.close()
  331. if contents != '':
  332. energy_yesterday_min = float(contents)
  333. if debug:
  334. print(cName + " - Energy yesterday min: " + str(energy_yesterday_min))
  335. else: energy_yesterday_min = None
  336. if energy_yesterday_min != None:
  337. energy_yesterday_total = round(energy_today_min - energy_yesterday_min, 3)
  338. ###log.debug(meter_id_name[meter['id']] + " - Energy yesterday total: " + str(energy_yesterday_total))
  339. if not os.path.exists(file_path_meter):
  340. os.mkdir(file_path_meter)
  341. f = open(file_yesterday_total, "w+")
  342. #f.write(str('{0:.3f}'.format(energy_yesterday_total)))
  343. f.write(strformat.format(energy_yesterday_total))
  344. f.close()
  345. #else:
  346. # # file yesterday_min does not exist
  347. except:
  348. e = sys.exc_info()[0]
  349. print( "<p>Error in file log: %s</p>" % e )
  350. if energy_today_total is not None:
  351. cJson['Today'] = round(energy_today_total, digits)
  352. if MQTTenabled:
  353. mqttc.publish(statTopic + "/today", str(round(energy_today_total, digits)), qos=0, retain=False)
  354. if energy_yesterday_total is not None:
  355. cJson['Yesterday'] = round(energy_yesterday_total, digits)
  356. if MQTTenabled:
  357. mqttc.publish(statTopic + "/yesterday", str(round(energy_yesterday_total, digits)), qos=0, retain=False)
  358. # END file log
  359. if verbose:
  360. if consoleAddTimestamp:
  361. print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] meterData:", json.dumps(cJson))
  362. else:
  363. print("meterData:", json.dumps(cJson))
  364. if MQTTenabled:
  365. mqttc.publish(statTopic + "/json", json.dumps(cJson), qos=0, retain=False)
  366. def publishStatMsg(data):
  367. if MQTTenabled:
  368. mqttc.publish(config['mqtt'].get('topic_stat'), data, qos=0, retain=False)
  369. def publishCmdResponseMsg(data):
  370. if MQTTenabled:
  371. mqttc.publish(config['mqtt'].get('topic_cmdresponse'), data, qos=0, retain=False)
  372. try:
  373. while True:
  374. serLine = ser.readline().strip()
  375. # catch exception on invalid char coming in: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf4 in position 6: ordinal not in range(128)
  376. try:
  377. serLine = serLine.decode('ascii')
  378. except:
  379. serLine = ""
  380. if(serLine):
  381. if verbose:
  382. if consoleAddTimestamp:
  383. print ("[" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) + "] DEV_SENT: ", serLine)
  384. else:
  385. print ("DEV_SENT: ", serLine) #Echo the serial buffer bytes up to the CRLF back to screen
  386. # check if data looks like meter readings JSON - then process
  387. if serLine.startswith('{"C":'):
  388. Thread(target=processMeterData, args=(serLine,)).start()
  389. # response to a command
  390. elif serLine.startswith('cmd:'):
  391. Thread(target=publishCmdResponseMsg, args=(serLine,)).start()
  392. # other cases it is a normal "stat" message
  393. else:
  394. Thread(target=publishStatMsg, args=(serLine,)).start()
  395. except KeyboardInterrupt:
  396. print('\n')
  397. exit()