123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- /*
- PubSubClient.cpp - A simple client for MQTT.
- Nick O'Leary
- http://knolleary.net
- */
- #include "PubSubClient.h"
- #include "Arduino.h"
- PubSubClient::PubSubClient() {
- this->_state = MQTT_DISCONNECTED;
- this->_client = NULL;
- this->stream = NULL;
- setCallback(NULL);
- }
- PubSubClient::PubSubClient(Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(addr, port);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(addr,port);
- setClient(client);
- setStream(stream);
- }
- PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(addr, port);
- setCallback(callback);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(addr,port);
- setCallback(callback);
- setClient(client);
- setStream(stream);
- }
- PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(ip, port);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(ip,port);
- setClient(client);
- setStream(stream);
- }
- PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(ip, port);
- setCallback(callback);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(ip,port);
- setCallback(callback);
- setClient(client);
- setStream(stream);
- }
- PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(domain,port);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(domain,port);
- setClient(client);
- setStream(stream);
- }
- PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
- this->_state = MQTT_DISCONNECTED;
- setServer(domain,port);
- setCallback(callback);
- setClient(client);
- this->stream = NULL;
- }
- PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
- this->_state = MQTT_DISCONNECTED;
- setServer(domain,port);
- setCallback(callback);
- setClient(client);
- setStream(stream);
- }
- boolean PubSubClient::connect(const char *id) {
- return connect(id,NULL,NULL,0,0,0,0);
- }
- boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
- return connect(id,user,pass,0,0,0,0);
- }
- boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
- return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
- }
- boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
- if (!connected()) {
- int result = 0;
- if (domain != NULL) {
- result = _client->connect(this->domain, this->port);
- } else {
- result = _client->connect(this->ip, this->port);
- }
- if (result == 1) {
- nextMsgId = 1;
- // Leave room in the buffer for header and variable length field
- uint16_t length = 5;
- unsigned int j;
- #if MQTT_VERSION == MQTT_VERSION_3_1
- uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
- #define MQTT_HEADER_VERSION_LENGTH 9
- #elif MQTT_VERSION == MQTT_VERSION_3_1_1
- uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
- #define MQTT_HEADER_VERSION_LENGTH 7
- #endif
- for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
- buffer[length++] = d[j];
- }
- uint8_t v;
- if (willTopic) {
- v = 0x06|(willQos<<3)|(willRetain<<5);
- } else {
- v = 0x02;
- }
- if(user != NULL) {
- v = v|0x80;
- if(pass != NULL) {
- v = v|(0x80>>1);
- }
- }
- buffer[length++] = v;
- buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
- buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
- length = writeString(id,buffer,length);
- if (willTopic) {
- length = writeString(willTopic,buffer,length);
- length = writeString(willMessage,buffer,length);
- }
- if(user != NULL) {
- length = writeString(user,buffer,length);
- if(pass != NULL) {
- length = writeString(pass,buffer,length);
- }
- }
- write(MQTTCONNECT,buffer,length-5);
- lastInActivity = lastOutActivity = millis();
- while (!_client->available()) {
- unsigned long t = millis();
- if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
- _state = MQTT_CONNECTION_TIMEOUT;
- _client->stop();
- return false;
- }
- }
- uint8_t llen;
- uint16_t len = readPacket(&llen);
- if (len == 4) {
- if (buffer[3] == 0) {
- lastInActivity = millis();
- pingOutstanding = false;
- _state = MQTT_CONNECTED;
- return true;
- } else {
- _state = buffer[3];
- }
- }
- _client->stop();
- } else {
- _state = MQTT_CONNECT_FAILED;
- }
- return false;
- }
- return true;
- }
- // reads a byte into result
- boolean PubSubClient::readByte(uint8_t * result) {
- uint32_t previousMillis = millis();
- while(!_client->available()) {
- uint32_t currentMillis = millis();
- if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
- return false;
- }
- }
- *result = _client->read();
- return true;
- }
- // reads a byte into result[*index] and increments index
- boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
- uint16_t current_index = *index;
- uint8_t * write_address = &(result[current_index]);
- if(readByte(write_address)){
- *index = current_index + 1;
- return true;
- }
- return false;
- }
- uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
- uint16_t len = 0;
- if(!readByte(buffer, &len)) return 0;
- bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
- uint32_t multiplier = 1;
- uint16_t length = 0;
- uint8_t digit = 0;
- uint16_t skip = 0;
- uint8_t start = 0;
- do {
- if(!readByte(&digit)) return 0;
- buffer[len++] = digit;
- length += (digit & 127) * multiplier;
- multiplier *= 128;
- } while ((digit & 128) != 0);
- *lengthLength = len-1;
- if (isPublish) {
- // Read in topic length to calculate bytes to skip over for Stream writing
- if(!readByte(buffer, &len)) return 0;
- if(!readByte(buffer, &len)) return 0;
- skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
- start = 2;
- if (buffer[0]&MQTTQOS1) {
- // skip message id
- skip += 2;
- }
- }
- for (uint16_t i = start;i<length;i++) {
- if(!readByte(&digit)) return 0;
- if (this->stream) {
- if (isPublish && len-*lengthLength-2>skip) {
- this->stream->write(digit);
- }
- }
- if (len < MQTT_MAX_PACKET_SIZE) {
- buffer[len] = digit;
- }
- len++;
- }
- if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
- len = 0; // This will cause the packet to be ignored.
- }
- return len;
- }
- boolean PubSubClient::loop() {
- if (connected()) {
- unsigned long t = millis();
- if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
- if (pingOutstanding) {
- this->_state = MQTT_CONNECTION_TIMEOUT;
- _client->stop();
- return false;
- } else {
- buffer[0] = MQTTPINGREQ;
- buffer[1] = 0;
- _client->write(buffer,2);
- lastOutActivity = t;
- lastInActivity = t;
- pingOutstanding = true;
- }
- }
- if (_client->available()) {
- uint8_t llen;
- uint16_t len = readPacket(&llen);
- uint16_t msgId = 0;
- uint8_t *payload;
- if (len > 0) {
- lastInActivity = t;
- uint8_t type = buffer[0]&0xF0;
- if (type == MQTTPUBLISH) {
- if (callback) {
- uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
- char topic[tl+1];
- for (uint16_t i=0;i<tl;i++) {
- topic[i] = buffer[llen+3+i];
- }
- topic[tl] = 0;
- // msgId only present for QOS>0
- if ((buffer[0]&0x06) == MQTTQOS1) {
- msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
- payload = buffer+llen+3+tl+2;
- callback(topic,payload,len-llen-3-tl-2);
- buffer[0] = MQTTPUBACK;
- buffer[1] = 2;
- buffer[2] = (msgId >> 8);
- buffer[3] = (msgId & 0xFF);
- _client->write(buffer,4);
- lastOutActivity = t;
- } else {
- payload = buffer+llen+3+tl;
- callback(topic,payload,len-llen-3-tl);
- }
- }
- } else if (type == MQTTPINGREQ) {
- buffer[0] = MQTTPINGRESP;
- buffer[1] = 0;
- _client->write(buffer,2);
- } else if (type == MQTTPINGRESP) {
- pingOutstanding = false;
- }
- }
- }
- return true;
- }
- return false;
- }
- boolean PubSubClient::publish(const char* topic, const char* payload) {
- return publish(topic,(const uint8_t*)payload,strlen(payload),false);
- }
- boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
- return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
- }
- boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
- return publish(topic, payload, plength, false);
- }
- boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
- if (connected()) {
- if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
- // Too long
- return false;
- }
- // Leave room in the buffer for header and variable length field
- uint16_t length = 5;
- length = writeString(topic,buffer,length);
- uint16_t i;
- for (i=0;i<plength;i++) {
- buffer[length++] = payload[i];
- }
- uint8_t header = MQTTPUBLISH;
- if (retained) {
- header |= 1;
- }
- return write(header,buffer,length-5);
- }
- return false;
- }
- boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
- uint8_t llen = 0;
- uint8_t digit;
- unsigned int rc = 0;
- uint16_t tlen;
- unsigned int pos = 0;
- unsigned int i;
- uint8_t header;
- unsigned int len;
- if (!connected()) {
- return false;
- }
- tlen = strlen(topic);
- header = MQTTPUBLISH;
- if (retained) {
- header |= 1;
- }
- buffer[pos++] = header;
- len = plength + 2 + tlen;
- do {
- digit = len % 128;
- len = len / 128;
- if (len > 0) {
- digit |= 0x80;
- }
- buffer[pos++] = digit;
- llen++;
- } while(len>0);
- pos = writeString(topic,buffer,pos);
- rc += _client->write(buffer,pos);
- for (i=0;i<plength;i++) {
- rc += _client->write((char)pgm_read_byte_near(payload + i));
- }
- lastOutActivity = millis();
- return rc == tlen + 4 + plength;
- }
- boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
- uint8_t lenBuf[4];
- uint8_t llen = 0;
- uint8_t digit;
- uint8_t pos = 0;
- uint16_t rc;
- uint16_t len = length;
- do {
- digit = len % 128;
- len = len / 128;
- if (len > 0) {
- digit |= 0x80;
- }
- lenBuf[pos++] = digit;
- llen++;
- } while(len>0);
- buf[4-llen] = header;
- for (int i=0;i<llen;i++) {
- buf[5-llen+i] = lenBuf[i];
- }
- #ifdef MQTT_MAX_TRANSFER_SIZE
- uint8_t* writeBuf = buf+(4-llen);
- uint16_t bytesRemaining = length+1+llen; //Match the length type
- uint8_t bytesToWrite;
- boolean result = true;
- while((bytesRemaining > 0) && result) {
- bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
- rc = _client->write(writeBuf,bytesToWrite);
- result = (rc == bytesToWrite);
- bytesRemaining -= rc;
- writeBuf += rc;
- }
- return result;
- #else
- rc = _client->write(buf+(4-llen),length+1+llen);
- lastOutActivity = millis();
- return (rc == 1+llen+length);
- #endif
- }
- boolean PubSubClient::subscribe(const char* topic) {
- return subscribe(topic, 0);
- }
- boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
- if (qos < 0 || qos > 1) {
- return false;
- }
- if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
- // Too long
- return false;
- }
- if (connected()) {
- // Leave room in the buffer for header and variable length field
- uint16_t length = 5;
- nextMsgId++;
- if (nextMsgId == 0) {
- nextMsgId = 1;
- }
- buffer[length++] = (nextMsgId >> 8);
- buffer[length++] = (nextMsgId & 0xFF);
- length = writeString((char*)topic, buffer,length);
- buffer[length++] = qos;
- return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
- }
- return false;
- }
- boolean PubSubClient::unsubscribe(const char* topic) {
- if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
- // Too long
- return false;
- }
- if (connected()) {
- uint16_t length = 5;
- nextMsgId++;
- if (nextMsgId == 0) {
- nextMsgId = 1;
- }
- buffer[length++] = (nextMsgId >> 8);
- buffer[length++] = (nextMsgId & 0xFF);
- length = writeString(topic, buffer,length);
- return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
- }
- return false;
- }
- void PubSubClient::disconnect() {
- buffer[0] = MQTTDISCONNECT;
- buffer[1] = 0;
- _client->write(buffer,2);
- _state = MQTT_DISCONNECTED;
- _client->stop();
- lastInActivity = lastOutActivity = millis();
- }
- uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
- const char* idp = string;
- uint16_t i = 0;
- pos += 2;
- while (*idp) {
- buf[pos++] = *idp++;
- i++;
- }
- buf[pos-i-2] = (i >> 8);
- buf[pos-i-1] = (i & 0xFF);
- return pos;
- }
- boolean PubSubClient::connected() {
- boolean rc;
- if (_client == NULL ) {
- rc = false;
- } else {
- rc = (int)_client->connected();
- if (!rc) {
- if (this->_state == MQTT_CONNECTED) {
- this->_state = MQTT_CONNECTION_LOST;
- _client->flush();
- _client->stop();
- }
- }
- }
- return rc;
- }
- PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
- IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
- return setServer(addr,port);
- }
- PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
- this->ip = ip;
- this->port = port;
- this->domain = NULL;
- return *this;
- }
- PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
- this->domain = domain;
- this->port = port;
- return *this;
- }
- PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
- this->callback = callback;
- return *this;
- }
- PubSubClient& PubSubClient::setClient(Client& client){
- this->_client = &client;
- return *this;
- }
- PubSubClient& PubSubClient::setStream(Stream& stream){
- this->stream = &stream;
- return *this;
- }
- int PubSubClient::state() {
- return this->_state;
- }
|