/*#include */ #define MQTT_RECONNECT_INTERVAL_3_TRIES 15000 #define MQTT_RECONNECT_INTERVAL_10_TRIES 60000 #define MQTT_RECONNECT_INTERVAL_LONG 300000 bool mqttHeartbeatReceived = false; // MQTT callback void mqttCallback(char *topic, unsigned char *payload, uint16_t length) { // Serial.print("MQTT payload arrived ["); // Serial.print(topic); // Serial.print("] "); // for (int i = 0; i < length; i++) { // Serial.print((char)payload[i]); // } // Serial.println(); //char tmp_topic_pub[51]; //char tmp_payload_pub[51]; if (strcmp(topic, mqtt_topic_in_cmd) == 0) { //if topic = mqtt_topic_in_cmd int len; if (length < sizeof(cmdPayload)) len = length; // if input is bigger than dest buffer, cut else len = sizeof(cmdPayload) - 1; for (int i = 0; i < len; i++) { cmdPayload[i] = (char)payload[i]; } cmdPayload[len] = '\0'; // Serial.print("cmdPayload:"); // Serial.println(cmdPayload); //if (serialdebug) //{ // Serial.print("MQTT: received '"); // Serial.print(mqtt_topic_in_cmd); // Serial.print("' - '"); // Serial.print(cmdPayload); // Serial.println("'"); //} //if (mqttdebug) //{ // sprintf(tmp_topic_pub, "%s/%s", confMqtt.mqtt_topic_out, "mqtt_received"); // sprintf(tmp_payload_pub, "%s: %s", mqtt_topic_in_cmd, cmdPayload); // mqttclient.publish(tmp_topic_pub, tmp_payload_pub); //} if (strncmp(cmdPayload, "HEARTBEAT", 9) == 0) { mqttLastHeartbeat = millis(); mqttHeartbeatReceived = true; mqttConnected = true; } else { cmdInQueue = true; // payload is processed in "commands" } } //if topic = mqtt_topic_in_cmd if (strcmp(topic, mqtt_topic_in_setTemp) == 0) { //if topic = mqtt_topic_in_setTemp int len; if (length < sizeof(cmdPayload)) len = length; // if input is bigger than dest buffer, cut else len = sizeof(cmdPayload) - 1; for (int i = 0; i < len; i++) { cmdPayload[i] = (char)payload[i]; } //cmdPayload[len + 1] = '\0'; cmdPayload[len] = '\0'; // Serial.print("cmdPayload:"); // Serial.println(cmdPayload); char tmpPayload[11]; strlcpy(tmpPayload, cmdPayload, 10); float valueFloat; valueFloat = round(atof(tmpPayload) * 2.0) / 2.0; setTempTo(valueFloat); //if (serialdebug) //{ // Serial.print("MQTT: received '"); // Serial.print(mqtt_topic_in_setTemp); // Serial.print("' - '"); // Serial.print(cmdPayload); // Serial.println("'"); //} //if (mqttdebug) //{ // sprintf(tmp_topic_pub, "%s/%s", confMqtt.mqtt_topic_out, "mqtt_received"); // sprintf(tmp_payload_pub, "%s: %s", mqtt_topic_in_setTemp, cmdPayload); // mqttclient.publish(tmp_topic_pub, tmp_payload_pub); //} char buf[50]; sprintf(buf, "MQTT IN: setTemp to %2.1f", valueFloat); sendLog(buf, LOGLEVEL_INFO); cmdInQueue = false; // payload is processed in "commands" } //if topic = mqtt_topic_in_setTemp if (strcmp(topic, mqtt_topic_in_setMode) == 0) { //if topic = mqtt_topic_in_setMode uint8_t len; if (length < sizeof(cmdPayload)) len = length; // if input is bigger than dest buffer, cut else len = sizeof(cmdPayload) - 1; for (int i = 0; i < len; i++) { cmdPayload[i] = (char)payload[i]; } //cmdPayload[len + 1] = '\0'; cmdPayload[len] = '\0'; // Serial.print("cmdPayload:"); // Serial.println(cmdPayload); char tmpPayload[21]; strlcpy(tmpPayload, cmdPayload, sizeof(tmpPayload)); strlwr(tmpPayload); unsigned char tmpHeatMode = 100; char tmpModename0[15]; char tmpModename1[15]; strlcpy(tmpModename0, confAdv.modeName0, sizeof(tmpModename0)); strlcpy(tmpModename1, confAdv.modeName1, sizeof(tmpModename1)); strlwr(tmpModename0); strlwr(tmpModename1); if (strcmp(tmpPayload, tmpModename0) == 0) tmpHeatMode = 0; else if (strcmp(tmpPayload, tmpModename1) == 0) tmpHeatMode = 1; else if (strcmp(tmpPayload, "off") == 0) tmpHeatMode = 0; else if (strcmp(tmpPayload, "aus") == 0) tmpHeatMode = 0; else if (strcmp(tmpPayload, "on") == 0) tmpHeatMode = 1; else if (strcmp(tmpPayload, "ein") == 0) tmpHeatMode = 1; else if (atoi(tmpPayload) == 0) tmpHeatMode = 0; else if (atoi(tmpPayload) == 1) tmpHeatMode = 1; if (tmpHeatMode == 0 || tmpHeatMode == 1) { setHeatingmodeTo(tmpHeatMode); //if (serialdebug) //{ // Serial.print(F("set heatmode to: ")); // Serial.println(tmpHeatMode); //} //if (serialdebug) //{ // Serial.print("MQTT: received '"); // Serial.print(mqtt_topic_in_setMode); // Serial.print("' - '"); // Serial.print(cmdPayload); // Serial.println("'"); //} //if (mqttdebug) //{ // sprintf(tmp_topic_pub, "%s/%s", confMqtt.mqtt_topic_out, "mqtt_received"); // sprintf(tmp_payload_pub, "%s: %s", mqtt_topic_in_setMode, cmdPayload); // mqttclient.publish(tmp_topic_pub, tmp_payload_pub); //} char buf[50]; sprintf(buf, "MQTT IN: setMode to %u", tmpHeatMode); sendLog(buf, LOGLEVEL_INFO); } cmdInQueue = false; // payload is processed in "commands" } //if topic = mqtt_topic_in_setMode if (strcmp(topic, mqtt_topic_in_setPreset) == 0) { //if topic = mqtt_topic_in_setPreset char tmpPayload[21]; char tmpPresetName0[15]; char tmpPresetName1[15]; char tmpPresetName2[15]; uint16_t len; unsigned char tmpPreset = 100; if (length < sizeof(cmdPayload)) len = length; // if input is bigger than dest buffer, cut else len = sizeof(cmdPayload) - 1; for (unsigned char i = 0; i < len; i++) { cmdPayload[i] = (char)payload[i]; } //cmdPayload[len + 1] = '\0'; cmdPayload[len] = '\0'; // Serial.print("cmdPayload:"); // Serial.println(cmdPayload); strlcpy(tmpPayload, cmdPayload, sizeof(tmpPayload)); strlwr(tmpPayload); strlcpy(tmpPresetName0, confAdv.psetName0, sizeof(tmpPresetName0)); strlcpy(tmpPresetName1, confAdv.psetName1, sizeof(tmpPresetName1)); strlcpy(tmpPresetName2, confAdv.psetName2, sizeof(tmpPresetName2)); strlwr(tmpPresetName0); strlwr(tmpPresetName1); strlwr(tmpPresetName2); if (strcmp(tmpPayload, tmpPresetName0) == 0) tmpPreset = 0; else if (strcmp(tmpPayload, tmpPresetName1) == 0) tmpPreset = 1; else if (strcmp(tmpPayload, tmpPresetName2) == 0) tmpPreset = 2; else if (strcmp(tmpPayload, "none") == 0) tmpPreset = 0; else if (strcmp(tmpPayload, "red1") == 0) tmpPreset = 1; else if (strcmp(tmpPayload, "red2") == 0) tmpPreset = 2; else if (atoi(tmpPayload) == 0) tmpPreset = 0; else if (atoi(tmpPayload) == 1) tmpPreset = 1; else if (atoi(tmpPayload) == 2) tmpPreset = 2; if (tmpPreset <= 2) { setPresetTo(tmpPreset); //if (serialdebug) //{ // Serial.print(F("set preset to: ")); // Serial.println(tmpPreset); //} //if (serialdebug) //{ // Serial.print(F("MQTT: received '")); // Serial.print(mqtt_topic_in_setPreset); // Serial.print("' - '"); // Serial.print(cmdPayload); // Serial.println("'"); //} //if (mqttdebug) //{ // sprintf(tmp_topic_pub, "%s/%s", confMqtt.mqtt_topic_out, "mqtt_received"); // sprintf(tmp_payload_pub, "%s: %s", mqtt_topic_in_setPreset, cmdPayload); // mqttclient.publish(tmp_topic_pub, tmp_payload_pub); //} char buf[50]; sprintf(buf, "MQTT IN: setPreset to %s", cmdPayload); sendLog(buf, LOGLEVEL_INFO); } cmdInQueue = false; // payload is processed in "commands" } //if topic = mqtt_topic_in_setPreset if (strcmp(topic, confAdd.outTemp_topic_in) == 0) { //if topic = outTemp_topic_in int len; if (length < 6) len = length; // if input is bigger than dest buffer, cut else len = 5; for (int i = 0; i < len; i++) { outTemp_newValue[i] = (char)payload[i]; } //outTemp_newValue[len + 1] = '\0'; outTemp_newValue[len] = '\0'; outTemp_parseNewValue = true; } //if topic = outTemp_topic_in if (strcmp(topic, confAdd.outHum_topic_in) == 0) { //if topic = outHum_topic_in int len; if (length < 4) len = length; // if input is bigger than dest buffer, cut else len = 3; for (int i = 0; i < len; i++) { outHum_newValue[i] = (char)payload[i]; } //outHum_newValue[len + 1] = '\0'; outHum_newValue[len] = '\0'; outHum_parseNewValue = true; } //if topic = outHum_topic_in } //mqttCallback void mqttPrepareSubscribeTopics() { //char tmp_topic_out[50]; sprintf(mqtt_topic_in_cmd, "%s/%s", confMqtt.mqtt_topic_in, "cmd"); sprintf(mqtt_topic_in_setTemp, "%s/%s", mqtt_topic_in_cmd, "setTemp"); sprintf(mqtt_topic_in_setMode, "%s/%s", mqtt_topic_in_cmd, "setMode"); sprintf(mqtt_topic_in_setPreset, "%s/%s", mqtt_topic_in_cmd, "setPreset"); } bool mqttReconnect(void) { mqttclient.disconnect(); delay(10); // Create MQTT client ID from device name //String mqttClientId = "ESP8266Client-"; String mqttClientId = FIRMWARE_SHORTNAME; mqttClientId += "-"; //mqttClientId += String(random(0xffff), HEX); //or random mqttClientId += String(confDevWiFi.deviceName); //if (serialdebug) // Serial.print(F("MQTT: connecting to broker '")); // Serial.print(confMqtt.mqtt_server); // Serial.print(F("' with ")); boolean connRes; mqttReconnectAttempts++; char connectMode[20]; if (strlen(confMqtt.mqtt_user) > 0 && strlen(confMqtt.mqtt_willTopic) == 0) { // user and password, no Last Will //if (serialdebug) // Serial.print(F("Auth, no LWT")); strlcpy(connectMode, "Auth, no LWT", sizeof(connectMode)); connRes = mqttclient.connect(mqttClientId.c_str(), confMqtt.mqtt_user, confMqtt.mqtt_pass); } else if (strlen(confMqtt.mqtt_user) > 0 && strlen(confMqtt.mqtt_willTopic) > 0) { // user, password and Last Will //if (serialdebug) // Serial.print(F("Auth and LWT")); strlcpy(connectMode, "Auth and LWT", sizeof(connectMode)); connRes = mqttclient.connect(mqttClientId.c_str(), confMqtt.mqtt_user, confMqtt.mqtt_pass, confMqtt.mqtt_willTopic, confMqtt.mqtt_willQos, confMqtt.mqtt_willRetain, confMqtt.mqtt_willMsg); } else if (strlen(confMqtt.mqtt_user) == 0 && strlen(confMqtt.mqtt_willTopic) > 0) { // Last Will but no user and password //if (serialdebug) // Serial.print(F("LWT, no Auth")); strlcpy(connectMode, "LWT, no Auth", sizeof(connectMode)); connRes = mqttclient.connect(mqttClientId.c_str(), confMqtt.mqtt_willTopic, confMqtt.mqtt_willQos, confMqtt.mqtt_willRetain, confMqtt.mqtt_willMsg); } else { // no user, password and no Last Will //if (serialdebug) // Serial.print(F("no Auth, no LWT")); strlcpy(connectMode, "no Auth, no LWT", sizeof(connectMode)); connRes = mqttclient.connect(mqttClientId.c_str()); } //if (serialdebug) //{ // Serial.print(F(", attempt: ")); // Serial.print(mqttReconnectAttempts); // Serial.println(); //} char logBuf[70]; sprintf(logBuf, "MQTT: connecting to %s:%u w/ %s, attempt: %u", confMqtt.mqtt_server, confMqtt.mqtt_port, connectMode, mqttReconnectAttempts); sendLog(logBuf, LOGLEVEL_INFO); if (connRes) { // if connection was successful // if (serialdebug) // { // Serial.print(F("MQTT: connected. Reconnects: ")); // Serial.println(mqttReconnects); // } //char logBuf2[50]; //sprintf(logBuf2, "MQTT: connected to %s:%u, Reconnects: %u", confMqtt.mqtt_server, confMqtt.mqtt_port, mqttReconnects); //sendLog(logBuf2, LOGLEVEL_INFO); //sendLog(F("MQTT: connected. Reconnects: "), LOGLEVEL_INFO); mqttConnected = true; mqttReconnects++; mqttOnConnect(); //if (serialdebug) //Serial.println("MQTT: subscribed topics:"); sendLog("MQTT: subscribed topics:", LOGLEVEL_INFO); if (strlen(mqtt_topic_in_cmd) > 0) { char mqtt_topic_in_subscribe[52]; sprintf(mqtt_topic_in_subscribe, "%s/%s", mqtt_topic_in_cmd, "#"); if (mqttclient.subscribe(mqtt_topic_in_subscribe)) { //if (serialdebug) { // Serial.print(" "); // Serial.println(mqtt_topic_in_subscribe); //} char buf[60]; sprintf(buf, " %s", mqtt_topic_in_subscribe); sendLog(buf, LOGLEVEL_INFO); mqttInTopicSubscribed = true; } } if (strlen(confAdd.outTemp_topic_in) > 0) { if (mqttclient.subscribe(confAdd.outTemp_topic_in)) { //if (serialdebug) { // Serial.print(" "); // Serial.println(confAdd.outTemp_topic_in); //} char buf[60]; sprintf(buf, " %s", confAdd.outTemp_topic_in); sendLog(buf, LOGLEVEL_INFO); } } if (strlen(confAdd.outHum_topic_in) > 0) { if (mqttclient.subscribe(confAdd.outHum_topic_in)) { //if (serialdebug) { // Serial.print(" "); // Serial.println(confAdd.outHum_topic_in); //} char buf[60]; sprintf(buf, " %s", confAdd.outHum_topic_in); sendLog(buf, LOGLEVEL_INFO); } } mqttReconnectAttempts = 0; return mqttclient.connected(); } else { int mqttConnRes = mqttclient.state(); char logBuf[70]; mqtt_updateCurrentStateName(); sprintf_P(logBuf, "%s: %s, rc=%d (%s)", PGMStr_MQTT, PGMStr_connectedFailed, mqttConnRes, mqttCurrentStateName); sendLog(logBuf, LOGLEVEL_INFO); if (mqttReconnectAttempts >= 3 && (mqttConnRes == 4 || mqttConnRes == 5)) { // MQTT credentials are invalid/rejected from the server - stop trying to reconnect until reboot mqtt_tempDisabled_credentialError = true; if (confLog.logLevelSerial >= LOGLEVEL_VERBOSE) { //Serial.println(F("MQTT disabled until reboot due to credential error")); sendLog(F("MQTT disabled until reboot due to credential error"), LOGLEVEL_ERROR); } } mqttConnected = false; mqttOnDisconnect(); return mqttclient.connected(); } } //mqttReconnect void mqttClientInit() { if (confMqtt.mqtt_enable) { mqttclient.setServer(confMqtt.mqtt_server, confMqtt.mqtt_port); mqttclient.setCallback(mqttCallback); mqttLastReconnectAttempt = 0; mqttReconnectAttempts = 0; mqtt_tempDisabled_credentialError = false; } } int lastWifiStatus; void mqttHandleConnection() { if (mqttHeartbeatReceived) { mqttHeartbeatReceived = false; sendLog(F("MQTT: received HEARTBEAT - resetting timer..."), LOGLEVEL_VERBOSE); } int currWifiStatus = WiFi.status(); if (currWifiStatus != lastWifiStatus) { lastWifiStatus = currWifiStatus; //Serial.print("WiFi status changed to: "); //Serial.println(currWifiStatus); } if (confMqtt.mqtt_enable && !mqtt_tempDisabled_credentialError && currWifiStatus == WL_CONNECTED) { // MQTT reconnect if not connected (nonblocking) boolean doReconnect = false; if (!mqttclient.connected()) doReconnect = true; if (mqttInTopicSubscribed && confMqtt.mqtt_heartbeat_maxage_reconnect > 0) { // if mqtt_topic_in is subscribed if (confMqtt.mqtt_enable_heartbeat && confMqtt.mqtt_heartbeat_maxage_reconnect > 0 && ((millis() - mqttLastHeartbeat) > confMqtt.mqtt_heartbeat_maxage_reconnect)) { // also reconnect if no HEARTBEAT was received for some time doReconnect = true; mqttConnected = false; mqttLastHeartbeat = millis(); //Serial.println(F("MQTT: HEARTBEAT overdue. Force-Reconnecting MQTT...")); sendLog(F("MQTT: HEARTBEAT overdue. Force-Reconnecting MQTT..."), LOGLEVEL_WARN); } } if (doReconnect) { unsigned long mqttReconnectAttemptDelay; if (mqttReconnectAttempts < 3) mqttReconnectAttemptDelay = MQTT_RECONNECT_INTERVAL_3_TRIES; // if this is the 1-3rd attempt, try again in 15s else if (mqttReconnectAttempts < 10) mqttReconnectAttemptDelay = MQTT_RECONNECT_INTERVAL_10_TRIES; // if more than 3 attempts failed, try again every min else mqttReconnectAttemptDelay = MQTT_RECONNECT_INTERVAL_LONG; // if more than 10 attempts failed, try again every 5 min if (confMqtt.mqtt_enable_heartbeat && confMqtt.mqtt_heartbeat_maxage_reboot > 0 && ((millis() - mqttLastHeartbeat) > confMqtt.mqtt_heartbeat_maxage_reboot)) { // if no heartbeat was received for set time, reboot the ESP //Serial.println(F("MQTT: HEARTBEAT_MAXAGE_REBOOT overdue. Restarting...")); sendLog(F("MQTT: HEARTBEAT_MAXAGE_REBOOT overdue. Restarting..."), LOGLEVEL_WARN); restart(); } if ((millis() - mqttLastReconnectAttempt) > mqttReconnectAttemptDelay) { mqttLastReconnectAttempt = millis(); //#ifdef USE_MQTT_TLS // verifytls(); //#endif mqttReconnect(); } mqttclient.loop(); } else { mqttclient.loop(); } } } void mqttPublishStatus() { if (confMqtt.mqtt_enable && mqttclient.state() == 0) { char outMsg[60]; sprintf_P(outMsg, "%s: %s: %s:%u, %s: %d", PGMStr_MQTT, PGMStr_connectedTo, confMqtt.mqtt_server, confMqtt.mqtt_port, PGMStr_reconnects, mqttReconnects - 1); //mqttclient.publish(confMqtt.mqtt_topic_out, outMsg, confMqtt.mqtt_outRetain); sendLog(outMsg, LOGLEVEL_INFO); } } void mqttOnConnect() { mqttPublishConnectMsg(); mqttPublishStatus(); displayShowMQTTConnected(); } void mqttOnDisconnect() { displayShowMQTTConnectionError(); } void mqttPublishConnectMsg() { if (strlen(confMqtt.mqtt_willTopic) > 4) { if (confMqtt.mqtt_enable && mqttclient.state() == 0) { mqttclient.publish(confMqtt.mqtt_willTopic, confMqtt.mqtt_connMsg, confMqtt.mqtt_willRetain); //sendLog(F("MQTT: connected"), LOGLEVEL_INFO); } } } void mqttPublishHeartbeat() { if (confMqtt.mqtt_enable && confMqtt.mqtt_enable_heartbeat && mqttclient.state() == 0) { mqttclient.publish(mqtt_topic_in_cmd, "HEARTBEAT", false); // publishes to IN-topic. MQTT will force-reconnect if it does not get a HEARTBEAT for 2 min } } void mqtt_updateCurrentStateName() { if (confMqtt.mqtt_enable) { sprintf_P(mqttCurrentStateName, "%s", PGMStr_MQTTStates[mqttclient.state() + 4]); } else sprintf_P(mqttCurrentStateName, "%s", PGMStr_MQTTState_DIS); } void mqttResetConnection() { sendLog(F("MQTT: resetting connection...")); if (!confMqtt.mqtt_enable) mqttclient.disconnect(); else { mqttclient.disconnect(); mqttPrepareSubscribeTopics(); mqttClientInit(); } configChangedMqttConnResetRequired = false; }