1#ifdef NOT_FOR_SHOW_ONLY
12PubSubClient::PubSubClient() {
13 this->_state = MQTT_DISCONNECTED;
18 setBufferSize(MQTT_MAX_PACKET_SIZE);
19 setKeepAlive(MQTT_KEEPALIVE);
20 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
23PubSubClient::PubSubClient(Client& client) {
24 this->_state = MQTT_DISCONNECTED;
28 setBufferSize(MQTT_MAX_PACKET_SIZE);
29 setKeepAlive(MQTT_KEEPALIVE);
30 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
33PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
34 this->_state = MQTT_DISCONNECTED;
35 setServer(addr, port);
39 setBufferSize(MQTT_MAX_PACKET_SIZE);
40 setKeepAlive(MQTT_KEEPALIVE);
41 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
43PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
44 this->_state = MQTT_DISCONNECTED;
49 setBufferSize(MQTT_MAX_PACKET_SIZE);
50 setKeepAlive(MQTT_KEEPALIVE);
51 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
53PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
54 this->_state = MQTT_DISCONNECTED;
55 setServer(addr, port);
56 setCallback(callback);
60 setBufferSize(MQTT_MAX_PACKET_SIZE);
61 setKeepAlive(MQTT_KEEPALIVE);
62 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
64PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
65 this->_state = MQTT_DISCONNECTED;
67 setCallback(callback);
71 setBufferSize(MQTT_MAX_PACKET_SIZE);
72 setKeepAlive(MQTT_KEEPALIVE);
73 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
76PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
77 this->_state = MQTT_DISCONNECTED;
82 setBufferSize(MQTT_MAX_PACKET_SIZE);
83 setKeepAlive(MQTT_KEEPALIVE);
84 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
86PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
87 this->_state = MQTT_DISCONNECTED;
92 setBufferSize(MQTT_MAX_PACKET_SIZE);
93 setKeepAlive(MQTT_KEEPALIVE);
94 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
96PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
97 this->_state = MQTT_DISCONNECTED;
99 setCallback(callback);
102 this->bufferSize = 0;
103 setBufferSize(MQTT_MAX_PACKET_SIZE);
104 setKeepAlive(MQTT_KEEPALIVE);
105 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
107PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
108 this->_state = MQTT_DISCONNECTED;
110 setCallback(callback);
113 this->bufferSize = 0;
114 setBufferSize(MQTT_MAX_PACKET_SIZE);
115 setKeepAlive(MQTT_KEEPALIVE);
116 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
119PubSubClient::PubSubClient(
const char* domain, uint16_t port, Client& client) {
120 this->_state = MQTT_DISCONNECTED;
121 setServer(domain,port);
124 this->bufferSize = 0;
125 setBufferSize(MQTT_MAX_PACKET_SIZE);
126 setKeepAlive(MQTT_KEEPALIVE);
127 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
129PubSubClient::PubSubClient(
const char* domain, uint16_t port, Client& client, Stream& stream) {
130 this->_state = MQTT_DISCONNECTED;
131 setServer(domain,port);
134 this->bufferSize = 0;
135 setBufferSize(MQTT_MAX_PACKET_SIZE);
136 setKeepAlive(MQTT_KEEPALIVE);
137 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
139PubSubClient::PubSubClient(
const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
140 this->_state = MQTT_DISCONNECTED;
141 setServer(domain,port);
142 setCallback(callback);
145 this->bufferSize = 0;
146 setBufferSize(MQTT_MAX_PACKET_SIZE);
147 setKeepAlive(MQTT_KEEPALIVE);
148 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
150PubSubClient::PubSubClient(
const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
151 this->_state = MQTT_DISCONNECTED;
152 setServer(domain,port);
153 setCallback(callback);
156 this->bufferSize = 0;
157 setBufferSize(MQTT_MAX_PACKET_SIZE);
158 setKeepAlive(MQTT_KEEPALIVE);
159 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
162PubSubClient::~PubSubClient() {
166boolean PubSubClient::connect(
const char *
id) {
167 return connect(
id,NULL,NULL,0,0,0,0,1);
170boolean PubSubClient::connect(
const char *
id,
const char *user,
const char *pass) {
171 return connect(
id,user,pass,0,0,0,0,1);
174boolean PubSubClient::connect(
const char *
id,
const char* willTopic, uint8_t willQos,
boolean willRetain,
const char* willMessage) {
175 return connect(
id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
178boolean PubSubClient::connect(
const char *
id,
const char *user,
const char *pass,
const char* willTopic, uint8_t willQos,
boolean willRetain,
const char* willMessage) {
179 return connect(
id,user,pass,willTopic,willQos,willRetain,willMessage,1);
182boolean PubSubClient::connect(
const char *
id,
const char *user,
const char *pass,
const char* willTopic, uint8_t willQos,
boolean willRetain,
const char* willMessage,
boolean cleanSession) {
190 if (domain != NULL) {
191 result =
_client->connect(this->domain, this->port);
193 result =
_client->connect(this->ip, this->port);
200 uint16_t length = MQTT_MAX_HEADER_SIZE;
203#if MQTT_VERSION == MQTT_VERSION_3_1
204 uint8_t d[9] = {0x00,0x06,
'M',
'Q',
'I',
's',
'd',
'p', MQTT_VERSION};
205#define MQTT_HEADER_VERSION_LENGTH 9
206#elif MQTT_VERSION == MQTT_VERSION_3_1_1
207 uint8_t d[7] = {0x00,0x04,
'M',
'Q',
'T',
'T',MQTT_VERSION};
208#define MQTT_HEADER_VERSION_LENGTH 7
210 for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
211 this->buffer[length++] = d[j];
216 v = 0x04|(willQos<<3)|(willRetain<<5);
231 this->buffer[length++] = v;
233 this->buffer[length++] = ((this->keepAlive) >> 8);
234 this->buffer[length++] = ((this->keepAlive) & 0xFF);
236 CHECK_STRING_LENGTH(length,
id)
237 length = writeString(
id,this->buffer,length);
239 CHECK_STRING_LENGTH(length,willTopic)
240 length = writeString(willTopic,this->buffer,length);
241 CHECK_STRING_LENGTH(length,willMessage)
242 length = writeString(willMessage,this->buffer,length);
246 CHECK_STRING_LENGTH(length,user)
247 length = writeString(user,this->buffer,length);
249 CHECK_STRING_LENGTH(length,pass)
250 length = writeString(pass,this->buffer,length);
254 write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
256 lastInActivity = lastOutActivity = millis();
258 while (!
_client->available()) {
259 unsigned long t = millis();
260 if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
261 _state = MQTT_CONNECTION_TIMEOUT;
267 uint32_t len = readPacket(&llen);
270 if (buffer[3] == 0) {
271 lastInActivity = millis();
272 pingOutstanding =
false;
273 _state = MQTT_CONNECTED;
281 _state = MQTT_CONNECT_FAILED;
289boolean PubSubClient::readByte(uint8_t * result) {
290 uint32_t previousMillis = millis();
293 uint32_t currentMillis = millis();
294 if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
303boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
304 uint16_t current_index = *index;
305 uint8_t * write_address = &(result[current_index]);
306 if(readByte(write_address)){
307 *index = current_index + 1;
313uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
315 if(!readByte(this->buffer, &len))
return 0;
316 bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
317 uint32_t multiplier = 1;
326 _state = MQTT_DISCONNECTED;
330 if(!readByte(&digit))
return 0;
331 this->buffer[len++] = digit;
332 length += (digit & 127) * multiplier;
334 }
while ((digit & 128) != 0);
335 *lengthLength = len-1;
339 if(!readByte(this->buffer, &len))
return 0;
340 if(!readByte(this->buffer, &len))
return 0;
341 skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
343 if (this->buffer[0]&MQTTQOS1) {
350 for (uint32_t i = start;i<length;i++) {
351 if(!readByte(&digit))
return 0;
353 if (isPublish && idx-*lengthLength-2>skip) {
354 this->stream->write(digit);
358 if (len < this->bufferSize) {
359 this->buffer[len] = digit;
365 if (!this->stream && idx > this->bufferSize) {
373 unsigned long t = millis();
374 if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
375 if (pingOutstanding) {
376 this->_state = MQTT_CONNECTION_TIMEOUT;
380 this->buffer[0] = MQTTPINGREQ;
382 _client->write(this->buffer,2);
385 pingOutstanding =
true;
390 uint16_t len = readPacket(&llen);
395 uint8_t type = this->buffer[0]&0xF0;
396 if (type == MQTTPUBLISH) {
398 uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2];
399 memmove(this->buffer+llen+2,this->buffer+llen+3,tl);
400 this->buffer[llen+2+tl] = 0;
401 char *topic = (
char*) this->buffer+llen+2;
403 if ((this->buffer[0]&0x06) == MQTTQOS1) {
404 msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
405 payload = this->buffer+llen+3+tl+2;
406 callback(topic,payload,len-llen-3-tl-2);
408 this->buffer[0] = MQTTPUBACK;
410 this->buffer[2] = (msgId >> 8);
411 this->buffer[3] = (msgId & 0xFF);
412 _client->write(this->buffer,4);
416 payload = this->buffer+llen+3+tl;
417 callback(topic,payload,len-llen-3-tl);
420 }
else if (type == MQTTPINGREQ) {
421 this->buffer[0] = MQTTPINGRESP;
423 _client->write(this->buffer,2);
424 }
else if (type == MQTTPINGRESP) {
425 pingOutstanding =
false;
427 }
else if (!connected()) {
437boolean PubSubClient::publish(
const char* topic,
const char* payload) {
438 return publish(topic,(
const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,
false);
441boolean PubSubClient::publish(
const char* topic,
const char* payload,
boolean retained) {
442 return publish(topic,(
const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained);
445boolean PubSubClient::publish(
const char* topic,
const uint8_t* payload,
unsigned int plength) {
446 return publish(topic, payload, plength,
false);
449boolean PubSubClient::publish(
const char* topic,
const uint8_t* payload,
unsigned int plength,
boolean retained) {
451 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) {
456 uint16_t length = MQTT_MAX_HEADER_SIZE;
457 length = writeString(topic,this->buffer,length);
461 for (i=0;i<plength;i++) {
462 this->buffer[length++] = payload[i];
466 uint8_t header = MQTTPUBLISH;
470 return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
475boolean PubSubClient::publish_P(
const char* topic,
const char* payload,
boolean retained) {
476 return publish_P(topic, (
const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained);
479boolean PubSubClient::publish_P(
const char* topic,
const uint8_t* payload,
unsigned int plength,
boolean retained) {
484 unsigned int pos = 0;
494 tlen = strnlen(topic, this->bufferSize);
496 header = MQTTPUBLISH;
500 this->buffer[pos++] = header;
501 len = plength + 2 + tlen;
508 this->buffer[pos++] = digit;
512 pos = writeString(topic,this->buffer,pos);
514 rc +=
_client->write(this->buffer,pos);
516 for (i=0;i<plength;i++) {
517 rc +=
_client->write((
char)pgm_read_byte_near(payload + i));
520 lastOutActivity = millis();
522 expectedLength = 1 + llen + 2 + tlen + plength;
524 return (rc == expectedLength);
527boolean PubSubClient::beginPublish(
const char* topic,
unsigned int plength,
boolean retained) {
530 uint16_t length = MQTT_MAX_HEADER_SIZE;
531 length = writeString(topic,this->buffer,length);
532 uint8_t header = MQTTPUBLISH;
536 size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
537 uint16_t rc =
_client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
538 lastOutActivity = millis();
539 return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
544int PubSubClient::endPublish() {
548size_t PubSubClient::write(uint8_t data) {
549 lastOutActivity = millis();
553size_t PubSubClient::write(
const uint8_t *buffer,
size_t size) {
554 lastOutActivity = millis();
555 return _client->write(buffer,size);
558size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
563 uint16_t len = length;
571 lenBuf[pos++] = digit;
575 buf[4-llen] = header;
576 for (
int i=0;i<llen;i++) {
577 buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
582boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
584 uint8_t hlen = buildHeader(header, buf, length);
586#ifdef MQTT_MAX_TRANSFER_SIZE
587 uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
588 uint16_t bytesRemaining = length+hlen;
589 uint8_t bytesToWrite;
590 boolean result =
true;
591 while((bytesRemaining > 0) && result) {
592 bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
593 rc =
_client->write(writeBuf,bytesToWrite);
594 result = (rc == bytesToWrite);
595 bytesRemaining -= rc;
600 rc =
_client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
601 lastOutActivity = millis();
602 return (rc == hlen+length);
606boolean PubSubClient::subscribe(
const char* topic) {
607 return subscribe(topic, 0);
610boolean PubSubClient::subscribe(
const char* topic, uint8_t qos) {
611 size_t topicLength = strnlen(topic, this->bufferSize);
618 if (this->bufferSize < 9 + topicLength) {
624 uint16_t length = MQTT_MAX_HEADER_SIZE;
626 if (nextMsgId == 0) {
629 this->buffer[length++] = (nextMsgId >> 8);
630 this->buffer[length++] = (nextMsgId & 0xFF);
631 length = writeString((
char*)topic, this->buffer,length);
632 this->buffer[length++] = qos;
633 return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
638boolean PubSubClient::unsubscribe(
const char* topic) {
639 size_t topicLength = strnlen(topic, this->bufferSize);
643 if (this->bufferSize < 9 + topicLength) {
648 uint16_t length = MQTT_MAX_HEADER_SIZE;
650 if (nextMsgId == 0) {
653 this->buffer[length++] = (nextMsgId >> 8);
654 this->buffer[length++] = (nextMsgId & 0xFF);
655 length = writeString(topic, this->buffer,length);
656 return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
661void PubSubClient::disconnect() {
662 this->buffer[0] = MQTTDISCONNECT;
664 _client->write(this->buffer,2);
665 _state = MQTT_DISCONNECTED;
668 lastInActivity = lastOutActivity = millis();
671uint16_t PubSubClient::writeString(
const char*
string, uint8_t* buf, uint16_t pos) {
672 const char* idp = string;
679 buf[pos-i-2] = (i >> 8);
680 buf[pos-i-1] = (i & 0xFF);
685boolean PubSubClient::connected() {
690 rc = (int)
_client->connected();
692 if (this->_state == MQTT_CONNECTED) {
693 this->_state = MQTT_CONNECTION_LOST;
698 return this->_state == MQTT_CONNECTED;
704PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
705 IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
706 return setServer(addr,port);
709PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
716PubSubClient& PubSubClient::setServer(
const char * domain, uint16_t port) {
717 this->domain = domain;
722PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
723 this->callback = callback;
727PubSubClient& PubSubClient::setClient(Client& client){
732PubSubClient& PubSubClient::setStream(Stream& stream){
733 this->stream = &stream;
737int PubSubClient::state() {
741boolean PubSubClient::setBufferSize(uint16_t size) {
746 if (this->bufferSize == 0) {
747 this->buffer = (uint8_t*)malloc(size);
749 uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
750 if (newBuffer != NULL) {
751 this->buffer = newBuffer;
756 this->bufferSize = size;
757 return (this->buffer != NULL);
760uint16_t PubSubClient::getBufferSize() {
761 return this->bufferSize;
763PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
764 this->keepAlive = keepAlive;
767PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
768 this->socketTimeout = timeout;
void loop()
main loop() of the Arduino runtime