Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion up_client_socket/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ cmake_minimum_required(VERSION 3.20.1)
project(up_client_socket VERSION 0.1.0 LANGUAGES CXX DESCRIPTION "C++ socket transport")

find_package(up-cpp REQUIRED)
find_package(protobuf REQUIRED)
find_package(up-core-api REQUIRED)
find_package(spdlog REQUIRED)
add_definitions(-DSPDLOG_FMT_EXTERNAL)
find_package(fmt REQUIRED CONFIG)
Expand All @@ -39,6 +41,8 @@ target_include_directories(${PROJECT_NAME}
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
${rapidjson_INCLUDE_DIRS}
${up-cpp_INCLUDE_DIR}
${up-core-api_INCLUDE_DIR}
${protobuf_INCLUDE_DIR}
${spdlog_INCLUDE_DIR})

set_property(TARGET ${PROJECT_NAME} PROPERTY POSITION_INDEPENDENT_CODE ON)
Expand All @@ -50,9 +54,12 @@ target_link_libraries(${PROJECT_NAME}
dl
spdlog::spdlog
up-cpp::up-cpp
up-core-api::up-core-api
protobuf::libprotobuf
fmt::fmt
rapidjson)

# Specify the install location for the library
INSTALL(TARGETS ${PROJECT_NAME})
INSTALL(DIRECTORY include DESTINATION .)
INSTALL(DIRECTORY include DESTINATION .)

73 changes: 0 additions & 73 deletions up_client_socket/cpp/conanfile.py

This file was deleted.

5 changes: 3 additions & 2 deletions up_client_socket/cpp/conanfile.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[requires]
up-core-api/1.6.0
protobuf/3.21.12
up-cpp/0.1.2-dev
up-cpp/1.0.0
rapidjson/cci.20230929
spdlog/1.13.0
fmt/10.2.1
Expand All @@ -11,4 +12,4 @@ CMakeDeps
CMakeToolchain

[layout]
cmake_layout
cmake_layout
43 changes: 43 additions & 0 deletions up_client_socket/cpp/include/SafeTupleMap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>

#include "TupleOfOptionals.h"

template <typename KEY, typename VALUE>
class SafeTupleMap {
std::unordered_map<KEY, std::shared_ptr<VALUE>,
tuple_of_optionals::hash<KEY>>
map_;
std::mutex mtx;

public:
using Key = KEY;

SafeTupleMap() = default;

std::shared_ptr<VALUE> find(const KEY& key, bool create = false) {
std::unique_lock<std::mutex> lock(mtx);
auto it = map_.find(key);
if (!create) {
return (it != map_.end()) ? it->second : nullptr;
} else {
if (it != map_.end())
return it->second;
auto ptr = std::make_shared<VALUE>();
map_.emplace(key, ptr);
return ptr;
}
}

void erase(std::function<void(std::shared_ptr<VALUE>)> fn) {
std::unique_lock<std::mutex> lock(mtx);
for (auto [key, ptr] : map_) {
fn(ptr);
}
}
};
141 changes: 19 additions & 122 deletions up_client_socket/cpp/include/SocketUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
#ifndef _SOCKET_UTRANSPORT_H_
#define _SOCKET_UTRANSPORT_H_

#include <unistd.h>
#include <up-core-api/umessage.pb.h>
#include <up-cpp/rpc/RpcClient.h>
#include <up-cpp/transport/UTransport.h>
#include <up-cpp/transport/builder/UAttributesBuilder.h>
#include <up-cpp/uri/builder/BuildUUri.h>
#include <up-cpp/utils/ThreadPool.h>
#include <up-cpp/uuid/factory/Uuidv8Factory.h>
#include <up-cpp/uuid/serializer/UuidSerializer.h>

#include <memory>
#include <string>
// #include <string_view>

