#!/usr/bin/python3 -u # # pip3 install pyyaml paho-mqtt pyserial import yaml from yaml.constructor import ConstructorError import serial import sys import time from time import sleep import datetime import paho.mqtt.client as mqtt import os version = 0.1 verbose = True debug = False deviceConfigFile = "/home/pi/cul2mqtt_devices.yaml" mqtt_server = "mqtt.lan" mqtt_port = 1883 mqtt_user = "" mqtt_password = "" TX_interface_prefer = "UART" # UART or MQTT repeat_received_commands = False # does not yet work correctly if len(sys.argv) >= 2: if sys.argv[1] == "-q": verbose = False elif sys.argv[1] == "-v": verbose = True elif sys.argv[1] == "-d" debug = True verbose = True # serial (USB) CUL device receive_from_serial_cul = True send_on_serial_cul = True serialPort = '/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0' serialBaudrate = 38400 serialTimeout = 1 # s, should be not too high, otherwise program could block culInitCmd = 'X21\r\n' # CUL init command for normal operation - should not be changed normally culInitTimeout = 3 # s forceSerialCULConnected = True # if true, program exits when serial CUL cannot be connected to # MQTT CUL receive_from_mqtt_cul = False send_on_mqtt_cul = False mqtt_topic_cul_received = "MQTT-CUL/received" mqtt_topic_cul_send = "MQTT-CUL/send" filterSelfSentIncomingTimeout = 500 # do not change below serialCULAvailable = False devdata = {} RXcodesToDevFunction = {} InTopicsToDevIds = {} try: from yaml import CLoader as Loader except ImportError: from yaml import Loader def no_duplicates_constructor(loader, node, deep=False): """Check for duplicate keys.""" mapping = {} for key_node, value_node in node.value: key = loader.construct_object(key_node, deep=deep) value = loader.construct_object(value_node, deep=deep) if key in mapping: raise ConstructorError("while constructing a mapping", node.start_mark, "found duplicate key (%s)" % key, key_node.start_mark) mapping[key] = value return loader.construct_mapping(node, deep) yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, no_duplicates_constructor) print("CUL2MQTT v" + str(version)) print("=====================================") try: with open(deviceConfigFile) as devfile: devdata = yaml.load(devfile, Loader=yaml.FullLoader) if debug: print() print() print("==== parsing config file ====") print() print(devdata) print() for deviceid in devdata: if debug: print() print() print("Device: " + deviceid) print(devdata[deviceid]) if "RX" in devdata[deviceid].keys(): if devdata[deviceid]["RX"] != "": if debug: print("RX codes:") for key, value in devdata[deviceid]["RX"].items(): if debug: print(key, "->", value) if value in RXcodesToDevFunction.keys(): print() print() print("ERROR: RX-string '" + str(value) + "' is already defined for another device! Must be unique.") raise else: RXcodesToDevFunction[value] = deviceid, key if "cmdTopic" in devdata[deviceid].keys(): if devdata[deviceid]["cmdTopic"] != "": cmdTopic = devdata[deviceid]["cmdTopic"] if debug: print("cmdTopic: " + cmdTopic) if cmdTopic in InTopicsToDevIds.keys(): print() print() print("ERROR: cmdTopic '" + str(cmdTopic) + "' is already defined for another device! Must be unique.") raise else: InTopicsToDevIds[cmdTopic] = deviceid if debug: print() print() print() print("RXcodesToDevFunction:") print(RXcodesToDevFunction) print() print() print("InTopicsToDevIds:") print(InTopicsToDevIds) print() print() print("InTopicsToDevIds.keys():") print(InTopicsToDevIds.keys()) print() print() print("==== parsing config file complete ====") print() print() print() print() except ConstructorError as err: print("ERROR on parsing configfile:") print(err) exit(1) except: print("ERROR opening configfile") exit(1) ## #mqtt_topic_in = "cul/in" ## ## lastReceived = ["","","","","","","","","",""] lastReceivedTime = [0,0,0,0,0,0,0,0,0,0] lastReceivedIndex = 0 lastReceivedMaxAge = 500 # ignore repeated messages when they are younger than x ms lastSentTime = {} lastSentMinInterval = 1500 # ignore repeated messages when they are younger than x ms, should be > 1500 lastSentCmd = "" lastSentCmdTime = 0 lastSentDev = "" lastSentDevCmd = "" ## ## ## ## global lastAction_bwm1 ## global lastAction_bwm2 ## global lastAction_bwm3 ## lastAction_bwm1 = 0 ## lastAction_bwm2 = 0 ## lastAction_bwm3 = 0 def touch(fname, times=None): with open(fname, 'a'): os.utime(fname, times) def on_connect(client, userdata, flags, rc): if verbose: print("MQTT connected with result code " + str(rc)) if receive_from_mqtt_cul: if mqtt_topic_cul_received != "": client.subscribe(mqtt_topic_cul_received) for in_topic in InTopicsToDevIds.keys(): if in_topic != "": client.subscribe(in_topic) print("MQTT subscribed: " + in_topic) def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected MQTT disconnection. Will auto-reconnect") def on_message(client, userdata, msg): #print(msg.topic + ": " + str(msg.payload)) payload = msg.payload.decode("utf-8") print(msg.topic + ": " + str(payload)) #print() #print("TOPIC: ") #print(msg.topic) #print("TOPIC REC: ") #print(mqtt_topic_cul_received) # MQTT message is output from CUL if receive_from_mqtt_cul and msg.topic == mqtt_topic_cul_received: payload = payload.rstrip() print("MQTT-CUL RX: '" + payload + "'") cul_received(payload) else: for in_topic, dev in InTopicsToDevIds.items(): if msg.topic == in_topic: print() print("MQTT command received - Topic: '" + msg.topic + "', Payload: '" + payload + " -> Device: '" + dev + "'") #print(devdata[dev]) if 'name' in devdata[dev].keys(): print('Device Name: ' + devdata[dev]['name']) if 'statTopic' in devdata[dev].keys(): print('statTopic: ' + devdata[dev]['statTopic']) #mqttc.publish(devdata[dev]['statTopic'], payload, qos=0, retain=False) if 'add_statTopics_on' in devdata[dev].keys(): print("add_statTopics_on:") for res in devdata[dev]['add_statTopics_on']: #print(res) if 'on_payload' in res and 'topic' in res and 'payload' in res: if payload == res['on_payload'] and payload != "" and res['topic'] != "" and res['payload'] != "": print(" on '" + payload + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'") #mqttc.publish(res[topic], res[payload], qos=0, retain=False) #if 'TX' in devdata[dev].keys(): # if payload in devdata[dev]['TX']: # txstring = devdata[dev]['TX'][payload] # print("TX code for '" + payload + "' is: " + txstring) TX_interface = TX_interface_prefer global lastSentDev, lastSentDevCmd, lastSentCmdTime now = int(round(time.time() * 1000)) print("dev="+dev+", lastSentDevCmd="+lastSentDevCmd) if dev == lastSentDev and payload == lastSentDevCmd and (now - lastSentCmdTime) < 1000: print("MQTT: ignored command as we just sent this.") pass else: cul_send(dev, payload) #if 'TX_interface' in devdata[dev].keys(): # print("TX_interface: " + devdata[dev]['TX_interface']) # #if TX_interface == "MQTT" or TX_interface == "both": # print("TX via MQTT") # cul_send(dev, payload) # #if TX_interface == "UART" or TX_interface == "both": # print("TX via UART") # cul_send(dev, payload) def publish_device_statusupdate(device, cmd): if 'statTopic' in devdata[device].keys(): statTopic = devdata[device]['statTopic'] print("MQTT publish statTopic: " + statTopic + " -> " + cmd) mqttc.publish(statTopic, cmd, qos=0, retain=False) if 'add_statTopics_on' in devdata[device].keys(): print(" MQTT publish add_statTopics_on:") for res in devdata[device]['add_statTopics_on']: #print(res) if 'on_payload' in res and 'topic' in res and 'payload' in res: if cmd == res['on_payload']: print(" on '" + res['on_payload'] + "' -> publish '" + res['payload'] + "' on topic '" + res['topic'] + "'") mqttc.publish(res['topic'], res['payload'], qos=0, retain=False) if 'add_statTopics' in devdata[device].keys(): print(" MQTT publish on add_statTopics:") for res in devdata[device]['add_statTopics']: print(" '" + cmd + "' -> '" + res + "'") mqttc.publish(res, cmd, qos=0, retain=False) def parseRXCode(rx_code): if rx_code in RXcodesToDevFunction: receivedForDevice = RXcodesToDevFunction[rx_code][0] receivedCmnd = RXcodesToDevFunction[rx_code][1] print("receivedForDevice = " + receivedForDevice + ", receivedCmnd = " + receivedCmnd) publish_device_statusupdate(receivedForDevice, receivedCmnd) def cul_received(payload): #global lastAction_bwm1, lastAction_bwm2, lastAction_bwm3 global lastReceivedTime, lastReceivedIndex, lastReceivedMaxAge #actionSupported = True isSendReply = False if payload[:2] == 'is': # msg is reply from CUL to raw send command isSendReply = True elif payload[:1] == 'i': # is a IT compatible command - so look it up in the code table inCmd = payload[:-2] #strip last 2 chars, these only represent signal strength and are not needed print("inCmd: " + inCmd) ignoreCommand = False lastReceivedLength = len(lastReceived) i = 0 while i < lastReceivedLength: #print(str(i) + ": " + lastReceived[i]) if lastReceived[i] == inCmd: lastTime = int(lastReceivedTime[i]) now = int(round(time.time() * 1000)) tdelta = (now - lastTime) #print("TDELTA = " + str(tdelta)) if tdelta < lastReceivedMaxAge: print("ignoring received command '" + inCmd + "' - already received " + str(tdelta) + " ms ago") ignoreCommand = True #break i += 1 if not ignoreCommand: lastReceived[lastReceivedIndex] = inCmd lastReceivedTime[lastReceivedIndex] = int(round(time.time() * 1000)) if lastReceivedIndex >= (lastReceivedLength - 1): lastReceivedIndex = 0 else: lastReceivedIndex += 1 parseRXCode(inCmd) #if repeat_received_commands: # lastSentLength = len(lastSent) # i = 0 # dontRepeat = False # while i < lastSentLength: # #print(str(i) + ": " + lastReceived[i]) # if lastSent[i] == decCmd: # lastTime = int(lastSentTime[i]) # now = int(round(time.time() * 1000)) # tdelta = (now - lastTime) # #print("TDELTA = " + str(tdelta)) # if tdelta < lastSentMaxAge: # print("ignoring command as it originated from ourselfs " + inCmd + " " + str(tdelta) + " ms ago") # dontRepeat = True # #break # i += 1 # #cmdToSend = culSendCmds[decCmd] # if not dontRepeat: # if device != "" and cmd != "": # print("REPEATING COMMAND: " + cmd + " TO DEVICE " + device) # cul_send(device, cmd) def IT_RXtoTXCode(itReceiveCode): if debug: statusstr = "IT_RXtoTXCode " statusstr += "RX: " statusstr += itReceiveCode #print("IT_RXtoTXCode ReceiveCode: " + itReceiveCode) itReceiveCode = itReceiveCode[1:] # remove first character "i" itReceiveCodeLengthBytes = int(len(itReceiveCode)/2) itReceiveCodeBytes = [] itTransmitTristate = "is" for x in range(itReceiveCodeLengthBytes): itReceiveCodeBytes.append(bin(int(itReceiveCode[x*2:(x*2+2)],16))[2:].zfill(8)) for x in range(len(itReceiveCodeBytes)): #print("IT REC byte " + str(x) + " = " + str(itReceiveCodeBytes[x])) for y in range(4): quarterbyte = str(itReceiveCodeBytes[x][y*2:y*2+2]) if quarterbyte == "00": tmpTristate = "0"; elif quarterbyte == "01": tmpTristate = "F"; elif quarterbyte == "10": tmpTristate = "D"; elif quarterbyte == "11": tmpTristate = "1"; #print(quarterbyte + " -> " + tmpTristate) itTransmitTristate = itTransmitTristate + tmpTristate if debug: statusstr += " -> TX: " statusstr += itTransmitTristate #print("IT_RXtoTXCode TransmitCode: " + itTransmitTristate) print(statusstr) return(itTransmitTristate) def cul_send(device, cmd): global lastSentTime culCmd = "" culSendCmdsKeyName = device + ' ' + cmd #print culSendCmdsKeyName #statTopic = False #if 'statTopic' in devdata[device].keys(): # statTopic = devdata[device]['statTopic'] # #print('statTopic: ' + devdata[device]['statTopic']) #if statTopic: # mqttc.publish(statTopic, cmd, qos=0, retain=False) # print(" MQTT: published '" + cmd + "' on statTopic: '" + devdata[device]['statTopic']) publish_device_statusupdate(device, cmd) tx_code = False if 'TX' in devdata[device]: #print("TX data available, cmd="+cmd) #print(devdata[device]['TX']) if cmd in devdata[device]['TX']: tx_code = devdata[device]['TX'][cmd] print(" TX code for '" + cmd + "': " + tx_code) if not tx_code: if 'RX' in devdata[device]: #print("RX data available, cmd="+cmd) #print(devdata[device]['RX']) if cmd in devdata[device]['RX']: rx_code = devdata[device]['RX'][cmd] #print("RX code for '" + cmd + "': " + rx_code) tx_code = IT_RXtoTXCode(rx_code) print(" TX code for '" + cmd + "': " + tx_code) if not tx_code: print(" no valid TX code for this device/command") else: now = int(round(time.time() * 1000)) # look if this command has been sent in the past, and when if culSendCmdsKeyName in lastSentTime.keys(): lastTime = lastSentTime[culSendCmdsKeyName] else: lastTime = 0 lastTimeAge = now - lastTime print(' lastTime: ' + str(lastTimeAge) + 'ms ago') if lastTimeAge > lastSentMinInterval: # only send if last time + min interval is exceeded lastSentTime[culSendCmdsKeyName] = now # save what we send, so that we dont repeat our own sent messages if repeating is enabled TX_interface = TX_interface_prefer if 'TX_interface' in devdata[device].keys(): print(" TX_interface: " + devdata[device]['TX_interface']) if TX_interface == "UART" and not serialCULAvailable: TX_interface = "MQTT" global lastSentCmd, lastSentCmdTime, lastSentDev, lastSentDevCmd lastSentCmd = tx_code lastSentCmdTime = now lastSentDev = device lastSentDevCmd = cmd if send_on_mqtt_cul and (TX_interface == "MQTT" or TX_interface == "both"): print(" TX via MQTT: " + tx_code) mqttc.publish(mqtt_topic_cul_send, tx_code, qos=0, retain=False) if serialCULAvailable and send_on_serial_cul and (TX_interface == "UART" or TX_interface == "both"): print(" TX via UART: " + tx_code) culCmd = tx_code + '\r\n' ser.write(culCmd.encode('utf-8')) else: print("WARNING: CUL send command repeated too quickly.") if receive_from_serial_cul or send_on_serial_cul: if not os.path.exists(serialPort): print("ERROR opening connection to serial CUL... device '" + serialPort + "' does not exist.") if receive_from_mqtt_cul: if forceSerialCULConnected: exit(2) else: print("resuming in MQTT-CUL only mode...") TX_interface_prefer = "MQTT" receive_from_serial_cul = False send_on_serial_cul = False serialCULAvailable = False print() print() else: print("opening connection to serial CUL...") serLine = "" ser = serial.Serial(port=serialPort,baudrate=serialBaudrate,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,timeout=serialTimeout) sleep(culInitTimeout) ser.write('V\r\n'.encode('utf-8')) # get CUL version info serLine = ser.readline() serLine = serLine.decode('utf-8').rstrip('\r\n') if serLine.startswith("V ") and serLine.find("culfw") != -1: print("connected. CUL version: " + serLine) serialCULAvailable = True sleep(0.1) print('Initializing CUL with command: ' + culInitCmd.rstrip('\r\n')) ser.write(culInitCmd.encode('utf-8')) # initialize CUL in normal receive mode sleep(0.5) else: print("WARNING: could not connect serial CUL") receive_from_serial_cul = False send_on_serial_cul = False serialCULAvailable = False TX_interface_prefer = "MQTT" if forceSerialCULConnected: exit(2) mqttc = mqtt.Client() mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect mqttc.on_message = on_message if len(mqtt_user) > 0 and len(mqtt_password) > 0: mqttc.username_pw_set(mqtt_user, mqtt_password) mqttc.connect(mqtt_server, mqtt_port, 60) mqttc.loop_start() while True: if receive_from_serial_cul: serLine = ser.readline() if len(serLine) > 0: now = int(round(time.time() * 1000)) recvCmd = serLine.decode('utf-8') recvCmd = recvCmd.rstrip('\r\n') #if debug: # print("lastSentCmd: " + lastSentCmd + ", lastSentCmdTime=" + str(lastSentCmdTime)) if recvCmd == lastSentCmd and (now - lastSentCmdTime) < filterSelfSentIncomingTimeout: pass else: print() print("Serial-CUL RX: '" + recvCmd + "'") cul_received(recvCmd) sleep(0.05) ## #print "test" ## #touch("/tmp/culagent_running") #except KeyboardInterrupt: # print("\n")