ESP_IOT v2.5
IOT ESP Coding
PubSubClient.cpp
Go to the documentation of this file.
1#ifdef NOT_FOR_SHOW_ONLY
2/*
3
4 PubSubClient.cpp - A simple client for MQTT.
5 Nick O'Leary
6 http://knolleary.net
7*/
8
9#include "PubSubClient.h"
10#include "Arduino.h"
11
12PubSubClient::PubSubClient() {
13 this->_state = MQTT_DISCONNECTED;
14 this->_client = NULL;
15 this->stream = NULL;
16 setCallback(NULL);
17 this->bufferSize = 0;
18 setBufferSize(MQTT_MAX_PACKET_SIZE);
19 setKeepAlive(MQTT_KEEPALIVE);
20 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
21}
22
23PubSubClient::PubSubClient(Client& client) {
24 this->_state = MQTT_DISCONNECTED;
25 setClient(client);
26 this->stream = NULL;
27 this->bufferSize = 0;
28 setBufferSize(MQTT_MAX_PACKET_SIZE);
29 setKeepAlive(MQTT_KEEPALIVE);
30 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
31}
32
33PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
34 this->_state = MQTT_DISCONNECTED;
35 setServer(addr, port);
36 setClient(client);
37 this->stream = NULL;
38 this->bufferSize = 0;
39 setBufferSize(MQTT_MAX_PACKET_SIZE);
40 setKeepAlive(MQTT_KEEPALIVE);
41 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
42}
43PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
44 this->_state = MQTT_DISCONNECTED;
45 setServer(addr,port);
46 setClient(client);
47 setStream(stream);
48 this->bufferSize = 0;
49 setBufferSize(MQTT_MAX_PACKET_SIZE);
50 setKeepAlive(MQTT_KEEPALIVE);
51 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
52}
53PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
54 this->_state = MQTT_DISCONNECTED;
55 setServer(addr, port);
56 setCallback(callback);
57 setClient(client);
58 this->stream = NULL;
59 this->bufferSize = 0;
60 setBufferSize(MQTT_MAX_PACKET_SIZE);
61 setKeepAlive(MQTT_KEEPALIVE);
62 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
63}
64PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
65 this->_state = MQTT_DISCONNECTED;
66 setServer(addr,port);
67 setCallback(callback);
68 setClient(client);
69 setStream(stream);
70 this->bufferSize = 0;
71 setBufferSize(MQTT_MAX_PACKET_SIZE);
72 setKeepAlive(MQTT_KEEPALIVE);
73 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
74}
75
76PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
77 this->_state = MQTT_DISCONNECTED;
78 setServer(ip, port);
79 setClient(client);
80 this->stream = NULL;
81 this->bufferSize = 0;
82 setBufferSize(MQTT_MAX_PACKET_SIZE);
83 setKeepAlive(MQTT_KEEPALIVE);
84 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
85}
86PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
87 this->_state = MQTT_DISCONNECTED;
88 setServer(ip,port);
89 setClient(client);
90 setStream(stream);
91 this->bufferSize = 0;
92 setBufferSize(MQTT_MAX_PACKET_SIZE);
93 setKeepAlive(MQTT_KEEPALIVE);
94 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
95}
96PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
97 this->_state = MQTT_DISCONNECTED;
98 setServer(ip, port);
99 setCallback(callback);
100 setClient(client);
101 this->stream = NULL;
102 this->bufferSize = 0;
103 setBufferSize(MQTT_MAX_PACKET_SIZE);
104 setKeepAlive(MQTT_KEEPALIVE);
105 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
106}
107PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
108 this->_state = MQTT_DISCONNECTED;
109 setServer(ip,port);
110 setCallback(callback);
111 setClient(client);
112 setStream(stream);
113 this->bufferSize = 0;
114 setBufferSize(MQTT_MAX_PACKET_SIZE);
115 setKeepAlive(MQTT_KEEPALIVE);
116 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
117}
118
119PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
120 this->_state = MQTT_DISCONNECTED;
121 setServer(domain,port);
122 setClient(client);
123 this->stream = NULL;
124 this->bufferSize = 0;
125 setBufferSize(MQTT_MAX_PACKET_SIZE);
126 setKeepAlive(MQTT_KEEPALIVE);
127 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
128}
129PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
130 this->_state = MQTT_DISCONNECTED;
131 setServer(domain,port);
132 setClient(client);
133 setStream(stream);
134 this->bufferSize = 0;
135 setBufferSize(MQTT_MAX_PACKET_SIZE);
136 setKeepAlive(MQTT_KEEPALIVE);
137 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
138}
139PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
140 this->_state = MQTT_DISCONNECTED;
141 setServer(domain,port);
142 setCallback(callback);
143 setClient(client);
144 this->stream = NULL;
145 this->bufferSize = 0;
146 setBufferSize(MQTT_MAX_PACKET_SIZE);
147 setKeepAlive(MQTT_KEEPALIVE);
148 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
149}
150PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
151 this->_state = MQTT_DISCONNECTED;
152 setServer(domain,port);
153 setCallback(callback);
154 setClient(client);
155 setStream(stream);
156 this->bufferSize = 0;
157 setBufferSize(MQTT_MAX_PACKET_SIZE);
158 setKeepAlive(MQTT_KEEPALIVE);
159 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
160}
161
162PubSubClient::~PubSubClient() {
163 free(this->buffer);
164}
165
166boolean PubSubClient::connect(const char *id) {
167 return connect(id,NULL,NULL,0,0,0,0,1);
168}
169
170boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
171 return connect(id,user,pass,0,0,0,0,1);
172}
173
174boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
175 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
176}
177
178boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
179 return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
180}
181
182boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
183 if (!connected()) {
184 int result = 0;
185
186
187 if(_client->connected()) {
188 result = 1;
189 } else {
190 if (domain != NULL) {
191 result = _client->connect(this->domain, this->port);
192 } else {
193 result = _client->connect(this->ip, this->port);
194 }
195 }
196
197 if (result == 1) {
198 nextMsgId = 1;
199 // Leave room in the buffer for header and variable length field
200 uint16_t length = MQTT_MAX_HEADER_SIZE;
201 unsigned int j;
202
203#if MQTT_VERSION == MQTT_VERSION_3_1
204 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
205#define MQTT_HEADER_VERSION_LENGTH 9
206#elif MQTT_VERSION == MQTT_VERSION_3_1_1
207 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
208#define MQTT_HEADER_VERSION_LENGTH 7
209#endif
210 for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
211 this->buffer[length++] = d[j];
212 }
213
214 uint8_t v;
215 if (willTopic) {
216 v = 0x04|(willQos<<3)|(willRetain<<5);
217 } else {
218 v = 0x00;
219 }
220 if (cleanSession) {
221 v = v|0x02;
222 }
223
224 if(user != NULL) {
225 v = v|0x80;
226
227 if(pass != NULL) {
228 v = v|(0x80>>1);
229 }
230 }
231 this->buffer[length++] = v;
232
233 this->buffer[length++] = ((this->keepAlive) >> 8);
234 this->buffer[length++] = ((this->keepAlive) & 0xFF);
235
236 CHECK_STRING_LENGTH(length,id)
237 length = writeString(id,this->buffer,length);
238 if (willTopic) {
239 CHECK_STRING_LENGTH(length,willTopic)
240 length = writeString(willTopic,this->buffer,length);
241 CHECK_STRING_LENGTH(length,willMessage)
242 length = writeString(willMessage,this->buffer,length);
243 }
244
245 if(user != NULL) {
246 CHECK_STRING_LENGTH(length,user)
247 length = writeString(user,this->buffer,length);
248 if(pass != NULL) {
249 CHECK_STRING_LENGTH(length,pass)
250 length = writeString(pass,this->buffer,length);
251 }
252 }
253
254 write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
255
256 lastInActivity = lastOutActivity = millis();
257
258 while (!_client->available()) {
259 unsigned long t = millis();
260 if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
261 _state = MQTT_CONNECTION_TIMEOUT;
262 _client->stop();
263 return false;
264 }
265 }
266 uint8_t llen;
267 uint32_t len = readPacket(&llen);
268
269 if (len == 4) {
270 if (buffer[3] == 0) {
271 lastInActivity = millis();
272 pingOutstanding = false;
273 _state = MQTT_CONNECTED;
274 return true;
275 } else {
276 _state = buffer[3];
277 }
278 }
279 _client->stop();
280 } else {
281 _state = MQTT_CONNECT_FAILED;
282 }
283 return false;
284 }
285 return true;
286}
287
288// reads a byte into result
289boolean PubSubClient::readByte(uint8_t * result) {
290 uint32_t previousMillis = millis();
291 while(!_client->available()) {
292 yield();
293 uint32_t currentMillis = millis();
294 if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
295 return false;
296 }
297 }
298 *result = _client->read();
299 return true;
300}
301
302// reads a byte into result[*index] and increments index
303boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
304 uint16_t current_index = *index;
305 uint8_t * write_address = &(result[current_index]);
306 if(readByte(write_address)){
307 *index = current_index + 1;
308 return true;
309 }
310 return false;
311}
312
313uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
314 uint16_t len = 0;
315 if(!readByte(this->buffer, &len)) return 0;
316 bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
317 uint32_t multiplier = 1;
318 uint32_t length = 0;
319 uint8_t digit = 0;
320 uint16_t skip = 0;
321 uint32_t start = 0;
322
323 do {
324 if (len == 5) {
325 // Invalid remaining length encoding - kill the connection
326 _state = MQTT_DISCONNECTED;
327 _client->stop();
328 return 0;
329 }
330 if(!readByte(&digit)) return 0;
331 this->buffer[len++] = digit;
332 length += (digit & 127) * multiplier;
333 multiplier <<=7; //multiplier *= 128
334 } while ((digit & 128) != 0);
335 *lengthLength = len-1;
336
337 if (isPublish) {
338 // Read in topic length to calculate bytes to skip over for Stream writing
339 if(!readByte(this->buffer, &len)) return 0;
340 if(!readByte(this->buffer, &len)) return 0;
341 skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
342 start = 2;
343 if (this->buffer[0]&MQTTQOS1) {
344 // skip message id
345 skip += 2;
346 }
347 }
348 uint32_t idx = len;
349
350 for (uint32_t i = start;i<length;i++) {
351 if(!readByte(&digit)) return 0;
352 if (this->stream) {
353 if (isPublish && idx-*lengthLength-2>skip) {
354 this->stream->write(digit);
355 }
356 }
357
358 if (len < this->bufferSize) {
359 this->buffer[len] = digit;
360 len++;
361 }
362 idx++;
363 }
364
365 if (!this->stream && idx > this->bufferSize) {
366 len = 0; // This will cause the packet to be ignored.
367 }
368 return len;
369}
370
371boolean PubSubClient::loop() {
372 if (connected()) {
373 unsigned long t = millis();
374 if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
375 if (pingOutstanding) {
376 this->_state = MQTT_CONNECTION_TIMEOUT;
377 _client->stop();
378 return false;
379 } else {
380 this->buffer[0] = MQTTPINGREQ;
381 this->buffer[1] = 0;
382 _client->write(this->buffer,2);
383 lastOutActivity = t;
384 lastInActivity = t;
385 pingOutstanding = true;
386 }
387 }
388 if (_client->available()) {
389 uint8_t llen;
390 uint16_t len = readPacket(&llen);
391 uint16_t msgId = 0;
392 uint8_t *payload;
393 if (len > 0) {
394 lastInActivity = t;
395 uint8_t type = this->buffer[0]&0xF0;
396 if (type == MQTTPUBLISH) {
397 if (callback) {
398 uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
399 memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
400 this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
401 char *topic = (char*) this->buffer+llen+2;
402 // msgId only present for QOS>0
403 if ((this->buffer[0]&0x06) == MQTTQOS1) {
404 msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
405 payload = this->buffer+llen+3+tl+2;
406 callback(topic,payload,len-llen-3-tl-2);
407
408 this->buffer[0] = MQTTPUBACK;
409 this->buffer[1] = 2;
410 this->buffer[2] = (msgId >> 8);
411 this->buffer[3] = (msgId & 0xFF);
412 _client->write(this->buffer,4);
413 lastOutActivity = t;
414
415 } else {
416 payload = this->buffer+llen+3+tl;
417 callback(topic,payload,len-llen-3-tl);
418 }
419 }
420 } else if (type == MQTTPINGREQ) {
421 this->buffer[0] = MQTTPINGRESP;
422 this->buffer[1] = 0;
423 _client->write(this->buffer,2);
424 } else if (type == MQTTPINGRESP) {
425 pingOutstanding = false;
426 }
427 } else if (!connected()) {
428 // readPacket has closed the connection
429 return false;
430 }
431 }
432 return true;
433 }
434 return false;
435}
436
437boolean PubSubClient::publish(const char* topic, const char* payload) {
438 return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false);
439}
440
441boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
442 return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained);
443}
444
445boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
446 return publish(topic, payload, plength, false);
447}
448
449boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
450 if (connected()) {
451 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) {
452 // Too long
453 return false;
454 }
455 // Leave room in the buffer for header and variable length field
456 uint16_t length = MQTT_MAX_HEADER_SIZE;
457 length = writeString(topic,this->buffer,length);
458
459 // Add payload
460 uint16_t i;
461 for (i=0;i<plength;i++) {
462 this->buffer[length++] = payload[i];
463 }
464
465 // Write the header
466 uint8_t header = MQTTPUBLISH;
467 if (retained) {
468 header |= 1;
469 }
470 return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
471 }
472 return false;
473}
474
475boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
476 return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained);
477}
478
479boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
480 uint8_t llen = 0;
481 uint8_t digit;
482 unsigned int rc = 0;
483 uint16_t tlen;
484 unsigned int pos = 0;
485 unsigned int i;
486 uint8_t header;
487 unsigned int len;
488 int expectedLength;
489
490 if (!connected()) {
491 return false;
492 }
493
494 tlen = strnlen(topic, this->bufferSize);
495
496 header = MQTTPUBLISH;
497 if (retained) {
498 header |= 1;
499 }
500 this->buffer[pos++] = header;
501 len = plength + 2 + tlen;
502 do {
503 digit = len & 127; //digit = len %128
504 len >>= 7; //len = len / 128
505 if (len > 0) {
506 digit |= 0x80;
507 }
508 this->buffer[pos++] = digit;
509 llen++;
510 } while(len>0);
511
512 pos = writeString(topic,this->buffer,pos);
513
514 rc += _client->write(this->buffer,pos);
515
516 for (i=0;i<plength;i++) {
517 rc += _client->write((char)pgm_read_byte_near(payload + i));
518 }
519
520 lastOutActivity = millis();
521
522 expectedLength = 1 + llen + 2 + tlen + plength;
523
524 return (rc == expectedLength);
525}
526
527boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
528 if (connected()) {
529 // Send the header and variable length field
530 uint16_t length = MQTT_MAX_HEADER_SIZE;
531 length = writeString(topic,this->buffer,length);
532 uint8_t header = MQTTPUBLISH;
533 if (retained) {
534 header |= 1;
535 }
536 size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
537 uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
538 lastOutActivity = millis();
539 return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
540 }
541 return false;
542}
543
544int PubSubClient::endPublish() {
545 return 1;
546}
547
548size_t PubSubClient::write(uint8_t data) {
549 lastOutActivity = millis();
550 return _client->write(data);
551}
552
553size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
554 lastOutActivity = millis();
555 return _client->write(buffer,size);
556}
557
558size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
559 uint8_t lenBuf[4];
560 uint8_t llen = 0;
561 uint8_t digit;
562 uint8_t pos = 0;
563 uint16_t len = length;
564 do {
565
566 digit = len & 127; //digit = len %128
567 len >>= 7; //len = len / 128
568 if (len > 0) {
569 digit |= 0x80;
570 }
571 lenBuf[pos++] = digit;
572 llen++;
573 } while(len>0);
574
575 buf[4-llen] = header;
576 for (int i=0;i<llen;i++) {
577 buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
578 }
579 return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
580}
581
582boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
583 uint16_t rc;
584 uint8_t hlen = buildHeader(header, buf, length);
585
586#ifdef MQTT_MAX_TRANSFER_SIZE
587 uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
588 uint16_t bytesRemaining = length+hlen; //Match the length type
589 uint8_t bytesToWrite;
590 boolean result = true;
591 while((bytesRemaining > 0) && result) {
592 bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
593 rc = _client->write(writeBuf,bytesToWrite);
594 result = (rc == bytesToWrite);
595 bytesRemaining -= rc;
596 writeBuf += rc;
597 }
598 return result;
599#else
600 rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
601 lastOutActivity = millis();
602 return (rc == hlen+length);
603#endif
604}
605
606boolean PubSubClient::subscribe(const char* topic) {
607 return subscribe(topic, 0);
608}
609
610boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
611 size_t topicLength = strnlen(topic, this->bufferSize);
612 if (topic == 0) {
613 return false;
614 }
615 if (qos > 1) {
616 return false;
617 }
618 if (this->bufferSize < 9 + topicLength) {
619 // Too long
620 return false;
621 }
622 if (connected()) {
623 // Leave room in the buffer for header and variable length field
624 uint16_t length = MQTT_MAX_HEADER_SIZE;
625 nextMsgId++;
626 if (nextMsgId == 0) {
627 nextMsgId = 1;
628 }
629 this->buffer[length++] = (nextMsgId >> 8);
630 this->buffer[length++] = (nextMsgId & 0xFF);
631 length = writeString((char*)topic, this->buffer,length);
632 this->buffer[length++] = qos;
633 return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
634 }
635 return false;
636}
637
638boolean PubSubClient::unsubscribe(const char* topic) {
639 size_t topicLength = strnlen(topic, this->bufferSize);
640 if (topic == 0) {
641 return false;
642 }
643 if (this->bufferSize < 9 + topicLength) {
644 // Too long
645 return false;
646 }
647 if (connected()) {
648 uint16_t length = MQTT_MAX_HEADER_SIZE;
649 nextMsgId++;
650 if (nextMsgId == 0) {
651 nextMsgId = 1;
652 }
653 this->buffer[length++] = (nextMsgId >> 8);
654 this->buffer[length++] = (nextMsgId & 0xFF);
655 length = writeString(topic, this->buffer,length);
656 return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
657 }
658 return false;
659}
660
661void PubSubClient::disconnect() {
662 this->buffer[0] = MQTTDISCONNECT;
663 this->buffer[1] = 0;
664 _client->write(this->buffer,2);
665 _state = MQTT_DISCONNECTED;
666 _client->flush();
667 _client->stop();
668 lastInActivity = lastOutActivity = millis();
669}
670
671uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
672 const char* idp = string;
673 uint16_t i = 0;
674 pos += 2;
675 while (*idp) {
676 buf[pos++] = *idp++;
677 i++;
678 }
679 buf[pos-i-2] = (i >> 8);
680 buf[pos-i-1] = (i & 0xFF);
681 return pos;
682}
683
684
685boolean PubSubClient::connected() {
686 boolean rc;
687 if (_client == NULL ) {
688 rc = false;
689 } else {
690 rc = (int)_client->connected();
691 if (!rc) {
692 if (this->_state == MQTT_CONNECTED) {
693 this->_state = MQTT_CONNECTION_LOST;
694 _client->flush();
695 _client->stop();
696 }
697 } else {
698 return this->_state == MQTT_CONNECTED;
699 }
700 }
701 return rc;
702}
703
704PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
705 IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
706 return setServer(addr,port);
707}
708
709PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
710 this->ip = ip;
711 this->port = port;
712 this->domain = NULL;
713 return *this;
714}
715
716PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
717 this->domain = domain;
718 this->port = port;
719 return *this;
720}
721
722PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
723 this->callback = callback;
724 return *this;
725}
726
727PubSubClient& PubSubClient::setClient(Client& client){
728 this->_client = &client;
729 return *this;
730}
731
732PubSubClient& PubSubClient::setStream(Stream& stream){
733 this->stream = &stream;
734 return *this;
735}
736
737int PubSubClient::state() {
738 return this->_state;
739}
740
741boolean PubSubClient::setBufferSize(uint16_t size) {
742 if (size == 0) {
743 // Cannot set it back to 0
744 return false;
745 }
746 if (this->bufferSize == 0) {
747 this->buffer = (uint8_t*)malloc(size);
748 } else {
749 uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
750 if (newBuffer != NULL) {
751 this->buffer = newBuffer;
752 } else {
753 return false;
754 }
755 }
756 this->bufferSize = size;
757 return (this->buffer != NULL);
758}
759
760uint16_t PubSubClient::getBufferSize() {
761 return this->bufferSize;
762}
763PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
764 this->keepAlive = keepAlive;
765 return *this;
766}
767PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
768 this->socketTimeout = timeout;
769 return *this;
770}
771#endif
void loop()
main loop() of the Arduino runtime
Definition: ESP_IOT.ino:272
WiFiClient _client