/// @class SocketUTransport
/// @brief Represents a socket-based implementation of the UTransport interface
Expand All @@ -29,127 +25,28 @@
/// The SocketUTransport class provides functionality for sending messages,
/// registering and unregistering listeners, and invoking remote methods over a
/// socket connection. It inherits from the UTransport and RpcClient classes.
class SocketUTransport : public uprotocol::utransport::UTransport,
public uprotocol::rpc::RpcClient {
class SocketUTransport : public uprotocol::transport::UTransport {
public:
/// @brief Constructs a SocketUTransport object.
SocketUTransport();

/// @brief Destroys the SocketUTransport object.
~SocketUTransport();

/// UTransport API's

/// @brief Sends a UMessage over the transport.
/// @param[in] transportUMessage The UMessage to send.
/// @return The status of the send operation.
uprotocol::v1::UStatus send(
const uprotocol::utransport::UMessage& transportUMessage) override;

/// @brief Registers a listener for a specific topic.
/// @param[in] topic The topic to register the listener for.
/// @param[in] listener The listener to register.
/// @return The status of the registration operation.
uprotocol::v1::UStatus registerListener(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UListener& listener) override;

/// @brief Unregisters a listener for a specific topic.
/// @param[in] topic The topic to unregister the listener from.
/// @param[in] listener The listener to unregister.
/// @return The status of the unregistration operation.
uprotocol::v1::UStatus unregisterListener(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UListener& listener) override;

/// @brief Invokes a remote method asynchronously and returns a future for
/// the response.
/// @param[in] topic The topic of the remote method.
/// @param[in] payload The payload of the remote method.
/// @param[in] options The call options for the remote method.
/// @return A future for the response of the remote method.
std::future<uprotocol::rpc::RpcResponse> invokeMethod(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UPayload& payload,
const uprotocol::v1::CallOptions& options) override;
static constexpr const char* default_dispatcher_ip = "127.0.0.1";
static constexpr int default_dispatcher_port = 44444;

/// @brief Invokes a remote method asynchronously and registers a callback
/// for the response.
/// @param[in] topic The topic of the remote method.
/// @param[in] payload The payload of the remote method.
/// @param[in] options The call options for the remote method.
/// @param[in] callback The callback to be invoked when the response is
/// received.
/// @return The status of the invocation operation.
uprotocol::v1::UStatus invokeMethod(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UPayload& payload,
const uprotocol::v1::CallOptions& options,
const uprotocol::utransport::UListener& callback) override;
/// @brief Constructs a SocketUTransport object.
SocketUTransport(const uprotocol::v1::UUri&,
const std::string& dispatcher_ip = default_dispatcher_ip,
int dispatcher_port = default_dispatcher_port);

private:
// The IP address of the dispatcher.
constexpr static const char* DISPATCHER_IP = "127.0.0.1";
// The port number of the dispatcher.
constexpr static const int DISPATCHER_PORT = 44444;
// The maximum length of a message in bytes.
constexpr static const int BYTES_MSG_LENGTH = 32767;

static const uprotocol::v1::UUri RESPONSE_URI; // The URI for responses.
std::thread processThread; // The thread for processing messages.
std::thread timeoutThread; // The thread for handling timeouts.
int socketFd; // The file descriptor for the socket.
std::mutex mutex_; // A mutex for thread synchronization.
std::mutex mutex_promise; // A mutex for synchronizing access to promises.

// A type alias for the key used in the uriToListener map.
using uuriKey = size_t;
// A type alias for the key used in the reqidToFutureUMessage map.
using uuidStr = std::string;

// A map from URIs to listeners. Each URI can have multiple listeners.
std::unordered_map<uuriKey,
std::vector<const uprotocol::utransport::UListener*>>
uriToListener;

// A map from request IDs to futures. Each request ID corresponds to a
// future for a UMessage.
std::unordered_map<uuidStr, std::promise<uprotocol::rpc::RpcResponse>>
reqidToFutureUMessage;

/// @brief Listens for incoming messages on the socket.
void listen();

/// @brief Handles a publish message received on the socket.
/// @param[in] umsg The UMessage representing the publish message.
void handlePublishMessage(const uprotocol::v1::UMessage umsg);

/// @brief Handles a request message received on the socket.
/// @param[in] umsg The UMessage representing the request message.
void handleRequestMessage(const uprotocol::v1::UMessage umsg);
[[nodiscard]] uprotocol::v1::UStatus sendImpl(
const uprotocol::v1::UMessage& message) override;

/// @brief Handles a response message received on the socket.
/// @param[in] umsg The UMessage representing the response message.
void handleResponseMessage(const uprotocol::v1::UMessage umsg);
[[nodiscard]] uprotocol::v1::UStatus registerListenerImpl(
const uprotocol::v1::UUri& sink_filter, CallableConn&& listener,
std::optional<uprotocol::v1::UUri>&& source_filter) override;

/// @brief Notifies the registered listeners for a specific URI about a
/// received message.
/// @param[in] uri The URI of the received message.
/// @param[in] umsg The UMessage representing the received message.
void notifyListeners(const uprotocol::v1::UUri uri,
const uprotocol::v1::UMessage umsg);
void cleanupListener(CallableConn listener) override;

/// @brief Counts the timeout for a request and handles the future and
/// promise accordingly.
/// @param[in] req_id The UUID of the request.
/// @param[in] resFuture The future for the response.
/// @param[in,out] promise The promise for the response.
/// @param[in] timeout The timeout value in milliseconds.
void timeout_counter(
const uprotocol::uuid::UUID& req_id,
const std::future<uprotocol::rpc::RpcResponse>& resFuture,
std::promise<uprotocol::rpc::RpcResponse>& promise,
const std::chrono::milliseconds timeout);
struct Impl;
std::shared_ptr<Impl> pImpl;
};

#endif // _SOCKET_UTRANSPORT_H_
Loading