From c178729fe5c4b6bc813a37b4de8e9cf05d20f43b Mon Sep 17 00:00:00 2001 From: TzuHuanTai Date: Mon, 19 May 2025 22:59:35 -0700 Subject: [PATCH 1/3] chore: add livekit protocol v1.38.0 as submodule --- .gitmodules | 3 +++ external/livekit-protocol | 1 + 2 files changed, 4 insertions(+) create mode 100644 .gitmodules create mode 160000 external/livekit-protocol diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..087f4d6 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "external/livekit-protocol"] + path = external/livekit-protocol + url = https://github.com/livekit/protocol.git diff --git a/external/livekit-protocol b/external/livekit-protocol new file mode 160000 index 0000000..499c17c --- /dev/null +++ b/external/livekit-protocol @@ -0,0 +1 @@ +Subproject commit 499c17c48063582ac2af0a021827fab18356cc29 From 3bd661b9496c5683cda29be338e240695fdd30ec Mon Sep 17 00:00:00 2001 From: TzuHuanTai Date: Sat, 24 May 2025 03:49:33 -0700 Subject: [PATCH 2/3] refactor: rename data channel class --- src/rtc/conductor.cpp | 16 +++---- src/rtc/conductor.h | 8 ++-- ...ta_channel_subject.cpp => rtc_channel.cpp} | 46 +++++++++---------- .../{data_channel_subject.h => rtc_channel.h} | 12 ++--- src/rtc/rtc_peer.cpp | 4 +- src/rtc/rtc_peer.h | 10 ++-- 6 files changed, 48 insertions(+), 48 deletions(-) rename src/rtc/{data_channel_subject.cpp => rtc_channel.cpp} (79%) rename src/rtc/{data_channel_subject.h => rtc_channel.h} (88%) diff --git a/src/rtc/conductor.cpp b/src/rtc/conductor.cpp index 9fd493f..74b71b7 100644 --- a/src/rtc/conductor.cpp +++ b/src/rtc/conductor.cpp @@ -151,22 +151,22 @@ rtc::scoped_refptr Conductor::CreatePeerConnection(PeerConfig config) { auto cmd_channel = peer->CreateDataChannel(ChannelMode::Command); cmd_channel->RegisterHandler( CommandType::SNAPSHOT, - [this](std::shared_ptr datachannel, const std::string &msg) { + [this](std::shared_ptr datachannel, const std::string &msg) { OnSnapshot(datachannel, msg); }); cmd_channel->RegisterHandler( CommandType::METADATA, - [this](std::shared_ptr datachannel, const std::string &msg) { + [this](std::shared_ptr datachannel, const std::string &msg) { OnMetadata(datachannel, msg); }); cmd_channel->RegisterHandler( CommandType::RECORDING, - [this](std::shared_ptr datachannel, const std::string &msg) { + [this](std::shared_ptr datachannel, const std::string &msg) { OnRecord(datachannel, msg); }); cmd_channel->RegisterHandler( CommandType::CAMERA_OPTION, - [this](std::shared_ptr datachannel, const std::string &msg) { + [this](std::shared_ptr datachannel, const std::string &msg) { OnCameraOption(datachannel, msg); }); @@ -176,7 +176,7 @@ rtc::scoped_refptr Conductor::CreatePeerConnection(PeerConfig config) { return peer; } -void Conductor::OnSnapshot(std::shared_ptr datachannel, +void Conductor::OnSnapshot(std::shared_ptr datachannel, const std::string &msg) { try { std::stringstream ss(msg); @@ -193,7 +193,7 @@ void Conductor::OnSnapshot(std::shared_ptr datachannel, } } -void Conductor::OnMetadata(std::shared_ptr datachannel, +void Conductor::OnMetadata(std::shared_ptr datachannel, const std::string &msg) { DEBUG_PRINT("OnMetadata msg: %s", msg.c_str()); json jsonObj = json::parse(msg.c_str()); @@ -225,7 +225,7 @@ void Conductor::OnMetadata(std::shared_ptr datachannel, } } -void Conductor::OnRecord(std::shared_ptr datachannel, const std::string &path) { +void Conductor::OnRecord(std::shared_ptr datachannel, const std::string &path) { if (args.record_path.empty()) { return; } @@ -242,7 +242,7 @@ void Conductor::OnRecord(std::shared_ptr datachannel, const } } -void Conductor::OnCameraOption(std::shared_ptr datachannel, +void Conductor::OnCameraOption(std::shared_ptr datachannel, const std::string &msg) { DEBUG_PRINT("OnCameraControl msg: %s", msg.c_str()); json jsonObj = json::parse(msg.c_str()); diff --git a/src/rtc/conductor.h b/src/rtc/conductor.h index d6bd3d6..64cdf89 100644 --- a/src/rtc/conductor.h +++ b/src/rtc/conductor.h @@ -37,10 +37,10 @@ class Conductor { void SetupIpcDataChannel(rtc::scoped_refptr peer, ChannelMode mode); void AddTracks(rtc::scoped_refptr peer_connection); - void OnSnapshot(std::shared_ptr datachannel, const std::string &msg); - void OnMetadata(std::shared_ptr datachannel, const std::string &path); - void OnRecord(std::shared_ptr datachannel, const std::string &path); - void OnCameraOption(std::shared_ptr datachannel, const std::string &msg); + void OnSnapshot(std::shared_ptr datachannel, const std::string &msg); + void OnMetadata(std::shared_ptr datachannel, const std::string &path); + void OnRecord(std::shared_ptr datachannel, const std::string &path); + void OnCameraOption(std::shared_ptr datachannel, const std::string &msg); std::unique_ptr network_thread_; std::unique_ptr worker_thread_; diff --git a/src/rtc/data_channel_subject.cpp b/src/rtc/rtc_channel.cpp similarity index 79% rename from src/rtc/data_channel_subject.cpp rename to src/rtc/rtc_channel.cpp index aeb5be8..663b863 100644 --- a/src/rtc/data_channel_subject.cpp +++ b/src/rtc/rtc_channel.cpp @@ -1,34 +1,34 @@ -#include "rtc/data_channel_subject.h" +#include "rtc/rtc_channel.h" #include "common/logging.h" const int CHUNK_SIZE = 65536; -std::shared_ptr -DataChannelSubject::Create(rtc::scoped_refptr data_channel) { - return std::make_shared(std::move(data_channel)); +std::shared_ptr +RtcChannel::Create(rtc::scoped_refptr data_channel) { + return std::make_shared(std::move(data_channel)); } -DataChannelSubject::DataChannelSubject( +RtcChannel::RtcChannel( rtc::scoped_refptr data_channel) { data_channel_ = std::move(data_channel); label_ = data_channel_->label(); data_channel_->RegisterObserver(this); } -DataChannelSubject::~DataChannelSubject() { +RtcChannel::~RtcChannel() { DEBUG_PRINT("datachannel (%s) is released!", label_.c_str()); } -std::string DataChannelSubject::label() const { return label_; } +std::string RtcChannel::label() const { return label_; } -void DataChannelSubject::OnStateChange() { +void RtcChannel::OnStateChange() { webrtc::DataChannelInterface::DataState state = data_channel_->state(); DEBUG_PRINT("[%s] OnStateChange => %s", data_channel_->label().c_str(), webrtc::DataChannelInterface::DataStateString(state)); } -void DataChannelSubject::Terminate() { +void RtcChannel::Terminate() { UnSubscribe(); data_channel_->UnregisterObserver(); data_channel_->Close(); @@ -37,11 +37,11 @@ void DataChannelSubject::Terminate() { } } -void DataChannelSubject::OnClosed(std::function func) { +void RtcChannel::OnClosed(std::function func) { on_closed_func_ = std::move(func); } -void DataChannelSubject::OnMessage(const webrtc::DataBuffer &buffer) { +void RtcChannel::OnMessage(const webrtc::DataBuffer &buffer) { const uint8_t *data = buffer.data.data(); size_t length = buffer.data.size(); std::string message(reinterpret_cast(data), length); @@ -49,7 +49,7 @@ void DataChannelSubject::OnMessage(const webrtc::DataBuffer &buffer) { Next(message); } -void DataChannelSubject::RegisterHandler(CommandType type, ChannelCommandHandler func) { +void RtcChannel::RegisterHandler(CommandType type, ChannelCommandHandler func) { auto observer = AsObservable(type); observer->Subscribe([self = shared_from_this(), func](std::string message) { if (!message.empty()) { @@ -58,7 +58,7 @@ void DataChannelSubject::RegisterHandler(CommandType type, ChannelCommandHandler }); } -void DataChannelSubject::RegisterHandler(CommandType type, PayloadHandler func) { +void RtcChannel::RegisterHandler(CommandType type, PayloadHandler func) { auto observer = AsObservable(type); observer->Subscribe([func](std::string message) { if (!message.empty()) { @@ -67,7 +67,7 @@ void DataChannelSubject::RegisterHandler(CommandType type, PayloadHandler func) }); } -void DataChannelSubject::Next(std::string message) { +void RtcChannel::Next(std::string message) { try { json jsonObj = json::parse(message.c_str()); @@ -92,26 +92,26 @@ void DataChannelSubject::Next(std::string message) { } } -std::shared_ptr> DataChannelSubject::AsObservable() { +std::shared_ptr> RtcChannel::AsObservable() { auto observer = std::make_shared>(); observers_map_[CommandType::CUSTOM].push_back(observer); return observer; } -std::shared_ptr> DataChannelSubject::AsObservable(CommandType type) { +std::shared_ptr> RtcChannel::AsObservable(CommandType type) { auto observer = std::make_shared>(); observers_map_[type].push_back(observer); return observer; } -void DataChannelSubject::UnSubscribe() { +void RtcChannel::UnSubscribe() { for (auto &[type, observers] : observers_map_) { observers.clear(); } observers_map_.clear(); } -void DataChannelSubject::Send(CommandType type, const uint8_t *data, size_t size) { +void RtcChannel::Send(CommandType type, const uint8_t *data, size_t size) { int bytes_read = 0; const size_t header_size = sizeof(CommandType); @@ -139,7 +139,7 @@ void DataChannelSubject::Send(CommandType type, const uint8_t *data, size_t size } } -void DataChannelSubject::Send(const uint8_t *data, size_t size) { +void RtcChannel::Send(const uint8_t *data, size_t size) { if (data_channel_->state() != webrtc::DataChannelInterface::kOpen) { return; } @@ -148,7 +148,7 @@ void DataChannelSubject::Send(const uint8_t *data, size_t size) { data_channel_->Send(data_buffer); } -void DataChannelSubject::Send(MetaMessage metadata) { +void RtcChannel::Send(MetaMessage metadata) { if (metadata.path.empty()) { return; } @@ -164,7 +164,7 @@ void DataChannelSubject::Send(MetaMessage metadata) { Send(type, nullptr, 0); } -void DataChannelSubject::Send(Buffer image) { +void RtcChannel::Send(Buffer image) { auto type = CommandType::SNAPSHOT; const int file_size = image.length; std::string size_str = std::to_string(file_size); @@ -175,7 +175,7 @@ void DataChannelSubject::Send(Buffer image) { DEBUG_PRINT("Image sent: %d bytes", file_size); } -void DataChannelSubject::Send(std::ifstream &file) { +void RtcChannel::Send(std::ifstream &file) { std::vector buffer(CHUNK_SIZE); int bytes_read = 0; int count = 0; @@ -205,7 +205,7 @@ void DataChannelSubject::Send(std::ifstream &file) { Send(type, nullptr, 0); } -void DataChannelSubject::Send(const std::string &message) { +void RtcChannel::Send(const std::string &message) { auto type = CommandType::CUSTOM; auto body = message; int body_size = message.length(); diff --git a/src/rtc/data_channel_subject.h b/src/rtc/rtc_channel.h similarity index 88% rename from src/rtc/data_channel_subject.h rename to src/rtc/rtc_channel.h index 0648730..4f17870 100644 --- a/src/rtc/data_channel_subject.h +++ b/src/rtc/rtc_channel.h @@ -70,19 +70,19 @@ struct MetaMessage { } }; -class DataChannelSubject : public webrtc::DataChannelObserver, +class RtcChannel : public webrtc::DataChannelObserver, public Subject, - public std::enable_shared_from_this { + public std::enable_shared_from_this { public: using ChannelCommandHandler = - std::function, const std::string &)>; + std::function, const std::string &)>; using PayloadHandler = std::function; - static std::shared_ptr + static std::shared_ptr Create(rtc::scoped_refptr data_channel); - DataChannelSubject(rtc::scoped_refptr data_channel); - ~DataChannelSubject(); + RtcChannel(rtc::scoped_refptr data_channel); + ~RtcChannel(); std::string label() const; diff --git a/src/rtc/rtc_peer.cpp b/src/rtc/rtc_peer.cpp index ac7ef31..240b798 100644 --- a/src/rtc/rtc_peer.cpp +++ b/src/rtc/rtc_peer.cpp @@ -70,7 +70,7 @@ void RtcPeer::SetPeer(rtc::scoped_refptr peer) rtc::scoped_refptr RtcPeer::GetPeer() { return peer_connection_; } -std::shared_ptr RtcPeer::CreateDataChannel(ChannelMode mode) { +std::shared_ptr RtcPeer::CreateDataChannel(ChannelMode mode) { struct webrtc::DataChannelInit init; init.ordered = true; init.id = static_cast(mode); @@ -89,7 +89,7 @@ std::shared_ptr RtcPeer::CreateDataChannel(ChannelMode mode) return nullptr; } - auto channel = DataChannelSubject::Create(result.MoveValue()); + auto channel = RtcChannel::Create(result.MoveValue()); if (mode == ChannelMode::Command) { DEBUG_PRINT("The Command data channel is established successfully."); diff --git a/src/rtc/rtc_peer.h b/src/rtc/rtc_peer.h index f04d152..e7f8058 100644 --- a/src/rtc/rtc_peer.h +++ b/src/rtc/rtc_peer.h @@ -10,7 +10,7 @@ #include "args.h" #include "common/logging.h" -#include "rtc/data_channel_subject.h" +#include "rtc/rtc_channel.h" enum ChannelMode { Command, @@ -108,7 +108,7 @@ class RtcPeer : public webrtc::PeerConnectionObserver, void SetSink(rtc::VideoSinkInterface *video_sink_obj); void SetPeer(rtc::scoped_refptr peer); rtc::scoped_refptr GetPeer(); - std::shared_ptr CreateDataChannel(ChannelMode mode); + std::shared_ptr CreateDataChannel(ChannelMode mode); std::string RestartIce(std::string ice_ufrag, std::string ice_pwd); // SignalingMessageObserver implementation. @@ -146,9 +146,9 @@ class RtcPeer : public webrtc::PeerConnectionObserver, webrtc::PeerConnectionInterface::SignalingState signaling_state_; std::unique_ptr modified_desc_; - std::shared_ptr cmd_channel_; - std::shared_ptr lossy_channel_; - std::shared_ptr reliable_channel_; + std::shared_ptr cmd_channel_; + std::shared_ptr lossy_channel_; + std::shared_ptr reliable_channel_; rtc::scoped_refptr peer_connection_; rtc::VideoSinkInterface *custom_video_sink_; }; From b6a1073a1338baec11e18312f272367552a1e411 Mon Sep 17 00:00:00 2001 From: TzuHuanTai Date: Sat, 24 May 2025 05:25:09 -0700 Subject: [PATCH 3/3] feat: relay ipc msg via sfu datachannel --- .gitignore | 2 +- .vscode/c_cpp_properties.json | 20 ++--- doc/BUILD.md | 2 +- example/unix_socket_client.py | 0 scripts/gen_proto.sh | 8 ++ src/args.h | 2 + src/parser.cpp | 10 +++ src/rtc/CMakeLists.txt | 50 ++++++++++++- src/rtc/conductor.cpp | 109 ++++++++++++++++++++-------- src/rtc/conductor.h | 7 +- src/rtc/rtc_channel.cpp | 38 +++++----- src/rtc/rtc_channel.h | 19 +++-- src/rtc/rtc_peer.cpp | 54 +++++++++++--- src/rtc/rtc_peer.h | 17 ++++- src/rtc/sfu_channel.cpp | 63 ++++++++++++++++ src/rtc/sfu_channel.h | 35 +++++++++ src/signaling/mqtt_service.cpp | 8 +- src/signaling/signaling_service.h | 6 +- src/signaling/websocket_service.cpp | 22 +++--- 19 files changed, 369 insertions(+), 103 deletions(-) mode change 100644 => 100755 example/unix_socket_client.py create mode 100755 scripts/gen_proto.sh create mode 100644 src/rtc/sfu_channel.cpp create mode 100644 src/rtc/sfu_channel.h diff --git a/.gitignore b/.gitignore index b9bf34b..d41abfb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -node_modules/ *.o *.tmp *.jpg @@ -13,5 +12,6 @@ Makefile # folder build +proto !/doc/*.jpg diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 498d10a..71f0471 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -3,17 +3,19 @@ { "name": "Linux", "includePath": [ - "${workspaceFolder}/**", + "${workspaceFolder}/src", + "${workspaceFolder}/proto", + "${workspaceFolder}/external/livekit-protocol/protobufs", "/usr/include", "/usr/local/include", - "/usr/include/libcamera/**", - "/usr/local/include/webrtc/*", - "/usr/local/include/webrtc/third_party/abseil-cpp/*", - "/usr/local/include/webrtc/third_party/libyuv/include/*", - "/usr/local/include/webrtc/tools/json_schema_compiler/*", - "/usr/local/include/webrtc/third_party/boringssl/src/include/*", - "/usr/include/boost/program_options/*", - "${workspaceFolder}/src" + "/usr/include/libcamera", + "/usr/local/include/webrtc", + "/usr/local/include/webrtc/third_party/abseil-cpp", + "/usr/local/include/webrtc/third_party/libyuv/include", + "/usr/local/include/webrtc/tools/json_schema_compiler", + "/usr/local/include/webrtc/third_party/boringssl/src/include", + "/usr/include/boost/program_options", + "${workspaceFolder}" ], "defines": [], "compilerPath": "/usr/bin/clang", diff --git a/doc/BUILD.md b/doc/BUILD.md index 9b0ec76..d2451f8 100644 --- a/doc/BUILD.md +++ b/doc/BUILD.md @@ -8,7 +8,7 @@ * Install the lib from official repo [[tutorial](https://repo.mosquitto.org/debian/README.txt)]. (recommended) 4. Install essential packages ```bash - sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev + sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev protobuf-compiler libprotobuf-dev ``` 5. Copy the [nlohmann/json.hpp](https://github.com/nlohmann/json/blob/develop/single_include/nlohmann/json.hpp) to `/usr/local/include` ```bash diff --git a/example/unix_socket_client.py b/example/unix_socket_client.py old mode 100644 new mode 100755 diff --git a/scripts/gen_proto.sh b/scripts/gen_proto.sh new file mode 100755 index 0000000..fbed6e0 --- /dev/null +++ b/scripts/gen_proto.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +PROTO_SRC_DIR=external/livekit-protocol/protobufs +PROTO_OUT_DIR=proto + +mkdir -p $PROTO_OUT_DIR + +protoc -I=$PROTO_SRC_DIR --cpp_out=$PROTO_OUT_DIR $PROTO_SRC_DIR/*.proto diff --git a/src/args.h b/src/args.h index 6d4eebd..5f79a73 100644 --- a/src/args.h +++ b/src/args.h @@ -105,6 +105,8 @@ struct Args { // ipc bool enable_ipc = false; std::string socket_path = "/tmp/pi-webrtc-ipc.sock"; + std::string ipc_channel = "both"; + int ipc_channel_mode = -1; // webrtc int jpeg_quality = 30; diff --git a/src/parser.cpp b/src/parser.cpp index bfe2646..73e3ca7 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -16,6 +16,12 @@ static const std::unordered_map v4l2_fmt_table = { {"yuyv", V4L2_PIX_FMT_YUYV}, }; +static const std::unordered_map ipc_mode_table = { + {"both", -1}, + {"lossy", ChannelMode::Lossy}, + {"reliable", ChannelMode::Reliable}, +}; + static const std::unordered_map ae_metering_table = { {"centre", libcamera::controls::MeteringCentreWeighted}, {"spot", libcamera::controls::MeteringSpot}, @@ -144,6 +150,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) { "the output resolution will remain fixed regardless of network or device conditions.") ("enable-ipc", bpo::bool_switch(&args.enable_ipc)->default_value(args.enable_ipc), "Enable IPC relay using a WebRTC DataChannel, lossy (UDP-like) or reliable (TCP-like) based on client preference.") + ("ipc-channel", bpo::value(&args.ipc_channel)->default_value(args.ipc_channel), + "IPC channel mode: both, lossy, reliable") ("socket-path", bpo::value(&args.socket_path)->default_value(args.socket_path), "Specifies the Unix domain socket path used to bridge messages between " "the WebRTC DataChannel and local IPC applications.") @@ -254,6 +262,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) { args.jpeg_quality = std::clamp(args.jpeg_quality, 0, 100); + args.ipc_channel_mode = ParseEnum(ipc_mode_table, args.ipc_channel); + ParseDevice(args); } diff --git a/src/rtc/CMakeLists.txt b/src/rtc/CMakeLists.txt index 2ceb698..8f935d0 100644 --- a/src/rtc/CMakeLists.txt +++ b/src/rtc/CMakeLists.txt @@ -1,7 +1,53 @@ project(rtc) +find_package(Protobuf REQUIRED) + +# path of the proto files +set(PROTO_SRC_DIR "${CMAKE_SOURCE_DIR}/external/livekit-protocol/protobufs") +set(PROTO_OUT_DIR "${CMAKE_SOURCE_DIR}/proto") + +file(MAKE_DIRECTORY ${PROTO_OUT_DIR}) + +# collect all .proto +file(GLOB PROTO_FILES "${PROTO_SRC_DIR}/*.proto") + +# generate `*.pb.cc` and `*.pb.h` to `proto/` +foreach(proto_file ${PROTO_FILES}) + get_filename_component(proto_name ${proto_file} NAME_WE) + + set(proto_cc "${PROTO_OUT_DIR}/${proto_name}.pb.cc") + set(proto_h "${PROTO_OUT_DIR}/${proto_name}.pb.h") + + add_custom_command( + OUTPUT ${proto_cc} ${proto_h} + COMMAND ${Protobuf_PROTOC_EXECUTABLE} + ARGS --cpp_out=${PROTO_OUT_DIR} + -I ${PROTO_SRC_DIR} + ${proto_file} + DEPENDS ${proto_file} + ) + + list(APPEND PROTO_SRCS ${proto_cc}) + list(APPEND PROTO_HDRS ${proto_h}) +endforeach() + aux_source_directory(${PROJECT_SOURCE_DIR} RTC_FILES) -add_library(${PROJECT_NAME} ${RTC_FILES}) +add_library(${PROJECT_NAME} + ${RTC_FILES} + ${PROTO_SRCS} +) + +target_include_directories(${PROJECT_NAME} PUBLIC + ${CMAKE_SOURCE_DIR} + ${PROTO_OUT_DIR} +) -target_link_libraries(${PROJECT_NAME} PUBLIC common track capturer v4l2_codecs ipc) +target_link_libraries(${PROJECT_NAME} PUBLIC + common + track + capturer + v4l2_codecs + ipc + protobuf::libprotobuf +) diff --git a/src/rtc/conductor.cpp b/src/rtc/conductor.cpp index 74b71b7..b774076 100644 --- a/src/rtc/conductor.cpp +++ b/src/rtc/conductor.cpp @@ -137,17 +137,49 @@ rtc::scoped_refptr Conductor::CreatePeerConnection(PeerConfig config) { peer->SetPeer(result.MoveValue()); - if (config.is_sfu_peer) { - if (!config.is_publisher) { - return peer; + InitializeDataChannels(peer); + + AddTracks(peer->GetPeer()); + + DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str()); + return peer; +} + +void Conductor::InitializeDataChannels(rtc::scoped_refptr peer) { + if (peer->isSfuPeer() && !peer->isPublisher()) { + peer->SetOnDataChannelCallback([this](std::shared_ptr channel) { + DEBUG_PRINT("Remote channel (%s) from sfu subscriber peer [%s]", + channel->label().c_str(), channel->id().c_str()); + BindDataChannelToIpcReceiver(channel); + }); + return; + } + + // Essential data channels(Lossy / Reliable / Command) + auto lossy_channel = peer->CreateDataChannel(ChannelMode::Lossy); + auto reliable_channel = peer->CreateDataChannel(ChannelMode::Reliable); + + if (args.enable_ipc) { + switch (args.ipc_channel_mode) { + case ChannelMode::Lossy: + BindIpcToDataChannel(lossy_channel); + break; + case ChannelMode::Reliable: + BindIpcToDataChannel(reliable_channel); + break; + default: + BindIpcToDataChannel(lossy_channel); + BindIpcToDataChannel(reliable_channel); + break; } - peer->CreateDataChannel(ChannelMode::Lossy); - peer->CreateDataChannel(ChannelMode::Reliable); - } else if (args.enable_ipc) { - SetupIpcDataChannel(peer, ChannelMode::Lossy); - SetupIpcDataChannel(peer, ChannelMode::Reliable); } + if (!peer->isSfuPeer()) { + InitializeCommandChannel(peer); + } +} + +void Conductor::InitializeCommandChannel(rtc::scoped_refptr peer) { auto cmd_channel = peer->CreateDataChannel(ChannelMode::Command); cmd_channel->RegisterHandler( CommandType::SNAPSHOT, @@ -169,15 +201,9 @@ rtc::scoped_refptr Conductor::CreatePeerConnection(PeerConfig config) { [this](std::shared_ptr datachannel, const std::string &msg) { OnCameraOption(datachannel, msg); }); - - AddTracks(peer->GetPeer()); - - DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str()); - return peer; } -void Conductor::OnSnapshot(std::shared_ptr datachannel, - const std::string &msg) { +void Conductor::OnSnapshot(std::shared_ptr datachannel, const std::string &msg) { try { std::stringstream ss(msg); int num; @@ -193,8 +219,7 @@ void Conductor::OnSnapshot(std::shared_ptr datachannel, } } -void Conductor::OnMetadata(std::shared_ptr datachannel, - const std::string &msg) { +void Conductor::OnMetadata(std::shared_ptr datachannel, const std::string &msg) { DEBUG_PRINT("OnMetadata msg: %s", msg.c_str()); json jsonObj = json::parse(msg.c_str()); @@ -242,8 +267,7 @@ void Conductor::OnRecord(std::shared_ptr datachannel, const std::str } } -void Conductor::OnCameraOption(std::shared_ptr datachannel, - const std::string &msg) { +void Conductor::OnCameraOption(std::shared_ptr datachannel, const std::string &msg) { DEBUG_PRINT("OnCameraControl msg: %s", msg.c_str()); json jsonObj = json::parse(msg.c_str()); @@ -325,19 +349,40 @@ void Conductor::InitializeIpcServer() { } } -void Conductor::SetupIpcDataChannel(rtc::scoped_refptr peer, ChannelMode mode) { - auto channel = peer->CreateDataChannel(mode); - if (channel && ipc_server_) { - ipc_server_->RegisterPeerCallback(channel->label(), [channel](const std::string &msg) { - channel->Send(msg); - }); - - channel->OnClosed([this](const std::string &label) { - ipc_server_->UnregisterPeerCallback(label); - }); +void Conductor::BindIpcToDataChannel(std::shared_ptr channel) { + BindIpcToDataChannelSender(channel); + BindDataChannelToIpcReceiver(channel); +} - channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) { - ipc_server_->Write(msg); - }); +void Conductor::BindIpcToDataChannelSender(std::shared_ptr channel) { + if (!channel || !ipc_server_) { + ERROR_PRINT("IPC or DataChannel is not found!"); + return; } + + const auto id = channel->id(); + const auto label = channel->label(); + + ipc_server_->RegisterPeerCallback(id, [channel](const std::string &msg) { + channel->Send(msg); + }); + DEBUG_PRINT("[%s] DataChannel (%s) registered to IPC server for sending.", id.c_str(), + label.c_str()); + + channel->OnClosed([this, id, label]() { + ipc_server_->UnregisterPeerCallback(id); + DEBUG_PRINT("[%s] DataChannel (%s) unregistered from IPC server.", id.c_str(), + label.c_str()); + }); +} + +void Conductor::BindDataChannelToIpcReceiver(std::shared_ptr channel) { + if (!channel || !ipc_server_) + return; + + channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) { + ipc_server_->Write(msg); + }); + DEBUG_PRINT("DataChannel (%s) connected to IPC server for receiving.", + channel->label().c_str()); } diff --git a/src/rtc/conductor.h b/src/rtc/conductor.h index 64cdf89..20476ca 100644 --- a/src/rtc/conductor.h +++ b/src/rtc/conductor.h @@ -34,8 +34,13 @@ class Conductor { void InitializePeerConnectionFactory(); void InitializeTracks(); void InitializeIpcServer(); + void InitializeDataChannels(rtc::scoped_refptr peer); + void InitializeCommandChannel(rtc::scoped_refptr peer); + + void BindIpcToDataChannel(std::shared_ptr channel); + void BindIpcToDataChannelSender(std::shared_ptr channel); + void BindDataChannelToIpcReceiver(std::shared_ptr channel); - void SetupIpcDataChannel(rtc::scoped_refptr peer, ChannelMode mode); void AddTracks(rtc::scoped_refptr peer_connection); void OnSnapshot(std::shared_ptr datachannel, const std::string &msg); void OnMetadata(std::shared_ptr datachannel, const std::string &path); diff --git a/src/rtc/rtc_channel.cpp b/src/rtc/rtc_channel.cpp index 663b863..98e3475 100644 --- a/src/rtc/rtc_channel.cpp +++ b/src/rtc/rtc_channel.cpp @@ -9,37 +9,34 @@ RtcChannel::Create(rtc::scoped_refptr data_channel return std::make_shared(std::move(data_channel)); } -RtcChannel::RtcChannel( - rtc::scoped_refptr data_channel) { - data_channel_ = std::move(data_channel); - label_ = data_channel_->label(); - data_channel_->RegisterObserver(this); +RtcChannel::RtcChannel(rtc::scoped_refptr data_channel) + : data_channel(data_channel), + id_(Utils::GenerateUuid()), + label_(data_channel->label()) { + data_channel->RegisterObserver(this); } +RtcChannel::~RtcChannel() { DEBUG_PRINT("datachannel (%s) is released!", label_.c_str()); } -RtcChannel::~RtcChannel() { - DEBUG_PRINT("datachannel (%s) is released!", label_.c_str()); -} +std::string RtcChannel::id() const { return id_; } std::string RtcChannel::label() const { return label_; } void RtcChannel::OnStateChange() { - webrtc::DataChannelInterface::DataState state = data_channel_->state(); - DEBUG_PRINT("[%s] OnStateChange => %s", data_channel_->label().c_str(), + webrtc::DataChannelInterface::DataState state = data_channel->state(); + DEBUG_PRINT("[%s] OnStateChange => %s", data_channel->label().c_str(), webrtc::DataChannelInterface::DataStateString(state)); } void RtcChannel::Terminate() { UnSubscribe(); - data_channel_->UnregisterObserver(); - data_channel_->Close(); + data_channel->UnregisterObserver(); + data_channel->Close(); if (on_closed_func_) { - on_closed_func_(label()); + on_closed_func_(); } } -void RtcChannel::OnClosed(std::function func) { - on_closed_func_ = std::move(func); -} +void RtcChannel::OnClosed(std::function func) { on_closed_func_ = std::move(func); } void RtcChannel::OnMessage(const webrtc::DataBuffer &buffer) { const uint8_t *data = buffer.data.data(); @@ -124,7 +121,7 @@ void RtcChannel::Send(CommandType type, const uint8_t *data, size_t size) { } while (bytes_read < size) { - if (data_channel_->buffered_amount() + CHUNK_SIZE > data_channel_->MaxSendQueueSize()) { + if (data_channel->buffered_amount() + CHUNK_SIZE > data_channel->MaxSendQueueSize()) { usleep(100); DEBUG_PRINT("Sleeping for 100 microsecond due to MaxSendQueueSize reached."); continue; @@ -140,12 +137,13 @@ void RtcChannel::Send(CommandType type, const uint8_t *data, size_t size) { } void RtcChannel::Send(const uint8_t *data, size_t size) { - if (data_channel_->state() != webrtc::DataChannelInterface::kOpen) { + if (data_channel->state() != webrtc::DataChannelInterface::kOpen) { return; } + rtc::CopyOnWriteBuffer buffer(data, size); webrtc::DataBuffer data_buffer(buffer, true); - data_channel_->Send(data_buffer); + data_channel->Send(data_buffer); } void RtcChannel::Send(MetaMessage metadata) { @@ -189,7 +187,7 @@ void RtcChannel::Send(std::ifstream &file) { Send(type, (uint8_t *)size_str.c_str(), size_str.length()); while (bytes_read < file_size) { - if (data_channel_->buffered_amount() + CHUNK_SIZE > data_channel_->MaxSendQueueSize()) { + if (data_channel->buffered_amount() + CHUNK_SIZE > data_channel->MaxSendQueueSize()) { sleep(1); DEBUG_PRINT("Sleeping for 1 second due to MaxSendQueueSize reached."); continue; diff --git a/src/rtc/rtc_channel.h b/src/rtc/rtc_channel.h index 4f17870..9d72028 100644 --- a/src/rtc/rtc_channel.h +++ b/src/rtc/rtc_channel.h @@ -71,8 +71,8 @@ struct MetaMessage { }; class RtcChannel : public webrtc::DataChannelObserver, - public Subject, - public std::enable_shared_from_this { + public Subject, + public std::enable_shared_from_this { public: using ChannelCommandHandler = std::function, const std::string &)>; @@ -84,12 +84,13 @@ class RtcChannel : public webrtc::DataChannelObserver, RtcChannel(rtc::scoped_refptr data_channel); ~RtcChannel(); + std::string id() const; std::string label() const; // webrtc::DataChannelObserver void OnStateChange() override; void OnMessage(const webrtc::DataBuffer &buffer) override; - void OnClosed(std::function func); + void OnClosed(std::function func); void Terminate(); void RegisterHandler(CommandType type, ChannelCommandHandler func); @@ -100,19 +101,23 @@ class RtcChannel : public webrtc::DataChannelObserver, void Send(std::ifstream &file); void Send(const std::string &message); + protected: + rtc::scoped_refptr data_channel; + + virtual void Send(const uint8_t *data, size_t size); + void Next(std::string message) override final; + private: + std::string id_; std::string label_; - std::function on_closed_func_; - rtc::scoped_refptr data_channel_; + std::function on_closed_func_; std::map>>> observers_map_; // Subject - void Next(std::string message) override; std::shared_ptr> AsObservable() override; std::shared_ptr> AsObservable(CommandType type); void UnSubscribe() override; - void Send(const uint8_t *data, size_t size); void Send(CommandType type, const uint8_t *data, size_t size); }; diff --git a/src/rtc/rtc_peer.cpp b/src/rtc/rtc_peer.cpp index 240b798..61001c4 100644 --- a/src/rtc/rtc_peer.cpp +++ b/src/rtc/rtc_peer.cpp @@ -3,13 +3,18 @@ #include #include +#include "rtc/sfu_channel.h" + rtc::scoped_refptr RtcPeer::Create(PeerConfig config) { return rtc::make_ref_counted(std::move(config)); } RtcPeer::RtcPeer(PeerConfig config) : id_(Utils::GenerateUuid()), - config_(std::move(config)), + timeout_(config.timeout), + is_sfu_peer_(config.is_sfu_peer), + is_publisher_(config.is_publisher), + has_candidates_in_sdp_(config.has_candidates_in_sdp), is_connected_(false), is_complete_(false) {} @@ -56,9 +61,13 @@ void RtcPeer::Terminate() { } } -std::string RtcPeer::GetId() const { return id_; } +std::string RtcPeer::id() const { return id_; } + +bool RtcPeer::isSfuPeer() const { return is_sfu_peer_; } + +bool RtcPeer::isPublisher() const { return is_publisher_; } -bool RtcPeer::IsConnected() const { return is_connected_.load(); } +bool RtcPeer::isConnected() const { return is_connected_.load(); } void RtcPeer::SetSink(rtc::VideoSinkInterface *video_sink_obj) { custom_video_sink_ = std::move(video_sink_obj); @@ -74,7 +83,7 @@ std::shared_ptr RtcPeer::CreateDataChannel(ChannelMode mode) { struct webrtc::DataChannelInit init; init.ordered = true; init.id = static_cast(mode); - if (!config_.is_sfu_peer) { + if (!is_sfu_peer_) { init.negotiated = true; } if (mode == ChannelMode::Lossy) { @@ -85,11 +94,14 @@ std::shared_ptr RtcPeer::CreateDataChannel(ChannelMode mode) { auto result = peer_connection_->CreateDataChannelOrError(label, &init); if (!result.ok()) { - ERROR_PRINT("Failed to create data channel."); + ERROR_PRINT("Failed to create data channel: %s", label.c_str()); return nullptr; } - auto channel = RtcChannel::Create(result.MoveValue()); + auto dc = result.MoveValue(); + + std::shared_ptr channel = + is_sfu_peer_ ? SfuChannel::Create(dc) : RtcChannel::Create(dc); if (mode == ChannelMode::Command) { DEBUG_PRINT("The Command data channel is established successfully."); @@ -128,13 +140,17 @@ std::string RtcPeer::RestartIce(std::string ice_ufrag, std::string ice_pwd) { return local_sdp; } +void RtcPeer::SetOnDataChannelCallback(OnRtcChannelCallback callback) { + on_data_channel_ = std::move(callback); +} + void RtcPeer::OnSignalingChange(webrtc::PeerConnectionInterface::SignalingState new_state) { signaling_state_ = new_state; auto state = webrtc::PeerConnectionInterface::AsString(new_state); DEBUG_PRINT("OnSignalingChange => %s", std::string(state).c_str()); if (new_state == webrtc::PeerConnectionInterface::SignalingState::kHaveRemoteOffer) { peer_timeout_ = std::thread([this]() { - std::this_thread::sleep_for(std::chrono::seconds(config_.timeout)); + std::this_thread::sleep_for(std::chrono::seconds(timeout_)); if (peer_connection_ && !is_complete_.load() && !is_connected_.load()) { DEBUG_PRINT("Connection timeout after kConnecting. Closing connection."); peer_connection_->Close(); @@ -144,7 +160,25 @@ void RtcPeer::OnSignalingChange(webrtc::PeerConnectionInterface::SignalingState } void RtcPeer::OnDataChannel(rtc::scoped_refptr channel) { - DEBUG_PRINT("Connected to data channel => %s", channel->label().c_str()); + DEBUG_PRINT("On remote DataChannel => %s", channel->label().c_str()); + + if (!on_data_channel_) { + return; + } + + if (channel->label() == ChannelModeToString(ChannelMode::Command)) { + cmd_channel_ = RtcChannel::Create(channel); + on_data_channel_(cmd_channel_); + DEBUG_PRINT("Command data channel is established successfully."); + } else if (channel->label() == ChannelModeToString(ChannelMode::Lossy)) { + lossy_channel_ = SfuChannel::Create(channel); + on_data_channel_(lossy_channel_); + DEBUG_PRINT("Lossy data channel is established successfully."); + } else if (channel->label() == ChannelModeToString(ChannelMode::Reliable)) { + reliable_channel_ = SfuChannel::Create(channel); + on_data_channel_(reliable_channel_); + DEBUG_PRINT("Reliable data channel is established successfully."); + } } void RtcPeer::OnIceGatheringChange(webrtc::PeerConnectionInterface::IceGatheringState new_state) { @@ -169,7 +203,7 @@ void RtcPeer::OnConnectionChange(webrtc::PeerConnectionInterface::PeerConnection } void RtcPeer::OnIceCandidate(const webrtc::IceCandidateInterface *candidate) { - if (config_.has_candidates_in_sdp && modified_desc_) { + if (has_candidates_in_sdp_ && modified_desc_) { modified_desc_->AddCandidate(candidate); } @@ -210,7 +244,7 @@ void RtcPeer::OnSuccess(webrtc::SessionDescriptionInterface *desc) { peer_connection_->SetLocalDescription(SetSessionDescription::Create(nullptr, nullptr).get(), modified_desc_.get()); - if (config_.has_candidates_in_sdp) { + if (has_candidates_in_sdp_) { EmitLocalSdp(1); } else { EmitLocalSdp(); diff --git a/src/rtc/rtc_peer.h b/src/rtc/rtc_peer.h index e7f8058..78daa1c 100644 --- a/src/rtc/rtc_peer.h +++ b/src/rtc/rtc_peer.h @@ -97,19 +97,26 @@ class RtcPeer : public webrtc::PeerConnectionObserver, public webrtc::CreateSessionDescriptionObserver, public SignalingMessageObserver { public: + using OnRtcChannelCallback = std::function)>; + static rtc::scoped_refptr Create(PeerConfig config); RtcPeer(PeerConfig config); ~RtcPeer(); void CreateOffer(); void Terminate(); - bool IsConnected() const; - std::string GetId() const; + + bool isSfuPeer() const; + bool isPublisher() const; + bool isConnected() const; + std::string id() const; + void SetSink(rtc::VideoSinkInterface *video_sink_obj); void SetPeer(rtc::scoped_refptr peer); rtc::scoped_refptr GetPeer(); std::shared_ptr CreateDataChannel(ChannelMode mode); std::string RestartIce(std::string ice_ufrag, std::string ice_pwd); + void SetOnDataChannelCallback(OnRtcChannelCallback callback); // SignalingMessageObserver implementation. void SetRemoteSdp(const std::string &sdp, const std::string &type) override; @@ -134,8 +141,11 @@ class RtcPeer : public webrtc::PeerConnectionObserver, std::string ModifySetupAttribute(const std::string &sdp, const std::string &new_setup); void EmitLocalSdp(int delay_sec = 0); + int timeout_; std::string id_; - PeerConfig config_; + bool is_sfu_peer_; + bool is_publisher_; + bool has_candidates_in_sdp_; std::atomic is_connected_; std::atomic is_complete_; std::thread peer_timeout_; @@ -146,6 +156,7 @@ class RtcPeer : public webrtc::PeerConnectionObserver, webrtc::PeerConnectionInterface::SignalingState signaling_state_; std::unique_ptr modified_desc_; + OnRtcChannelCallback on_data_channel_; std::shared_ptr cmd_channel_; std::shared_ptr lossy_channel_; std::shared_ptr reliable_channel_; diff --git a/src/rtc/sfu_channel.cpp b/src/rtc/sfu_channel.cpp new file mode 100644 index 0000000..bb881a7 --- /dev/null +++ b/src/rtc/sfu_channel.cpp @@ -0,0 +1,63 @@ +#include "rtc/sfu_channel.h" + +#include "proto/livekit_models.pb.h" + +#include "common/logging.h" + +std::shared_ptr +SfuChannel::Create(rtc::scoped_refptr data_channel) { + return std::make_shared(std::move(data_channel)); +} + +SfuChannel::SfuChannel(rtc::scoped_refptr data_channel) + : RtcChannel(data_channel), + topic_("ipc_topic") {} + +SfuChannel::~SfuChannel() { DEBUG_PRINT("sfu datachannel (%s) is released!", label().c_str()); } + +void SfuChannel::OnMessage(const webrtc::DataBuffer &buffer) { + livekit::DataPacket packet; + if (!packet.ParseFromArray(buffer.data.data(), buffer.data.size())) { + DEBUG_PRINT("(%s) Failed to parse DataPacket", label().c_str()); + return; + } + + if (!packet.has_user()) { + DEBUG_PRINT("(%s) Unknown DataPacket type", label().c_str()); + return; + } + + const auto &user = packet.user(); + std::string topic = user.topic(); + const std::string &payload = user.payload(); + DEBUG_PRINT("(%s) Received USER packet: participant_sid=%s, payload=%s, topic=%s", + label().c_str(), user.participant_sid().c_str(), payload.c_str(), topic.c_str()); + + Next(payload); +} + +void SfuChannel::Send(const uint8_t *data, size_t size) { SendUserData(topic_, data, size); } + +void SfuChannel::SendUserData(const std::string &topic, const uint8_t *data, size_t size) { + if (data_channel->state() != webrtc::DataChannelInterface::kOpen) { + return; + } + + livekit::UserPacket user; + user.set_payload(data, size); + user.set_topic(topic); + + livekit::DataPacket packet; + packet.set_allocated_user(&user); + + std::string serialized; + if (!packet.SerializeToString(&serialized)) { + return; + } + + rtc::CopyOnWriteBuffer buffer(serialized.data(), serialized.size()); + webrtc::DataBuffer data_buffer(buffer, true); + data_channel->Send(data_buffer); + + (void)packet.release_user(); +} diff --git a/src/rtc/sfu_channel.h b/src/rtc/sfu_channel.h new file mode 100644 index 0000000..4d9db37 --- /dev/null +++ b/src/rtc/sfu_channel.h @@ -0,0 +1,35 @@ +#ifndef SFU_CHANNEL_H_ +#define SFU_CHANNEL_H_ + +#include +#include +#include + +#include +#include + +#include "common/interface/subject.h" +#include "common/utils.h" +#include "ipc/unix_socket_server.h" +#include "rtc/rtc_channel.h" + +class SfuChannel : public RtcChannel { + public: + static std::shared_ptr + Create(rtc::scoped_refptr data_channel); + + SfuChannel(rtc::scoped_refptr data_channel); + ~SfuChannel(); + + void OnMessage(const webrtc::DataBuffer &buffer) override; + + protected: + void Send(const uint8_t *data, size_t size) override; + + private: + std::string topic_; + + void SendUserData(const std::string &topic, const uint8_t *data, size_t size); +}; + +#endif // SFU_CHANNEL_H_ diff --git a/src/signaling/mqtt_service.cpp b/src/signaling/mqtt_service.cpp index 0a8e2b9..b657f9b 100644 --- a/src/signaling/mqtt_service.cpp +++ b/src/signaling/mqtt_service.cpp @@ -166,8 +166,8 @@ void MqttService::OnMessage(struct mosquitto *mosq, void *obj, AnswerLocalIce(peer_id, sdp_mid, sdp_mline_index, candidate); }); - client_id_to_peer_id_[client_id] = peer->GetId(); - peer_id_to_client_id_[peer->GetId()] = client_id; + client_id_to_peer_id_[client_id] = peer->id(); + peer_id_to_client_id_[peer->id()] = client_id; OnRemoteSdp(client_id_to_peer_id_[client_id], payload); } else if (topic.starts_with(ice_base_topic_)) { @@ -203,9 +203,9 @@ void MqttService::RefreshPeerMap() { auto peer = GetPeer(peer_id); DEBUG_PRINT("Found peer_id key: %s, connected value: %d", peer_id.c_str(), - peer->IsConnected()); + peer->isConnected()); - if (!peer->IsConnected()) { + if (!peer->isConnected()) { auto client_id = peer_id_to_client_id_[peer_id]; peer_id_to_client_id_.erase(peer_id); pm_it = map.erase(pm_it); diff --git a/src/signaling/signaling_service.h b/src/signaling/signaling_service.h index d2c7551..5f78978 100644 --- a/src/signaling/signaling_service.h +++ b/src/signaling/signaling_service.h @@ -27,7 +27,7 @@ class SignalingService { auto peer = conductor->CreatePeerConnection(config); if (!config.is_sfu_peer) { - peer_map_[peer->GetId()] = peer; + peer_map_[peer->id()] = peer; } return peer; } @@ -48,9 +48,9 @@ class SignalingService { virtual void RefreshPeerMap() { auto pm_it = peer_map_.begin(); while (pm_it != peer_map_.end()) { - auto peer_id = pm_it->second->GetId(); + auto peer_id = pm_it->second->id(); - if (pm_it->second && !pm_it->second->IsConnected()) { + if (pm_it->second && !pm_it->second->isConnected()) { pm_it = peer_map_.erase(pm_it); DEBUG_PRINT("peer_map (%s) was erased.", peer_id.c_str()); } else { diff --git a/src/signaling/websocket_service.cpp b/src/signaling/websocket_service.cpp index c185a22..12d6ccc 100644 --- a/src/signaling/websocket_service.cpp +++ b/src/signaling/websocket_service.cpp @@ -132,10 +132,11 @@ void WebsocketService::OnConnect(beast::error_code ec) { } void WebsocketService::OnHandshake(websocket::stream &ws) { - std::string target = BuildWebSocketTarget("/rtc", {{"apiKey", args_.ws_key}, - {"roomId", args_.ws_room}, - {"userId", args_.uid}, - {"canSubscribe", "0"}}); + std::string target = + BuildWebSocketTarget("/rtc", {{"apiKey", args_.ws_key}, + {"roomId", args_.ws_room}, + {"userId", args_.uid}, + {"canSubscribe", args_.enable_ipc ? "1" : "0"}}); ws.async_handshake(args_.ws_host, target, [this](boost::system::error_code ec) { OnHandshake(ec); }); @@ -147,10 +148,11 @@ void WebsocketService::OnHandshake(websocket::stream> & if (ec) { ERROR_PRINT("Failed to tls handshake: %s", ec.message().c_str()); } - std::string target = BuildWebSocketTarget("/rtc", {{"apiKey", args_.ws_key}, - {"roomId", args_.ws_room}, - {"userId", args_.uid}, - {"canSubscribe", "0"}}); + std::string target = + BuildWebSocketTarget("/rtc", {{"apiKey", args_.ws_key}, + {"roomId", args_.ws_room}, + {"userId", args_.uid}, + {"canSubscribe", args_.enable_ipc ? "1" : "0"}}); ws.async_handshake(args_.ws_host, target, [this](boost::system::error_code ec) { OnHandshake(ec); }); @@ -246,9 +248,9 @@ void WebsocketService::OnMessage(const std::string &req) { pub_peer_->CreateOffer(); - } else if (action == "offer" && sub_peer_ && !sub_peer_->IsConnected()) { + } else if (action == "offer" && sub_peer_ && !sub_peer_->isConnected()) { sub_peer_->SetRemoteSdp(message, "offer"); - } else if (action == "answer" && pub_peer_ && !pub_peer_->IsConnected()) { + } else if (action == "answer" && pub_peer_ && !pub_peer_->isConnected()) { pub_peer_->SetRemoteSdp(message, "answer"); } else if (action == "trickle") { OnRemoteIce(message);