Skip to content

Improvements #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions include/AWSClient/AWSClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@

#include "netsocket/TLSSocket.h"
#include "platform/Callback.h"
#include "rtos/Mutex.h"

extern "C"
{
#include "core_mqtt.h"
#include "core_json.h" // Expose return value enumeration
#include "mqtt_subscription_manager.h"
}

// Undef trace group from AWS SDK logging
Expand Down Expand Up @@ -128,12 +130,10 @@ class AWSClient {
* Initializes the SDK.
* Parses the root CAs and stores them.
*
* @param subCallback Subscription callback (topic, topic length, payload, payload length).
* @param creds Credentials containing the root CA.
* @return MBED_SUCCESS on success.
*/
int init(mbed::Callback<void(const char *, uint16_t, const void *, size_t)> subCallback,
const TLSCredentials_t &creds);
int init(const TLSCredentials_t &creds);

/**
* @brief Establish the MQTT connection.
Expand Down Expand Up @@ -191,23 +191,44 @@ class AWSClient {
/**
* @brief Subscribes to a topic filter.
*
* TODO char array variant would be more efficient in some cases
*
* @param topicFilter Topic filter.
* @param topicFilterLength Length of the topic filter.
* @param qos QoS.
* @param subCallback Subscription callback.
* @param callbackTopicFilter Topic to match on callbacks (required for jobs).
* @param callbackTopicFilterLength Length of the topic to match on callbacks (required for jobs).
* @return MBED_SUCCESS on success.
*/
int subscribe(const char *topicFilter, uint16_t topicFilterLength, const MQTTQoS qos = MQTTQoS0);
int subscribe(const char *topicFilter, uint16_t topicFilterLength,
const MQTTQoS qos, SubscriptionManagerCallback_t subCallback,
const char *callbackTopicFilter, uint16_t callbackTopicFilterLength);

inline int subscribe(const char *topicFilter, uint16_t topicFilterLength,
const MQTTQoS qos, SubscriptionManagerCallback_t subCallback)
{
return subscribe(topicFilter, topicFilterLength,
qos, subCallback,
topicFilter, topicFilterLength);
}

/**
* @brief Unsubscribes from a topic filter.
*
* @param topicFilter Topic filter.
* @param topicFilterLength Length of the topic filter.
* @param qos QoS.
* @param callbackTopicFilter Topic to match on callbacks (required for jobs).
* @param callbackTopicFilterLength Length of the topic to match on callbacks (required for jobs).
* @return MBED_SUCCESS on success.
*/
int unsubscribe(const char *topicFilter, uint16_t topicFilterLength);
int unsubscribe(const char *topicFilter, uint16_t topicFilterLength, const MQTTQoS qos,
const char *callbackTopicFilter, uint16_t callbackTopicFilterLength);

inline int unsubscribe(const char *topicFilter, uint16_t topicFilterLength, const MQTTQoS qos = MQTTQoS0)
{
return unsubscribe(topicFilter, topicFilterLength,
qos, topicFilter, topicFilterLength);
}

/**
* @brief Publishes to a topic.
Expand All @@ -222,15 +243,18 @@ class AWSClient {
int publish(const char *topic, uint16_t topic_length, const void *payload, size_t payload_length, const MQTTQoS qos = MQTTQoS0);

/**
* @brief Processes all of the pending incoming messages.
* @brief Processes pending incoming messages.
*
* Also handles keepalive.
* This must be called periodically by the application.
* Triggers application callback for received subscriptions.
*
* @param once ProcessLoop called only once if true.
* If false, ProcessLoop is called until all messages are received.
*
* @return MBED_SUCCESS on success.
*/
int processResponses();
int processResponses(bool once = false);

#if MBED_CONF_AWS_CLIENT_SHADOW

Expand Down Expand Up @@ -305,6 +329,11 @@ class AWSClient {
*/
MQTTContext_t mqttContext;

/**
* @brief Mutex for thread safety.
*/
rtos::Mutex mutex;

/**
* @brief Network context provided to the SDK.
*/
Expand All @@ -327,11 +356,6 @@ class AWSClient {
*/
bool isResponseReceived;

/**
* @brief Application callback for subscription events.
*/
mbed::Callback<void(const char *, uint16_t, const void *, size_t)> subCallback;

/**
* @brief Static callback to provide to the SDK.
* Calls the application callback when a response is received
Expand All @@ -353,8 +377,6 @@ class AWSClient {

bool shadowGetAccepted;

bool shadowUpdateAccepted;

/**
* @brief Buffer for the shadow get response.
*
Expand All @@ -363,6 +385,13 @@ class AWSClient {
*/
char shadowGetResponse[MBED_CONF_AWS_CLIENT_SHADOW_GET_RESPONSE_MAX_SIZE];

/**
* @brief Subscription callback for shadow subscriptions.
*
* @param pPublishInfo Publish info from event callback.
*/
static void shadowSubscriptionCallback(MQTTPublishInfo_t *pPublishInfo);

#endif // MBED_CONF_AWS_CLIENT_SHADOW
};

Expand Down
119 changes: 119 additions & 0 deletions include/AWSClient/mqtt_subscription_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* AWS IoT Device SDK for Embedded C 202103.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

/**
* @file mqtt_subscription_manager.h
* @brief The API of a subscription manager for handling subscription callbacks
* to topic filters in MQTT operations.
*/

#ifndef MQTT_SUBSCRIPTION_MANAGER_H_
#define MQTT_SUBSCRIPTION_MANAGER_H_

#include "logging_config.h"

#undef TRACE_GROUP
#define TRACE_GROUP "SubManager"

/* Include MQTT library. */
#include "core_mqtt.h"

/* Enumeration type for return status value from Subscription Manager API. */
typedef enum SubscriptionManagerStatus
{
/**
* @brief Success return value from Subscription Manager API.
*/
SUBSCRIPTION_MANAGER_SUCCESS = 1,

/**
* @brief Failure return value due to registry being full.
*/
SUBSCRIPTION_MANAGER_REGISTRY_FULL = 2,

/**
* @brief Failure return value due to an already existing record in the
* registry for a new callback registration's requested topic filter.
*/
SUBSCRIPTION_MANAGER_RECORD_EXISTS = 3
} SubscriptionManagerStatus_t;


/**
* @brief Callback type to be registered for a topic filter with the subscription manager.
*
* For incoming PUBLISH messages received on topics that match the registered topic filter,
* the callback would be invoked by the subscription manager.
*
* @param[in] pPublishInfo The incoming PUBLISH message information.
*/
typedef void (* SubscriptionManagerCallback_t )( MQTTPublishInfo_t * pPublishInfo );

/**
* @brief Dispatches the incoming PUBLISH message to the callbacks that have their
* registered topic filters matching the incoming PUBLISH topic name. The dispatch
* handler will invoke all these callbacks with matching topic filters.
*
* @param[in] pPublishInfo The incoming PUBLISH message information.
*/
void SubscriptionManager_DispatchHandler( MQTTPublishInfo_t * pPublishInfo );

/**
* @brief Utility to register a callback for a topic filter in the subscription manager.
*
* The callback will be invoked when an incoming PUBLISH message is received on
* a topic that matches the topic filter, @a pTopicFilter. The subscription manager
* accepts wildcard topic filters.
*
* @param[in] pTopicFilter The topic filter to register the callback for.
* @param[in] topicFilterLength The length of the topic filter string.
* @param[in] callback The callback to be registered for the topic filter.
*
* @note The subscription manager does not allow more than one callback to be registered
* for the same topic filter.
* @note The passed topic filter, @a pTopicFilter, is saved in the registry.
* The application must not free or alter the content of the topic filter memory
* until the callback for the topic filter is removed from the subscription manager.
*
* @return Returns one of the following:
* - #SUBSCRIPTION_MANAGER_SUCCESS if registration of the callback is successful.
* - #SUBSCRIPTION_MANAGER_REGISTRY_FULL if the registration failed due to registry
* being already full.
* - #SUBSCRIPTION_MANAGER_RECORD_EXISTS, if a registered callback already exists for
* the requested topic filter in the subscription manager.
*/
SubscriptionManagerStatus_t SubscriptionManager_RegisterCallback( const char * pTopicFilter,
uint16_t topicFilterLength,
SubscriptionManagerCallback_t pCallback );

/**
* @brief Utility to remove the callback registered for a topic filter from the
* subscription manager.
*
* @param[in] pTopicFilter The topic filter to remove from the subscription manager.
* @param[in] topicFilterLength The length of the topic filter string.
*/
void SubscriptionManager_RemoveCallback( const char * pTopicFilter,
uint16_t topicFilterLength );


#endif /* ifndef MQTT_SUBSCRIPTION_MANAGER_H_ */
Loading