cul2mqtt.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. #!/usr/bin/python3 -u
  2. #
  3. # pip3 install pyyaml paho-mqtt pyserial
  4. import yaml
  5. from yaml.constructor import ConstructorError
  6. import serial
  7. import sys
  8. import time
  9. from time import sleep
  10. import datetime
  11. import paho.mqtt.client as mqtt
  12. import json
  13. import os
  14. version = 0.1
  15. verbose = True
  16. debug = False
  17. deviceConfigFile = "/home/pi/cul2mqtt_devices.yaml"
  18. mqtt_server = "mqtt.lan"
  19. mqtt_port = 1883
  20. mqtt_user = ""
  21. mqtt_password = ""
  22. TX_interface_prefer = "UART" # UART or MQTT
  23. repeat_received_commands = False # does not yet work correctly
  24. if len(sys.argv) >= 2:
  25. if sys.argv[1] == "-q":
  26. verbose = False
  27. elif sys.argv[1] == "-v":
  28. verbose = True
  29. # serial (USB) CUL device
  30. receive_from_serial_cul = True
  31. send_on_serial_cul = True
  32. serialPort = '/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0'
  33. serialBaudrate = 38400
  34. serialTimeout = 1 # s, should be not too high, otherwise program could block
  35. culInitCmd = 'X21\r\n' # CUL init command for normal operation - should not be changed normally
  36. culInitTimeout = 3 # s
  37. forceSerialCULConnected = True # if true, program exits when serial CUL cannot be connected to
  38. # MQTT CUL
  39. receive_from_mqtt_cul = False
  40. send_on_mqtt_cul = False
  41. mqtt_topic_cul_received = "MQTT-CUL/received"
  42. mqtt_topic_cul_send = "MQTT-CUL/send"
  43. filterSelfSentIncomingTimeout = 500
  44. # do not change below
  45. serialCULAvailable = False
  46. devdata = {}
  47. RXcodesToDevFunction = {}
  48. InTopicsToDevIds = {}
  49. try:
  50. from yaml import CLoader as Loader
  51. except ImportError:
  52. from yaml import Loader
  53. def no_duplicates_constructor(loader, node, deep=False):
  54. """Check for duplicate keys."""
  55. mapping = {}
  56. for key_node, value_node in node.value:
  57. key = loader.construct_object(key_node, deep=deep)
  58. value = loader.construct_object(value_node, deep=deep)
  59. if key in mapping:
  60. raise ConstructorError("while constructing a mapping", node.start_mark, "found duplicate key (%s)" % key, key_node.start_mark)
  61. mapping[key] = value
  62. return loader.construct_mapping(node, deep)
  63. yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, no_duplicates_constructor)
  64. print("CULagent v" + str(version))
  65. print("=====================================")
  66. try:
  67. with open(deviceConfigFile) as devfile:
  68. devdata = yaml.load(devfile, Loader=yaml.FullLoader)
  69. if debug:
  70. print()
  71. print()
  72. print("==== parsing config file ====")
  73. print()
  74. print(devdata)
  75. print()
  76. for deviceid in devdata:
  77. if debug:
  78. print()
  79. print()
  80. print("Device: " + deviceid)
  81. print(devdata[deviceid])
  82. if "RX" in devdata[deviceid].keys():
  83. if devdata[deviceid]["RX"] != "":
  84. if debug:
  85. print("RX codes:")
  86. for key, value in devdata[deviceid]["RX"].items():
  87. if debug:
  88. print(key, "->", value)
  89. if value in RXcodesToDevFunction.keys():
  90. print()
  91. print()
  92. print("ERROR: RX-string '" + str(value) + "' is already defined for another device! Must be unique.")
  93. raise
  94. else:
  95. RXcodesToDevFunction[value] = deviceid, key
  96. if "cmdTopic" in devdata[deviceid].keys():
  97. if devdata[deviceid]["cmdTopic"] != "":
  98. cmdTopic = devdata[deviceid]["cmdTopic"]
  99. if debug:
  100. print("cmdTopic: " + cmdTopic)
  101. if cmdTopic in InTopicsToDevIds.keys():
  102. print()
  103. print()
  104. print("ERROR: cmdTopic '" + str(cmdTopic) + "' is already defined for another device! Must be unique.")
  105. raise
  106. else:
  107. InTopicsToDevIds[cmdTopic] = deviceid
  108. if debug:
  109. print()
  110. print()
  111. print()
  112. print("RXcodesToDevFunction:")
  113. print(RXcodesToDevFunction)
  114. print()
  115. print()
  116. print("InTopicsToDevIds:")
  117. print(InTopicsToDevIds)
  118. print()
  119. print()
  120. print("InTopicsToDevIds.keys():")
  121. print(InTopicsToDevIds.keys())
  122. print()
  123. print()
  124. print("==== parsing config file complete ====")
  125. print()
  126. print()
  127. print()
  128. print()
  129. except ConstructorError as err:
  130. print("ERROR on parsing configfile:")
  131. print(err)
  132. exit(1)
  133. except:
  134. print("ERROR opening configfile")
  135. exit(1)
  136. ## #mqtt_topic_in = "cul/in"
  137. ##
  138. ##
  139. lastReceived = ["","","","","","","","","",""]
  140. lastReceivedTime = [0,0,0,0,0,0,0,0,0,0]
  141. lastReceivedIndex = 0
  142. lastReceivedMaxAge = 500 # ignore repeated messages when they are younger than x ms
  143. lastSentTime = {}
  144. lastSentMinInterval = 1500 # ignore repeated messages when they are younger than x ms, should be > 1500
  145. lastSentCmd = ""
  146. lastSentCmdTime = 0
  147. lastSentDev = ""
  148. lastSentDevCmd = ""
  149. ##
  150. ##
  151. ##
  152. ## global lastAction_bwm1
  153. ## global lastAction_bwm2
  154. ## global lastAction_bwm3
  155. ## lastAction_bwm1 = 0
  156. ## lastAction_bwm2 = 0
  157. ## lastAction_bwm3 = 0
  158. def touch(fname, times=None):
  159. with open(fname, 'a'):
  160. os.utime(fname, times)
  161. def on_connect(client, userdata, flags, rc):
  162. if verbose:
  163. print("MQTT connected with result code " + str(rc))
  164. if receive_from_mqtt_cul:
  165. if mqtt_topic_cul_received != "":
  166. client.subscribe(mqtt_topic_cul_received)
  167. for in_topic in InTopicsToDevIds.keys():
  168. if in_topic != "":
  169. client.subscribe(in_topic)
  170. print("MQTT subscribed: " + in_topic)
  171. def on_disconnect(client, userdata, rc):
  172. if rc != 0:
  173. print("Unexpected MQTT disconnection. Will auto-reconnect")
  174. def on_message(client, userdata, msg):
  175. #print(msg.topic + ": " + str(msg.payload))
  176. payload = msg.payload.decode("utf-8")
  177. print(msg.topic + ": " + str(payload))
  178. #print()
  179. #print("TOPIC: ")
  180. #print(msg.topic)
  181. #print("TOPIC REC: ")
  182. #print(mqtt_topic_cul_received)
  183. # MQTT message is output from CUL
  184. if receive_from_mqtt_cul and msg.topic == mqtt_topic_cul_received:
  185. payload = payload.rstrip()
  186. print("MQTT-CUL RX: '" + payload + "'")
  187. cul_received(payload)
  188. else:
  189. for in_topic, dev in InTopicsToDevIds.items():
  190. if msg.topic == in_topic:
  191. print()
  192. print("MQTT command received - Topic: '" + msg.topic + "', Payload: '" + payload + " -> Device: '" + dev + "'")
  193. #print(devdata[dev])
  194. if 'name' in devdata[dev].keys():
  195. print('Device Name: ' + devdata[dev]['name'])
  196. if 'statTopic' in devdata[dev].keys():
  197. print('statTopic: ' + devdata[dev]['statTopic'])
  198. #mqttc.publish(devdata[dev]['statTopic'], payload, qos=0, retain=False)
  199. if 'add_statTopics_on' in devdata[dev].keys():
  200. print("add_statTopics_on:")
  201. for res in devdata[dev]['add_statTopics_on']:
  202. #print(res)
  203. if 'on_payload' in res and 'topic' in res and 'payload' in res:
  204. if payload == res['on_payload'] and payload != "" and res['topic'] != "" and res['payload'] != "":
  205. print(" on '" + payload + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'")
  206. #mqttc.publish(res[topic], res[payload], qos=0, retain=False)
  207. #if 'TX' in devdata[dev].keys():
  208. # if payload in devdata[dev]['TX']:
  209. # txstring = devdata[dev]['TX'][payload]
  210. # print("TX code for '" + payload + "' is: " + txstring)
  211. TX_interface = TX_interface_prefer
  212. global lastSentDev, lastSentDevCmd, lastSentCmdTime
  213. now = int(round(time.time() * 1000))
  214. print("dev="+dev+", lastSentDevCmd="+lastSentDevCmd)
  215. if dev == lastSentDev and payload == lastSentDevCmd and (now - lastSentCmdTime) < 1000:
  216. print("MQTT: ignored command as we just sent this.")
  217. pass
  218. else:
  219. cul_send(dev, payload)
  220. #if 'TX_interface' in devdata[dev].keys():
  221. # print("TX_interface: " + devdata[dev]['TX_interface'])
  222. #
  223. #if TX_interface == "MQTT" or TX_interface == "both":
  224. # print("TX via MQTT")
  225. # cul_send(dev, payload)
  226. #
  227. #if TX_interface == "UART" or TX_interface == "both":
  228. # print("TX via UART")
  229. # cul_send(dev, payload)
  230. def publish_device_statusupdate(device, cmd):
  231. if 'statTopic' in devdata[device].keys():
  232. statTopic = devdata[device]['statTopic']
  233. print("MQTT publish statTopic: " + statTopic + " -> " + cmd)
  234. mqttc.publish(statTopic, cmd, qos=0, retain=False)
  235. if 'add_statTopics_on' in devdata[device].keys():
  236. print(" MQTT publish add_statTopics_on:")
  237. for res in devdata[device]['add_statTopics_on']:
  238. #print(res)
  239. if 'on_payload' in res and 'topic' in res and 'payload' in res:
  240. if cmd == res['on_payload']:
  241. print(" on '" + res['on_payload'] + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'")
  242. mqttc.publish(res['topic'], res['payload'], qos=0, retain=False)
  243. if 'add_statTopics' in devdata[device].keys():
  244. print(" MQTT publish on add_statTopics:")
  245. for res in devdata[device]['add_statTopics']:
  246. print(" '" + cmd + "' -> '" + res + "'")
  247. mqttc.publish(res, cmd, qos=0, retain=False)
  248. def parseRXCode(rx_code):
  249. if rx_code in RXcodesToDevFunction:
  250. receivedForDevice = RXcodesToDevFunction[rx_code][0]
  251. receivedCmnd = RXcodesToDevFunction[rx_code][1]
  252. print("receivedForDevice = " + receivedForDevice + ", receivedCmnd = " + receivedCmnd)
  253. publish_device_statusupdate(receivedForDevice, receivedCmnd)
  254. def cul_received(payload):
  255. #global lastAction_bwm1, lastAction_bwm2, lastAction_bwm3
  256. global lastReceivedTime, lastReceivedIndex, lastReceivedMaxAge
  257. #actionSupported = True
  258. isSendReply = False
  259. if payload[:2] == 'is': # msg is reply from CUL to raw send command
  260. isSendReply = True
  261. elif payload[:1] == 'i': # is a IT compatible command - so look it up in the code table
  262. inCmd = payload[:-2] #strip last 2 chars, these only represent signal strength and are not needed
  263. print("inCmd: " + inCmd)
  264. ignoreCommand = False
  265. lastReceivedLength = len(lastReceived)
  266. i = 0
  267. while i < lastReceivedLength:
  268. #print(str(i) + ": " + lastReceived[i])
  269. if lastReceived[i] == inCmd:
  270. lastTime = int(lastReceivedTime[i])
  271. now = int(round(time.time() * 1000))
  272. tdelta = (now - lastTime)
  273. #print("TDELTA = " + str(tdelta))
  274. if tdelta < lastReceivedMaxAge:
  275. print("ignoring received command '" + inCmd + "' - already received " + str(tdelta) + " ms ago")
  276. ignoreCommand = True
  277. #break
  278. i += 1
  279. if not ignoreCommand:
  280. lastReceived[lastReceivedIndex] = inCmd
  281. lastReceivedTime[lastReceivedIndex] = int(round(time.time() * 1000))
  282. if lastReceivedIndex >= (lastReceivedLength - 1):
  283. lastReceivedIndex = 0
  284. else:
  285. lastReceivedIndex += 1
  286. parseRXCode(inCmd)
  287. #if repeat_received_commands:
  288. # lastSentLength = len(lastSent)
  289. # i = 0
  290. # dontRepeat = False
  291. # while i < lastSentLength:
  292. # #print(str(i) + ": " + lastReceived[i])
  293. # if lastSent[i] == decCmd:
  294. # lastTime = int(lastSentTime[i])
  295. # now = int(round(time.time() * 1000))
  296. # tdelta = (now - lastTime)
  297. # #print("TDELTA = " + str(tdelta))
  298. # if tdelta < lastSentMaxAge:
  299. # print("ignoring command as it originated from ourselfs " + inCmd + " " + str(tdelta) + " ms ago")
  300. # dontRepeat = True
  301. # #break
  302. # i += 1
  303. # #cmdToSend = culSendCmds[decCmd]
  304. # if not dontRepeat:
  305. # if device != "" and cmd != "":
  306. # print("REPEATING COMMAND: " + cmd + " TO DEVICE " + device)
  307. # cul_send(device, cmd)
  308. def IT_RXtoTXCode(itReceiveCode):
  309. if debug:
  310. statusstr = "IT_RXtoTXCode "
  311. statusstr += "RX: "
  312. statusstr += itReceiveCode
  313. #print("IT_RXtoTXCode ReceiveCode: " + itReceiveCode)
  314. itReceiveCode = itReceiveCode[1:] # remove first character "i"
  315. itReceiveCodeLengthBytes = int(len(itReceiveCode)/2)
  316. itReceiveCodeBytes = []
  317. itTransmitTristate = "is"
  318. for x in range(itReceiveCodeLengthBytes):
  319. itReceiveCodeBytes.append(bin(int(itReceiveCode[x*2:(x*2+2)],16))[2:].zfill(8))
  320. for x in range(len(itReceiveCodeBytes)):
  321. #print("IT REC byte " + str(x) + " = " + str(itReceiveCodeBytes[x]))
  322. for y in range(4):
  323. quarterbyte = str(itReceiveCodeBytes[x][y*2:y*2+2])
  324. if quarterbyte == "00":
  325. tmpTristate = "0";
  326. elif quarterbyte == "01":
  327. tmpTristate = "F";
  328. elif quarterbyte == "10":
  329. tmpTristate = "D";
  330. elif quarterbyte == "11":
  331. tmpTristate = "1";
  332. #print(quarterbyte + " -> " + tmpTristate)
  333. itTransmitTristate = itTransmitTristate + tmpTristate
  334. if debug:
  335. statusstr += " -> TX: "
  336. statusstr += itTransmitTristate
  337. #print("IT_RXtoTXCode TransmitCode: " + itTransmitTristate)
  338. print(statusstr)
  339. return(itTransmitTristate)
  340. def cul_send(device, cmd):
  341. global lastSentTime
  342. culCmd = ""
  343. culSendCmdsKeyName = device + ' ' + cmd
  344. #print culSendCmdsKeyName
  345. #statTopic = False
  346. #if 'statTopic' in devdata[device].keys():
  347. # statTopic = devdata[device]['statTopic']
  348. # #print('statTopic: ' + devdata[device]['statTopic'])
  349. #if statTopic:
  350. # mqttc.publish(statTopic, cmd, qos=0, retain=False)
  351. # print(" MQTT: published '" + cmd + "' on statTopic: '" + devdata[device]['statTopic'])
  352. publish_device_statusupdate(device, cmd)
  353. tx_code = False
  354. if 'TX' in devdata[device]:
  355. #print("TX data available, cmd="+cmd)
  356. #print(devdata[device]['TX'])
  357. if cmd in devdata[device]['TX']:
  358. tx_code = devdata[device]['TX'][cmd]
  359. print(" TX code for '" + cmd + "': " + tx_code)
  360. if not tx_code:
  361. if 'RX' in devdata[device]:
  362. #print("RX data available, cmd="+cmd)
  363. #print(devdata[device]['RX'])
  364. if cmd in devdata[device]['RX']:
  365. rx_code = devdata[device]['RX'][cmd]
  366. #print("RX code for '" + cmd + "': " + rx_code)
  367. tx_code = IT_RXtoTXCode(rx_code)
  368. print(" TX code for '" + cmd + "': " + tx_code)
  369. if not tx_code:
  370. print(" no valid TX code for this device/command")
  371. else:
  372. now = int(round(time.time() * 1000))
  373. # look if this command has been sent in the past, and when
  374. if culSendCmdsKeyName in lastSentTime.keys():
  375. lastTime = lastSentTime[culSendCmdsKeyName]
  376. else:
  377. lastTime = 0
  378. lastTimeAge = now - lastTime
  379. print(' lastTime: ' + str(lastTimeAge) + 'ms ago')
  380. if lastTimeAge > lastSentMinInterval: # only send if last time + min interval is exceeded
  381. lastSentTime[culSendCmdsKeyName] = now # save what we send, so that we dont repeat our own sent messages if repeating is enabled
  382. TX_interface = TX_interface_prefer
  383. if 'TX_interface' in devdata[device].keys():
  384. print(" TX_interface: " + devdata[device]['TX_interface'])
  385. if TX_interface == "UART" and not serialCULAvailable:
  386. TX_interface = "MQTT"
  387. global lastSentCmd, lastSentCmdTime, lastSentDev, lastSentDevCmd
  388. lastSentCmd = tx_code
  389. lastSentCmdTime = now
  390. lastSentDev = device
  391. lastSentDevCmd = cmd
  392. if send_on_mqtt_cul and (TX_interface == "MQTT" or TX_interface == "both"):
  393. print(" TX via MQTT: " + tx_code)
  394. mqttc.publish(mqtt_topic_cul_send, tx_code, qos=0, retain=False)
  395. if serialCULAvailable and send_on_serial_cul and (TX_interface == "UART" or TX_interface == "both"):
  396. print(" TX via UART: " + tx_code)
  397. culCmd = tx_code + '\r\n'
  398. ser.write(culCmd.encode('utf-8'))
  399. else:
  400. print("WARNING: CUL send command repeated too quickly.")
  401. if receive_from_serial_cul or send_on_serial_cul:
  402. if not os.path.exists(serialPort):
  403. print("ERROR opening connection to serial CUL... device '" + serialPort + "' does not exist.")
  404. if receive_from_mqtt_cul:
  405. if forceSerialCULConnected:
  406. exit(2)
  407. else:
  408. print("resuming in MQTT-CUL only mode...")
  409. TX_interface_prefer = "MQTT"
  410. receive_from_serial_cul = False
  411. send_on_serial_cul = False
  412. serialCULAvailable = False
  413. print()
  414. print()
  415. else:
  416. print("opening connection to serial CUL...")
  417. serLine = ""
  418. ser = serial.Serial(port=serialPort,baudrate=serialBaudrate,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,timeout=serialTimeout)
  419. sleep(culInitTimeout)
  420. ser.write('V\r\n'.encode('utf-8')) # get CUL version info
  421. serLine = ser.readline()
  422. serLine = serLine.decode('utf-8').rstrip('\r\n')
  423. if serLine.startswith("V ") and serLine.find("culfw") != -1:
  424. print("connected. CUL version: " + serLine)
  425. serialCULAvailable = True
  426. sleep(0.1)
  427. print('Initializing CUL with command: ' + culInitCmd.rstrip('\r\n'))
  428. ser.write(culInitCmd.encode('utf-8')) # initialize CUL in normal receive mode
  429. sleep(0.5)
  430. else:
  431. print("WARNING: could not connect serial CUL")
  432. receive_from_serial_cul = False
  433. send_on_serial_cul = False
  434. serialCULAvailable = False
  435. TX_interface_prefer = "MQTT"
  436. if forceSerialCULConnected:
  437. exit(2)
  438. mqttc = mqtt.Client()
  439. mqttc.on_connect = on_connect
  440. mqttc.on_disconnect = on_disconnect
  441. mqttc.on_message = on_message
  442. if len(mqtt_user) > 0 and len(mqtt_password) > 0:
  443. mqttc.username_pw_set(mqtt_user, mqtt_password)
  444. mqttc.connect(mqtt_server, mqtt_port, 60)
  445. mqttc.loop_start()
  446. while True:
  447. if receive_from_serial_cul:
  448. serLine = ser.readline()
  449. if len(serLine) > 0:
  450. now = int(round(time.time() * 1000))
  451. recvCmd = serLine.decode('utf-8')
  452. recvCmd = recvCmd.rstrip('\r\n')
  453. #if debug:
  454. # print("lastSentCmd: " + lastSentCmd + ", lastSentCmdTime=" + str(lastSentCmdTime))
  455. if recvCmd == lastSentCmd and (now - lastSentCmdTime) < filterSelfSentIncomingTimeout:
  456. pass
  457. else:
  458. print()
  459. print("Serial-CUL RX: '" + recvCmd + "'")
  460. cul_received(recvCmd)
  461. sleep(0.05)
  462. ## #print "test"
  463. ## #touch("/tmp/culagent_running")
  464. #except KeyboardInterrupt:
  465. # print("\n")