PubSubClient.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. /*
  2. PubSubClient.cpp - A simple client for MQTT.
  3. Nick O'Leary
  4. http://knolleary.net
  5. */
  6. #include "PubSubClient.h"
  7. #include "Arduino.h"
  8. PubSubClient::PubSubClient() {
  9. this->_state = MQTT_DISCONNECTED;
  10. this->_client = NULL;
  11. this->stream = NULL;
  12. setCallback(NULL);
  13. }
  14. PubSubClient::PubSubClient(Client& client) {
  15. this->_state = MQTT_DISCONNECTED;
  16. setClient(client);
  17. this->stream = NULL;
  18. }
  19. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
  20. this->_state = MQTT_DISCONNECTED;
  21. setServer(addr, port);
  22. setClient(client);
  23. this->stream = NULL;
  24. }
  25. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
  26. this->_state = MQTT_DISCONNECTED;
  27. setServer(addr,port);
  28. setClient(client);
  29. setStream(stream);
  30. }
  31. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  32. this->_state = MQTT_DISCONNECTED;
  33. setServer(addr, port);
  34. setCallback(callback);
  35. setClient(client);
  36. this->stream = NULL;
  37. }
  38. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  39. this->_state = MQTT_DISCONNECTED;
  40. setServer(addr,port);
  41. setCallback(callback);
  42. setClient(client);
  43. setStream(stream);
  44. }
  45. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
  46. this->_state = MQTT_DISCONNECTED;
  47. setServer(ip, port);
  48. setClient(client);
  49. this->stream = NULL;
  50. }
  51. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
  52. this->_state = MQTT_DISCONNECTED;
  53. setServer(ip,port);
  54. setClient(client);
  55. setStream(stream);
  56. }
  57. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  58. this->_state = MQTT_DISCONNECTED;
  59. setServer(ip, port);
  60. setCallback(callback);
  61. setClient(client);
  62. this->stream = NULL;
  63. }
  64. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  65. this->_state = MQTT_DISCONNECTED;
  66. setServer(ip,port);
  67. setCallback(callback);
  68. setClient(client);
  69. setStream(stream);
  70. }
  71. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
  72. this->_state = MQTT_DISCONNECTED;
  73. setServer(domain,port);
  74. setClient(client);
  75. this->stream = NULL;
  76. }
  77. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
  78. this->_state = MQTT_DISCONNECTED;
  79. setServer(domain,port);
  80. setClient(client);
  81. setStream(stream);
  82. }
  83. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
  84. this->_state = MQTT_DISCONNECTED;
  85. setServer(domain,port);
  86. setCallback(callback);
  87. setClient(client);
  88. this->stream = NULL;
  89. }
  90. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
  91. this->_state = MQTT_DISCONNECTED;
  92. setServer(domain,port);
  93. setCallback(callback);
  94. setClient(client);
  95. setStream(stream);
  96. }
  97. boolean PubSubClient::connect(const char *id) {
  98. return connect(id,NULL,NULL,0,0,0,0);
  99. }
  100. boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
  101. return connect(id,user,pass,0,0,0,0);
  102. }
  103. boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
  104. return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
  105. }
  106. boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
  107. if (!connected()) {
  108. int result = 0;
  109. if (domain != NULL) {
  110. result = _client->connect(this->domain, this->port);
  111. } else {
  112. result = _client->connect(this->ip, this->port);
  113. }
  114. if (result == 1) {
  115. nextMsgId = 1;
  116. // Leave room in the buffer for header and variable length field
  117. uint16_t length = 5;
  118. unsigned int j;
  119. #if MQTT_VERSION == MQTT_VERSION_3_1
  120. uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
  121. #define MQTT_HEADER_VERSION_LENGTH 9
  122. #elif MQTT_VERSION == MQTT_VERSION_3_1_1
  123. uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
  124. #define MQTT_HEADER_VERSION_LENGTH 7
  125. #endif
  126. for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
  127. buffer[length++] = d[j];
  128. }
  129. uint8_t v;
  130. if (willTopic) {
  131. v = 0x06|(willQos<<3)|(willRetain<<5);
  132. } else {
  133. v = 0x02;
  134. }
  135. if(user != NULL) {
  136. v = v|0x80;
  137. if(pass != NULL) {
  138. v = v|(0x80>>1);
  139. }
  140. }
  141. buffer[length++] = v;
  142. buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
  143. buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
  144. length = writeString(id,buffer,length);
  145. if (willTopic) {
  146. length = writeString(willTopic,buffer,length);
  147. length = writeString(willMessage,buffer,length);
  148. }
  149. if(user != NULL) {
  150. length = writeString(user,buffer,length);
  151. if(pass != NULL) {
  152. length = writeString(pass,buffer,length);
  153. }
  154. }
  155. write(MQTTCONNECT,buffer,length-5);
  156. lastInActivity = lastOutActivity = millis();
  157. while (!_client->available()) {
  158. unsigned long t = millis();
  159. if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
  160. _state = MQTT_CONNECTION_TIMEOUT;
  161. _client->stop();
  162. return false;
  163. }
  164. }
  165. uint8_t llen;
  166. uint16_t len = readPacket(&llen);
  167. if (len == 4) {
  168. if (buffer[3] == 0) {
  169. lastInActivity = millis();
  170. pingOutstanding = false;
  171. _state = MQTT_CONNECTED;
  172. return true;
  173. } else {
  174. _state = buffer[3];
  175. }
  176. }
  177. _client->stop();
  178. } else {
  179. _state = MQTT_CONNECT_FAILED;
  180. }
  181. return false;
  182. }
  183. return true;
  184. }
  185. // reads a byte into result
  186. boolean PubSubClient::readByte(uint8_t * result) {
  187. uint32_t previousMillis = millis();
  188. while(!_client->available()) {
  189. uint32_t currentMillis = millis();
  190. if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
  191. return false;
  192. }
  193. }
  194. *result = _client->read();
  195. return true;
  196. }
  197. // reads a byte into result[*index] and increments index
  198. boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
  199. uint16_t current_index = *index;
  200. uint8_t * write_address = &(result[current_index]);
  201. if(readByte(write_address)){
  202. *index = current_index + 1;
  203. return true;
  204. }
  205. return false;
  206. }
  207. uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
  208. uint16_t len = 0;
  209. if(!readByte(buffer, &len)) return 0;
  210. bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
  211. uint32_t multiplier = 1;
  212. uint16_t length = 0;
  213. uint8_t digit = 0;
  214. uint16_t skip = 0;
  215. uint8_t start = 0;
  216. do {
  217. if(!readByte(&digit)) return 0;
  218. buffer[len++] = digit;
  219. length += (digit & 127) * multiplier;
  220. multiplier *= 128;
  221. } while ((digit & 128) != 0);
  222. *lengthLength = len-1;
  223. if (isPublish) {
  224. // Read in topic length to calculate bytes to skip over for Stream writing
  225. if(!readByte(buffer, &len)) return 0;
  226. if(!readByte(buffer, &len)) return 0;
  227. skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
  228. start = 2;
  229. if (buffer[0]&MQTTQOS1) {
  230. // skip message id
  231. skip += 2;
  232. }
  233. }
  234. for (uint16_t i = start;i<length;i++) {
  235. if(!readByte(&digit)) return 0;
  236. if (this->stream) {
  237. if (isPublish && len-*lengthLength-2>skip) {
  238. this->stream->write(digit);
  239. }
  240. }
  241. if (len < MQTT_MAX_PACKET_SIZE) {
  242. buffer[len] = digit;
  243. }
  244. len++;
  245. }
  246. if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
  247. len = 0; // This will cause the packet to be ignored.
  248. }
  249. return len;
  250. }
  251. boolean PubSubClient::loop() {
  252. if (connected()) {
  253. unsigned long t = millis();
  254. if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
  255. if (pingOutstanding) {
  256. this->_state = MQTT_CONNECTION_TIMEOUT;
  257. _client->stop();
  258. return false;
  259. } else {
  260. buffer[0] = MQTTPINGREQ;
  261. buffer[1] = 0;
  262. _client->write(buffer,2);
  263. lastOutActivity = t;
  264. lastInActivity = t;
  265. pingOutstanding = true;
  266. }
  267. }
  268. if (_client->available()) {
  269. uint8_t llen;
  270. uint16_t len = readPacket(&llen);
  271. uint16_t msgId = 0;
  272. uint8_t *payload;
  273. if (len > 0) {
  274. lastInActivity = t;
  275. uint8_t type = buffer[0]&0xF0;
  276. if (type == MQTTPUBLISH) {
  277. if (callback) {
  278. uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
  279. char topic[tl+1];
  280. for (uint16_t i=0;i<tl;i++) {
  281. topic[i] = buffer[llen+3+i];
  282. }
  283. topic[tl] = 0;
  284. // msgId only present for QOS>0
  285. if ((buffer[0]&0x06) == MQTTQOS1) {
  286. msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
  287. payload = buffer+llen+3+tl+2;
  288. callback(topic,payload,len-llen-3-tl-2);
  289. buffer[0] = MQTTPUBACK;
  290. buffer[1] = 2;
  291. buffer[2] = (msgId >> 8);
  292. buffer[3] = (msgId & 0xFF);
  293. _client->write(buffer,4);
  294. lastOutActivity = t;
  295. } else {
  296. payload = buffer+llen+3+tl;
  297. callback(topic,payload,len-llen-3-tl);
  298. }
  299. }
  300. } else if (type == MQTTPINGREQ) {
  301. buffer[0] = MQTTPINGRESP;
  302. buffer[1] = 0;
  303. _client->write(buffer,2);
  304. } else if (type == MQTTPINGRESP) {
  305. pingOutstanding = false;
  306. }
  307. }
  308. }
  309. return true;
  310. }
  311. return false;
  312. }
  313. boolean PubSubClient::publish(const char* topic, const char* payload) {
  314. return publish(topic,(const uint8_t*)payload,strlen(payload),false);
  315. }
  316. boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
  317. return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
  318. }
  319. boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
  320. return publish(topic, payload, plength, false);
  321. }
  322. boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
  323. if (connected()) {
  324. if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
  325. // Too long
  326. return false;
  327. }
  328. // Leave room in the buffer for header and variable length field
  329. uint16_t length = 5;
  330. length = writeString(topic,buffer,length);
  331. uint16_t i;
  332. for (i=0;i<plength;i++) {
  333. buffer[length++] = payload[i];
  334. }
  335. uint8_t header = MQTTPUBLISH;
  336. if (retained) {
  337. header |= 1;
  338. }
  339. return write(header,buffer,length-5);
  340. }
  341. return false;
  342. }
  343. boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
  344. uint8_t llen = 0;
  345. uint8_t digit;
  346. unsigned int rc = 0;
  347. uint16_t tlen;
  348. unsigned int pos = 0;
  349. unsigned int i;
  350. uint8_t header;
  351. unsigned int len;
  352. if (!connected()) {
  353. return false;
  354. }
  355. tlen = strlen(topic);
  356. header = MQTTPUBLISH;
  357. if (retained) {
  358. header |= 1;
  359. }
  360. buffer[pos++] = header;
  361. len = plength + 2 + tlen;
  362. do {
  363. digit = len % 128;
  364. len = len / 128;
  365. if (len > 0) {
  366. digit |= 0x80;
  367. }
  368. buffer[pos++] = digit;
  369. llen++;
  370. } while(len>0);
  371. pos = writeString(topic,buffer,pos);
  372. rc += _client->write(buffer,pos);
  373. for (i=0;i<plength;i++) {
  374. rc += _client->write((char)pgm_read_byte_near(payload + i));
  375. }
  376. lastOutActivity = millis();
  377. return rc == tlen + 4 + plength;
  378. }
  379. boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
  380. uint8_t lenBuf[4];
  381. uint8_t llen = 0;
  382. uint8_t digit;
  383. uint8_t pos = 0;
  384. uint16_t rc;
  385. uint16_t len = length;
  386. do {
  387. digit = len % 128;
  388. len = len / 128;
  389. if (len > 0) {
  390. digit |= 0x80;
  391. }
  392. lenBuf[pos++] = digit;
  393. llen++;
  394. } while(len>0);
  395. buf[4-llen] = header;
  396. for (int i=0;i<llen;i++) {
  397. buf[5-llen+i] = lenBuf[i];
  398. }
  399. #ifdef MQTT_MAX_TRANSFER_SIZE
  400. uint8_t* writeBuf = buf+(4-llen);
  401. uint16_t bytesRemaining = length+1+llen; //Match the length type
  402. uint8_t bytesToWrite;
  403. boolean result = true;
  404. while((bytesRemaining > 0) && result) {
  405. bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
  406. rc = _client->write(writeBuf,bytesToWrite);
  407. result = (rc == bytesToWrite);
  408. bytesRemaining -= rc;
  409. writeBuf += rc;
  410. }
  411. return result;
  412. #else
  413. rc = _client->write(buf+(4-llen),length+1+llen);
  414. lastOutActivity = millis();
  415. return (rc == 1+llen+length);
  416. #endif
  417. }
  418. boolean PubSubClient::subscribe(const char* topic) {
  419. return subscribe(topic, 0);
  420. }
  421. boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
  422. if (qos < 0 || qos > 1) {
  423. return false;
  424. }
  425. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  426. // Too long
  427. return false;
  428. }
  429. if (connected()) {
  430. // Leave room in the buffer for header and variable length field
  431. uint16_t length = 5;
  432. nextMsgId++;
  433. if (nextMsgId == 0) {
  434. nextMsgId = 1;
  435. }
  436. buffer[length++] = (nextMsgId >> 8);
  437. buffer[length++] = (nextMsgId & 0xFF);
  438. length = writeString((char*)topic, buffer,length);
  439. buffer[length++] = qos;
  440. return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
  441. }
  442. return false;
  443. }
  444. boolean PubSubClient::unsubscribe(const char* topic) {
  445. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  446. // Too long
  447. return false;
  448. }
  449. if (connected()) {
  450. uint16_t length = 5;
  451. nextMsgId++;
  452. if (nextMsgId == 0) {
  453. nextMsgId = 1;
  454. }
  455. buffer[length++] = (nextMsgId >> 8);
  456. buffer[length++] = (nextMsgId & 0xFF);
  457. length = writeString(topic, buffer,length);
  458. return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
  459. }
  460. return false;
  461. }
  462. void PubSubClient::disconnect() {
  463. buffer[0] = MQTTDISCONNECT;
  464. buffer[1] = 0;
  465. _client->write(buffer,2);
  466. _state = MQTT_DISCONNECTED;
  467. _client->stop();
  468. lastInActivity = lastOutActivity = millis();
  469. }
  470. uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
  471. const char* idp = string;
  472. uint16_t i = 0;
  473. pos += 2;
  474. while (*idp) {
  475. buf[pos++] = *idp++;
  476. i++;
  477. }
  478. buf[pos-i-2] = (i >> 8);
  479. buf[pos-i-1] = (i & 0xFF);
  480. return pos;
  481. }
  482. boolean PubSubClient::connected() {
  483. boolean rc;
  484. if (_client == NULL ) {
  485. rc = false;
  486. } else {
  487. rc = (int)_client->connected();
  488. if (!rc) {
  489. if (this->_state == MQTT_CONNECTED) {
  490. this->_state = MQTT_CONNECTION_LOST;
  491. _client->flush();
  492. _client->stop();
  493. }
  494. }
  495. }
  496. return rc;
  497. }
  498. PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
  499. IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
  500. return setServer(addr,port);
  501. }
  502. PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
  503. this->ip = ip;
  504. this->port = port;
  505. this->domain = NULL;
  506. return *this;
  507. }
  508. PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
  509. this->domain = domain;
  510. this->port = port;
  511. return *this;
  512. }
  513. PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
  514. this->callback = callback;
  515. return *this;
  516. }
  517. PubSubClient& PubSubClient::setClient(Client& client){
  518. this->_client = &client;
  519. return *this;
  520. }
  521. PubSubClient& PubSubClient::setStream(Stream& stream){
  522. this->stream = &stream;
  523. return *this;
  524. }
  525. int PubSubClient::state() {
  526. return this->_state;
  527. }