PubSubClient.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. /*
  2. PubSubClient.cpp - A simple client for MQTT.
  3. Nick O'Leary
  4. http://knolleary.net
  5. */
  6. #include "PubSubClient.h"
  7. #include "Arduino.h"
  8. PubSubClient::PubSubClient() {
  9. this->_state = MQTT_DISCONNECTED;
  10. this->_client = NULL;
  11. this->stream = NULL;
  12. setCallback(NULL);
  13. }
  14. PubSubClient::PubSubClient(Client& client) {
  15. this->_state = MQTT_DISCONNECTED;
  16. setClient(client);
  17. this->stream = NULL;
  18. }
  19. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
  20. this->_state = MQTT_DISCONNECTED;
  21. setServer(addr, port);
  22. setClient(client);
  23. this->stream = NULL;
  24. }
  25. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
  26. this->_state = MQTT_DISCONNECTED;
  27. setServer(addr,port);
  28. setClient(client);
  29. setStream(stream);
  30. }
  31. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  32. this->_state = MQTT_DISCONNECTED;
  33. setServer(addr, port);
  34. setCallback(callback);
  35. setClient(client);
  36. this->stream = NULL;
  37. }
  38. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  39. this->_state = MQTT_DISCONNECTED;
  40. setServer(addr,port);
  41. setCallback(callback);
  42. setClient(client);
  43. setStream(stream);
  44. }
  45. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
  46. this->_state = MQTT_DISCONNECTED;
  47. setServer(ip, port);
  48. setClient(client);
  49. this->stream = NULL;
  50. }
  51. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
  52. this->_state = MQTT_DISCONNECTED;
  53. setServer(ip,port);
  54. setClient(client);
  55. setStream(stream);
  56. }
  57. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  58. this->_state = MQTT_DISCONNECTED;
  59. setServer(ip, port);
  60. setCallback(callback);
  61. setClient(client);
  62. this->stream = NULL;
  63. }
  64. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  65. this->_state = MQTT_DISCONNECTED;
  66. setServer(ip,port);
  67. setCallback(callback);
  68. setClient(client);
  69. setStream(stream);
  70. }
  71. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
  72. this->_state = MQTT_DISCONNECTED;
  73. setServer(domain,port);
  74. setClient(client);
  75. this->stream = NULL;
  76. }
  77. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
  78. this->_state = MQTT_DISCONNECTED;
  79. setServer(domain,port);
  80. setClient(client);
  81. setStream(stream);
  82. }
  83. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  84. this->_state = MQTT_DISCONNECTED;
  85. setServer(domain,port);
  86. setCallback(callback);
  87. setClient(client);
  88. this->stream = NULL;
  89. }
  90. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  91. this->_state = MQTT_DISCONNECTED;
  92. setServer(domain,port);
  93. setCallback(callback);
  94. setClient(client);
  95. setStream(stream);
  96. }
  97. boolean PubSubClient::connect(const char *id) {
  98. return connect(id,NULL,NULL,0,0,0,0,1);
  99. }
  100. boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
  101. return connect(id,user,pass,0,0,0,0,1);
  102. }
  103. boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
  104. return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
  105. }
  106. boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
  107. return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
  108. }
  109. boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
  110. if (!connected()) {
  111. int result = 0;
  112. if (domain != NULL) {
  113. result = _client->connect(this->domain, this->port);
  114. } else {
  115. result = _client->connect(this->ip, this->port);
  116. }
  117. if (result == 1) {
  118. nextMsgId = 1;
  119. // Leave room in the buffer for header and variable length field
  120. uint16_t length = MQTT_MAX_HEADER_SIZE;
  121. unsigned int j;
  122. #if MQTT_VERSION == MQTT_VERSION_3_1
  123. uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
  124. #define MQTT_HEADER_VERSION_LENGTH 9
  125. #elif MQTT_VERSION == MQTT_VERSION_3_1_1
  126. uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
  127. #define MQTT_HEADER_VERSION_LENGTH 7
  128. #endif
  129. for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
  130. buffer[length++] = d[j];
  131. }
  132. uint8_t v;
  133. if (willTopic) {
  134. v = 0x04|(willQos<<3)|(willRetain<<5);
  135. } else {
  136. v = 0x00;
  137. }
  138. if (cleanSession) {
  139. v = v|0x02;
  140. }
  141. if(user != NULL) {
  142. v = v|0x80;
  143. if(pass != NULL) {
  144. v = v|(0x80>>1);
  145. }
  146. }
  147. buffer[length++] = v;
  148. buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
  149. buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
  150. CHECK_STRING_LENGTH(length,id)
  151. length = writeString(id,buffer,length);
  152. if (willTopic) {
  153. CHECK_STRING_LENGTH(length,willTopic)
  154. length = writeString(willTopic,buffer,length);
  155. CHECK_STRING_LENGTH(length,willMessage)
  156. length = writeString(willMessage,buffer,length);
  157. }
  158. if(user != NULL) {
  159. CHECK_STRING_LENGTH(length,user)
  160. length = writeString(user,buffer,length);
  161. if(pass != NULL) {
  162. CHECK_STRING_LENGTH(length,pass)
  163. length = writeString(pass,buffer,length);
  164. }
  165. }
  166. write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);
  167. lastInActivity = lastOutActivity = millis();
  168. while (!_client->available()) {
  169. unsigned long t = millis();
  170. if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
  171. _state = MQTT_CONNECTION_TIMEOUT;
  172. _client->stop();
  173. return false;
  174. }
  175. }
  176. uint8_t llen;
  177. uint16_t len = readPacket(&llen);
  178. if (len == 4) {
  179. if (buffer[3] == 0) {
  180. lastInActivity = millis();
  181. pingOutstanding = false;
  182. _state = MQTT_CONNECTED;
  183. return true;
  184. } else {
  185. _state = buffer[3];
  186. }
  187. }
  188. _client->stop();
  189. } else {
  190. _state = MQTT_CONNECT_FAILED;
  191. }
  192. return false;
  193. }
  194. return true;
  195. }
  196. // reads a byte into result
  197. boolean PubSubClient::readByte(uint8_t * result) {
  198. uint32_t previousMillis = millis();
  199. while(!_client->available()) {
  200. yield();
  201. uint32_t currentMillis = millis();
  202. if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
  203. return false;
  204. }
  205. }
  206. *result = _client->read();
  207. return true;
  208. }
  209. // reads a byte into result[*index] and increments index
  210. boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
  211. uint16_t current_index = *index;
  212. uint8_t * write_address = &(result[current_index]);
  213. if(readByte(write_address)){
  214. *index = current_index + 1;
  215. return true;
  216. }
  217. return false;
  218. }
  219. uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
  220. uint16_t len = 0;
  221. if(!readByte(buffer, &len)) return 0;
  222. boolean isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
  223. uint32_t multiplier = 1;
  224. uint16_t length = 0;
  225. uint8_t digit = 0;
  226. uint16_t skip = 0;
  227. uint8_t start = 0;
  228. do {
  229. if (len == 5) {
  230. // Invalid remaining length encoding - kill the connection
  231. _state = MQTT_DISCONNECTED;
  232. _client->stop();
  233. return 0;
  234. }
  235. if(!readByte(&digit)) return 0;
  236. buffer[len++] = digit;
  237. length += (digit & 127) * multiplier;
  238. multiplier *= 128;
  239. } while ((digit & 128) != 0);
  240. *lengthLength = len-1;
  241. if (isPublish) {
  242. // Read in topic length to calculate bytes to skip over for Stream writing
  243. if(!readByte(buffer, &len)) return 0;
  244. if(!readByte(buffer, &len)) return 0;
  245. skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
  246. start = 2;
  247. if (buffer[0]&MQTTQOS1) {
  248. // skip message id
  249. skip += 2;
  250. }
  251. }
  252. for (uint16_t i = start;i<length;i++) {
  253. if(!readByte(&digit)) return 0;
  254. if (this->stream) {
  255. if (isPublish && len-*lengthLength-2>skip) {
  256. this->stream->write(digit);
  257. }
  258. }
  259. if (len < MQTT_MAX_PACKET_SIZE) {
  260. buffer[len] = digit;
  261. }
  262. len++;
  263. }
  264. if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
  265. len = 0; // This will cause the packet to be ignored.
  266. }
  267. return len;
  268. }
  269. boolean PubSubClient::loop() {
  270. if (connected()) {
  271. unsigned long t = millis();
  272. if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
  273. if (pingOutstanding) {
  274. this->_state = MQTT_CONNECTION_TIMEOUT;
  275. _client->stop();
  276. return false;
  277. } else {
  278. buffer[0] = MQTTPINGREQ;
  279. buffer[1] = 0;
  280. _client->write(buffer,2);
  281. lastOutActivity = t;
  282. lastInActivity = t;
  283. pingOutstanding = true;
  284. }
  285. }
  286. if (_client->available()) {
  287. uint8_t llen;
  288. uint16_t len = readPacket(&llen);
  289. uint16_t msgId = 0;
  290. uint8_t *payload;
  291. if (len > 0) {
  292. lastInActivity = t;
  293. uint8_t type = buffer[0]&0xF0;
  294. if (type == MQTTPUBLISH) {
  295. if (callback) {
  296. uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
  297. memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
  298. buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
  299. char *topic = (char*) buffer+llen+2;
  300. // msgId only present for QOS>0
  301. if ((buffer[0]&0x06) == MQTTQOS1) {
  302. msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
  303. payload = buffer+llen+3+tl+2;
  304. callback(topic,payload,len-llen-3-tl-2);
  305. buffer[0] = MQTTPUBACK;
  306. buffer[1] = 2;
  307. buffer[2] = (msgId >> 8);
  308. buffer[3] = (msgId & 0xFF);
  309. _client->write(buffer,4);
  310. lastOutActivity = t;
  311. } else {
  312. payload = buffer+llen+3+tl;
  313. callback(topic,payload,len-llen-3-tl);
  314. }
  315. }
  316. } else if (type == MQTTPINGREQ) {
  317. buffer[0] = MQTTPINGRESP;
  318. buffer[1] = 0;
  319. _client->write(buffer,2);
  320. } else if (type == MQTTPINGRESP) {
  321. pingOutstanding = false;
  322. }
  323. } else if (!connected()) {
  324. // readPacket has closed the connection
  325. return false;
  326. }
  327. }
  328. return true;
  329. }
  330. return false;
  331. }
  332. boolean PubSubClient::publish(const char* topic, const char* payload) {
  333. return publish(topic,(const uint8_t*)payload,strlen(payload),false);
  334. }
  335. boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
  336. return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
  337. }
  338. boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
  339. return publish(topic, payload, plength, false);
  340. }
  341. boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
  342. if (connected()) {
  343. if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
  344. // Too long
  345. return false;
  346. }
  347. // Leave room in the buffer for header and variable length field
  348. uint16_t length = MQTT_MAX_HEADER_SIZE;
  349. length = writeString(topic,buffer,length);
  350. uint16_t i;
  351. for (i=0;i<plength;i++) {
  352. buffer[length++] = payload[i];
  353. }
  354. uint8_t header = MQTTPUBLISH;
  355. if (retained) {
  356. header |= 1;
  357. }
  358. return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
  359. }
  360. return false;
  361. }
  362. boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
  363. return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained);
  364. }
  365. boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
  366. uint8_t llen = 0;
  367. uint8_t digit;
  368. unsigned int rc = 0;
  369. uint16_t tlen;
  370. unsigned int pos = 0;
  371. unsigned int i;
  372. uint8_t header;
  373. unsigned int len;
  374. if (!connected()) {
  375. return false;
  376. }
  377. tlen = strlen(topic);
  378. header = MQTTPUBLISH;
  379. if (retained) {
  380. header |= 1;
  381. }
  382. buffer[pos++] = header;
  383. len = plength + 2 + tlen;
  384. do {
  385. digit = len % 128;
  386. len = len / 128;
  387. if (len > 0) {
  388. digit |= 0x80;
  389. }
  390. buffer[pos++] = digit;
  391. llen++;
  392. } while(len>0);
  393. pos = writeString(topic,buffer,pos);
  394. rc += _client->write(buffer,pos);
  395. for (i=0;i<plength;i++) {
  396. rc += _client->write((char)pgm_read_byte_near(payload + i));
  397. }
  398. lastOutActivity = millis();
  399. return rc == tlen + 4 + plength;
  400. }
  401. boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
  402. if (connected()) {
  403. // Send the header and variable length field
  404. uint16_t length = MQTT_MAX_HEADER_SIZE;
  405. length = writeString(topic,buffer,length);
  406. uint16_t i;
  407. uint8_t header = MQTTPUBLISH;
  408. if (retained) {
  409. header |= 1;
  410. }
  411. size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
  412. uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
  413. lastOutActivity = millis();
  414. return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
  415. }
  416. return false;
  417. }
  418. int PubSubClient::endPublish() {
  419. return 1;
  420. }
  421. size_t PubSubClient::write(uint8_t data) {
  422. lastOutActivity = millis();
  423. return _client->write(data);
  424. }
  425. size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
  426. lastOutActivity = millis();
  427. return _client->write(buffer,size);
  428. }
  429. size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
  430. uint8_t lenBuf[4];
  431. uint8_t llen = 0;
  432. uint8_t digit;
  433. uint8_t pos = 0;
  434. uint16_t len = length;
  435. do {
  436. digit = len % 128;
  437. len = len / 128;
  438. if (len > 0) {
  439. digit |= 0x80;
  440. }
  441. lenBuf[pos++] = digit;
  442. llen++;
  443. } while(len>0);
  444. buf[4-llen] = header;
  445. for (int i=0;i<llen;i++) {
  446. buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
  447. }
  448. return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
  449. }
  450. boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
  451. uint16_t rc;
  452. uint8_t hlen = buildHeader(header, buf, length);
  453. #ifdef MQTT_MAX_TRANSFER_SIZE
  454. uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
  455. uint16_t bytesRemaining = length+hlen; //Match the length type
  456. uint8_t bytesToWrite;
  457. boolean result = true;
  458. while((bytesRemaining > 0) && result) {
  459. bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
  460. rc = _client->write(writeBuf,bytesToWrite);
  461. result = (rc == bytesToWrite);
  462. bytesRemaining -= rc;
  463. writeBuf += rc;
  464. }
  465. return result;
  466. #else
  467. rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
  468. lastOutActivity = millis();
  469. return (rc == hlen+length);
  470. #endif
  471. }
  472. boolean PubSubClient::subscribe(const char* topic) {
  473. return subscribe(topic, 0);
  474. }
  475. boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
  476. if (qos > 1) {
  477. return false;
  478. }
  479. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  480. // Too long
  481. return false;
  482. }
  483. if (connected()) {
  484. // Leave room in the buffer for header and variable length field
  485. uint16_t length = MQTT_MAX_HEADER_SIZE;
  486. nextMsgId++;
  487. if (nextMsgId == 0) {
  488. nextMsgId = 1;
  489. }
  490. buffer[length++] = (nextMsgId >> 8);
  491. buffer[length++] = (nextMsgId & 0xFF);
  492. length = writeString((char*)topic, buffer,length);
  493. buffer[length++] = qos;
  494. return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
  495. }
  496. return false;
  497. }
  498. boolean PubSubClient::unsubscribe(const char* topic) {
  499. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  500. // Too long
  501. return false;
  502. }
  503. if (connected()) {
  504. uint16_t length = MQTT_MAX_HEADER_SIZE;
  505. nextMsgId++;
  506. if (nextMsgId == 0) {
  507. nextMsgId = 1;
  508. }
  509. buffer[length++] = (nextMsgId >> 8);
  510. buffer[length++] = (nextMsgId & 0xFF);
  511. length = writeString(topic, buffer,length);
  512. return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
  513. }
  514. return false;
  515. }
  516. void PubSubClient::disconnect() {
  517. buffer[0] = MQTTDISCONNECT;
  518. buffer[1] = 0;
  519. _client->write(buffer,2);
  520. _state = MQTT_DISCONNECTED;
  521. _client->flush();
  522. _client->stop();
  523. lastInActivity = lastOutActivity = millis();
  524. }
  525. uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
  526. const char* idp = string;
  527. uint16_t i = 0;
  528. pos += 2;
  529. while (*idp) {
  530. buf[pos++] = *idp++;
  531. i++;
  532. }
  533. buf[pos-i-2] = (i >> 8);
  534. buf[pos-i-1] = (i & 0xFF);
  535. return pos;
  536. }
  537. boolean PubSubClient::connected() {
  538. boolean rc;
  539. if (_client == NULL ) {
  540. rc = false;
  541. } else {
  542. rc = (int)_client->connected();
  543. if (!rc) {
  544. if (this->_state == MQTT_CONNECTED) {
  545. this->_state = MQTT_CONNECTION_LOST;
  546. _client->flush();
  547. _client->stop();
  548. }
  549. }
  550. }
  551. return rc;
  552. }
  553. PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
  554. IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
  555. return setServer(addr,port);
  556. }
  557. PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
  558. this->ip = ip;
  559. this->port = port;
  560. this->domain = NULL;
  561. return *this;
  562. }
  563. PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
  564. this->domain = domain;
  565. this->port = port;
  566. return *this;
  567. }
  568. PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
  569. this->callback = callback;
  570. return *this;
  571. }
  572. PubSubClient& PubSubClient::setClient(Client& client){
  573. this->_client = &client;
  574. return *this;
  575. }
  576. PubSubClient& PubSubClient::setStream(Stream& stream){
  577. this->stream = &stream;
  578. return *this;
  579. }
  580. int PubSubClient::state() {
  581. return this->_state;
  582. }