cul2mqtt.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. #!/usr/bin/python3 -u
  2. #
  3. # pip3 install pyyaml paho-mqtt pyserial
  4. import yaml
  5. from yaml.constructor import ConstructorError
  6. import serial
  7. import sys
  8. import time
  9. from time import sleep
  10. import datetime
  11. import paho.mqtt.client as mqtt
  12. import os
  13. version = 0.1
  14. verbose = True
  15. debug = False
  16. deviceConfigFile = "/home/pi/cul2mqtt_devices.yaml"
  17. mqtt_server = "mqtt.lan"
  18. mqtt_port = 1883
  19. mqtt_user = ""
  20. mqtt_password = ""
  21. TX_interface_prefer = "UART" # UART or MQTT
  22. repeat_received_commands = False # does not yet work correctly
  23. if len(sys.argv) >= 2:
  24. if sys.argv[1] == "-q":
  25. verbose = False
  26. elif sys.argv[1] == "-v":
  27. verbose = True
  28. elif sys.argv[1] == "-d":
  29. debug = True
  30. verbose = True
  31. # serial (USB) CUL device
  32. receive_from_serial_cul = True
  33. send_on_serial_cul = True
  34. serialPort = '/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0'
  35. serialBaudrate = 38400
  36. serialTimeout = 1 # s, should be not too high, otherwise program could block
  37. culInitCmd = 'X21\r\n' # CUL init command for normal operation - should not be changed normally
  38. culInitTimeout = 3 # s
  39. forceSerialCULConnected = True # if true, program exits when serial CUL cannot be connected to
  40. # MQTT CUL
  41. receive_from_mqtt_cul = False
  42. send_on_mqtt_cul = False
  43. mqtt_topic_cul_received = "MQTT-CUL/received"
  44. mqtt_topic_cul_send = "MQTT-CUL/send"
  45. filterSelfSentIncomingTimeout = 500
  46. # do not change below
  47. serialCULAvailable = False
  48. devdata = {}
  49. RXcodesToDevFunction = {}
  50. InTopicsToDevIds = {}
  51. try:
  52. from yaml import CLoader as Loader
  53. except ImportError:
  54. from yaml import Loader
  55. def no_duplicates_constructor(loader, node, deep=False):
  56. """Check for duplicate keys."""
  57. mapping = {}
  58. for key_node, value_node in node.value:
  59. key = loader.construct_object(key_node, deep=deep)
  60. value = loader.construct_object(value_node, deep=deep)
  61. if key in mapping:
  62. raise ConstructorError("while constructing a mapping", node.start_mark, "found duplicate key (%s)" % key, key_node.start_mark)
  63. mapping[key] = value
  64. return loader.construct_mapping(node, deep)
  65. yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, no_duplicates_constructor)
  66. print("CUL2MQTT v" + str(version))
  67. print("=====================================")
  68. try:
  69. with open(deviceConfigFile) as devfile:
  70. devdata = yaml.load(devfile, Loader=yaml.FullLoader)
  71. if debug:
  72. print()
  73. print()
  74. print("==== parsing config file ====")
  75. print()
  76. print(devdata)
  77. print()
  78. for deviceid in devdata:
  79. if debug:
  80. print()
  81. print()
  82. print("Device: " + deviceid)
  83. print(devdata[deviceid])
  84. if "RX" in devdata[deviceid].keys():
  85. if devdata[deviceid]["RX"] != "":
  86. if debug:
  87. print("RX codes:")
  88. for key, value in devdata[deviceid]["RX"].items():
  89. if debug:
  90. print(key, "->", value)
  91. if value in RXcodesToDevFunction.keys():
  92. print()
  93. print()
  94. print("ERROR: RX-string '" + str(value) + "' is already defined for another device! Must be unique.")
  95. raise
  96. else:
  97. RXcodesToDevFunction[value] = deviceid, key
  98. if "cmdTopic" in devdata[deviceid].keys():
  99. if devdata[deviceid]["cmdTopic"] != "":
  100. cmdTopic = devdata[deviceid]["cmdTopic"]
  101. if debug:
  102. print("cmdTopic: " + cmdTopic)
  103. if cmdTopic in InTopicsToDevIds.keys():
  104. print()
  105. print()
  106. print("ERROR: cmdTopic '" + str(cmdTopic) + "' is already defined for another device! Must be unique.")
  107. raise
  108. else:
  109. InTopicsToDevIds[cmdTopic] = deviceid
  110. if debug:
  111. print()
  112. print()
  113. print()
  114. print("RXcodesToDevFunction:")
  115. print(RXcodesToDevFunction)
  116. print()
  117. print()
  118. print("InTopicsToDevIds:")
  119. print(InTopicsToDevIds)
  120. print()
  121. print()
  122. print("InTopicsToDevIds.keys():")
  123. print(InTopicsToDevIds.keys())
  124. print()
  125. print()
  126. print("==== parsing config file complete ====")
  127. print()
  128. print()
  129. print()
  130. print()
  131. except ConstructorError as err:
  132. print("ERROR on parsing configfile:")
  133. print(err)
  134. exit(1)
  135. except:
  136. print("ERROR opening configfile")
  137. exit(1)
  138. ## #mqtt_topic_in = "cul/in"
  139. ##
  140. ##
  141. lastReceived = ["","","","","","","","","",""]
  142. lastReceivedTime = [0,0,0,0,0,0,0,0,0,0]
  143. lastReceivedIndex = 0
  144. lastReceivedMaxAge = 500 # ignore repeated messages when they are younger than x ms
  145. lastSentTime = {}
  146. lastSentMinInterval = 1500 # ignore repeated messages when they are younger than x ms, should be > 1500
  147. lastSentCmd = ""
  148. lastSentCmdTime = 0
  149. lastSentDev = ""
  150. lastSentDevCmd = ""
  151. ##
  152. ##
  153. ##
  154. ## global lastAction_bwm1
  155. ## global lastAction_bwm2
  156. ## global lastAction_bwm3
  157. ## lastAction_bwm1 = 0
  158. ## lastAction_bwm2 = 0
  159. ## lastAction_bwm3 = 0
  160. def touch(fname, times=None):
  161. with open(fname, 'a'):
  162. os.utime(fname, times)
  163. def on_connect(client, userdata, flags, rc):
  164. if verbose:
  165. print("MQTT connected with result code " + str(rc))
  166. if receive_from_mqtt_cul:
  167. if mqtt_topic_cul_received != "":
  168. client.subscribe(mqtt_topic_cul_received)
  169. for in_topic in InTopicsToDevIds.keys():
  170. if in_topic != "":
  171. client.subscribe(in_topic)
  172. print("MQTT subscribed: " + in_topic)
  173. def on_disconnect(client, userdata, rc):
  174. if rc != 0:
  175. print("Unexpected MQTT disconnection. Will auto-reconnect")
  176. def on_message(client, userdata, msg):
  177. #print(msg.topic + ": " + str(msg.payload))
  178. payload = msg.payload.decode("utf-8")
  179. print(msg.topic + ": " + str(payload))
  180. #print()
  181. #print("TOPIC: ")
  182. #print(msg.topic)
  183. #print("TOPIC REC: ")
  184. #print(mqtt_topic_cul_received)
  185. # MQTT message is output from CUL
  186. if receive_from_mqtt_cul and msg.topic == mqtt_topic_cul_received:
  187. payload = payload.rstrip()
  188. print("MQTT-CUL RX: '" + payload + "'")
  189. cul_received(payload)
  190. else:
  191. for in_topic, dev in InTopicsToDevIds.items():
  192. if msg.topic == in_topic:
  193. print()
  194. print("MQTT command received - Topic: '" + msg.topic + "', Payload: '" + payload + " -> Device: '" + dev + "'")
  195. #print(devdata[dev])
  196. if 'name' in devdata[dev].keys():
  197. print('Device Name: ' + devdata[dev]['name'])
  198. if 'statTopic' in devdata[dev].keys():
  199. print('statTopic: ' + devdata[dev]['statTopic'])
  200. #mqttc.publish(devdata[dev]['statTopic'], payload, qos=0, retain=False)
  201. if 'add_statTopics_on' in devdata[dev].keys():
  202. print("add_statTopics_on:")
  203. for res in devdata[dev]['add_statTopics_on']:
  204. #print(res)
  205. if 'on_payload' in res and 'topic' in res and 'payload' in res:
  206. if payload == res['on_payload'] and payload != "" and res['topic'] != "" and res['payload'] != "":
  207. print(" on '" + payload + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'")
  208. #mqttc.publish(res[topic], res[payload], qos=0, retain=False)
  209. #if 'TX' in devdata[dev].keys():
  210. # if payload in devdata[dev]['TX']:
  211. # txstring = devdata[dev]['TX'][payload]
  212. # print("TX code for '" + payload + "' is: " + txstring)
  213. TX_interface = TX_interface_prefer
  214. global lastSentDev, lastSentDevCmd, lastSentCmdTime
  215. now = int(round(time.time() * 1000))
  216. print("dev="+dev+", lastSentDevCmd="+lastSentDevCmd)
  217. if dev == lastSentDev and payload == lastSentDevCmd and (now - lastSentCmdTime) < 1000:
  218. print("MQTT: ignored command as we just sent this.")
  219. pass
  220. else:
  221. cul_send(dev, payload)
  222. #if 'TX_interface' in devdata[dev].keys():
  223. # print("TX_interface: " + devdata[dev]['TX_interface'])
  224. #
  225. #if TX_interface == "MQTT" or TX_interface == "both":
  226. # print("TX via MQTT")
  227. # cul_send(dev, payload)
  228. #
  229. #if TX_interface == "UART" or TX_interface == "both":
  230. # print("TX via UART")
  231. # cul_send(dev, payload)
  232. def publish_device_statusupdate(device, cmd):
  233. if 'statTopic' in devdata[device].keys():
  234. statTopic = devdata[device]['statTopic']
  235. print("MQTT publish statTopic: " + statTopic + " -> " + cmd)
  236. mqttc.publish(statTopic, cmd, qos=0, retain=False)
  237. if 'add_statTopics_on' in devdata[device].keys():
  238. print(" MQTT publish add_statTopics_on:")
  239. for res in devdata[device]['add_statTopics_on']:
  240. #print(res)
  241. if 'on_payload' in res and 'topic' in res and 'payload' in res:
  242. if cmd == res['on_payload']:
  243. print(" on '" + res['on_payload'] + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'")
  244. mqttc.publish(res['topic'], res['payload'], qos=0, retain=False)
  245. if 'add_statTopics' in devdata[device].keys():
  246. print(" MQTT publish on add_statTopics:")
  247. for res in devdata[device]['add_statTopics']:
  248. print(" '" + cmd + "' -> '" + res + "'")
  249. mqttc.publish(res, cmd, qos=0, retain=False)
  250. def parseRXCode(rx_code):
  251. if rx_code in RXcodesToDevFunction:
  252. receivedForDevice = RXcodesToDevFunction[rx_code][0]
  253. receivedCmnd = RXcodesToDevFunction[rx_code][1]
  254. print("receivedForDevice = " + receivedForDevice + ", receivedCmnd = " + receivedCmnd)
  255. publish_device_statusupdate(receivedForDevice, receivedCmnd)
  256. def cul_received(payload):
  257. #global lastAction_bwm1, lastAction_bwm2, lastAction_bwm3
  258. global lastReceivedTime, lastReceivedIndex, lastReceivedMaxAge
  259. #actionSupported = True
  260. isSendReply = False
  261. if payload[:2] == 'is': # msg is reply from CUL to raw send command
  262. isSendReply = True
  263. elif payload[:1] == 'i': # is a IT compatible command - so look it up in the code table
  264. inCmd = payload[:-2] #strip last 2 chars, these only represent signal strength and are not needed
  265. print("inCmd: " + inCmd)
  266. ignoreCommand = False
  267. lastReceivedLength = len(lastReceived)
  268. i = 0
  269. while i < lastReceivedLength:
  270. #print(str(i) + ": " + lastReceived[i])
  271. if lastReceived[i] == inCmd:
  272. lastTime = int(lastReceivedTime[i])
  273. now = int(round(time.time() * 1000))
  274. tdelta = (now - lastTime)
  275. #print("TDELTA = " + str(tdelta))
  276. if tdelta < lastReceivedMaxAge:
  277. print("ignoring received command '" + inCmd + "' - already received " + str(tdelta) + " ms ago")
  278. ignoreCommand = True
  279. #break
  280. i += 1
  281. if not ignoreCommand:
  282. lastReceived[lastReceivedIndex] = inCmd
  283. lastReceivedTime[lastReceivedIndex] = int(round(time.time() * 1000))
  284. if lastReceivedIndex >= (lastReceivedLength - 1):
  285. lastReceivedIndex = 0
  286. else:
  287. lastReceivedIndex += 1
  288. parseRXCode(inCmd)
  289. #if repeat_received_commands:
  290. # lastSentLength = len(lastSent)
  291. # i = 0
  292. # dontRepeat = False
  293. # while i < lastSentLength:
  294. # #print(str(i) + ": " + lastReceived[i])
  295. # if lastSent[i] == decCmd:
  296. # lastTime = int(lastSentTime[i])
  297. # now = int(round(time.time() * 1000))
  298. # tdelta = (now - lastTime)
  299. # #print("TDELTA = " + str(tdelta))
  300. # if tdelta < lastSentMaxAge:
  301. # print("ignoring command as it originated from ourselfs " + inCmd + " " + str(tdelta) + " ms ago")
  302. # dontRepeat = True
  303. # #break
  304. # i += 1
  305. # #cmdToSend = culSendCmds[decCmd]
  306. # if not dontRepeat:
  307. # if device != "" and cmd != "":
  308. # print("REPEATING COMMAND: " + cmd + " TO DEVICE " + device)
  309. # cul_send(device, cmd)
  310. def IT_RXtoTXCode(itReceiveCode):
  311. if debug:
  312. statusstr = "IT_RXtoTXCode "
  313. statusstr += "RX: "
  314. statusstr += itReceiveCode
  315. #print("IT_RXtoTXCode ReceiveCode: " + itReceiveCode)
  316. itReceiveCode = itReceiveCode[1:] # remove first character "i"
  317. itReceiveCodeLengthBytes = int(len(itReceiveCode)/2)
  318. itReceiveCodeBytes = []
  319. itTransmitTristate = "is"
  320. for x in range(itReceiveCodeLengthBytes):
  321. itReceiveCodeBytes.append(bin(int(itReceiveCode[x*2:(x*2+2)],16))[2:].zfill(8))
  322. for x in range(len(itReceiveCodeBytes)):
  323. #print("IT REC byte " + str(x) + " = " + str(itReceiveCodeBytes[x]))
  324. for y in range(4):
  325. quarterbyte = str(itReceiveCodeBytes[x][y*2:y*2+2])
  326. if quarterbyte == "00":
  327. tmpTristate = "0";
  328. elif quarterbyte == "01":
  329. tmpTristate = "F";
  330. elif quarterbyte == "10":
  331. tmpTristate = "D";
  332. elif quarterbyte == "11":
  333. tmpTristate = "1";
  334. #print(quarterbyte + " -> " + tmpTristate)
  335. itTransmitTristate = itTransmitTristate + tmpTristate
  336. if debug:
  337. statusstr += " -> TX: "
  338. statusstr += itTransmitTristate
  339. #print("IT_RXtoTXCode TransmitCode: " + itTransmitTristate)
  340. print(statusstr)
  341. return(itTransmitTristate)
  342. def cul_send(device, cmd):
  343. global lastSentTime
  344. culCmd = ""
  345. culSendCmdsKeyName = device + ' ' + cmd
  346. #print culSendCmdsKeyName
  347. #statTopic = False
  348. #if 'statTopic' in devdata[device].keys():
  349. # statTopic = devdata[device]['statTopic']
  350. # #print('statTopic: ' + devdata[device]['statTopic'])
  351. #if statTopic:
  352. # mqttc.publish(statTopic, cmd, qos=0, retain=False)
  353. # print(" MQTT: published '" + cmd + "' on statTopic: '" + devdata[device]['statTopic'])
  354. publish_device_statusupdate(device, cmd)
  355. tx_code = False
  356. if 'TX' in devdata[device]:
  357. #print("TX data available, cmd="+cmd)
  358. #print(devdata[device]['TX'])
  359. if cmd in devdata[device]['TX']:
  360. tx_code = devdata[device]['TX'][cmd]
  361. print(" TX code for '" + cmd + "': " + tx_code)
  362. if not tx_code:
  363. if 'RX' in devdata[device]:
  364. #print("RX data available, cmd="+cmd)
  365. #print(devdata[device]['RX'])
  366. if cmd in devdata[device]['RX']:
  367. rx_code = devdata[device]['RX'][cmd]
  368. #print("RX code for '" + cmd + "': " + rx_code)
  369. tx_code = IT_RXtoTXCode(rx_code)
  370. print(" TX code for '" + cmd + "': " + tx_code)
  371. if not tx_code:
  372. print(" no valid TX code for this device/command")
  373. else:
  374. now = int(round(time.time() * 1000))
  375. # look if this command has been sent in the past, and when
  376. if culSendCmdsKeyName in lastSentTime.keys():
  377. lastTime = lastSentTime[culSendCmdsKeyName]
  378. else:
  379. lastTime = 0
  380. lastTimeAge = now - lastTime
  381. print(' lastTime: ' + str(lastTimeAge) + 'ms ago')
  382. if lastTimeAge > lastSentMinInterval: # only send if last time + min interval is exceeded
  383. lastSentTime[culSendCmdsKeyName] = now # save what we send, so that we dont repeat our own sent messages if repeating is enabled
  384. TX_interface = TX_interface_prefer
  385. if 'TX_interface' in devdata[device].keys():
  386. print(" TX_interface: " + devdata[device]['TX_interface'])
  387. if TX_interface == "UART" and not serialCULAvailable:
  388. TX_interface = "MQTT"
  389. global lastSentCmd, lastSentCmdTime, lastSentDev, lastSentDevCmd
  390. lastSentCmd = tx_code
  391. lastSentCmdTime = now
  392. lastSentDev = device
  393. lastSentDevCmd = cmd
  394. if send_on_mqtt_cul and (TX_interface == "MQTT" or TX_interface == "both"):
  395. print(" TX via MQTT: " + tx_code)
  396. mqttc.publish(mqtt_topic_cul_send, tx_code, qos=0, retain=False)
  397. if serialCULAvailable and send_on_serial_cul and (TX_interface == "UART" or TX_interface == "both"):
  398. print(" TX via UART: " + tx_code)
  399. culCmd = tx_code + '\r\n'
  400. ser.write(culCmd.encode('utf-8'))
  401. else:
  402. print("WARNING: CUL send command repeated too quickly.")
  403. if receive_from_serial_cul or send_on_serial_cul:
  404. if not os.path.exists(serialPort):
  405. print("ERROR opening connection to serial CUL... device '" + serialPort + "' does not exist.")
  406. if receive_from_mqtt_cul:
  407. if forceSerialCULConnected:
  408. exit(2)
  409. else:
  410. print("resuming in MQTT-CUL only mode...")
  411. TX_interface_prefer = "MQTT"
  412. receive_from_serial_cul = False
  413. send_on_serial_cul = False
  414. serialCULAvailable = False
  415. print()
  416. print()
  417. else:
  418. print("opening connection to serial CUL...")
  419. serLine = ""
  420. ser = serial.Serial(port=serialPort,baudrate=serialBaudrate,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,timeout=serialTimeout)
  421. sleep(culInitTimeout)
  422. ser.write('V\r\n'.encode('utf-8')) # get CUL version info
  423. serLine = ser.readline()
  424. serLine = serLine.decode('utf-8').rstrip('\r\n')
  425. if serLine.startswith("V ") and serLine.find("culfw") != -1:
  426. print("connected. CUL version: " + serLine)
  427. serialCULAvailable = True
  428. sleep(0.1)
  429. print('Initializing CUL with command: ' + culInitCmd.rstrip('\r\n'))
  430. ser.write(culInitCmd.encode('utf-8')) # initialize CUL in normal receive mode
  431. sleep(0.5)
  432. else:
  433. print("WARNING: could not connect serial CUL")
  434. receive_from_serial_cul = False
  435. send_on_serial_cul = False
  436. serialCULAvailable = False
  437. TX_interface_prefer = "MQTT"
  438. if forceSerialCULConnected:
  439. exit(2)
  440. mqttc = mqtt.Client()
  441. mqttc.on_connect = on_connect
  442. mqttc.on_disconnect = on_disconnect
  443. mqttc.on_message = on_message
  444. if len(mqtt_user) > 0 and len(mqtt_password) > 0:
  445. mqttc.username_pw_set(mqtt_user, mqtt_password)
  446. mqttc.connect(mqtt_server, mqtt_port, 60)
  447. mqttc.loop_start()
  448. while True:
  449. if receive_from_serial_cul:
  450. serLine = ser.readline()
  451. if len(serLine) > 0:
  452. now = int(round(time.time() * 1000))
  453. recvCmd = serLine.decode('utf-8')
  454. recvCmd = recvCmd.rstrip('\r\n')
  455. #if debug:
  456. # print("lastSentCmd: " + lastSentCmd + ", lastSentCmdTime=" + str(lastSentCmdTime))
  457. if recvCmd == lastSentCmd and (now - lastSentCmdTime) < filterSelfSentIncomingTimeout:
  458. pass
  459. else:
  460. print()
  461. print("Serial-CUL RX: '" + recvCmd + "'")
  462. cul_received(recvCmd)
  463. sleep(0.05)
  464. ## #print "test"
  465. ## #touch("/tmp/culagent_running")
  466. #except KeyboardInterrupt:
  467. # print("\n")