Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions cpp/include/ggl/ipc/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define GGL_IPC_CLIENT_HPP

#include <ggl/buffer.hpp>
#include <ggl/list.hpp>
#include <ggl/map.hpp>
#include <ggl/object.hpp>
#include <ggl/sdk.hpp>
Expand All @@ -23,6 +24,8 @@

namespace ggl::ipc {

class Subscription;

/// Heap-allocated object. The result of some IPC operations.
class AllocatedObject {
private:
Expand Down Expand Up @@ -63,6 +66,32 @@ class AuthToken {
static std::optional<AuthToken> from_environment() noexcept;
};

class LocalTopicCallback {
public:
virtual ~LocalTopicCallback() noexcept = default;
virtual void operator()(
std::string_view topic, ggl::Object payload, Subscription &handle
) = 0;
};

class IotTopicCallback {
public:
virtual ~IotTopicCallback() noexcept = default;
virtual void operator()(
std::string_view topic, ggl::Buffer payload, Subscription &handle
) = 0;
};

class ConfigurationUpdateCallback {
public:
virtual ~ConfigurationUpdateCallback() noexcept = default;
virtual void operator()(
std::string_view component_name,
ggl::List key_path,
Subscription &handle
) = 0;
};

class Client {
private:
constexpr Client() noexcept = default;
Expand All @@ -89,10 +118,23 @@ class Client {
std::string_view topic, const Map &json
) noexcept;

std::error_code subscribe_to_topic(
std::string_view topic,
LocalTopicCallback &callback,
Subscription *handle = nullptr
) noexcept;

std::error_code publish_to_iot_core(
std::string_view topic, Buffer bytes, uint8_t qos
) noexcept;

std::error_code subscribe_to_iot_core(
std::string_view topic_filter,
std::uint8_t qos,
IotTopicCallback &callback,
Subscription *handle = nullptr
) noexcept;

std::error_code update_config(
std::span<const Buffer> key_path,
const Object &value,
Expand Down Expand Up @@ -132,6 +174,13 @@ class Client {
std::optional<std::string_view> component_name,
bool &value
) noexcept;

std::error_code subscribe_to_configuration_update(
std::span<const Buffer> key_path,
std::optional<std::string_view> component_name,
ConfigurationUpdateCallback &callback,
Subscription *handle = nullptr
) noexcept;
};

}
Expand Down
21 changes: 4 additions & 17 deletions cpp/include/ggl/ipc/subscription.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#ifndef GGL_IPC_SUBSCRIPTION_HPP
#define GGL_IPC_SUBSCRIPTION_HPP

#include <ggl/error.hpp>
#include <ggl/ipc/client_c_api.hpp>
#include <ggl/types.hpp>
#include <cstddef>
#include <cstdint>
#include <compare>
#include <functional>
#include <utility>
Expand All @@ -13,20 +14,6 @@ void ggipc_close_subscription(GgIpcSubscriptionHandle handle) noexcept;

namespace ggl::ipc {

/// Holds a bound function associated with a subscription. Instances of
/// SubscriptionCallback must outlive their associated subscription(s) (i.e.
/// close() on the Subscription must be called before destroying its
/// SubscriptionCallback). The subscription callback function for a bound
/// subscription is called by exactly one other thread.
class SubscriptionCallback {
private:
/// TODO:
std::function<void(void)> callback;

public:
/// TODO:
};

/// Subscription handle with std::unique_ptr semantics. The underlying
/// handle has two copies; one is returned by a Client subscription method, and
/// another is passed to SubscriptionCallbacks, which are called by a
Expand Down Expand Up @@ -137,7 +124,7 @@ class [[nodiscard]] Subscription {
/// Subscription hashing
template <> class std::hash<ggl::ipc::Subscription> {
public:
constexpr std::size_t operator()(
std::size_t operator()(
const ggl::ipc::Subscription &subscription
) const noexcept {
return std::hash<std::uint32_t> {}(subscription.get().val);
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/ggl/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ typedef enum {
GGL_TYPE_MAP,
} GglObjectType;

typedef struct {
uint32_t val;
} GgIpcSubscriptionHandle;

enum class GglComponentState {
RUNNING,
ERRORED
Expand Down
19 changes: 15 additions & 4 deletions cpp/priv_include/ggl/ipc/client_c_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ GglError ggipc_connect_with_token(
GglBuffer socket_path, GglBuffer auth_token
) noexcept;

typedef struct {
uint32_t val;
} GgIpcSubscriptionHandle;

GglError ggipc_publish_to_topic_json(GglBuffer topic, GglMap payload) noexcept;

GglError ggipc_publish_to_topic_binary(
Expand Down Expand Up @@ -84,6 +80,21 @@ GglError ggipc_update_config(
GglError ggipc_update_state(GglComponentState state) noexcept;

GglError ggipc_restart_component(GglBuffer component_name);

typedef void GgIpcSubscribeToConfigurationUpdateCallback(
void *ctx,
GglBuffer component_name,
GglList key_path,
GgIpcSubscriptionHandle handle
);

GglError ggipc_subscribe_to_configuration_update(
const GglBuffer *component_name,
GglBufList key_path,
GgIpcSubscribeToConfigurationUpdateCallback *callback,
void *ctx,
GgIpcSubscriptionHandle *handle
);
}

#endif
71 changes: 71 additions & 0 deletions cpp/samples/pubsub_cpp/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <ggl/buffer.hpp>
#include <ggl/ipc/client.hpp>
#include <ggl/object.hpp>
#include <ggl/types.hpp>
#include <chrono>
#include <iostream>
#include <string_view>
#include <system_error>
#include <thread>

namespace {
std::ostream &operator<<(std::ostream &os, const ggl::Buffer &buffer) {
os.write(
reinterpret_cast<const char *>(buffer.data()),
static_cast<std::streamsize>(buffer.size())
);
return os;
}
}

class PubsubHandler : public ggl::ipc::LocalTopicCallback {
void operator()(
std::string_view topic,
ggl::Object payload,
ggl::ipc::Subscription &handle
) override {
(void) handle;
std::cout << "Message received on " << topic << "\n";
if (payload.index() == GGL_TYPE_MAP) {
std::cout << "(ggl::Map of unknown schema)\n";
return;
}
std::cout << get<ggl::Buffer>(payload) << '\n';
}
};

int main(int argc, char *argv[]) {
if (argc < 3) {
std::cerr << "Too few args\n";
return 1;
}
std::string_view topic = argv[1];
std::string_view payload = argv[2];

auto &client = ggl::ipc::Client::get();

auto error = client.connect();

if (error) {
std::cerr << "Failed to connect (" << error << ")\n";
return 1;
}

// handlers must be static lifetime if subscription handle is not held
static PubsubHandler handler;
error = client.subscribe_to_topic(topic, handler);

std::cout << "Attempting to publish to local topic: \"" << topic << "\"\n";

while (true) {
error = client.publish_to_topic(topic, payload);

if (error) {
std::cerr << "Failed to publish to local topic (" << error << ")\n";
return 1;
}

using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
}
}
75 changes: 75 additions & 0 deletions cpp/src/ipc/subscribe_to_configuration_update.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <ggl/buffer.hpp>
#include <ggl/error.hpp>
#include <ggl/ipc/client.hpp>
#include <ggl/ipc/client_c_api.hpp>
#include <ggl/ipc/subscription.hpp>
#include <ggl/types.hpp>
#include <exception>
#include <functional>
#include <iostream>
#include <optional>
#include <source_location>
#include <span>
#include <string_view>
#include <system_error>

namespace ggl::ipc {
extern "C" {
namespace {
void subscribe_to_configuration_update_callback(
void *ctx,
GglBuffer component_name,
GglList key_path,
GgIpcSubscriptionHandle handle
) noexcept try {
Subscription locked { handle };
std::invoke(
*static_cast<ConfigurationUpdateCallback *>(ctx),
std::string_view { reinterpret_cast<char *>(component_name.data),
component_name.len },
key_path,
locked
);
(void) locked.release();
} catch (const std::exception &e) {
std::cerr << "Exception caught in "
<< std::source_location {}.function_name() << '\n'
<< e.what() << '\n';
} catch (...) {
std::cerr << "Exception caught in "
<< std::source_location {}.function_name() << '\n';
}
}
}

// singleton interface class.
// NOLINTBEGIN(readability-convert-member-functions-to-static)

std::error_code Client::subscribe_to_configuration_update(
std::span<const Buffer> key_path,
std::optional<std::string_view> component_name,
ConfigurationUpdateCallback &callback,
Subscription *handle
) noexcept {
ggl::Buffer component_name_buf {
component_name.value_or(std::string_view {})
};

GgIpcSubscriptionHandle raw_handle;
GglError ret = ggipc_subscribe_to_configuration_update(
component_name.has_value() ? &component_name_buf : nullptr,
GglBufList { .bufs = const_cast<ggl::Buffer *>(key_path.data()),
.len = key_path.size() },
subscribe_to_configuration_update_callback,
&callback,
(handle != nullptr) ? &raw_handle : nullptr
);
if ((handle != nullptr) && (ret == GGL_ERR_OK)) {
handle->reset(raw_handle);
}
return ret;
}

// NOLINTEND(readability-convert-member-functions-to-static)

}
69 changes: 69 additions & 0 deletions cpp/src/ipc/subscribe_to_iot_core.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <ggl/buffer.hpp>
#include <ggl/error.hpp>
#include <ggl/ipc/client.hpp>
#include <ggl/ipc/client_c_api.hpp>
#include <ggl/ipc/subscription.hpp>
#include <ggl/types.hpp>
#include <cstdint>
#include <exception>
#include <functional>
#include <iostream>
#include <source_location>
#include <string_view>
#include <system_error>

namespace ggl::ipc {
extern "C" {
namespace {
void subscribe_to_iot_core_callback(
void *ctx,
GglBuffer topic,
GglBuffer payload,
GgIpcSubscriptionHandle handle
) noexcept try {
Subscription locked { handle };
std::invoke(
*static_cast<IotTopicCallback *>(ctx),
std::string_view { reinterpret_cast<char *>(topic.data),
topic.len },
payload,
locked
);
(void) locked.release();
} catch (const std::exception &e) {
std::cerr << "Exception caught in "
<< std::source_location {}.function_name() << '\n'
<< e.what() << '\n';
} catch (...) {
std::cerr << "Exception caught in "
<< std::source_location {}.function_name() << '\n';
}
}
}

// singleton interface class.
// NOLINTBEGIN(readability-convert-member-functions-to-static)

std::error_code Client::subscribe_to_iot_core(
std::string_view topic_filter,
std::uint8_t qos,
IotTopicCallback &callback,
Subscription *handle
) noexcept {
GgIpcSubscriptionHandle raw_handle;
GglError ret = ggipc_subscribe_to_iot_core(
ggl::Buffer { topic_filter },
qos,
subscribe_to_iot_core_callback,
&callback,
(handle != nullptr) ? &raw_handle : nullptr
);
if ((handle != nullptr) && (ret == GGL_ERR_OK)) {
handle->reset(raw_handle);
}
return ret;
}

// NOLINTEND(readability-convert-member-functions-to-static)

}
Loading