Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions include/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,9 @@ enum MQTTConnectFlags {
MQTT_CONNECT_RESERVED = 1u,
MQTT_CONNECT_CLEAN_SESSION = 2u,
MQTT_CONNECT_WILL_FLAG = 4u,
MQTT_CONNECT_WILL_QOS_0 = (0u & 0x03) << 3,
MQTT_CONNECT_WILL_QOS_1 = (1u & 0x03) << 3,
MQTT_CONNECT_WILL_QOS_2 = (2u & 0x03) << 3,
MQTT_CONNECT_WILL_QOS_0 = 0u << 3,
MQTT_CONNECT_WILL_QOS_1 = 1u << 3,
MQTT_CONNECT_WILL_QOS_2 = 2u << 3,
MQTT_CONNECT_WILL_RETAIN = 32u,
MQTT_CONNECT_PASSWORD = 64u,
MQTT_CONNECT_USER_NAME = 128u
Expand Down Expand Up @@ -742,10 +742,10 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
*/
enum MQTTPublishFlags {
MQTT_PUBLISH_DUP = 8u,
MQTT_PUBLISH_QOS_0 = ((0u << 1) & 0x06),
MQTT_PUBLISH_QOS_1 = ((1u << 1) & 0x06),
MQTT_PUBLISH_QOS_2 = ((2u << 1) & 0x06),
MQTT_PUBLISH_QOS_MASK = ((3u << 1) & 0x06),
MQTT_PUBLISH_QOS_0 = 0u << 1,
MQTT_PUBLISH_QOS_1 = 1u << 1,
MQTT_PUBLISH_QOS_2 = 2u << 1,
MQTT_PUBLISH_QOS_MASK = 3u << 1,
MQTT_PUBLISH_RETAIN = 0x01
};

Expand Down Expand Up @@ -973,10 +973,10 @@ struct mqtt_message_queue {
*
* @warning This member should \em not be manually changed.
*/
void *mem_start;
uint8_t *mem_start;

/** @brief The end of the message queue's memory block. */
void *mem_end;
struct mqtt_queued_message *mem_end;

/**
* @brief A pointer to the position in the buffer you can pack bytes at.
Expand Down Expand Up @@ -1014,7 +1014,7 @@ struct mqtt_message_queue {
*
* @relates mqtt_message_queue
*/
void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz);
void mqtt_mq_init(struct mqtt_message_queue *mq, uint8_t *buf, size_t bufsz);

/**
* @brief Clear as many messages from the front of the queue as possible.
Expand Down Expand Up @@ -1069,19 +1069,19 @@ struct mqtt_queued_message* mqtt_mq_find(const struct mqtt_message_queue *mq, en
*
* @returns The mqtt_queued_message at \p index.
*/
#define mqtt_mq_get(mq_ptr, index) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - 1 - index)
#define mqtt_mq_get(mq_ptr, index) ((mq_ptr)->mem_end - 1 - index)

/**
* @brief Returns the number of messages in the message queue, \p mq_ptr.
* @ingroup details
*/
#define mqtt_mq_length(mq_ptr) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - (mq_ptr)->queue_tail)
#define mqtt_mq_length(mq_ptr) ((mq_ptr)->mem_end - (mq_ptr)->queue_tail)

/**
* @brief Used internally to recalculate the \c curr_sz.
* @ingroup details
*/
#define mqtt_mq_currsz(mq_ptr) (((mq_ptr)->curr >= (uint8_t*) ((mq_ptr)->queue_tail - 1)) ? 0 : ((uint8_t*) ((mq_ptr)->queue_tail - 1)) - (mq_ptr)->curr)
#define mqtt_mq_currsz(mq_ptr) (((mq_ptr)->curr >= (uint8_t*) ((mq_ptr)->queue_tail - 1)) ? 0 : (size_t) (((uint8_t*) ((mq_ptr)->queue_tail - 1)) - (mq_ptr)->curr))

/* CLIENT */

Expand Down Expand Up @@ -1190,7 +1190,7 @@ struct mqtt_client {
* This member is always initialized to NULL but it can be manually set at any
* time.
*/
enum MQTTErrors (*inspector_callback)(struct mqtt_client*);
enum MQTTErrors (*inspector_callback)(struct mqtt_client* client);

/**
* @brief A callback that is called whenever the client is in an error state.
Expand All @@ -1199,7 +1199,7 @@ struct mqtt_client {
* previous sockets, and reestabilishing the connection to the broker and
* session configurations (i.e. subscriptions).
*/
void (*reconnect_callback)(struct mqtt_client*, void**);
void (*reconnect_callback)(struct mqtt_client* client, void** state);

/**
* @brief A pointer to some state. A pointer to this member is passed to
Expand Down
58 changes: 31 additions & 27 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
}

void mqtt_init_reconnect(struct mqtt_client *client,
void (*reconnect)(struct mqtt_client *, void**),
void (*reconnect_callback)(struct mqtt_client *client, void** state),
void *reconnect_state,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish))
{
Expand All @@ -176,7 +176,7 @@ void mqtt_init_reconnect(struct mqtt_client *client,
client->send_offset = 0;

client->inspector_callback = NULL;
client->reconnect_callback = reconnect;
client->reconnect_callback = reconnect_callback;
client->reconnect_state = reconnect_state;
}

Expand Down Expand Up @@ -214,14 +214,16 @@ void mqtt_reinit(struct mqtt_client* client,
client->error = (enum MQTTErrors)tmp; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return (enum MQTTErrors)tmp; \
} else if (tmp == 0) { \
} \
if (tmp == 0) { \
mqtt_mq_clean(&client->mq); \
tmp = pack_call; \
if (tmp < 0) { \
client->error = (enum MQTTErrors)tmp; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return (enum MQTTErrors)tmp; \
} else if(tmp == 0) { \
} \
if(tmp == 0) { \
client->error = MQTT_ERROR_SEND_BUFFER_IS_FULL; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return (enum MQTTErrors)MQTT_ERROR_SEND_BUFFER_IS_FULL; \
Expand Down Expand Up @@ -676,7 +678,8 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
client->error = (enum MQTTErrors)consumed;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return consumed;
} else if (consumed == 0) {
}
if (consumed == 0) {
/* if curr_sz is 0 then the buffer is too small to ever fit the message */
if (client->recv_buffer.curr_sz == 0) {
client->error = MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
Expand Down Expand Up @@ -1284,7 +1287,7 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
uint8_t inspected_qos;

/* check for null pointers */
if(buf == NULL || topic_name == NULL) {
if (topic_name == NULL) {
return MQTT_ERROR_NULLPTR;
}

Expand Down Expand Up @@ -1391,9 +1394,6 @@ ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
const uint8_t *const start = buf;
struct mqtt_fixed_header fixed_header;
ssize_t rv;
if (buf == NULL) {
return MQTT_ERROR_NULLPTR;
}

/* pack fixed header */
fixed_header.control_type = control_type;
Expand Down Expand Up @@ -1478,7 +1478,7 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, unsigned int pac
unsigned int num_subs = 0;
unsigned int i;
const char *topic[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS] = { 0 };

/* parse all subscriptions */
va_start(args, packet_id);
Expand Down Expand Up @@ -1613,13 +1613,13 @@ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, unsigned int p
}

/* MESSAGE QUEUE */
void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz)
void mqtt_mq_init(struct mqtt_message_queue *mq, uint8_t *buf, size_t bufsz)
{
mq->mem_start = buf;
mq->mem_end = (uint8_t *)buf + bufsz;
mq->curr = (uint8_t *)buf;
mq->queue_tail = (struct mqtt_queued_message *)mq->mem_end;
mq->curr_sz = buf == NULL ? 0 : mqtt_mq_currsz(mq);
mq->mem_end = (struct mqtt_queued_message *)(mq->mem_start + bufsz);
mq->curr = mq->mem_start;
mq->queue_tail = mq->mem_end;
mq->curr_sz = (buf == NULL) ? 0 : mqtt_mq_currsz(mq);
}

struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes)
Expand All @@ -1632,7 +1632,7 @@ struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size

/* move curr and recalculate curr_sz */
mq->curr += nbytes;
mq->curr_sz = (size_t) (mqtt_mq_currsz(mq));
mq->curr_sz = mqtt_mq_currsz(mq);

return mq->queue_tail;
}
Expand All @@ -1646,27 +1646,28 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq) {

/* check if everything can be removed */
if (new_head < mq->queue_tail) {
mq->curr = (uint8_t *)mq->mem_start;
mq->queue_tail = (struct mqtt_queued_message *)mq->mem_end;
mq->curr_sz = (size_t) (mqtt_mq_currsz(mq));
mq->curr = mq->mem_start;
mq->queue_tail = mq->mem_end;
mq->curr_sz = mqtt_mq_currsz(mq);
return;
} else if (new_head == mqtt_mq_get(mq, 0)) {
}
if (new_head == mqtt_mq_get(mq, 0)) {
/* do nothing */
return;
}

/* move buffered data */
{
size_t n = (size_t) (mq->curr - new_head->start);
size_t removing = (size_t) (new_head->start - (uint8_t*) mq->mem_start);
size_t removing = (size_t) (new_head->start - mq->mem_start);
memmove(mq->mem_start, new_head->start, n);
mq->curr = (unsigned char*)mq->mem_start + n;
mq->curr = mq->mem_start + n;


/* move queue */
{
ssize_t new_tail_idx = new_head - mq->queue_tail;
memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (size_t) ((new_tail_idx + 1)));
memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (size_t) (new_tail_idx + 1));
mq->queue_tail = mqtt_mq_get(mq, new_tail_idx);

{
Expand All @@ -1680,7 +1681,7 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq) {
}

/* get curr_sz */
mq->curr_sz = (size_t) (mqtt_mq_currsz(mq));
mq->curr_sz = mqtt_mq_currsz(mq);
}

struct mqtt_queued_message* mqtt_mq_find(const struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, const uint16_t *packet_id)
Expand All @@ -1703,7 +1704,7 @@ ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf,
const uint8_t *const start = buf;
ssize_t rv = mqtt_unpack_fixed_header(response, buf, bufsz);
if (rv <= 0) return rv;
else buf += rv;
buf += rv;
switch(response->fixed_header.control_type) {
case MQTT_CONTROL_CONNACK:
rv = mqtt_unpack_connack_response(response, buf);
Expand All @@ -1730,9 +1731,12 @@ ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf,
rv = mqtt_unpack_unsuback_response(response, buf);
break;
case MQTT_CONTROL_PINGRESP:
return rv;
/* nothing to unpack */
rv = 0;
break;
default:
return MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE;
rv = MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE;
break;
}

if (rv < 0) return rv;
Expand Down