Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
node_modules/
*.o
*.tmp
*.jpg
Expand All @@ -13,5 +12,6 @@ Makefile

# folder
build
proto

!/doc/*.jpg
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "external/livekit-protocol"]
path = external/livekit-protocol
url = https://github.com/livekit/protocol.git
20 changes: 11 additions & 9 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**",
"${workspaceFolder}/src",
"${workspaceFolder}/proto",
"${workspaceFolder}/external/livekit-protocol/protobufs",
"/usr/include",
"/usr/local/include",
"/usr/include/libcamera/**",
"/usr/local/include/webrtc/*",
"/usr/local/include/webrtc/third_party/abseil-cpp/*",
"/usr/local/include/webrtc/third_party/libyuv/include/*",
"/usr/local/include/webrtc/tools/json_schema_compiler/*",
"/usr/local/include/webrtc/third_party/boringssl/src/include/*",
"/usr/include/boost/program_options/*",
"${workspaceFolder}/src"
"/usr/include/libcamera",
"/usr/local/include/webrtc",
"/usr/local/include/webrtc/third_party/abseil-cpp",
"/usr/local/include/webrtc/third_party/libyuv/include",
"/usr/local/include/webrtc/tools/json_schema_compiler",
"/usr/local/include/webrtc/third_party/boringssl/src/include",
"/usr/include/boost/program_options",
"${workspaceFolder}"
],
"defines": [],
"compilerPath": "/usr/bin/clang",
Expand Down
2 changes: 1 addition & 1 deletion doc/BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* Install the lib from official repo [[tutorial](https://repo.mosquitto.org/debian/README.txt)]. (recommended)
4. Install essential packages
```bash
sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev
sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev protobuf-compiler libprotobuf-dev
```
5. Copy the [nlohmann/json.hpp](https://github.com/nlohmann/json/blob/develop/single_include/nlohmann/json.hpp) to `/usr/local/include`
```bash
Expand Down
Empty file modified example/unix_socket_client.py
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions external/livekit-protocol
Submodule livekit-protocol added at 499c17
8 changes: 8 additions & 0 deletions scripts/gen_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

PROTO_SRC_DIR=external/livekit-protocol/protobufs
PROTO_OUT_DIR=proto

mkdir -p $PROTO_OUT_DIR

protoc -I=$PROTO_SRC_DIR --cpp_out=$PROTO_OUT_DIR $PROTO_SRC_DIR/*.proto
2 changes: 2 additions & 0 deletions src/args.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct Args {
// ipc
bool enable_ipc = false;
std::string socket_path = "/tmp/pi-webrtc-ipc.sock";
std::string ipc_channel = "both";
int ipc_channel_mode = -1;

// webrtc
int jpeg_quality = 30;
Expand Down
10 changes: 10 additions & 0 deletions src/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ static const std::unordered_map<std::string, int> v4l2_fmt_table = {
{"yuyv", V4L2_PIX_FMT_YUYV},
};

static const std::unordered_map<std::string, int> ipc_mode_table = {
{"both", -1},
{"lossy", ChannelMode::Lossy},
{"reliable", ChannelMode::Reliable},
};

static const std::unordered_map<std::string, int> ae_metering_table = {
{"centre", libcamera::controls::MeteringCentreWeighted},
{"spot", libcamera::controls::MeteringSpot},
Expand Down Expand Up @@ -144,6 +150,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
"the output resolution will remain fixed regardless of network or device conditions.")
("enable-ipc", bpo::bool_switch(&args.enable_ipc)->default_value(args.enable_ipc),
"Enable IPC relay using a WebRTC DataChannel, lossy (UDP-like) or reliable (TCP-like) based on client preference.")
("ipc-channel", bpo::value<std::string>(&args.ipc_channel)->default_value(args.ipc_channel),
"IPC channel mode: both, lossy, reliable")
("socket-path", bpo::value<std::string>(&args.socket_path)->default_value(args.socket_path),
"Specifies the Unix domain socket path used to bridge messages between "
"the WebRTC DataChannel and local IPC applications.")
Expand Down Expand Up @@ -254,6 +262,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {

args.jpeg_quality = std::clamp(args.jpeg_quality, 0, 100);

args.ipc_channel_mode = ParseEnum(ipc_mode_table, args.ipc_channel);

ParseDevice(args);
}

Expand Down
50 changes: 48 additions & 2 deletions src/rtc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,53 @@
project(rtc)

find_package(Protobuf REQUIRED)

# path of the proto files
set(PROTO_SRC_DIR "${CMAKE_SOURCE_DIR}/external/livekit-protocol/protobufs")
set(PROTO_OUT_DIR "${CMAKE_SOURCE_DIR}/proto")

file(MAKE_DIRECTORY ${PROTO_OUT_DIR})

# collect all .proto
file(GLOB PROTO_FILES "${PROTO_SRC_DIR}/*.proto")

# generate `*.pb.cc` and `*.pb.h` to `proto/`
foreach(proto_file ${PROTO_FILES})
get_filename_component(proto_name ${proto_file} NAME_WE)

set(proto_cc "${PROTO_OUT_DIR}/${proto_name}.pb.cc")
set(proto_h "${PROTO_OUT_DIR}/${proto_name}.pb.h")

add_custom_command(
OUTPUT ${proto_cc} ${proto_h}
COMMAND ${Protobuf_PROTOC_EXECUTABLE}
ARGS --cpp_out=${PROTO_OUT_DIR}
-I ${PROTO_SRC_DIR}
${proto_file}
DEPENDS ${proto_file}
)

list(APPEND PROTO_SRCS ${proto_cc})
list(APPEND PROTO_HDRS ${proto_h})
endforeach()

aux_source_directory(${PROJECT_SOURCE_DIR} RTC_FILES)

add_library(${PROJECT_NAME} ${RTC_FILES})
add_library(${PROJECT_NAME}
${RTC_FILES}
${PROTO_SRCS}
)

target_include_directories(${PROJECT_NAME} PUBLIC
${CMAKE_SOURCE_DIR}
${PROTO_OUT_DIR}
)

target_link_libraries(${PROJECT_NAME} PUBLIC common track capturer v4l2_codecs ipc)
target_link_libraries(${PROJECT_NAME} PUBLIC
common
track
capturer
v4l2_codecs
ipc
protobuf::libprotobuf
)
119 changes: 82 additions & 37 deletions src/rtc/conductor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,47 +137,73 @@ rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig config) {

peer->SetPeer(result.MoveValue());

if (config.is_sfu_peer) {
if (!config.is_publisher) {
return peer;
InitializeDataChannels(peer);

AddTracks(peer->GetPeer());

DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str());
return peer;
}

void Conductor::InitializeDataChannels(rtc::scoped_refptr<RtcPeer> peer) {
if (peer->isSfuPeer() && !peer->isPublisher()) {
peer->SetOnDataChannelCallback([this](std::shared_ptr<RtcChannel> channel) {
DEBUG_PRINT("Remote channel (%s) from sfu subscriber peer [%s]",
channel->label().c_str(), channel->id().c_str());
BindDataChannelToIpcReceiver(channel);
});
return;
}

// Essential data channels(Lossy / Reliable / Command)
auto lossy_channel = peer->CreateDataChannel(ChannelMode::Lossy);
auto reliable_channel = peer->CreateDataChannel(ChannelMode::Reliable);

if (args.enable_ipc) {
switch (args.ipc_channel_mode) {
case ChannelMode::Lossy:
BindIpcToDataChannel(lossy_channel);
break;
case ChannelMode::Reliable:
BindIpcToDataChannel(reliable_channel);
break;
default:
BindIpcToDataChannel(lossy_channel);
BindIpcToDataChannel(reliable_channel);
break;
}
peer->CreateDataChannel(ChannelMode::Lossy);
peer->CreateDataChannel(ChannelMode::Reliable);
} else if (args.enable_ipc) {
SetupIpcDataChannel(peer, ChannelMode::Lossy);
SetupIpcDataChannel(peer, ChannelMode::Reliable);
}

if (!peer->isSfuPeer()) {
InitializeCommandChannel(peer);
}
}

void Conductor::InitializeCommandChannel(rtc::scoped_refptr<RtcPeer> peer) {
auto cmd_channel = peer->CreateDataChannel(ChannelMode::Command);
cmd_channel->RegisterHandler(
CommandType::SNAPSHOT,
[this](std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg) {
[this](std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
OnSnapshot(datachannel, msg);
});
cmd_channel->RegisterHandler(
CommandType::METADATA,
[this](std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg) {
[this](std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
OnMetadata(datachannel, msg);
});
cmd_channel->RegisterHandler(
CommandType::RECORDING,
[this](std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg) {
[this](std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
OnRecord(datachannel, msg);
});
cmd_channel->RegisterHandler(
CommandType::CAMERA_OPTION,
[this](std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg) {
[this](std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
OnCameraOption(datachannel, msg);
});

AddTracks(peer->GetPeer());

DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str());
return peer;
}

void Conductor::OnSnapshot(std::shared_ptr<DataChannelSubject> datachannel,
const std::string &msg) {
void Conductor::OnSnapshot(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
try {
std::stringstream ss(msg);
int num;
Expand All @@ -193,8 +219,7 @@ void Conductor::OnSnapshot(std::shared_ptr<DataChannelSubject> datachannel,
}
}

void Conductor::OnMetadata(std::shared_ptr<DataChannelSubject> datachannel,
const std::string &msg) {
void Conductor::OnMetadata(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
DEBUG_PRINT("OnMetadata msg: %s", msg.c_str());
json jsonObj = json::parse(msg.c_str());

Expand Down Expand Up @@ -225,7 +250,7 @@ void Conductor::OnMetadata(std::shared_ptr<DataChannelSubject> datachannel,
}
}

void Conductor::OnRecord(std::shared_ptr<DataChannelSubject> datachannel, const std::string &path) {
void Conductor::OnRecord(std::shared_ptr<RtcChannel> datachannel, const std::string &path) {
if (args.record_path.empty()) {
return;
}
Expand All @@ -242,8 +267,7 @@ void Conductor::OnRecord(std::shared_ptr<DataChannelSubject> datachannel, const
}
}

void Conductor::OnCameraOption(std::shared_ptr<DataChannelSubject> datachannel,
const std::string &msg) {
void Conductor::OnCameraOption(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
DEBUG_PRINT("OnCameraControl msg: %s", msg.c_str());
json jsonObj = json::parse(msg.c_str());

Expand Down Expand Up @@ -325,19 +349,40 @@ void Conductor::InitializeIpcServer() {
}
}

void Conductor::SetupIpcDataChannel(rtc::scoped_refptr<RtcPeer> peer, ChannelMode mode) {
auto channel = peer->CreateDataChannel(mode);
if (channel && ipc_server_) {
ipc_server_->RegisterPeerCallback(channel->label(), [channel](const std::string &msg) {
channel->Send(msg);
});

channel->OnClosed([this](const std::string &label) {
ipc_server_->UnregisterPeerCallback(label);
});
void Conductor::BindIpcToDataChannel(std::shared_ptr<RtcChannel> channel) {
BindIpcToDataChannelSender(channel);
BindDataChannelToIpcReceiver(channel);
}

channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) {
ipc_server_->Write(msg);
});
void Conductor::BindIpcToDataChannelSender(std::shared_ptr<RtcChannel> channel) {
if (!channel || !ipc_server_) {
ERROR_PRINT("IPC or DataChannel is not found!");
return;
}

const auto id = channel->id();
const auto label = channel->label();

ipc_server_->RegisterPeerCallback(id, [channel](const std::string &msg) {
channel->Send(msg);
});
DEBUG_PRINT("[%s] DataChannel (%s) registered to IPC server for sending.", id.c_str(),
label.c_str());

channel->OnClosed([this, id, label]() {
ipc_server_->UnregisterPeerCallback(id);
DEBUG_PRINT("[%s] DataChannel (%s) unregistered from IPC server.", id.c_str(),
label.c_str());
});
}

void Conductor::BindDataChannelToIpcReceiver(std::shared_ptr<RtcChannel> channel) {
if (!channel || !ipc_server_)
return;

channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) {
ipc_server_->Write(msg);
});
DEBUG_PRINT("DataChannel (%s) connected to IPC server for receiving.",
channel->label().c_str());
}
15 changes: 10 additions & 5 deletions src/rtc/conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ class Conductor {
void InitializePeerConnectionFactory();
void InitializeTracks();
void InitializeIpcServer();
void InitializeDataChannels(rtc::scoped_refptr<RtcPeer> peer);
void InitializeCommandChannel(rtc::scoped_refptr<RtcPeer> peer);

void BindIpcToDataChannel(std::shared_ptr<RtcChannel> channel);
void BindIpcToDataChannelSender(std::shared_ptr<RtcChannel> channel);
void BindDataChannelToIpcReceiver(std::shared_ptr<RtcChannel> channel);

void SetupIpcDataChannel(rtc::scoped_refptr<RtcPeer> peer, ChannelMode mode);
void AddTracks(rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection);
void OnSnapshot(std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg);
void OnMetadata(std::shared_ptr<DataChannelSubject> datachannel, const std::string &path);
void OnRecord(std::shared_ptr<DataChannelSubject> datachannel, const std::string &path);
void OnCameraOption(std::shared_ptr<DataChannelSubject> datachannel, const std::string &msg);
void OnSnapshot(std::shared_ptr<RtcChannel> datachannel, const std::string &msg);
void OnMetadata(std::shared_ptr<RtcChannel> datachannel, const std::string &path);
void OnRecord(std::shared_ptr<RtcChannel> datachannel, const std::string &path);
void OnCameraOption(std::shared_ptr<RtcChannel> datachannel, const std::string &msg);

std::unique_ptr<rtc::Thread> network_thread_;
std::unique_ptr<rtc::Thread> worker_thread_;
Expand Down
Loading