diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..bd902b9 --- /dev/null +++ b/.clang-format @@ -0,0 +1,13 @@ +--- +BasedOnStyle: LLVM +IndentWidth: 2 +TabWidth: 2 +UseTab: Never +ColumnLimit: 100 +PointerAlignment: Left +SortIncludes: CaseSensitive +AlignTrailingComments: true +AllowShortFunctionsOnASingleLine: Empty +BreakBeforeBraces: Attach +Standard: c++17 +... diff --git a/.clang-tidy b/.clang-tidy new file mode 100644 index 0000000..681ef34 --- /dev/null +++ b/.clang-tidy @@ -0,0 +1,10 @@ +Checks: > + -*, + bugprone-*, + performance-*, + readability-*, + modernize-*, + cppcoreguidelines-* +WarningsAsErrors: '' +HeaderFilterRegex: '.*' +FormatStyle: file diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..25f6fdb --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +indent_style = space +indent_size = 2 +trim_trailing_whitespace = true + +[*.{cpp,hpp,py,xml,yaml,yml,cmake,txt,md}] +indent_size = 2 + +[Makefile] +indent_style = tab diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6e4bd87 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,59 @@ +name: ros2-gst-video-bridge-ci + +on: + pull_request: + push: + branches: ["main", "ft/1stversion_w_srt"] + schedule: + - cron: "0 3 * * *" + +jobs: + build-and-test: + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup ROS 2 Humble + uses: ros-tooling/setup-ros@v0.7 + with: + required-ros-distributions: humble + + - name: Build + run: | + source /opt/ros/humble/setup.bash + colcon build --event-handlers console_direct+ + + - name: Test + run: | + source /opt/ros/humble/setup.bash + colcon test --event-handlers console_direct+ + colcon test-result --all --verbose + + nightly-matrix: + if: github.event_name == 'schedule' + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup ROS 2 Humble + uses: ros-tooling/setup-ros@v0.7 + with: + required-ros-distributions: humble + + - name: Build + run: | + source /opt/ros/humble/setup.bash + colcon build --event-handlers console_direct+ + + - name: Run matrix + run: | + chmod +x src/ros2_gst_video_bridge/scripts/run_transport_codec_matrix.zsh + src/ros2_gst_video_bridge/scripts/run_transport_codec_matrix.zsh "$PWD" /tmp/matrix.csv + + - name: Upload report + uses: actions/upload-artifact@v4 + with: + name: transport-codec-matrix-report + path: /tmp/matrix.csv diff --git a/.gitignore b/.gitignore index 35d74bb..b060c90 100644 --- a/.gitignore +++ b/.gitignore @@ -1,51 +1,25 @@ -devel/ -logs/ +# ROS 2 / colcon build/ -bin/ -lib/ -msg_gen/ -srv_gen/ -msg/*Action.msg -msg/*ActionFeedback.msg -msg/*ActionGoal.msg -msg/*ActionResult.msg -msg/*Feedback.msg -msg/*Goal.msg -msg/*Result.msg -msg/_*.py -build_isolated/ -devel_isolated/ +install/ +log/ -# Generated by dynamic reconfigure -*.cfgc -/cfg/cpp/ -/cfg/*.py - -# Ignore generated docs -*.dox -*.wikidoc - -# eclipse stuff -.project -.cproject - -# qcreator stuff -CMakeLists.txt.user - -srv/_*.py -*.pcd +# Python +__pycache__/ *.pyc -qtcreator-* -*.user -/planning/cfg -/planning/docs -/planning/src +# C/C++ build systems +*.o +*.so +*.a +*.dSYM/ +compile_commands.json +# Editors / IDEs +.vscode/ +.idea/ +*.user +*.swp *~ -# Emacs -.#* - -# Catkin custom files -CATKIN_IGNORE +# Local planning +TODO.md diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..452ab17 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,140 @@ +cmake_minimum_required(VERSION 3.8) +project(ros2_gst_video_bridge) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +option(ROS2_GST_VIDEO_BRIDGE_ENABLE_CLANG_TIDY "Enable clang-tidy during C++ compilation" ON) +option(ROS2_GST_VIDEO_BRIDGE_ENABLE_CLANG_FORMAT_CHECK "Enable clang-format check target" ON) +option(ROS2_GST_VIDEO_BRIDGE_ENABLE_AMENT_LINT "Enable ament_lint_auto checks" OFF) + +if(ROS2_GST_VIDEO_BRIDGE_ENABLE_CLANG_TIDY) + find_program(CLANG_TIDY_EXE NAMES clang-tidy clang-tidy-18 clang-tidy-17 clang-tidy-16 clang-tidy-15) + if(CLANG_TIDY_EXE) + set(CMAKE_CXX_CLANG_TIDY ${CLANG_TIDY_EXE}) + message(STATUS "clang-tidy enabled: ${CLANG_TIDY_EXE}") + else() + message(WARNING "clang-tidy requested but not found. Continuing without clang-tidy.") + endif() +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(sensor_msgs REQUIRED) +find_package(std_msgs REQUIRED) +find_package(ros2_gst_video_bridge_msgs REQUIRED) +find_package(PkgConfig REQUIRED) +pkg_check_modules(GSTREAMER REQUIRED gstreamer-1.0 gstreamer-app-1.0) + +if(ROS2_GST_VIDEO_BRIDGE_ENABLE_CLANG_FORMAT_CHECK) + find_program(CLANG_FORMAT_EXE NAMES clang-format clang-format-18 clang-format-17 clang-format-16 clang-format-15) + if(CLANG_FORMAT_EXE) + file(GLOB_RECURSE ROS2_GST_VIDEO_BRIDGE_FORMAT_FILES CONFIGURE_DEPENDS + "${CMAKE_CURRENT_SOURCE_DIR}/include/*.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/*.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc" + ) + add_custom_target(clang_format_check + COMMAND ${CLANG_FORMAT_EXE} --dry-run --Werror ${ROS2_GST_VIDEO_BRIDGE_FORMAT_FILES} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + COMMENT "Running clang-format check on C++ source files" + VERBATIM + ) + else() + message(WARNING "clang-format check requested but clang-format was not found.") + endif() +endif() + +add_executable(gst_video_bridge_node + src/main.cc + src/gst_video_bridge_node.cc + src/node/core.cc + src/node/lifecycle.cc + src/node/modes.cc + src/node/streaming.cc + src/node/recovery.cc + src/node/fallback.cc + src/node/observability.cc + src/core/config_loader.cc + src/core/pipeline_builder.cc + src/runtime/capability_probe.cc + src/runtime/topic_introspector.cc + src/runtime/stream_engine.cc + src/runtime/metrics_publisher.cc +) + +target_include_directories(gst_video_bridge_node + PRIVATE + include + ${GSTREAMER_INCLUDE_DIRS} +) + +target_link_libraries(gst_video_bridge_node + ${GSTREAMER_LIBRARIES} +) + +target_compile_options(gst_video_bridge_node PRIVATE ${GSTREAMER_CFLAGS_OTHER}) + +ament_target_dependencies(gst_video_bridge_node + rclcpp + sensor_msgs + std_msgs + ros2_gst_video_bridge_msgs +) + +install(TARGETS gst_video_bridge_node + DESTINATION lib/${PROJECT_NAME} +) + +install(DIRECTORY launch config + DESTINATION share/${PROJECT_NAME} +) + +if(BUILD_TESTING) + find_package(ament_cmake_gtest REQUIRED) + + ament_add_gtest(test_config_loader test/test_config_loader.cpp src/core/config_loader.cc) + target_include_directories(test_config_loader PRIVATE include) + ament_target_dependencies(test_config_loader rclcpp) + + ament_add_gtest(test_pipeline_builder test/test_pipeline_builder.cpp src/core/pipeline_builder.cc) + target_include_directories(test_pipeline_builder PRIVATE include) + + add_test( + NAME smoke_runtime_validate_config + COMMAND $ --ros-args -p runtime.mode:=validate_config) + + add_test( + NAME smoke_runtime_list_capabilities + COMMAND $ --ros-args -p runtime.mode:=list_capabilities) + + add_test( + NAME smoke_launch_minimal_show_args + COMMAND ros2 launch ros2_gst_video_bridge gst_video_bridge_minimal.launch.py --show-args) + + add_test( + NAME smoke_launch_advanced_show_args + COMMAND ros2 launch ros2_gst_video_bridge gst_video_bridge_advanced.launch.py --show-args) + + add_test( + NAME smoke_launch_compat_show_args + COMMAND ros2 launch ros2_gst_video_bridge gst_video_bridge.launch.py --show-args) + + set_tests_properties(smoke_runtime_validate_config PROPERTIES TIMEOUT 20) + set_tests_properties(smoke_runtime_list_capabilities PROPERTIES TIMEOUT 20) + set_tests_properties(smoke_launch_minimal_show_args PROPERTIES TIMEOUT 20) + set_tests_properties(smoke_launch_advanced_show_args PROPERTIES TIMEOUT 20) + set_tests_properties(smoke_launch_compat_show_args PROPERTIES TIMEOUT 20) + + if(ROS2_GST_VIDEO_BRIDGE_ENABLE_AMENT_LINT) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() + endif() +endif() + +ament_package() diff --git a/README.md b/README.md index 12aafcd..f11c2d3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,318 @@ # ros2_gst_video_bridge -Generic ROS 2 video bridge that subscribes to Image or CompressedImage topics and forwards them through configurable GStreamer pipelines (SRT, RTSP, RTP/UDP, files, and other sinks). +Generic ROS 2 video bridge that subscribes to a raw `sensor_msgs/Image` topic and forwards frames through configurable GStreamer pipelines (SRT, RTSP, RTP/UDP, files, and other sinks). + +## Goals + +- Keep transport generic by delegating output to configurable GStreamer pipelines. +- Keep ROS-side ingestion simple: raw image input, topic name configurable. +- Be production-friendly for Jetson/edge deployment and GCS streaming. + +## Configuration Contract (Phase 2) + +- Profiles (layered defaults): + - `profile.machine` (`generic`, `jetson`, `x86`, `raspi`) + - `profile.stream` (`default`, `low_latency`, `low_bandwidth`, `high_quality`, `monitoring_udp`) + +- ROS side (minimal and stable): + - `input_topic` + +- Transport parameters: + - `transport.kind` (`srt`, `rtsp`, `udp`, `file`) + - `transport.sink_uri` + - `transport.latency_ms` + - `transport.reconnect.enabled` + - `transport.reconnect.interval_ms` + - `transport.reconnect.max_attempts` + +- Codec parameters: + - `codec.name` (`auto`, `h264`, `h265`, `mjpeg`) + - `codec.profile` + - `codec.tune` + - `codec.rate_control` + - `codec.bitrate_kbps` + - `codec.gop` + +- Runtime parameters: + - `max_fps` + - `use_wall_clock_timestamps` + - `runtime.mode` (`stream`, `list_topics`, `list_capabilities`, `validate_config`, `discover`) + - `runtime.print_effective_config` + +- Escape hatch: + - `gst.pipeline_override` (when set, bypasses preset generation) + +## Repository Structure + +``` +ros2_gst_video_bridge/ + config/ # Runtime parameters (pipeline, topics, fps, etc.) + include/ros2_gst_video_bridge/ + launch/ + src/ + .clang-format + .clang-tidy + .editorconfig + CMakeLists.txt + package.xml +``` + +## Build + +```bash +cd +colcon build --packages-select ros2_gst_video_bridge +source install/setup.bash +``` + +## Run + +```bash +ros2 launch ros2_gst_video_bridge gst_video_bridge.launch.py +``` + +## End-to-End Basler USB Test (SRT Listener -> Caller) + +This section documents the validated launch flow for a Basler USB camera with: + +- camera node publishing ROS images, +- bridge node running SRT in listener mode, +- receiver running as SRT caller. + +### 0) One-time USB udev rule for Basler + +```bash +sudo tee /etc/udev/rules.d/99-basler-usb.rules >/dev/null <<'EOF' +SUBSYSTEM=="usb", ATTR{idVendor}=="2676", MODE="0666", GROUP="plugdev" +EOF + +sudo udevadm control --reload-rules +sudo udevadm trigger +``` + +Reconnect the USB camera (or reboot) after applying the rule. + +### 1) Verify camera detection + +```bash +cd /home/ccu-001/ws_dev +source /opt/ros/humble/setup.zsh +source install/setup.zsh + +ros2 run camera_aravis2 camera_finder +``` + +### 2) Launch camera node (Terminal A) + +```bash +source /opt/ros/humble/setup.zsh +source /home/ccu-001/ws_dev/install/setup.zsh +ros2 launch camera_aravis2 camera_driver_uv_example.launch.py guid:=Basler-2676016350B6-23285942 +``` + +### 3) Launch bridge node in SRT listener mode (Terminal B) + +```bash +source /opt/ros/humble/setup.zsh +source /home/ccu-001/ws_dev/install/setup.zsh +ros2 launch ros2_gst_video_bridge gst_video_bridge_minimal.launch.py \ + input_topic:=/camera_driver_uv_example/vis/image_raw \ + "sink_uri:=srt://0.0.0.0:9000?mode=listener" +``` + +For Bayer cameras, enable optional debayer (color output before bridge): + +```bash +ros2 launch ros2_gst_video_bridge gst_video_bridge_minimal.launch.py \ + input_topic:=/camera_driver_uv_example/vis/image_raw \ + enable_debayer:=true \ + debayer_output_topic:=/camera_driver_uv_example/vis/image_color \ + "sink_uri:=srt://0.0.0.0:9000?mode=listener" +``` + +### 4) Launch SRT receiver as caller (Terminal C) + +```bash +gst-launch-1.0 -v srtsrc uri="srt://1.0.0.22:9000?mode=caller" latency=60 ! tsdemux ! h264parse ! avdec_h264 ! videoconvert ! autovideosink sync=false +``` + +### 5) Quick runtime checks + +Check that the camera topic is publishing: + +```bash +source /opt/ros/humble/setup.zsh +source /home/ccu-001/ws_dev/install/setup.zsh +ros2 topic hz /camera_driver_uv_example/vis/image_raw +``` + +Check bridge runtime metrics: + +```bash +source /opt/ros/humble/setup.zsh +source /home/ccu-001/ws_dev/install/setup.zsh +ros2 topic echo /gst_video_bridge/runtime_metrics --once +``` + +If `fps_in` and `fps_out` are both greater than zero, the bridge is actively forwarding frames. + +### Launch files + +- `gst_video_bridge_minimal.launch.py`: + - essential arguments only (`profile_machine`, `profile_stream`, `input_topic`, `sink_uri`) +- `gst_video_bridge_advanced.launch.py`: + - full override surface for transport/codec/runtime plus `params_file` +- `gst_video_bridge.launch.py`: + - compatibility wrapper to the advanced launch + +Examples: + +```bash +ros2 launch ros2_gst_video_bridge gst_video_bridge_minimal.launch.py \ + profile_machine:=jetson profile_stream:=low_latency \ + input_topic:=/camera_driver_uv/vis/image_raw \ + sink_uri:=srt://127.0.0.1:9000?mode=listener + +ros2 launch ros2_gst_video_bridge gst_video_bridge_advanced.launch.py \ + params_file:=/home/ccu-001/ws_dev/src/ros2_gst_video_bridge/config/profiles/jetson_monitoring_udp.yaml +``` + +### Curated profile files + +Under `config/profiles/`: + +- `jetson_low_latency_srt.yaml` +- `x86_low_latency_srt.yaml` +- `raspi_low_latency_srt.yaml` +- `jetson_monitoring_udp.yaml` +- `x86_monitoring_udp.yaml` +- `raspi_monitoring_udp.yaml` +- `recording_file_sink.yaml` + +### Runtime metrics + +The node publishes runtime metrics on `~/runtime_metrics` (`std_msgs/msg/String`) for backward compatibility, and typed control-plane topics via `ros2_gst_video_bridge_msgs`: + +- `~/runtime_status` (`ros2_gst_video_bridge_msgs/msg/RuntimeStatus`) +- `~/runtime_events` (`ros2_gst_video_bridge_msgs/msg/RuntimeEvent`) +- `~/set_streaming_profile` (`ros2_gst_video_bridge_msgs/srv/SetStreamingProfile`) + +Fields include: + +- state (`connecting|streaming|degraded|reconnecting|failed`) +- `fps_in`, `fps_out` +- dropped frame counters +- reconnect counter +- latency estimate in milliseconds +- selected codec/encoder and fallback flags +- adaptation profile and current adaptation level + +Operator runtime command example: + +```bash +ros2 service call /gst_video_bridge/set_streaming_profile ros2_gst_video_bridge_msgs/srv/SetStreamingProfile "{adaptation_profile: balanced, reset_counters: false}" +``` + +First-failure snapshot: + +- On the first streaming failure in a session, the bridge publishes `FIRST_FAILURE_SNAPSHOT` on `~/runtime_events`. +- Payload includes session/stream IDs, runtime state, selected codec/encoder, sink URI, fallback status, and effective pipeline string. + +### Automatic codec selection (`codec.name:=auto`) + +When `codec.name` is set to `auto`, the node inspects available encoder implementations via +`gst-inspect-1.0` and resolves to the best codec for the selected machine profile. + +Selection order by machine profile: + +| Machine profile | Preferred implementation classes | +|---|---| +| `jetson` | `hw:nvidia-v4l2` -> `hw:nvidia` -> other HW (`v4l2`/`omx`/`vaapi`) -> `sw` | +| `x86` | `hw:vaapi` -> `hw:v4l2` -> NVIDIA HW -> `sw` | +| `raspi` | `hw:v4l2` -> `hw:omx` -> `sw` | +| `generic` | `hw:vaapi` -> `hw:v4l2` -> NVIDIA HW -> `hw:omx` -> `sw` | + +Codec tie-break preference is stable: `h264` -> `h265` -> `mjpeg`. + +If `codec.name:=auto` picks a hardware encoder and runtime fails repeatedly while streaming, +the bridge now falls back automatically to a software encoder for the same codec. +This fallback is cross-platform (Jetson/x86/Raspberry/generic) and uses detected encoders from +`gst-inspect-1.0`, not Jetson-only logic. + +Fallback sensitivity can be tuned with: + +```bash +-p runtime.hw_fallback_failures:=3 +``` + +Adaptive resilience controls: + +```bash +-p runtime.adaptation.enabled:=true \ +-p runtime.adaptation.profile:=balanced \ +-p runtime.adaptation.interval_ms:=2000 \ +-p runtime.adaptation.cooldown_ms:=5000 +``` + +Supported adaptation profiles: + +- `conservative` +- `balanced` +- `aggressive` + +### Validation automation + +Codec/transport matrix script: + +```bash +chmod +x /home/ccu-001/ws_dev/src/ros2_gst_video_bridge/scripts/run_transport_codec_matrix.zsh +/home/ccu-001/ws_dev/src/ros2_gst_video_bridge/scripts/run_transport_codec_matrix.zsh /home/ccu-001/ws_dev /tmp/matrix.csv +``` + +Soak run script: + +```bash +chmod +x /home/ccu-001/ws_dev/src/ros2_gst_video_bridge/scripts/run_soak_profile.zsh +/home/ccu-001/ws_dev/src/ros2_gst_video_bridge/scripts/run_soak_profile.zsh /home/ccu-001/ws_dev 1800 generic low_latency /camera/image_raw +``` + +Release/versioning policy documents: + +- `docs/VERSIONING.md` +- `docs/RELEASE.md` + +Example: + +```bash +ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args -p codec.name:=auto +``` + +### Discoverability modes (Phase 3) + +- List ROS image topics visible on the host: + +```bash +ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args -p runtime.mode:=list_topics +``` + +- List detected GStreamer plugins, encoders, and sinks: + +```bash +ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args -p runtime.mode:=list_capabilities +``` + +- Validate effective configuration and exit: + +```bash +ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args -p runtime.mode:=validate_config +``` + +- Run full discoverability report (topics + capabilities) and exit: + +```bash +ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args -p runtime.mode:=discover +``` + +## Current Status + +The node now supports ROS image ingestion into a real GStreamer `appsrc` pipeline with runtime modes, +reconnection policy, and profile-driven launch/configuration. diff --git a/config/default_params.yaml b/config/default_params.yaml new file mode 100644 index 0000000..8e2c820 --- /dev/null +++ b/config/default_params.yaml @@ -0,0 +1,34 @@ +gst_video_bridge: + ros__parameters: + profile.machine: generic + profile.stream: low_latency + + input_topic: /camera_driver_uv/vis/image_raw + + transport.kind: srt + transport.sink_uri: srt://127.0.0.1:9000?mode=listener + transport.latency_ms: 60 + transport.reconnect.enabled: true + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: baseline + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 2000 + codec.gop: 30 + + # Backward-compatible aliases. Prefer transport.* and codec.* above. + gst.transport: srt + gst.codec: h264 + gst.profile: baseline + gst.sink_uri: srt://127.0.0.1:9000?mode=listener + gst.bitrate_kbps: 2000 + gst.latency_ms: 60 + gst.pipeline_override: "" + + max_fps: 30.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/jetson_low_latency_srt.yaml b/config/profiles/jetson_low_latency_srt.yaml new file mode 100644 index 0000000..cfc81d3 --- /dev/null +++ b/config/profiles/jetson_low_latency_srt.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: jetson + profile.stream: low_latency + + input_topic: /camera_driver_uv/vis/image_raw + + transport.kind: srt + transport.sink_uri: srt://127.0.0.1:9000?mode=listener + transport.latency_ms: 60 + transport.reconnect.enabled: true + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: baseline + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 2500 + codec.gop: 30 + + max_fps: 30.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/jetson_monitoring_udp.yaml b/config/profiles/jetson_monitoring_udp.yaml new file mode 100644 index 0000000..00f2ed2 --- /dev/null +++ b/config/profiles/jetson_monitoring_udp.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: jetson + profile.stream: monitoring_udp + + input_topic: /camera_driver_uv/vis/image_raw + + transport.kind: udp + transport.sink_uri: udp://127.0.0.1:5000 + transport.latency_ms: 40 + transport.reconnect.enabled: false + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: baseline + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 2000 + codec.gop: 30 + + max_fps: 20.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/raspi_low_latency_srt.yaml b/config/profiles/raspi_low_latency_srt.yaml new file mode 100644 index 0000000..ecd0fe5 --- /dev/null +++ b/config/profiles/raspi_low_latency_srt.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: raspi + profile.stream: low_latency + + input_topic: /camera/image_raw + + transport.kind: srt + transport.sink_uri: srt://127.0.0.1:9000?mode=listener + transport.latency_ms: 80 + transport.reconnect.enabled: true + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: main + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 1800 + codec.gop: 30 + + max_fps: 25.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/raspi_monitoring_udp.yaml b/config/profiles/raspi_monitoring_udp.yaml new file mode 100644 index 0000000..b4d03ba --- /dev/null +++ b/config/profiles/raspi_monitoring_udp.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: raspi + profile.stream: monitoring_udp + + input_topic: /camera/image_raw + + transport.kind: udp + transport.sink_uri: udp://127.0.0.1:5000 + transport.latency_ms: 60 + transport.reconnect.enabled: false + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: main + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 1500 + codec.gop: 30 + + max_fps: 15.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/recording_file_sink.yaml b/config/profiles/recording_file_sink.yaml new file mode 100644 index 0000000..d2662a0 --- /dev/null +++ b/config/profiles/recording_file_sink.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: generic + profile.stream: high_quality + + input_topic: /camera/image_raw + + transport.kind: file + transport.sink_uri: /tmp/ros2_gst_video_bridge_recording.ts + transport.latency_ms: 0 + transport.reconnect.enabled: false + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: high + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 5000 + codec.gop: 60 + + max_fps: 30.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/x86_low_latency_srt.yaml b/config/profiles/x86_low_latency_srt.yaml new file mode 100644 index 0000000..b0dad3c --- /dev/null +++ b/config/profiles/x86_low_latency_srt.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: x86 + profile.stream: low_latency + + input_topic: /camera/image_raw + + transport.kind: srt + transport.sink_uri: srt://127.0.0.1:9000?mode=listener + transport.latency_ms: 70 + transport.reconnect.enabled: true + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: high + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 4000 + codec.gop: 30 + + max_fps: 30.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/config/profiles/x86_monitoring_udp.yaml b/config/profiles/x86_monitoring_udp.yaml new file mode 100644 index 0000000..b057f62 --- /dev/null +++ b/config/profiles/x86_monitoring_udp.yaml @@ -0,0 +1,25 @@ +gst_video_bridge: + ros__parameters: + profile.machine: x86 + profile.stream: monitoring_udp + + input_topic: /camera/image_raw + + transport.kind: udp + transport.sink_uri: udp://127.0.0.1:5000 + transport.latency_ms: 50 + transport.reconnect.enabled: false + transport.reconnect.interval_ms: 1000 + transport.reconnect.max_attempts: 0 + + codec.name: h264 + codec.profile: baseline + codec.tune: zerolatency + codec.rate_control: cbr + codec.bitrate_kbps: 2500 + codec.gop: 30 + + max_fps: 20.0 + use_wall_clock_timestamps: false + runtime.mode: stream + runtime.print_effective_config: true diff --git a/docs/RELEASE.md b/docs/RELEASE.md new file mode 100644 index 0000000..74b687b --- /dev/null +++ b/docs/RELEASE.md @@ -0,0 +1,25 @@ +# Release Gate + +## Required Gates + +1. PR gate +- Build + unit tests + smoke tests must pass. + +2. Nightly gate +- Transport/codec matrix script must run and generate report. + +3. Release gate +- Soak run >= 30 minutes per target machine class (Jetson/x86/Raspi/generic as available). + +## KPIs + +Release candidate is blocked if any threshold regresses vs previous stable: +- reconnects/hour +- dropped frames ratio +- startup success ratio +- median and p95 latency estimate + +## Rollback + +- Keep previous release profile bundle and parameter files. +- Revert to last stable tag and documented config migration pair. diff --git a/docs/VERSIONING.md b/docs/VERSIONING.md new file mode 100644 index 0000000..7f3345a --- /dev/null +++ b/docs/VERSIONING.md @@ -0,0 +1,20 @@ +# Versioning Policy + +## Scope + +This policy applies to: +- Runtime node behavior and parameters +- Pipeline profile semantics +- Interface contracts in ros2_gst_video_bridge_msgs + +## SemVer Rules + +- MAJOR: incompatible contract change (message field removals, parameter removals/renames without migration support). +- MINOR: backward-compatible additions (new parameters with defaults, new message fields preserving old behavior). +- PATCH: bug fixes and performance/stability improvements without contract break. + +## Contract Stability + +- `runtime_status` and `runtime_events` topics are versioned by package version. +- Config schema is tracked as `v1alpha`, then `v1` on first production release. +- Deprecated fields require at least one MINOR release overlap before removal. diff --git a/include/ros2_gst_video_bridge/core/config_loader.hpp b/include/ros2_gst_video_bridge/core/config_loader.hpp new file mode 100644 index 0000000..9b6ed6b --- /dev/null +++ b/include/ros2_gst_video_bridge/core/config_loader.hpp @@ -0,0 +1,29 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__CORE__CONFIG_LOADER_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__CORE__CONFIG_LOADER_HPP_ + +#include "ros2_gst_video_bridge/core/gst_bridge_config.hpp" + +#include + +#include +#include + +namespace ros2_gst_video_bridge { + +class ConfigLoader { +public: + static GstBridgeConfig loadFromNode(rclcpp::Node& node); + static std::vector validate(const GstBridgeConfig& config); + static std::string toDebugString(const GstBridgeConfig& config); + +private: + static void applyMachineProfileDefaults(GstBridgeConfig& config); + static void applyStreamProfileDefaults(GstBridgeConfig& config); +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__CORE__CONFIG_LOADER_HPP_ diff --git a/include/ros2_gst_video_bridge/core/gst_bridge_config.hpp b/include/ros2_gst_video_bridge/core/gst_bridge_config.hpp new file mode 100644 index 0000000..c05c3d5 --- /dev/null +++ b/include/ros2_gst_video_bridge/core/gst_bridge_config.hpp @@ -0,0 +1,63 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__CORE__GST_BRIDGE_CONFIG_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__CORE__GST_BRIDGE_CONFIG_HPP_ + +#include + +namespace ros2_gst_video_bridge { + +struct ProfileConfig { + std::string machine{"generic"}; + std::string stream{"default"}; +}; + +struct SourceConfig { + std::string input_topic{"/camera/image_raw"}; +}; + +struct TransportConfig { + std::string kind{"srt"}; + std::string sink_uri{"srt://127.0.0.1:9000?mode=listener"}; + int latency_ms{60}; + + bool reconnect_enabled{true}; + int reconnect_interval_ms{1000}; + int reconnect_max_attempts{0}; // 0 means unlimited +}; + +struct CodecConfig { + std::string name{"h264"}; + std::string encoder{""}; + std::string profile{"baseline"}; + std::string tune{"zerolatency"}; + std::string rate_control{"cbr"}; + + int bitrate_kbps{2000}; + int gop{30}; +}; + +struct RuntimeConfig { + double max_fps{30.0}; + bool use_wall_clock_timestamps{false}; + std::string mode{"stream"}; + bool print_effective_config{true}; +}; + +struct GstConfig { + std::string pipeline_override{}; +}; + +struct GstBridgeConfig { + ProfileConfig profile; + SourceConfig source; + TransportConfig transport; + CodecConfig codec; + RuntimeConfig runtime; + GstConfig gst; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__CORE__GST_BRIDGE_CONFIG_HPP_ diff --git a/include/ros2_gst_video_bridge/core/pipeline_builder.hpp b/include/ros2_gst_video_bridge/core/pipeline_builder.hpp new file mode 100644 index 0000000..b1c3136 --- /dev/null +++ b/include/ros2_gst_video_bridge/core/pipeline_builder.hpp @@ -0,0 +1,20 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__CORE__PIPELINE_BUILDER_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__CORE__PIPELINE_BUILDER_HPP_ + +#include "ros2_gst_video_bridge/core/gst_bridge_config.hpp" + +#include + +namespace ros2_gst_video_bridge { + +class PipelineBuilder { +public: + static std::string build(const GstBridgeConfig& config); +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__CORE__PIPELINE_BUILDER_HPP_ diff --git a/include/ros2_gst_video_bridge/gst_video_bridge_node.hpp b/include/ros2_gst_video_bridge/gst_video_bridge_node.hpp new file mode 100644 index 0000000..434edb7 --- /dev/null +++ b/include/ros2_gst_video_bridge/gst_video_bridge_node.hpp @@ -0,0 +1,116 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__GST_VIDEO_BRIDGE_NODE_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__GST_VIDEO_BRIDGE_NODE_HPP_ + +#include "ros2_gst_video_bridge/core/gst_bridge_config.hpp" + +#include "ros2_gst_video_bridge/runtime/capability_probe.hpp" +#include "ros2_gst_video_bridge/runtime/metrics_publisher.hpp" +#include "ros2_gst_video_bridge/runtime/stream_engine.hpp" +#include "ros2_gst_video_bridge/runtime/topic_introspector.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace ros2_gst_video_bridge { + +class GstVideoBridgeNode : public rclcpp::Node { +public: + explicit GstVideoBridgeNode(const rclcpp::NodeOptions& options = rclcpp::NodeOptions()); + ~GstVideoBridgeNode() override; + + bool hasImmediateExit() const; + int immediateExitCode() const; + +private: + enum class RuntimeState : uint8_t { Connecting, Streaming, Degraded, Reconnecting, Failed }; + + bool handleRuntimeMode(); + void printImageTopics() const; + void printGstCapabilities() const; + bool validateConfiguration() const; + + void initializeModules(); + void initializeSubscriptions(); + void initializeHealthMonitoring(); + void onImage(const sensor_msgs::msg::Image::SharedPtr msg); + void runHealthCheck(); + void evaluateAdaptationPolicy(); + void applyAdaptationLevel(uint8_t level, const std::string& reason); + bool requestPipelineReconfigure(const std::string& reason); + void scheduleReconnect(const std::string& reason); + bool tryReconnect(); + bool requestSoftwareFallback(const std::string& reason); + std::string selectSoftwareEncoderForCodec() const; + void setRuntimeState(RuntimeState new_state, const std::string& reason = ""); + static const char* runtimeStateToString(RuntimeState state); + + void logConfiguration() const; + void logRuntimeCapabilities() const; + void publishRuntimeEvent(const std::string& severity, const std::string& code, + const std::string& message) const; + void publishFirstFailureSnapshot(const std::string& reason); + void handleSetStreamingProfile( + const std::shared_ptr request, + std::shared_ptr response); + static bool isValidAdaptationProfile(const std::string& profile); + std::string buildSessionId() const; + + GstBridgeConfig config_; + std::string effective_pipeline_; + int immediate_exit_code_{-1}; + + std::unique_ptr topic_introspector_; + std::unique_ptr capability_probe_; + std::unique_ptr stream_engine_; + std::unique_ptr metrics_publisher_; + rclcpp::Subscription::SharedPtr image_subscription_; + rclcpp::Service::SharedPtr + set_profile_service_; + rclcpp::TimerBase::SharedPtr health_timer_; + + std::chrono::steady_clock::time_point last_frame_steady_time_{}; + std::chrono::steady_clock::time_point last_reconnect_attempt_{}; + RuntimeState runtime_state_{RuntimeState::Failed}; + bool reconnect_requested_{false}; + int reconnect_attempts_{0}; + uint64_t reconnect_count_{0}; + + std::string session_id_; + std::string stream_id_; + + std::vector encoder_implementations_; + bool auto_codec_requested_{false}; + bool hw_encoder_selected_{false}; + bool sw_fallback_applied_{false}; + bool sw_fallback_requested_{false}; + int consecutive_stream_failures_{0}; + int hw_fallback_failure_threshold_{3}; + + bool adaptation_enabled_{true}; + int adaptation_interval_ms_{2000}; + int adaptation_cooldown_ms_{5000}; + std::string adaptation_profile_{"balanced"}; + uint8_t adaptation_level_{0}; + uint64_t last_adaptation_ns_{0}; + + int base_bitrate_kbps_{2000}; + int base_gop_{30}; + double base_max_fps_{30.0}; + + bool pipeline_reconfigure_requested_{false}; + bool first_failure_snapshot_published_{false}; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__GST_VIDEO_BRIDGE_NODE_HPP_ diff --git a/include/ros2_gst_video_bridge/runtime/capability_probe.hpp b/include/ros2_gst_video_bridge/runtime/capability_probe.hpp new file mode 100644 index 0000000..3c2ab9a --- /dev/null +++ b/include/ros2_gst_video_bridge/runtime/capability_probe.hpp @@ -0,0 +1,28 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__RUNTIME__CAPABILITY_PROBE_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__RUNTIME__CAPABILITY_PROBE_HPP_ + +#include +#include + +namespace ros2_gst_video_bridge { + +class CapabilityProbe { +public: + bool hasGstInspect() const; + std::vector detectPlugins() const; + std::vector detectEncoders() const; + std::vector detectSinks() const; + + std::vector detectHostProfile() const; + std::vector detectEncoderImplementations() const; + + std::vector detectTransports() const; + std::vector detectCodecs() const; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__RUNTIME__CAPABILITY_PROBE_HPP_ diff --git a/include/ros2_gst_video_bridge/runtime/metrics_publisher.hpp b/include/ros2_gst_video_bridge/runtime/metrics_publisher.hpp new file mode 100644 index 0000000..f35e49e --- /dev/null +++ b/include/ros2_gst_video_bridge/runtime/metrics_publisher.hpp @@ -0,0 +1,84 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__RUNTIME__METRICS_PUBLISHER_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__RUNTIME__METRICS_PUBLISHER_HPP_ + +#include +#include +#include +#include + +#include +#include +#include + +namespace ros2_gst_video_bridge { + +class MetricsPublisher { +public: + explicit MetricsPublisher(rclcpp::Node& node); + + void setSessionMetadata(const std::string& session_id, const std::string& stream_id, + const std::string& source_topic, const std::string& codec_name, + const std::string& encoder_name, const std::string& transport_kind, + const std::string& sink_uri); + void setEffectiveEncoding(uint32_t bitrate_kbps, uint32_t gop, float max_fps); + void setAdaptationState(const std::string& profile, uint8_t level); + void setCodecSelectionFlags(bool auto_codec_requested, bool hw_selected, + bool sw_fallback_applied); + void publishHeartbeat(); + void recordFrameIn(uint64_t now_ns, uint64_t source_timestamp_ns); + void recordFrameOut(); + void recordFrameDropped(); + void recordReconnect(); + void updateRuntimeState(const std::string& runtime_state); + void publishEvent(const std::string& severity, const std::string& code, + const std::string& message); + +private: + ros2_gst_video_bridge_msgs::msg::RuntimeStatus buildStatusSnapshot(uint64_t now_ns, + double elapsed_s); + std::string buildLegacyPayload(const ros2_gst_video_bridge_msgs::msg::RuntimeStatus& status, + uint64_t drops_since_last) const; + void publishSnapshot(); + + rclcpp::Node& node_; + rclcpp::Publisher::SharedPtr metrics_pub_; + rclcpp::Publisher::SharedPtr status_pub_; + rclcpp::Publisher::SharedPtr event_pub_; + rclcpp::TimerBase::SharedPtr publish_timer_; + + std::atomic frames_in_{0}; + std::atomic frames_out_{0}; + std::atomic frames_dropped_{0}; + std::atomic reconnect_count_{0}; + + uint64_t last_frames_in_{0}; + uint64_t last_frames_out_{0}; + uint64_t last_frames_dropped_{0}; + uint64_t last_publish_time_ns_{0}; + std::atomic latency_estimate_ms_{0.0}; + std::string runtime_state_{"connecting"}; + + std::string session_id_; + std::string stream_id_{"default"}; + std::string source_topic_; + std::string codec_name_; + std::string encoder_name_; + std::string transport_kind_; + std::string sink_uri_; + std::string adaptation_profile_{"balanced"}; + uint8_t adaptation_level_{0}; + bool auto_codec_requested_{false}; + bool hw_encoder_selected_{false}; + bool sw_fallback_applied_{false}; + + uint32_t bitrate_kbps_effective_{0}; + uint32_t gop_effective_{0}; + float max_fps_effective_{0.0F}; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__RUNTIME__METRICS_PUBLISHER_HPP_ diff --git a/include/ros2_gst_video_bridge/runtime/stream_engine.hpp b/include/ros2_gst_video_bridge/runtime/stream_engine.hpp new file mode 100644 index 0000000..0e32e82 --- /dev/null +++ b/include/ros2_gst_video_bridge/runtime/stream_engine.hpp @@ -0,0 +1,55 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__RUNTIME__STREAM_ENGINE_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__RUNTIME__STREAM_ENGINE_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ros2_gst_video_bridge { + +class StreamEngine { +public: + explicit StreamEngine(std::string pipeline); + ~StreamEngine(); + + bool start(); + void stop(); + bool isRunning() const; + const std::string& pipeline() const; + bool pushFrame(const uint8_t* data, size_t size, int width, int height, + const std::string& gst_format, uint64_t timestamp_ns); + std::string lastError() const; + +private: + bool setAppSrcCaps(int width, int height, const std::string& gst_format); + void processBusMessages(GstClockTime timeout); + void busLoop(std::stop_token stop_token); + + std::string pipeline_; + std::string last_error_; + GstElement* pipeline_element_{nullptr}; + GstElement* appsrc_{nullptr}; + GstBus* bus_{nullptr}; + + int caps_width_{0}; + int caps_height_{0}; + std::string caps_format_; + + std::atomic_bool running_{false}; + bool gst_initialized_{false}; + std::jthread bus_thread_; + mutable std::mutex mutex_; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__RUNTIME__STREAM_ENGINE_HPP_ diff --git a/include/ros2_gst_video_bridge/runtime/topic_introspector.hpp b/include/ros2_gst_video_bridge/runtime/topic_introspector.hpp new file mode 100644 index 0000000..9bc886d --- /dev/null +++ b/include/ros2_gst_video_bridge/runtime/topic_introspector.hpp @@ -0,0 +1,28 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#ifndef ROS2_GST_VIDEO_BRIDGE__RUNTIME__TOPIC_INTROSPECTOR_HPP_ +#define ROS2_GST_VIDEO_BRIDGE__RUNTIME__TOPIC_INTROSPECTOR_HPP_ + +#include + +#include +#include +#include + +namespace ros2_gst_video_bridge { + +class TopicIntrospector { +public: + explicit TopicIntrospector(rclcpp::Node& node); + + std::map> listTopics() const; + std::map> listImageTopics() const; + +private: + rclcpp::Node& node_; +}; + +} // namespace ros2_gst_video_bridge + +#endif // ROS2_GST_VIDEO_BRIDGE__RUNTIME__TOPIC_INTROSPECTOR_HPP_ diff --git a/launch/gst_video_bridge.launch.py b/launch/gst_video_bridge.launch.py new file mode 100644 index 0000000..90c4768 --- /dev/null +++ b/launch/gst_video_bridge.launch.py @@ -0,0 +1,19 @@ +from launch import LaunchDescription +from launch.actions import IncludeLaunchDescription +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch.substitutions import PathJoinSubstitution +from launch_ros.substitutions import FindPackageShare + + +def generate_launch_description(): + return LaunchDescription([ + IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution([ + FindPackageShare('ros2_gst_video_bridge'), + 'launch', + 'gst_video_bridge_advanced.launch.py', + ]) + ) + ) + ]) diff --git a/launch/gst_video_bridge_advanced.launch.py b/launch/gst_video_bridge_advanced.launch.py new file mode 100644 index 0000000..423c194 --- /dev/null +++ b/launch/gst_video_bridge_advanced.launch.py @@ -0,0 +1,99 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.conditions import IfCondition +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution, PythonExpression +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare + + +def generate_launch_description(): + params_file = LaunchConfiguration('params_file') + input_topic = LaunchConfiguration('input_topic') + enable_debayer = LaunchConfiguration('enable_debayer') + debayer_output_topic = LaunchConfiguration('debayer_output_topic') + bridge_input_topic = PythonExpression([ + "'", debayer_output_topic, "' if '", enable_debayer, + "' == 'true' else '", input_topic, "'", + ]) + + return LaunchDescription([ + DeclareLaunchArgument( + 'params_file', + default_value=PathJoinSubstitution([ + FindPackageShare('ros2_gst_video_bridge'), + 'config', + 'default_params.yaml', + ]), + description='YAML parameter file with full transport/codec/runtime overrides', + ), + DeclareLaunchArgument('profile_machine', default_value='generic'), + DeclareLaunchArgument('profile_stream', default_value='low_latency'), + DeclareLaunchArgument('input_topic', default_value='/camera/image_raw'), + DeclareLaunchArgument('enable_debayer', default_value='false'), + DeclareLaunchArgument('debayer_output_topic', default_value='/camera/image_color'), + DeclareLaunchArgument('transport_kind', default_value='srt'), + DeclareLaunchArgument('sink_uri', default_value='srt://127.0.0.1:9000?mode=listener'), + DeclareLaunchArgument('transport_latency_ms', default_value='60'), + DeclareLaunchArgument('reconnect_enabled', default_value='true'), + DeclareLaunchArgument('reconnect_interval_ms', default_value='1000'), + DeclareLaunchArgument('reconnect_max_attempts', default_value='0'), + DeclareLaunchArgument('codec_name', default_value='auto'), + DeclareLaunchArgument('codec_profile', default_value='baseline'), + DeclareLaunchArgument('codec_tune', default_value='zerolatency'), + DeclareLaunchArgument('codec_rate_control', default_value='cbr'), + DeclareLaunchArgument('codec_bitrate_kbps', default_value='2000'), + DeclareLaunchArgument('codec_gop', default_value='30'), + DeclareLaunchArgument('max_fps', default_value='30.0'), + DeclareLaunchArgument('use_wall_clock_timestamps', default_value='false'), + DeclareLaunchArgument('runtime_mode', default_value='stream'), + DeclareLaunchArgument('print_effective_config', default_value='true'), + DeclareLaunchArgument('pipeline_override', default_value=''), + Node( + package='image_proc', + executable='debayer_node', + name='gst_video_bridge_debayer', + output='screen', + condition=IfCondition(enable_debayer), + remappings=[ + ('image_raw', input_topic), + ('image_color', debayer_output_topic), + ], + ), + Node( + package='ros2_gst_video_bridge', + executable='gst_video_bridge_node', + name='gst_video_bridge', + output='screen', + parameters=[ + params_file, + { + 'profile.machine': LaunchConfiguration('profile_machine'), + 'profile.stream': LaunchConfiguration('profile_stream'), + 'input_topic': bridge_input_topic, + 'transport.kind': LaunchConfiguration('transport_kind'), + 'transport.sink_uri': LaunchConfiguration('sink_uri'), + 'transport.latency_ms': LaunchConfiguration('transport_latency_ms'), + 'transport.reconnect.enabled': LaunchConfiguration('reconnect_enabled'), + 'transport.reconnect.interval_ms': LaunchConfiguration( + 'reconnect_interval_ms' + ), + 'transport.reconnect.max_attempts': LaunchConfiguration( + 'reconnect_max_attempts' + ), + 'codec.name': LaunchConfiguration('codec_name'), + 'codec.profile': LaunchConfiguration('codec_profile'), + 'codec.tune': LaunchConfiguration('codec_tune'), + 'codec.rate_control': LaunchConfiguration('codec_rate_control'), + 'codec.bitrate_kbps': LaunchConfiguration('codec_bitrate_kbps'), + 'codec.gop': LaunchConfiguration('codec_gop'), + 'max_fps': LaunchConfiguration('max_fps'), + 'use_wall_clock_timestamps': LaunchConfiguration('use_wall_clock_timestamps'), + 'runtime.mode': LaunchConfiguration('runtime_mode'), + 'runtime.print_effective_config': LaunchConfiguration( + 'print_effective_config' + ), + 'gst.pipeline_override': LaunchConfiguration('pipeline_override'), + }, + ], + ), + ]) diff --git a/launch/gst_video_bridge_minimal.launch.py b/launch/gst_video_bridge_minimal.launch.py new file mode 100644 index 0000000..da27ea9 --- /dev/null +++ b/launch/gst_video_bridge_minimal.launch.py @@ -0,0 +1,85 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.conditions import IfCondition +from launch.substitutions import LaunchConfiguration, PythonExpression +from launch_ros.actions import Node + + +def generate_launch_description(): + profile_machine = LaunchConfiguration('profile_machine') + profile_stream = LaunchConfiguration('profile_stream') + input_topic = LaunchConfiguration('input_topic') + enable_debayer = LaunchConfiguration('enable_debayer') + debayer_output_topic = LaunchConfiguration('debayer_output_topic') + sink_uri = LaunchConfiguration('sink_uri') + codec_name = LaunchConfiguration('codec_name') + bridge_input_topic = PythonExpression([ + "'", debayer_output_topic, "' if '", enable_debayer, + "' == 'true' else '", input_topic, "'", + ]) + + return LaunchDescription([ + DeclareLaunchArgument( + 'profile_machine', + default_value='generic', + description='Machine profile: generic|jetson|x86|raspi', + ), + DeclareLaunchArgument( + 'profile_stream', + default_value='low_latency', + description=( + 'Stream profile: ' + 'default|low_latency|low_bandwidth|high_quality|monitoring_udp' + ), + ), + DeclareLaunchArgument( + 'input_topic', + default_value='/camera/image_raw', + description='ROS image input topic', + ), + DeclareLaunchArgument( + 'enable_debayer', + default_value='false', + description='Enable image_proc debayer node before bridge (true|false)', + ), + DeclareLaunchArgument( + 'debayer_output_topic', + default_value='/camera/image_color', + description='Debayered image topic used by the bridge when enable_debayer=true', + ), + DeclareLaunchArgument( + 'sink_uri', + default_value='srt://127.0.0.1:9000?mode=listener', + description='Transport URI (SRT/UDP/RTSP/file depending on profile/transport)', + ), + DeclareLaunchArgument( + 'codec_name', + default_value='auto', + description='Codec selection: auto|h264|h265|mjpeg', + ), + Node( + package='image_proc', + executable='debayer_node', + name='gst_video_bridge_debayer', + output='screen', + condition=IfCondition(enable_debayer), + remappings=[ + ('image_raw', input_topic), + ('image_color', debayer_output_topic), + ], + ), + Node( + package='ros2_gst_video_bridge', + executable='gst_video_bridge_node', + name='gst_video_bridge', + output='screen', + parameters=[{ + 'profile.machine': profile_machine, + 'profile.stream': profile_stream, + 'input_topic': bridge_input_topic, + 'codec.name': codec_name, + 'transport.sink_uri': sink_uri, + 'runtime.mode': 'stream', + }], + ), + ]) diff --git a/package.xml b/package.xml new file mode 100644 index 0000000..9096bc3 --- /dev/null +++ b/package.xml @@ -0,0 +1,25 @@ + + + ros2_gst_video_bridge + 0.1.0 + Generic ROS 2 video bridge backed by configurable GStreamer pipelines. + + Mouhsine Kassimi Farhaoui + AGPL-3.0-only + + ament_cmake + + rclcpp + sensor_msgs + std_msgs + ros2_gst_video_bridge_msgs + image_proc + + ament_lint_auto + ament_lint_common + ament_cmake_gtest + + + ament_cmake + + diff --git a/scripts/run_soak_profile.zsh b/scripts/run_soak_profile.zsh new file mode 100644 index 0000000..5c80aa6 --- /dev/null +++ b/scripts/run_soak_profile.zsh @@ -0,0 +1,24 @@ +#!/usr/bin/env zsh +set -euo pipefail + +source /opt/ros/humble/setup.zsh +source "${1:-/home/ccu-001/ws_dev}/install/setup.zsh" + +duration_sec="${2:-1800}" +profile_machine="${3:-generic}" +profile_stream="${4:-low_latency}" +input_topic="${5:-/camera/image_raw}" +sink_uri="${6:-srt://127.0.0.1:9000?mode=listener}" + +log_file="/tmp/bridge_soak_${profile_machine}_${profile_stream}_$(date +%Y%m%d_%H%M%S).log" + +timeout "${duration_sec}s" ros2 launch ros2_gst_video_bridge gst_video_bridge_minimal.launch.py \ + profile_machine:="${profile_machine}" \ + profile_stream:="${profile_stream}" \ + input_topic:="${input_topic}" \ + sink_uri:="${sink_uri}" \ + codec_name:=auto \ + runtime_mode:=stream \ + >"${log_file}" 2>&1 || true + +echo "Soak run finished. Log: ${log_file}" diff --git a/scripts/run_transport_codec_matrix.zsh b/scripts/run_transport_codec_matrix.zsh new file mode 100644 index 0000000..1a5c056 --- /dev/null +++ b/scripts/run_transport_codec_matrix.zsh @@ -0,0 +1,40 @@ +#!/usr/bin/env zsh +set -euo pipefail + +source /opt/ros/humble/setup.zsh +source "${1:-/home/ccu-001/ws_dev}/install/setup.zsh" + +declare -a codecs=(h264 h265 mjpeg) +declare -a transports=(srt udp rtsp file) + +report="${2:-/tmp/ros2_gst_video_bridge_matrix.csv}" +echo "codec,transport,result,details" > "$report" + +for codec in "${codecs[@]}"; do + for transport in "${transports[@]}"; do + sink_uri="" + case "$transport" in + srt) sink_uri="srt://127.0.0.1:9000?mode=listener" ;; + udp) sink_uri="udp://127.0.0.1:5000" ;; + rtsp) sink_uri="rtsp://127.0.0.1:8554/live" ;; + file) + ext="ts" + [[ "$codec" == "mjpeg" ]] && ext="mkv" + sink_uri="/tmp/bridge_${codec}.${ext}" + ;; + esac + + if timeout 10s ros2 run ros2_gst_video_bridge gst_video_bridge_node --ros-args \ + -p runtime.mode:=validate_config \ + -p codec.name:="$codec" \ + -p transport.kind:="$transport" \ + -p transport.sink_uri:="$sink_uri" >/tmp/bridge_matrix_case.log 2>&1; then + echo "${codec},${transport},PASS,validate_config" >> "$report" + else + detail=$(tr '\n' ' ' > "$report" + fi + done +done + +echo "Matrix report written to: $report" diff --git a/src/core/config_loader.cc b/src/core/config_loader.cc new file mode 100644 index 0000000..4b9111e --- /dev/null +++ b/src/core/config_loader.cc @@ -0,0 +1,280 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/core/config_loader.hpp" + +#include +#include + +namespace ros2_gst_video_bridge { + +namespace { + +bool inSet(const std::string& value, const std::set& allowed) { + return allowed.find(value) != allowed.end(); +} + +bool startsWith(const std::string& value, const std::string& prefix) { + return value.rfind(prefix, 0) == 0; +} + +std::string forceSrtListenerMode(const std::string& sink_uri) { + if (sink_uri.rfind("srt://", 0) != 0) { + return sink_uri; + } + + std::string normalized = sink_uri; + const auto mode_pos = normalized.find("mode="); + if (mode_pos == std::string::npos) { + const char separator = normalized.find('?') == std::string::npos ? '?' : '&'; + normalized += separator; + normalized += "mode=listener"; + return normalized; + } + + const auto value_begin = mode_pos + 5; + const auto value_end = normalized.find('&', value_begin); + normalized.replace(value_begin, + value_end == std::string::npos ? std::string::npos : value_end - value_begin, + "listener"); + return normalized; +} + +} // namespace + +GstBridgeConfig ConfigLoader::loadFromNode(rclcpp::Node& node) { + GstBridgeConfig config; + + config.profile.machine = + node.declare_parameter("profile.machine", config.profile.machine); + config.profile.stream = + node.declare_parameter("profile.stream", config.profile.stream); + + applyMachineProfileDefaults(config); + applyStreamProfileDefaults(config); + + config.source.input_topic = + node.declare_parameter("input_topic", config.source.input_topic); + + config.transport.kind = + node.declare_parameter("transport.kind", config.transport.kind); + config.transport.kind = + node.declare_parameter("gst.transport", config.transport.kind); + config.transport.sink_uri = + node.declare_parameter("transport.sink_uri", config.transport.sink_uri); + config.transport.sink_uri = + node.declare_parameter("gst.sink_uri", config.transport.sink_uri); + if (config.transport.kind == "srt") { + config.transport.sink_uri = forceSrtListenerMode(config.transport.sink_uri); + } + config.transport.latency_ms = + node.declare_parameter("transport.latency_ms", config.transport.latency_ms); + config.transport.latency_ms = + node.declare_parameter("gst.latency_ms", config.transport.latency_ms); + config.transport.reconnect_enabled = node.declare_parameter( + "transport.reconnect.enabled", config.transport.reconnect_enabled); + config.transport.reconnect_interval_ms = node.declare_parameter( + "transport.reconnect.interval_ms", config.transport.reconnect_interval_ms); + config.transport.reconnect_max_attempts = node.declare_parameter( + "transport.reconnect.max_attempts", config.transport.reconnect_max_attempts); + + config.codec.name = node.declare_parameter("codec.name", config.codec.name); + config.codec.name = node.declare_parameter("gst.codec", config.codec.name); + config.codec.encoder = node.declare_parameter("codec.encoder", config.codec.encoder); + config.codec.profile = node.declare_parameter("codec.profile", config.codec.profile); + config.codec.profile = node.declare_parameter("gst.profile", config.codec.profile); + config.codec.tune = node.declare_parameter("codec.tune", config.codec.tune); + config.codec.rate_control = + node.declare_parameter("codec.rate_control", config.codec.rate_control); + config.codec.bitrate_kbps = + node.declare_parameter("codec.bitrate_kbps", config.codec.bitrate_kbps); + config.codec.bitrate_kbps = + node.declare_parameter("gst.bitrate_kbps", config.codec.bitrate_kbps); + config.codec.gop = node.declare_parameter("codec.gop", config.codec.gop); + + config.runtime.max_fps = node.declare_parameter("max_fps", config.runtime.max_fps); + config.runtime.use_wall_clock_timestamps = node.declare_parameter( + "use_wall_clock_timestamps", config.runtime.use_wall_clock_timestamps); + config.runtime.mode = node.declare_parameter("runtime.mode", config.runtime.mode); + config.runtime.print_effective_config = node.declare_parameter( + "runtime.print_effective_config", config.runtime.print_effective_config); + + config.gst.pipeline_override = + node.declare_parameter("gst.pipeline_override", config.gst.pipeline_override); + + return config; +} + +void ConfigLoader::applyMachineProfileDefaults(GstBridgeConfig& config) { + if (config.profile.machine == "jetson") { + config.codec.name = "h264"; + config.codec.tune = "zerolatency"; + config.codec.bitrate_kbps = 2500; + config.codec.gop = 30; + config.transport.latency_ms = 70; + } else if (config.profile.machine == "x86") { + config.codec.name = "h264"; + config.codec.bitrate_kbps = 4000; + config.codec.gop = 60; + config.transport.latency_ms = 90; + } else if (config.profile.machine == "raspi") { + config.codec.name = "h264"; + config.codec.profile = "main"; + config.codec.tune = "zerolatency"; + config.codec.bitrate_kbps = 1800; + config.codec.gop = 30; + config.runtime.max_fps = 25.0; + config.transport.latency_ms = 80; + } +} + +void ConfigLoader::applyStreamProfileDefaults(GstBridgeConfig& config) { + if (config.profile.stream == "low_bandwidth") { + config.runtime.max_fps = 15.0; + config.codec.bitrate_kbps = 1200; + config.codec.gop = 30; + config.transport.kind = "srt"; + config.transport.latency_ms = 120; + } else if (config.profile.stream == "low_latency") { + config.runtime.max_fps = 30.0; + config.codec.bitrate_kbps = 2000; + config.codec.gop = 30; + config.codec.tune = "zerolatency"; + config.transport.kind = "srt"; + config.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + config.transport.latency_ms = 60; + } else if (config.profile.stream == "high_quality") { + config.runtime.max_fps = 30.0; + config.codec.bitrate_kbps = 6000; + config.codec.gop = 60; + config.transport.kind = "srt"; + config.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + config.transport.latency_ms = 100; + } else if (config.profile.stream == "monitoring_udp") { + config.transport.kind = "udp"; + config.transport.sink_uri = "udp://127.0.0.1:5000"; + config.codec.bitrate_kbps = 2500; + } +} + +std::vector ConfigLoader::validate(const GstBridgeConfig& config) { + std::vector errors; + + const std::set valid_machines{"generic", "jetson", "x86", "raspi"}; + const std::set valid_stream_profiles{"default", "low_latency", "low_bandwidth", + "high_quality", "monitoring_udp"}; + const std::set valid_modes{"stream", "list_topics", "list_capabilities", + "validate_config", "discover"}; + const std::set valid_transports{"srt", "rtsp", "udp", "file"}; + const std::set valid_codecs{"auto", "h264", "h265", "mjpeg"}; + + if (!inSet(config.profile.machine, valid_machines)) { + errors.emplace_back( + "profile.machine must be one of [generic, jetson, x86, raspi]. Current value: '" + + config.profile.machine + "'."); + } + + if (!inSet(config.profile.stream, valid_stream_profiles)) { + errors.emplace_back( + "profile.stream must be one of [default, low_latency, low_bandwidth, high_quality, " + "monitoring_udp]. Current value: '" + + config.profile.stream + "'."); + } + + if (!inSet(config.runtime.mode, valid_modes)) { + errors.emplace_back( + "runtime.mode must be one of [stream, list_topics, list_capabilities, validate_config, " + "discover]. Current value: '" + + config.runtime.mode + "'."); + } + + if (config.source.input_topic.empty() || config.source.input_topic.front() != '/') { + errors.emplace_back("input_topic must be an absolute ROS topic starting with '/'. Example: " + "'/camera/image_raw'."); + } + + if (!inSet(config.transport.kind, valid_transports)) { + errors.emplace_back("transport.kind must be one of [srt, rtsp, udp, file]. Current value: '" + + config.transport.kind + "'."); + } + + if (config.gst.pipeline_override.empty() && config.transport.sink_uri.empty()) { + errors.emplace_back( + "transport.sink_uri cannot be empty unless gst.pipeline_override is provided."); + } + + if (!config.gst.pipeline_override.empty()) { + // Custom pipeline override takes full responsibility for compatibility checks. + } else if (config.transport.kind == "srt" && !startsWith(config.transport.sink_uri, "srt://")) { + errors.emplace_back("transport.sink_uri must start with 'srt://' when transport.kind=srt."); + } else if (config.transport.kind == "rtsp" && !startsWith(config.transport.sink_uri, "rtsp://")) { + errors.emplace_back("transport.sink_uri must start with 'rtsp://' when transport.kind=rtsp."); + } else if (config.transport.kind == "udp" && !startsWith(config.transport.sink_uri, "udp://")) { + errors.emplace_back("transport.sink_uri must start with 'udp://' when transport.kind=udp."); + } else if (config.transport.kind == "file" && startsWith(config.transport.sink_uri, "udp://")) { + errors.emplace_back( + "transport.sink_uri for transport.kind=file must be a file path, not UDP URI."); + } + + if (config.transport.latency_ms < 0) { + errors.emplace_back("transport.latency_ms must be >= 0."); + } + + if (config.transport.reconnect_enabled && config.transport.reconnect_interval_ms <= 0) { + errors.emplace_back("transport.reconnect.interval_ms must be > 0 when reconnect is enabled."); + } + + if (config.transport.reconnect_max_attempts < 0) { + errors.emplace_back("transport.reconnect.max_attempts must be >= 0 (0 means unlimited)."); + } + + if (!inSet(config.codec.name, valid_codecs)) { + errors.emplace_back("codec.name must be one of [auto, h264, h265, mjpeg]. Current value: '" + + config.codec.name + "'."); + } + + if (config.codec.bitrate_kbps <= 0) { + errors.emplace_back("codec.bitrate_kbps must be > 0."); + } + + if (config.codec.gop <= 0) { + errors.emplace_back("codec.gop must be > 0."); + } + + if (config.runtime.max_fps <= 0.0) { + errors.emplace_back("max_fps must be > 0."); + } + + return errors; +} + +std::string ConfigLoader::toDebugString(const GstBridgeConfig& config) { + std::ostringstream out; + out << "profile.machine=" << config.profile.machine << '\n'; + out << "profile.stream=" << config.profile.stream << '\n'; + out << "input_topic=" << config.source.input_topic << '\n'; + out << "transport.kind=" << config.transport.kind << '\n'; + out << "transport.sink_uri=" << config.transport.sink_uri << '\n'; + out << "transport.latency_ms=" << config.transport.latency_ms << '\n'; + out << "transport.reconnect.enabled=" << (config.transport.reconnect_enabled ? "true" : "false") + << '\n'; + out << "transport.reconnect.interval_ms=" << config.transport.reconnect_interval_ms << '\n'; + out << "transport.reconnect.max_attempts=" << config.transport.reconnect_max_attempts << '\n'; + out << "codec.name=" << config.codec.name << '\n'; + out << "codec.encoder=" << config.codec.encoder << '\n'; + out << "codec.profile=" << config.codec.profile << '\n'; + out << "codec.tune=" << config.codec.tune << '\n'; + out << "codec.rate_control=" << config.codec.rate_control << '\n'; + out << "codec.bitrate_kbps=" << config.codec.bitrate_kbps << '\n'; + out << "codec.gop=" << config.codec.gop << '\n'; + out << "max_fps=" << config.runtime.max_fps << '\n'; + out << "use_wall_clock_timestamps=" + << (config.runtime.use_wall_clock_timestamps ? "true" : "false") << '\n'; + out << "runtime.mode=" << config.runtime.mode << '\n'; + out << "runtime.print_effective_config=" + << (config.runtime.print_effective_config ? "true" : "false") << '\n'; + out << "gst.pipeline_override=" << config.gst.pipeline_override; + return out.str(); +} + +} // namespace ros2_gst_video_bridge diff --git a/src/core/pipeline_builder.cc b/src/core/pipeline_builder.cc new file mode 100644 index 0000000..d557307 --- /dev/null +++ b/src/core/pipeline_builder.cc @@ -0,0 +1,186 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include +#include +#include + +namespace ros2_gst_video_bridge { + +namespace { + +std::string toLower(std::string value) { + std::transform(value.begin(), value.end(), value.begin(), + [](unsigned char c) { return static_cast(std::tolower(c)); }); + return value; +} + +std::pair parseUdpHostPort(const std::string& sink_uri) { + constexpr const char* kPrefix = "udp://"; + if (sink_uri.rfind(kPrefix, 0) != 0) { + return {"127.0.0.1", "5000"}; + } + + const auto host_port = sink_uri.substr(std::char_traits::length(kPrefix)); + const auto separator = host_port.find(':'); + if (separator == std::string::npos) { + return {host_port, "5000"}; + } + + const auto host = host_port.substr(0, separator); + const auto port = host_port.substr(separator + 1); + if (host.empty() || port.empty()) { + return {"127.0.0.1", "5000"}; + } + + return {host, port}; +} + +bool isOneOf(const std::string& value, const std::initializer_list& items) { + for (const auto* item : items) { + if (value == item) { + return true; + } + } + return false; +} + +std::string buildEncoderChain(const GstBridgeConfig& config, const std::string& codec, + const std::string& selected_encoder) { + const std::string encoder = + selected_encoder.empty() + ? (codec == "h265" ? "x265enc" : (codec == "mjpeg" ? "jpegenc" : "x264enc")) + : selected_encoder; + + std::ostringstream chain; + + const bool is_nv_v4l2_h264 = encoder == "nvv4l2h264enc"; + const bool is_nv_v4l2_h265 = encoder == "nvv4l2h265enc"; + const bool is_nv_v4l2 = is_nv_v4l2_h264 || is_nv_v4l2_h265; + + if (is_nv_v4l2) { + // nvv4l2 encoders on Jetson require NVMM input; convert from appsrc CPU memory first. + chain << "videoconvert ! " + << "video/x-raw,format=I420 ! " + << "nvvidconv ! " + << "video/x-raw(memory:NVMM),format=NV12 ! " << encoder << " " + << "bitrate=" << (config.codec.bitrate_kbps * 1000) << " " + << "control-rate=" << (config.codec.rate_control == "vbr" ? 0 : 1) << " " + << "iframeinterval=" << config.codec.gop << " " + << "insert-sps-pps=true " + << "insert-vui=true " + << "maxperf-enable=true " + << "! "; + + if (is_nv_v4l2_h265 || codec == "h265") { + chain << "h265parse config-interval=1 ! "; + } else { + chain << "h264parse config-interval=1 ! "; + } + + return chain.str(); + } + + if (encoder == "x264enc") { + chain << "videoconvert ! " + << "video/x-raw,format=I420 ! " + << "x264enc " + << "tune=" << config.codec.tune << " " + << "bitrate=" << config.codec.bitrate_kbps << " " + << "key-int-max=" << config.codec.gop << " " + << "speed-preset=ultrafast " + << "! h264parse config-interval=1 ! "; + return chain.str(); + } + + if (encoder == "x265enc") { + chain << "videoconvert ! " + << "video/x-raw,format=I420 ! " + << "x265enc " + << "tune=" << config.codec.tune << " " + << "bitrate=" << config.codec.bitrate_kbps << " " + << "key-int-max=" << config.codec.gop << " " + << "! h265parse config-interval=1 ! "; + return chain.str(); + } + + if (encoder == "nvjpegenc") { + chain << "videoconvert ! " + << "video/x-raw,format=I420 ! " + << "nvvidconv ! " + << "video/x-raw(memory:NVMM),format=I420 ! " + << "nvjpegenc ! "; + return chain.str(); + } + + if (isOneOf(encoder, {"jpegenc", "vaapijpegenc", "v4l2jpegenc"})) { + chain << "videoconvert ! " + << "video/x-raw,format=I420 ! " << encoder << " ! "; + return chain.str(); + } + + // Conservative generic branch for unknown encoders. + chain << "videoconvert ! " << encoder << " ! "; + return chain.str(); +} + +void appendPayloadOrMux(std::ostringstream& pipeline, const std::string& transport, + const std::string& codec) { + if (transport == "udp" || transport == "rtsp") { + if (codec == "h265") { + pipeline << "rtph265pay config-interval=1 pt=96 ! "; + } else if (codec == "mjpeg") { + pipeline << "rtpjpegpay pt=26 ! "; + } else { + pipeline << "rtph264pay config-interval=1 pt=96 ! "; + } + return; + } + + if (codec == "mjpeg") { + pipeline << "matroskamux streamable=true ! "; + } else { + pipeline << "mpegtsmux alignment=7 ! "; + } +} + +} // namespace + +std::string PipelineBuilder::build(const GstBridgeConfig& config) { + if (!config.gst.pipeline_override.empty()) { + return config.gst.pipeline_override; + } + + const auto codec = toLower(config.codec.name); + const auto encoder = toLower(config.codec.encoder); + const auto transport = toLower(config.transport.kind); + + std::ostringstream pipeline; + pipeline << "appsrc name=bridge_appsrc is-live=true format=time do-timestamp=true " + << "block=true ! " + << "queue leaky=downstream max-size-buffers=2 max-size-time=0 max-size-bytes=0 ! " + << buildEncoderChain(config, codec, encoder); + + appendPayloadOrMux(pipeline, transport, codec); + + if (transport == "udp") { + const auto [host, port] = parseUdpHostPort(config.transport.sink_uri); + pipeline << "udpsink host=" << host << " port=" << port << " sync=false async=false"; + } else if (transport == "rtsp") { + pipeline << "rtspclientsink location=" << config.transport.sink_uri << " protocols=tcp"; + } else if (transport == "file") { + pipeline << "filesink location=" << config.transport.sink_uri; + } else { + pipeline << "srtsink " + << "uri=" << config.transport.sink_uri << " " + << "latency=" << config.transport.latency_ms << " " + << "wait-for-connection=false " + << "sync=false async=false"; + } + + return pipeline.str(); +} + +} // namespace ros2_gst_video_bridge diff --git a/src/gst_video_bridge_node.cc b/src/gst_video_bridge_node.cc new file mode 100644 index 0000000..631cf98 --- /dev/null +++ b/src/gst_video_bridge_node.cc @@ -0,0 +1,11 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +namespace ros2_gst_video_bridge { + +// Implementations are split across dedicated translation units to keep files +// focused and below the project line-limit policy. + +} // namespace ros2_gst_video_bridge diff --git a/src/main.cc b/src/main.cc new file mode 100644 index 0000000..4f28e96 --- /dev/null +++ b/src/main.cc @@ -0,0 +1,21 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include + +int main(int argc, char** argv) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + + if (node->hasImmediateExit()) { + const int code = node->immediateExitCode(); + rclcpp::shutdown(); + return code; + } + + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/src/node/core.cc b/src/node/core.cc new file mode 100644 index 0000000..2591acf --- /dev/null +++ b/src/node/core.cc @@ -0,0 +1,281 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include "ros2_gst_video_bridge/core/config_loader.hpp" +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include +#include +#include +namespace ros2_gst_video_bridge { +namespace { + +bool startsWith(const std::string& value, const std::string_view prefix) { + return value.rfind(std::string(prefix), 0) == 0; +} + +std::string valueOf(const std::vector& entries, const std::string& key) { + for (const auto& entry : entries) { + const auto sep = entry.find('='); + if (sep == std::string::npos) { + continue; + } + if (entry.substr(0, sep) == key) { + return entry.substr(sep + 1); + } + } + return ""; +} + +std::string detectMachineProfile(const std::string& configured_machine, + const std::vector& host_profile) { + if (configured_machine != "generic") { + return configured_machine; + } + + const std::string platform_hint = valueOf(host_profile, "platform_hint"); + const std::string runtime_arch = valueOf(host_profile, "runtime_arch"); + + if (platform_hint == "jetson_or_nvidia") { + return "jetson"; + } + + if (runtime_arch == "x86_64" || runtime_arch == "amd64" || runtime_arch == "x86" || + runtime_arch == "i386") { + return "x86"; + } + + if (runtime_arch == "aarch64" || startsWith(runtime_arch, "arm")) { + return "raspi"; + } + + return "generic"; +} + +std::string extractElementName(const std::string& implementation) { + const auto arrow = implementation.find(" -> "); + if (arrow == std::string::npos) { + return ""; + } + + const auto name_start = arrow + 4; + const auto bracket = implementation.find(" [", name_start); + if (bracket == std::string::npos || bracket <= name_start) { + return ""; + } + + return implementation.substr(name_start, bracket - name_start); +} + +bool isHardwareImplementation(const std::string& implementation) { + return implementation.find("[hw:") != std::string::npos; +} + +int implementationScore(const std::string& machine, const std::string& impl) { + const bool is_sw = impl.find("[sw]") != std::string::npos; + const bool is_nv_v4l2 = impl.find("[hw:nvidia-v4l2]") != std::string::npos; + const bool is_nv = impl.find("[hw:nvidia]") != std::string::npos; + const bool is_vaapi = impl.find("[hw:vaapi]") != std::string::npos; + const bool is_v4l2 = impl.find("[hw:v4l2]") != std::string::npos; + const bool is_omx = impl.find("[hw:omx]") != std::string::npos; + + const int hw_base = 200; + const int sw_base = 100; + + if (machine == "jetson") { + if (is_nv_v4l2) { + return hw_base + 30; + } + if (is_nv) { + return hw_base + 25; + } + if (is_v4l2 || is_omx || is_vaapi) { + return hw_base + 10; + } + return is_sw ? sw_base : 0; + } + + if (machine == "x86") { + if (is_vaapi) { + return hw_base + 30; + } + if (is_v4l2) { + return hw_base + 20; + } + if (is_nv_v4l2 || is_nv) { + return hw_base + 15; + } + return is_sw ? sw_base : 0; + } + + if (machine == "raspi") { + if (is_v4l2) { + return hw_base + 30; + } + if (is_omx) { + return hw_base + 25; + } + if (is_nv_v4l2 || is_nv || is_vaapi) { + return hw_base + 10; + } + return is_sw ? sw_base : 0; + } + + if (is_vaapi) { + return hw_base + 25; + } + if (is_v4l2) { + return hw_base + 20; + } + if (is_nv_v4l2 || is_nv) { + return hw_base + 15; + } + if (is_omx) { + return hw_base + 10; + } + return is_sw ? sw_base : 0; +} + +std::string selectAutoCodec(const std::string& machine, + const std::vector& implementations, std::string& reason) { + const std::vector> codec_priority{ + {"h264", 3}, {"h265", 2}, {"mjpeg", 1}}; + + int best_score = -1; + int best_priority = -1; + std::string best_codec = "h264"; + std::string best_impl = "fallback"; + + for (const auto& codec_item : codec_priority) { + const std::string& codec = codec_item.first; + const int priority = codec_item.second; + + for (const auto& impl : implementations) { + if (!startsWith(impl, codec + " -> ")) { + continue; + } + + const int score = implementationScore(machine, impl); + if (score > best_score || (score == best_score && priority > best_priority)) { + best_score = score; + best_priority = priority; + best_codec = codec; + best_impl = impl; + } + } + } + + reason = best_impl; + return best_codec; +} + +} // namespace +GstVideoBridgeNode::GstVideoBridgeNode(const rclcpp::NodeOptions& options) + : Node("gst_video_bridge", options) { + session_id_ = buildSessionId(); + stream_id_ = this->declare_parameter("runtime.stream_id", "default"); + + hw_fallback_failure_threshold_ = this->declare_parameter("runtime.hw_fallback_failures", 3); + if (hw_fallback_failure_threshold_ < 1) { + hw_fallback_failure_threshold_ = 1; + } + + adaptation_enabled_ = this->declare_parameter("runtime.adaptation.enabled", true); + adaptation_interval_ms_ = this->declare_parameter("runtime.adaptation.interval_ms", 2000); + adaptation_cooldown_ms_ = this->declare_parameter("runtime.adaptation.cooldown_ms", 5000); + adaptation_profile_ = + this->declare_parameter("runtime.adaptation.profile", "balanced"); + + if (!isValidAdaptationProfile(adaptation_profile_)) { + RCLCPP_WARN(this->get_logger(), + "invalid runtime.adaptation.profile='%s', forcing to 'balanced'", + adaptation_profile_.c_str()); + adaptation_profile_ = "balanced"; + } + + if (adaptation_interval_ms_ < 500) { + adaptation_interval_ms_ = 500; + } + if (adaptation_cooldown_ms_ < adaptation_interval_ms_) { + adaptation_cooldown_ms_ = adaptation_interval_ms_; + } + + config_ = ConfigLoader::loadFromNode(*this); + base_bitrate_kbps_ = config_.codec.bitrate_kbps; + base_gop_ = config_.codec.gop; + base_max_fps_ = config_.runtime.max_fps; + + topic_introspector_ = std::make_unique(*this); + capability_probe_ = std::make_unique(); + + const auto host_profile = capability_probe_->detectHostProfile(); + const std::string machine_for_auto = detectMachineProfile(config_.profile.machine, host_profile); + + if (machine_for_auto != config_.profile.machine) { + RCLCPP_INFO(this->get_logger(), + "profile.machine '%s' auto-resolved to '%s' from host detection", + config_.profile.machine.c_str(), machine_for_auto.c_str()); + } + + auto_codec_requested_ = config_.codec.name == "auto"; + + if (config_.codec.name == "auto") { + encoder_implementations_ = capability_probe_->detectEncoderImplementations(); + std::string selection_reason; + const std::string selected_codec = + selectAutoCodec(machine_for_auto, encoder_implementations_, selection_reason); + config_.codec.name = selected_codec; + config_.codec.encoder = extractElementName(selection_reason); + hw_encoder_selected_ = isHardwareImplementation(selection_reason); + + RCLCPP_INFO( + this->get_logger(), + "codec.name=auto resolved to '%s' with encoder '%s' using '%s' (effective machine: %s)", + config_.codec.name.c_str(), config_.codec.encoder.c_str(), selection_reason.c_str(), + machine_for_auto.c_str()); + } else { + hw_encoder_selected_ = false; + } + + effective_pipeline_ = PipelineBuilder::build(config_); + + if (config_.runtime.print_effective_config) { + RCLCPP_INFO(this->get_logger(), "Effective runtime configuration:\n%s", + ConfigLoader::toDebugString(config_).c_str()); + } + + if (!validateConfiguration()) { + RCLCPP_ERROR(this->get_logger(), "Configuration validation failed. Node will shutdown."); + publishRuntimeEvent("error", "CONFIG_INVALID", "Configuration validation failed"); + immediate_exit_code_ = 1; + return; + } + + if (handleRuntimeMode()) { + immediate_exit_code_ = 0; + return; + } + + initializeModules(); + initializeSubscriptions(); + initializeHealthMonitoring(); + + if (immediate_exit_code_ >= 0) { + return; + } + + logConfiguration(); + logRuntimeCapabilities(); + + RCLCPP_INFO(this->get_logger(), + "Streaming node ready. Subscribed to %s and forwarding frames to GStreamer appsrc.", + config_.source.input_topic.c_str()); + + if (stream_engine_ && stream_engine_->isRunning()) { + setRuntimeState(RuntimeState::Streaming, "pipeline running and subscriptions active"); + } +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/fallback.cc b/src/node/fallback.cc new file mode 100644 index 0000000..739cd2a --- /dev/null +++ b/src/node/fallback.cc @@ -0,0 +1,94 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include + +namespace ros2_gst_video_bridge { + +namespace { + +bool startsWith(const std::string& value, const std::string_view prefix) { + return value.rfind(std::string(prefix), 0) == 0; +} + +std::string extractElementName(const std::string& implementation) { + const auto arrow = implementation.find(" -> "); + if (arrow == std::string::npos) { + return ""; + } + + const auto name_start = arrow + 4; + const auto bracket = implementation.find(" [", name_start); + if (bracket == std::string::npos || bracket <= name_start) { + return ""; + } + + return implementation.substr(name_start, bracket - name_start); +} + +} // namespace + +std::string GstVideoBridgeNode::selectSoftwareEncoderForCodec() const { + const std::string prefix = config_.codec.name + " -> "; + for (const auto& implementation : encoder_implementations_) { + if (!startsWith(implementation, prefix)) { + continue; + } + if (implementation.find("[sw]") == std::string::npos) { + continue; + } + + const std::string encoder = extractElementName(implementation); + if (!encoder.empty()) { + return encoder; + } + } + return ""; +} + +bool GstVideoBridgeNode::requestSoftwareFallback(const std::string& reason) { + if (!auto_codec_requested_ || !hw_encoder_selected_ || sw_fallback_applied_) { + return false; + } + + ++consecutive_stream_failures_; + if (consecutive_stream_failures_ < hw_fallback_failure_threshold_) { + return false; + } + + const std::string sw_encoder = selectSoftwareEncoderForCodec(); + if (sw_encoder.empty()) { + RCLCPP_WARN(this->get_logger(), + "runtime fallback requested after repeated failures but no SW encoder was found " + "for codec '%s'", + config_.codec.name.c_str()); + return false; + } + + config_.codec.encoder = sw_encoder; + effective_pipeline_ = PipelineBuilder::build(config_); + sw_fallback_applied_ = true; + sw_fallback_requested_ = true; + + if (metrics_publisher_) { + metrics_publisher_->setCodecSelectionFlags(auto_codec_requested_, hw_encoder_selected_, + sw_fallback_applied_); + metrics_publisher_->setSessionMetadata(session_id_, stream_id_, config_.source.input_topic, + config_.codec.name, config_.codec.encoder, + config_.transport.kind, config_.transport.sink_uri); + } + + RCLCPP_WARN(this->get_logger(), + "Switching encoder fallback from HW to SW after %d failures. codec=%s hw_reason=%s " + "sw_encoder=%s", + consecutive_stream_failures_, config_.codec.name.c_str(), reason.c_str(), + sw_encoder.c_str()); + publishRuntimeEvent("warning", "ENCODER_FALLBACK_SW", reason); + return true; +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/lifecycle.cc b/src/node/lifecycle.cc new file mode 100644 index 0000000..94310fc --- /dev/null +++ b/src/node/lifecycle.cc @@ -0,0 +1,22 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +namespace ros2_gst_video_bridge { + +GstVideoBridgeNode::~GstVideoBridgeNode() { + if (stream_engine_) { + stream_engine_->stop(); + } +} + +bool GstVideoBridgeNode::hasImmediateExit() const { + return immediate_exit_code_ >= 0; +} + +int GstVideoBridgeNode::immediateExitCode() const { + return immediate_exit_code_; +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/modes.cc b/src/node/modes.cc new file mode 100644 index 0000000..474d0e3 --- /dev/null +++ b/src/node/modes.cc @@ -0,0 +1,112 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include "ros2_gst_video_bridge/core/config_loader.hpp" + +#include + +namespace ros2_gst_video_bridge { + +bool GstVideoBridgeNode::handleRuntimeMode() { + if (config_.runtime.mode == "stream") { + return false; + } + + if (config_.runtime.mode == "list_topics") { + printImageTopics(); + return true; + } + + if (config_.runtime.mode == "list_capabilities") { + printGstCapabilities(); + return true; + } + + if (config_.runtime.mode == "validate_config") { + RCLCPP_INFO(this->get_logger(), "Configuration is valid."); + return true; + } + + if (config_.runtime.mode == "discover") { + printImageTopics(); + printGstCapabilities(); + RCLCPP_INFO(this->get_logger(), "Discovery mode completed successfully."); + return true; + } + + RCLCPP_ERROR(this->get_logger(), "Unsupported runtime.mode value: %s", + config_.runtime.mode.c_str()); + return true; +} + +void GstVideoBridgeNode::printImageTopics() const { + const auto image_topics = topic_introspector_->listImageTopics(); + RCLCPP_INFO(this->get_logger(), "Detected image topics: %zu", image_topics.size()); + + for (const auto& topic : image_topics) { + std::ostringstream types; + for (size_t i = 0; i < topic.second.size(); ++i) { + types << topic.second[i]; + if (i + 1 < topic.second.size()) { + types << ", "; + } + } + RCLCPP_INFO(this->get_logger(), " - %s [%s]", topic.first.c_str(), types.str().c_str()); + } +} + +void GstVideoBridgeNode::printGstCapabilities() const { + if (!capability_probe_->hasGstInspect()) { + RCLCPP_WARN(this->get_logger(), + "gst-inspect-1.0 was not found on PATH. Reporting static fallback capabilities."); + } + + const auto host_profile = capability_probe_->detectHostProfile(); + const auto plugins = capability_probe_->detectPlugins(); + const auto encoders = capability_probe_->detectEncoders(); + const auto encoder_impls = capability_probe_->detectEncoderImplementations(); + const auto sinks = capability_probe_->detectSinks(); + + RCLCPP_INFO(this->get_logger(), "Detected host profile: %zu", host_profile.size()); + for (const auto& item : host_profile) { + RCLCPP_INFO(this->get_logger(), " - %s", item.c_str()); + } + + RCLCPP_INFO(this->get_logger(), "Detected GStreamer plugins: %zu", plugins.size()); + for (const auto& plugin : plugins) { + RCLCPP_INFO(this->get_logger(), " - %s", plugin.c_str()); + } + + RCLCPP_INFO(this->get_logger(), "Detected encoders: %zu", encoders.size()); + for (const auto& encoder : encoders) { + RCLCPP_INFO(this->get_logger(), " - %s", encoder.c_str()); + } + + RCLCPP_INFO(this->get_logger(), "Detected encoder implementations (CPU/HW): %zu", + encoder_impls.size()); + for (const auto& encoder_impl : encoder_impls) { + RCLCPP_INFO(this->get_logger(), " - %s", encoder_impl.c_str()); + } + + RCLCPP_INFO(this->get_logger(), "Detected sinks/transports: %zu", sinks.size()); + for (const auto& sink : sinks) { + RCLCPP_INFO(this->get_logger(), " - %s", sink.c_str()); + } +} + +bool GstVideoBridgeNode::validateConfiguration() const { + const auto errors = ConfigLoader::validate(config_); + if (errors.empty()) { + return true; + } + + for (const auto& error : errors) { + RCLCPP_ERROR(this->get_logger(), "Config error: %s", error.c_str()); + } + + return false; +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/observability.cc b/src/node/observability.cc new file mode 100644 index 0000000..a8d32c1 --- /dev/null +++ b/src/node/observability.cc @@ -0,0 +1,212 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include +#include + +namespace ros2_gst_video_bridge { + +void GstVideoBridgeNode::setRuntimeState(RuntimeState new_state, const std::string& reason) { + if (runtime_state_ == new_state) { + return; + } + + runtime_state_ = new_state; + if (metrics_publisher_) { + metrics_publisher_->updateRuntimeState(runtimeStateToString(runtime_state_)); + } + + if (reason.empty()) { + RCLCPP_INFO(this->get_logger(), "runtime.state=%s", runtimeStateToString(runtime_state_)); + return; + } + + if (new_state == RuntimeState::Degraded || new_state == RuntimeState::Failed) { + RCLCPP_WARN(this->get_logger(), "runtime.state=%s reason=%s", + runtimeStateToString(runtime_state_), reason.c_str()); + } else { + RCLCPP_INFO(this->get_logger(), "runtime.state=%s reason=%s", + runtimeStateToString(runtime_state_), reason.c_str()); + } +} + +void GstVideoBridgeNode::publishRuntimeEvent(const std::string& severity, const std::string& code, + const std::string& message) const { + if (!metrics_publisher_) { + return; + } + metrics_publisher_->publishEvent(severity, code, message); +} + +void GstVideoBridgeNode::publishFirstFailureSnapshot(const std::string& reason) { + if (first_failure_snapshot_published_) { + return; + } + + std::ostringstream snapshot; + snapshot << "session_id=" << session_id_ << " stream_id=" << stream_id_ + << " state=" << runtimeStateToString(runtime_state_) << " reason=" << reason + << " codec=" << config_.codec.name << " encoder=" << config_.codec.encoder + << " transport=" << config_.transport.kind << " sink_uri=" << config_.transport.sink_uri + << " sw_fallback_applied=" << (sw_fallback_applied_ ? "true" : "false") << " pipeline='" + << effective_pipeline_ << "'"; + + publishRuntimeEvent("error", "FIRST_FAILURE_SNAPSHOT", snapshot.str()); + first_failure_snapshot_published_ = true; +} + +void GstVideoBridgeNode::handleSetStreamingProfile( + const std::shared_ptr request, + std::shared_ptr response) { + if (!request || !response) { + return; + } + + if (!request->adaptation_profile.empty()) { + if (!isValidAdaptationProfile(request->adaptation_profile)) { + response->success = false; + response->message = "invalid adaptation profile (allowed: conservative|balanced|aggressive)"; + return; + } + adaptation_profile_ = request->adaptation_profile; + } + + if (request->reset_counters) { + adaptation_level_ = 0; + reconnect_count_ = 0; + reconnect_attempts_ = 0; + consecutive_stream_failures_ = 0; + first_failure_snapshot_published_ = false; + config_.codec.bitrate_kbps = base_bitrate_kbps_; + config_.codec.gop = base_gop_; + config_.runtime.max_fps = base_max_fps_; + } + + if (metrics_publisher_) { + metrics_publisher_->setAdaptationState(adaptation_profile_, adaptation_level_); + metrics_publisher_->setEffectiveEncoding(static_cast(config_.codec.bitrate_kbps), + static_cast(config_.codec.gop), + static_cast(config_.runtime.max_fps)); + metrics_publisher_->publishHeartbeat(); + } + + publishRuntimeEvent("info", "OPERATOR_PROFILE_UPDATE", + "runtime profile updated via service request"); + + response->success = true; + response->message = "streaming profile updated"; +} + +bool GstVideoBridgeNode::isValidAdaptationProfile(const std::string& profile) { + return profile == "conservative" || profile == "balanced" || profile == "aggressive"; +} + +std::string GstVideoBridgeNode::buildSessionId() const { + const auto now = this->now(); + std::ostringstream oss; + oss << "bridge-" << std::hex << now.nanoseconds(); + return oss.str(); +} + +const char* GstVideoBridgeNode::runtimeStateToString(RuntimeState state) { + switch (state) { + case RuntimeState::Connecting: + return "connecting"; + case RuntimeState::Streaming: + return "streaming"; + case RuntimeState::Degraded: + return "degraded"; + case RuntimeState::Reconnecting: + return "reconnecting"; + case RuntimeState::Failed: + return "failed"; + default: + return "unknown"; + } +} + +void GstVideoBridgeNode::logConfiguration() const { + RCLCPP_INFO(this->get_logger(), "profile.machine: %s", config_.profile.machine.c_str()); + RCLCPP_INFO(this->get_logger(), "profile.stream: %s", config_.profile.stream.c_str()); + RCLCPP_INFO(this->get_logger(), "input_topic: %s", config_.source.input_topic.c_str()); + + RCLCPP_INFO(this->get_logger(), "transport.kind: %s", config_.transport.kind.c_str()); + RCLCPP_INFO(this->get_logger(), "transport.sink_uri: %s", config_.transport.sink_uri.c_str()); + RCLCPP_INFO(this->get_logger(), "transport.latency_ms: %d", config_.transport.latency_ms); + RCLCPP_INFO(this->get_logger(), "transport.reconnect.enabled: %s", + config_.transport.reconnect_enabled ? "true" : "false"); + RCLCPP_INFO(this->get_logger(), "transport.reconnect.interval_ms: %d", + config_.transport.reconnect_interval_ms); + RCLCPP_INFO(this->get_logger(), "transport.reconnect.max_attempts: %d", + config_.transport.reconnect_max_attempts); + + RCLCPP_INFO(this->get_logger(), "codec.name: %s", config_.codec.name.c_str()); + RCLCPP_INFO(this->get_logger(), "codec.encoder: %s", config_.codec.encoder.c_str()); + RCLCPP_INFO(this->get_logger(), "codec.profile: %s", config_.codec.profile.c_str()); + RCLCPP_INFO(this->get_logger(), "codec.tune: %s", config_.codec.tune.c_str()); + RCLCPP_INFO(this->get_logger(), "codec.rate_control: %s", config_.codec.rate_control.c_str()); + RCLCPP_INFO(this->get_logger(), "codec.bitrate_kbps: %d", config_.codec.bitrate_kbps); + RCLCPP_INFO(this->get_logger(), "codec.gop: %d", config_.codec.gop); + + RCLCPP_INFO(this->get_logger(), "gst.pipeline_override: %s", + config_.gst.pipeline_override.c_str()); + + RCLCPP_INFO(this->get_logger(), "max_fps: %.2f", config_.runtime.max_fps); + RCLCPP_INFO(this->get_logger(), "use_wall_clock_timestamps: %s", + config_.runtime.use_wall_clock_timestamps ? "true" : "false"); + RCLCPP_INFO(this->get_logger(), "effective_pipeline: %s", effective_pipeline_.c_str()); +} + +void GstVideoBridgeNode::logRuntimeCapabilities() const { + const auto host_profile = capability_probe_->detectHostProfile(); + const auto transports = capability_probe_->detectTransports(); + const auto codecs = capability_probe_->detectCodecs(); + const auto encoder_impls = capability_probe_->detectEncoderImplementations(); + + std::ostringstream host_stream; + for (size_t i = 0; i < host_profile.size(); ++i) { + host_stream << host_profile[i]; + if (i + 1 < host_profile.size()) { + host_stream << ", "; + } + } + + std::ostringstream transport_stream; + for (size_t i = 0; i < transports.size(); ++i) { + transport_stream << transports[i]; + if (i + 1 < transports.size()) { + transport_stream << ", "; + } + } + + std::ostringstream codec_stream; + for (size_t i = 0; i < codecs.size(); ++i) { + codec_stream << codecs[i]; + if (i + 1 < codecs.size()) { + codec_stream << ", "; + } + } + + std::ostringstream encoder_impl_stream; + for (size_t i = 0; i < encoder_impls.size(); ++i) { + encoder_impl_stream << encoder_impls[i]; + if (i + 1 < encoder_impls.size()) { + encoder_impl_stream << ", "; + } + } + + RCLCPP_INFO(this->get_logger(), "detected_host: %s", host_stream.str().c_str()); + RCLCPP_INFO(this->get_logger(), "detected_transports: %s", transport_stream.str().c_str()); + RCLCPP_INFO(this->get_logger(), "detected_codecs: %s", codec_stream.str().c_str()); + RCLCPP_INFO(this->get_logger(), "detected_encoder_implementations: %s", + encoder_impl_stream.str().c_str()); + + const auto topics = topic_introspector_->listTopics(); + RCLCPP_INFO(this->get_logger(), "visible_topics_count: %zu", topics.size()); + RCLCPP_INFO(this->get_logger(), "stream_engine_running: %s", + stream_engine_->isRunning() ? "true" : "false"); +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/recovery.cc b/src/node/recovery.cc new file mode 100644 index 0000000..711edbc --- /dev/null +++ b/src/node/recovery.cc @@ -0,0 +1,209 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include +#include +#include +#include + +namespace ros2_gst_video_bridge { + +void GstVideoBridgeNode::runHealthCheck() { + if (!stream_engine_ || runtime_state_ == RuntimeState::Failed) { + return; + } + + if (reconnect_requested_) { + (void)tryReconnect(); + return; + } + + evaluateAdaptationPolicy(); + + const auto engine_error = stream_engine_->lastError(); + if (!engine_error.empty()) { + scheduleReconnect(engine_error); + } +} + +void GstVideoBridgeNode::evaluateAdaptationPolicy() { + if (!adaptation_enabled_) { + return; + } + + const uint64_t now_ns = static_cast(this->now().nanoseconds()); + if (last_adaptation_ns_ != 0) { + const uint64_t since_last_ms = (now_ns - last_adaptation_ns_) / 1000000ULL; + if (since_last_ms < static_cast(adaptation_interval_ms_)) { + return; + } + } + + uint8_t target_level = adaptation_level_; + if (runtime_state_ == RuntimeState::Degraded || runtime_state_ == RuntimeState::Reconnecting) { + target_level = std::min(2, static_cast(adaptation_level_ + 1)); + } else if (runtime_state_ == RuntimeState::Streaming && reconnect_count_ == 0 && + adaptation_level_ > 0) { + target_level = static_cast(adaptation_level_ - 1); + } + + if (target_level != adaptation_level_) { + applyAdaptationLevel(target_level, "runtime state transition"); + last_adaptation_ns_ = now_ns; + } +} + +void GstVideoBridgeNode::applyAdaptationLevel(uint8_t level, const std::string& reason) { + adaptation_level_ = std::min(2, level); + + double fps_scale = 1.0; + double bitrate_scale = 1.0; + + if (adaptation_profile_ == "conservative") { + fps_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.85 : 0.70); + bitrate_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.75 : 0.55); + } else if (adaptation_profile_ == "aggressive") { + fps_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.75 : 0.55); + bitrate_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.60 : 0.40); + } else { + fps_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.80 : 0.60); + bitrate_scale = adaptation_level_ == 0 ? 1.0 : (adaptation_level_ == 1 ? 0.65 : 0.45); + } + + const int next_bitrate = std::max(400, static_cast(base_bitrate_kbps_ * bitrate_scale)); + const int next_gop = + std::max(10, static_cast(base_gop_ * (adaptation_level_ == 0 ? 1.0 : 0.8))); + const double next_fps = std::max(5.0, base_max_fps_ * fps_scale); + + bool changed = false; + if (next_bitrate != config_.codec.bitrate_kbps) { + config_.codec.bitrate_kbps = next_bitrate; + changed = true; + } + if (next_gop != config_.codec.gop) { + config_.codec.gop = next_gop; + changed = true; + } + if (std::abs(next_fps - config_.runtime.max_fps) > 0.01) { + config_.runtime.max_fps = next_fps; + changed = true; + } + + if (!changed) { + return; + } + + if (metrics_publisher_) { + metrics_publisher_->setAdaptationState(adaptation_profile_, adaptation_level_); + metrics_publisher_->setEffectiveEncoding(static_cast(config_.codec.bitrate_kbps), + static_cast(config_.codec.gop), + static_cast(config_.runtime.max_fps)); + } + + std::ostringstream message; + message << "adaptation_level=" << static_cast(adaptation_level_) << " reason=" << reason + << " bitrate_kbps=" << config_.codec.bitrate_kbps << " gop=" << config_.codec.gop + << " max_fps=" << config_.runtime.max_fps; + + publishRuntimeEvent("warning", "ADAPTATION_APPLIED", message.str()); + (void)requestPipelineReconfigure("adaptation policy updated encoding knobs"); +} + +bool GstVideoBridgeNode::requestPipelineReconfigure(const std::string& reason) { + effective_pipeline_ = PipelineBuilder::build(config_); + pipeline_reconfigure_requested_ = true; + + if (metrics_publisher_) { + metrics_publisher_->setSessionMetadata(session_id_, stream_id_, config_.source.input_topic, + config_.codec.name, config_.codec.encoder, + config_.transport.kind, config_.transport.sink_uri); + } + + if (!reconnect_requested_) { + reconnect_requested_ = true; + setRuntimeState(RuntimeState::Degraded, "pipeline reconfigure requested"); + } + + publishRuntimeEvent("info", "PIPELINE_RECONFIGURE", reason); + return true; +} + +void GstVideoBridgeNode::scheduleReconnect(const std::string& reason) { + if (runtime_state_ == RuntimeState::Failed) { + return; + } + + publishFirstFailureSnapshot(reason); + + if (!config_.transport.reconnect_enabled) { + setRuntimeState(RuntimeState::Failed, "reconnect disabled: " + reason); + return; + } + + if (!reconnect_requested_) { + reconnect_requested_ = true; + setRuntimeState(RuntimeState::Degraded, reason); + publishRuntimeEvent("warning", "PIPELINE_DEGRADED", reason); + (void)requestSoftwareFallback(reason); + } +} + +bool GstVideoBridgeNode::tryReconnect() { + if (!stream_engine_) { + return false; + } + + const auto now = std::chrono::steady_clock::now(); + const auto reconnect_interval = + std::chrono::milliseconds(config_.transport.reconnect_interval_ms); + if (last_reconnect_attempt_.time_since_epoch().count() != 0 && + (now - last_reconnect_attempt_) < reconnect_interval) { + return false; + } + + if (config_.transport.reconnect_max_attempts > 0 && + reconnect_attempts_ >= config_.transport.reconnect_max_attempts) { + setRuntimeState(RuntimeState::Failed, "reconnect attempts exhausted"); + reconnect_requested_ = false; + return false; + } + + last_reconnect_attempt_ = now; + ++reconnect_attempts_; + setRuntimeState(RuntimeState::Reconnecting, "attempt " + std::to_string(reconnect_attempts_)); + + ++reconnect_count_; + stream_engine_->stop(); + + if (sw_fallback_requested_ || pipeline_reconfigure_requested_) { + stream_engine_ = std::make_unique(effective_pipeline_); + sw_fallback_requested_ = false; + pipeline_reconfigure_requested_ = false; + } + + const bool started = stream_engine_->start(); + if (!started) { + setRuntimeState(RuntimeState::Degraded, stream_engine_->lastError()); + publishRuntimeEvent("error", "RECONNECT_FAILED", stream_engine_->lastError()); + publishFirstFailureSnapshot(stream_engine_->lastError()); + (void)requestSoftwareFallback(stream_engine_->lastError()); + return false; + } + + if (metrics_publisher_) { + metrics_publisher_->recordReconnect(); + } + + reconnect_requested_ = false; + reconnect_attempts_ = 0; + consecutive_stream_failures_ = 0; + last_frame_steady_time_ = std::chrono::steady_clock::time_point{}; + setRuntimeState(RuntimeState::Streaming, "reconnected successfully"); + return true; +} + +} // namespace ros2_gst_video_bridge diff --git a/src/node/streaming.cc b/src/node/streaming.cc new file mode 100644 index 0000000..3672073 --- /dev/null +++ b/src/node/streaming.cc @@ -0,0 +1,228 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/gst_video_bridge_node.hpp" + +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include + +#include +#include +#include +#include + +namespace ros2_gst_video_bridge { + +namespace { + +consteval int64_t nanosecondsPerSecond() { + return 1000000000LL; +} + +bool resolveGstFormat(const std::string& encoding, std::string& gst_format) { + if (encoding == sensor_msgs::image_encodings::BGR8 || + encoding == sensor_msgs::image_encodings::TYPE_8UC3 || + encoding == sensor_msgs::image_encodings::TYPE_8SC3) { + gst_format = "BGR"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::RGB8) { + gst_format = "RGB"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::BGRA8 || + encoding == sensor_msgs::image_encodings::TYPE_8UC4 || + encoding == sensor_msgs::image_encodings::TYPE_8SC4) { + gst_format = "BGRA"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::RGBA8) { + gst_format = "RGBA"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::MONO8 || + encoding == sensor_msgs::image_encodings::TYPE_8UC1 || + encoding == sensor_msgs::image_encodings::TYPE_8SC1) { + gst_format = "GRAY8"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::MONO16 || + encoding == sensor_msgs::image_encodings::TYPE_16UC1 || + encoding == sensor_msgs::image_encodings::TYPE_16SC1) { + gst_format = "GRAY16_LE"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::BAYER_RGGB8 || + encoding == sensor_msgs::image_encodings::BAYER_BGGR8 || + encoding == sensor_msgs::image_encodings::BAYER_GBRG8 || + encoding == sensor_msgs::image_encodings::BAYER_GRBG8) { + gst_format = "GRAY8"; + return true; + } + + if (encoding == sensor_msgs::image_encodings::BAYER_RGGB16 || + encoding == sensor_msgs::image_encodings::BAYER_BGGR16 || + encoding == sensor_msgs::image_encodings::BAYER_GBRG16 || + encoding == sensor_msgs::image_encodings::BAYER_GRBG16) { + gst_format = "GRAY16_LE"; + return true; + } + + return false; +} + +bool isBayerEncoding(const std::string& encoding) { + return encoding == sensor_msgs::image_encodings::BAYER_RGGB8 || + encoding == sensor_msgs::image_encodings::BAYER_BGGR8 || + encoding == sensor_msgs::image_encodings::BAYER_GBRG8 || + encoding == sensor_msgs::image_encodings::BAYER_GRBG8 || + encoding == sensor_msgs::image_encodings::BAYER_RGGB16 || + encoding == sensor_msgs::image_encodings::BAYER_BGGR16 || + encoding == sensor_msgs::image_encodings::BAYER_GBRG16 || + encoding == sensor_msgs::image_encodings::BAYER_GRBG16; +} + +} // namespace + +void GstVideoBridgeNode::initializeModules() { + topic_introspector_ = std::make_unique(*this); + capability_probe_ = std::make_unique(); + stream_engine_ = std::make_unique(effective_pipeline_); + metrics_publisher_ = std::make_unique(*this); + + metrics_publisher_->setSessionMetadata(session_id_, stream_id_, config_.source.input_topic, + config_.codec.name, config_.codec.encoder, + config_.transport.kind, config_.transport.sink_uri); + metrics_publisher_->setCodecSelectionFlags(auto_codec_requested_, hw_encoder_selected_, + sw_fallback_applied_); + metrics_publisher_->setEffectiveEncoding(static_cast(config_.codec.bitrate_kbps), + static_cast(config_.codec.gop), + static_cast(config_.runtime.max_fps)); + metrics_publisher_->setAdaptationState(adaptation_profile_, adaptation_level_); + + set_profile_service_ = this->create_service( + "~/set_streaming_profile", std::bind(&GstVideoBridgeNode::handleSetStreamingProfile, this, + std::placeholders::_1, std::placeholders::_2)); + + if (!stream_engine_->start()) { + const auto initial_error = stream_engine_->lastError(); + if (requestSoftwareFallback(initial_error)) { + stream_engine_ = std::make_unique(effective_pipeline_); + if (stream_engine_->start()) { + setRuntimeState(RuntimeState::Connecting, "pipeline restarted with software fallback"); + metrics_publisher_->publishHeartbeat(); + return; + } + } + + RCLCPP_ERROR(this->get_logger(), "Failed to start stream engine: %s", + stream_engine_->lastError().c_str()); + publishRuntimeEvent("error", "PIPELINE_START_FAILED", stream_engine_->lastError()); + publishFirstFailureSnapshot(stream_engine_->lastError()); + + if (config_.transport.reconnect_enabled) { + setRuntimeState(RuntimeState::Degraded, stream_engine_->lastError()); + reconnect_requested_ = true; + } else { + setRuntimeState(RuntimeState::Failed, stream_engine_->lastError()); + immediate_exit_code_ = 1; + } + return; + } + + setRuntimeState(RuntimeState::Connecting, "pipeline started"); + publishRuntimeEvent("info", "PIPELINE_STARTED", "Pipeline transitioned to PLAYING"); + metrics_publisher_->publishHeartbeat(); +} + +void GstVideoBridgeNode::initializeSubscriptions() { + if (immediate_exit_code_ >= 0) { + return; + } + + image_subscription_ = this->create_subscription( + config_.source.input_topic, rclcpp::SensorDataQoS(), + std::bind(&GstVideoBridgeNode::onImage, this, std::placeholders::_1)); +} + +void GstVideoBridgeNode::initializeHealthMonitoring() { + if (immediate_exit_code_ >= 0) { + return; + } + + health_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), + std::bind(&GstVideoBridgeNode::runHealthCheck, this)); +} + +void GstVideoBridgeNode::onImage(const sensor_msgs::msg::Image::SharedPtr msg) { + if (!stream_engine_ || !stream_engine_->isRunning()) { + return; + } + + if (config_.runtime.max_fps > 0.0) { + const auto now = std::chrono::steady_clock::now(); + const auto min_period = std::chrono::duration(1.0 / config_.runtime.max_fps); + if (last_frame_steady_time_.time_since_epoch().count() != 0 && + (now - last_frame_steady_time_) < min_period) { + return; + } + last_frame_steady_time_ = now; + } + + std::string gst_format; + if (!resolveGstFormat(msg->encoding, gst_format)) { + RCLCPP_WARN_THROTTLE( + this->get_logger(), *this->get_clock(), 5000, + "Unsupported image encoding '%s'. Supported encodings include mono8/mono16, rgb8/bgr8, " + "rgba8/bgra8, type_8/16 C1/C3/C4, and bayer_*8/*16 (grayscale fallback).", + msg->encoding.c_str()); + return; + } + + if (isBayerEncoding(msg->encoding)) { + RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 10000, + "Input encoding '%s' is being streamed as grayscale Bayer fallback. Use " + "image_proc/debayer for color output.", + msg->encoding.c_str()); + } + + uint64_t timestamp_ns = 0; + if (config_.runtime.use_wall_clock_timestamps) { + timestamp_ns = static_cast(this->now().nanoseconds()); + } else { + timestamp_ns = + static_cast(static_cast(msg->header.stamp.sec) * nanosecondsPerSecond() + + msg->header.stamp.nanosec); + } + + if (metrics_publisher_) { + const uint64_t now_ns = static_cast(this->now().nanoseconds()); + metrics_publisher_->recordFrameIn(now_ns, timestamp_ns); + } + + const bool pushed = + stream_engine_->pushFrame(msg->data.data(), msg->data.size(), static_cast(msg->width), + static_cast(msg->height), gst_format, timestamp_ns); + + if (!pushed) { + if (metrics_publisher_) { + metrics_publisher_->recordFrameDropped(); + } + scheduleReconnect(stream_engine_->lastError()); + RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 2000, + "Failed to push frame to GStreamer: %s", + stream_engine_->lastError().c_str()); + } else if (metrics_publisher_) { + metrics_publisher_->recordFrameOut(); + consecutive_stream_failures_ = 0; + } +} + +} // namespace ros2_gst_video_bridge diff --git a/src/runtime/capability_probe.cc b/src/runtime/capability_probe.cc new file mode 100644 index 0000000..eab4c74 --- /dev/null +++ b/src/runtime/capability_probe.cc @@ -0,0 +1,259 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/runtime/capability_probe.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace ros2_gst_video_bridge { + +namespace { + +#if defined(__aarch64__) +constexpr const char* kBuildArch = "aarch64"; +#elif defined(__arm__) +constexpr const char* kBuildArch = "arm"; +#elif defined(__x86_64__) +constexpr const char* kBuildArch = "x86_64"; +#elif defined(__i386__) +constexpr const char* kBuildArch = "x86"; +#else +constexpr const char* kBuildArch = "unknown"; +#endif + +std::string runCommand(const std::string& command) { + std::array buffer{}; + std::string result; + + FILE* pipe = popen(command.c_str(), "r"); + if (pipe == nullptr) { + return result; + } + + while (fgets(buffer.data(), static_cast(buffer.size()), pipe) != nullptr) { + result += buffer.data(); + } + + pclose(pipe); + return result; +} + +bool commandHasAnyOutput(const std::string& command) { + return !runCommand(command).empty(); +} + +std::string trim(std::string value) { + while (!value.empty() && (value.back() == '\n' || value.back() == '\r' || value.back() == ' ')) { + value.pop_back(); + } + + size_t start = 0; + while (start < value.size() && value[start] == ' ') { + ++start; + } + + return value.substr(start); +} + +std::string readFirstCpuModelLine() { + std::ifstream cpuinfo("/proc/cpuinfo"); + if (!cpuinfo.is_open()) { + return "unknown"; + } + + std::string line; + while (std::getline(cpuinfo, line)) { + const auto model_pos = line.find("model name"); + const auto hardware_pos = line.find("Hardware"); + if (model_pos == 0 || hardware_pos == 0) { + const auto sep = line.find(':'); + if (sep != std::string::npos) { + return trim(line.substr(sep + 1)); + } + } + } + + return "unknown"; +} + +bool fileExists(const char* path) { + std::ifstream f(path); + return f.good(); +} + +std::vector filterAvailableCodecElements( + const std::vector>& candidates) { + std::vector available; + for (const auto& candidate : candidates) { + const std::string& element = std::get<0>(candidate); + const std::string& codec = std::get<1>(candidate); + const std::string& accel = std::get<2>(candidate); + const std::string command = "gst-inspect-1.0 " + element + " 2>/dev/null"; + if (commandHasAnyOutput(command)) { + available.push_back(codec + " -> " + element + " [" + accel + "]"); + } + } + return available; +} + +std::vector +filterAvailableElements(const std::vector>& candidates) { + std::vector available; + for (const auto& candidate : candidates) { + const std::string command = "gst-inspect-1.0 " + candidate.first + " 2>/dev/null"; + if (commandHasAnyOutput(command)) { + available.push_back(candidate.second + " (" + candidate.first + ")"); + } + } + return available; +} + +} // namespace + +bool CapabilityProbe::hasGstInspect() const { + return commandHasAnyOutput("command -v gst-inspect-1.0"); +} + +std::vector CapabilityProbe::detectPlugins() const { + if (!hasGstInspect()) { + return {}; + } + + std::vector plugins; + const auto core_plugins = filterAvailableElements({{"appsrc", "appsrc"}, + {"videoconvert", "videoconvert"}, + {"videoscale", "videoscale"}, + {"queue", "queue"}}); + plugins.insert(plugins.end(), core_plugins.begin(), core_plugins.end()); + + const auto transport_plugins = filterAvailableElements({{"srtsink", "SRT sink"}, + {"rtspclientsink", "RTSP client sink"}, + {"udpsink", "UDP sink"}, + {"filesink", "File sink"}}); + plugins.insert(plugins.end(), transport_plugins.begin(), transport_plugins.end()); + + const auto codec_plugins = filterAvailableElements( + {{"x264enc", "H264 encoder"}, {"x265enc", "H265 encoder"}, {"jpegenc", "MJPEG encoder"}}); + plugins.insert(plugins.end(), codec_plugins.begin(), codec_plugins.end()); + + return plugins; +} + +std::vector CapabilityProbe::detectEncoders() const { + const auto implementations = detectEncoderImplementations(); + if (implementations.empty()) { + return {"h264", "h265", "mjpeg"}; + } + + bool has_h264 = false; + bool has_h265 = false; + bool has_mjpeg = false; + for (const auto& impl : implementations) { + if (impl.rfind("h264", 0) == 0) { + has_h264 = true; + } else if (impl.rfind("h265", 0) == 0) { + has_h265 = true; + } else if (impl.rfind("mjpeg", 0) == 0) { + has_mjpeg = true; + } + } + + std::vector encoders; + if (has_h264) { + encoders.push_back("h264"); + } + if (has_h265) { + encoders.push_back("h265"); + } + if (has_mjpeg) { + encoders.push_back("mjpeg"); + } + return encoders; +} + +std::vector CapabilityProbe::detectSinks() const { + if (!hasGstInspect()) { + return {"srt", "rtsp", "udp", "file"}; + } + + std::vector sinks; + + if (commandHasAnyOutput("gst-inspect-1.0 srtsink 2>/dev/null")) { + sinks.push_back("srt"); + } + if (commandHasAnyOutput("gst-inspect-1.0 rtspclientsink 2>/dev/null")) { + sinks.push_back("rtsp"); + } + if (commandHasAnyOutput("gst-inspect-1.0 udpsink 2>/dev/null")) { + sinks.push_back("udp"); + } + if (commandHasAnyOutput("gst-inspect-1.0 filesink 2>/dev/null")) { + sinks.push_back("file"); + } + + if (sinks.empty()) { + return {"srt", "rtsp", "udp", "file"}; + } + + return sinks; +} + +std::vector CapabilityProbe::detectTransports() const { + return detectSinks(); +} + +std::vector CapabilityProbe::detectCodecs() const { + return detectEncoders(); +} + +std::vector CapabilityProbe::detectHostProfile() const { + std::vector profile; + profile.push_back(std::string("build_arch=") + kBuildArch); + + const std::string runtime_arch = trim(runCommand("uname -m 2>/dev/null")); + const std::string runtime_os = trim(runCommand("uname -s 2>/dev/null")); + const std::string kernel = trim(runCommand("uname -r 2>/dev/null")); + profile.push_back("runtime_arch=" + + (runtime_arch.empty() ? std::string("unknown") : runtime_arch)); + profile.push_back("runtime_os=" + (runtime_os.empty() ? std::string("unknown") : runtime_os)); + profile.push_back("kernel=" + (kernel.empty() ? std::string("unknown") : kernel)); + profile.push_back("cpu_model=" + readFirstCpuModelLine()); + + const bool jetson_hint = fileExists("/etc/nv_tegra_release") || fileExists("/dev/nvhost-ctrl") || + fileExists("/proc/driver/nvidia/version"); + profile.push_back(std::string("platform_hint=") + (jetson_hint ? "jetson_or_nvidia" : "generic")); + + return profile; +} + +std::vector CapabilityProbe::detectEncoderImplementations() const { + if (!hasGstInspect()) { + return {"h264 -> x264enc [sw]", "h265 -> x265enc [sw]", "mjpeg -> jpegenc [sw]"}; + } + + return filterAvailableCodecElements({{"x264enc", "h264", "sw"}, + {"openh264enc", "h264", "sw"}, + {"x265enc", "h265", "sw"}, + {"jpegenc", "mjpeg", "sw"}, + {"nvv4l2h264enc", "h264", "hw:nvidia-v4l2"}, + {"nvv4l2h265enc", "h265", "hw:nvidia-v4l2"}, + {"nvh264enc", "h264", "hw:nvidia"}, + {"nvh265enc", "h265", "hw:nvidia"}, + {"nvjpegenc", "mjpeg", "hw:nvidia"}, + {"vaapih264enc", "h264", "hw:vaapi"}, + {"vaapih265enc", "h265", "hw:vaapi"}, + {"vaapijpegenc", "mjpeg", "hw:vaapi"}, + {"v4l2h264enc", "h264", "hw:v4l2"}, + {"v4l2h265enc", "h265", "hw:v4l2"}, + {"v4l2jpegenc", "mjpeg", "hw:v4l2"}, + {"omxh264enc", "h264", "hw:omx"}, + {"omxh265enc", "h265", "hw:omx"}}); +} + +} // namespace ros2_gst_video_bridge diff --git a/src/runtime/metrics_publisher.cc b/src/runtime/metrics_publisher.cc new file mode 100644 index 0000000..b621c23 --- /dev/null +++ b/src/runtime/metrics_publisher.cc @@ -0,0 +1,185 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/runtime/metrics_publisher.hpp" + +#include +#include +#include + +namespace ros2_gst_video_bridge { + +MetricsPublisher::MetricsPublisher(rclcpp::Node& node) : node_(node) { + metrics_pub_ = node_.create_publisher("~/runtime_metrics", + rclcpp::SystemDefaultsQoS()); + status_pub_ = node_.create_publisher( + "~/runtime_status", rclcpp::SystemDefaultsQoS()); + event_pub_ = node_.create_publisher( + "~/runtime_events", rclcpp::SystemDefaultsQoS()); + + last_publish_time_ns_ = static_cast(node_.now().nanoseconds()); + publish_timer_ = + node_.create_wall_timer(std::chrono::seconds(1), [this]() { publishSnapshot(); }); +} + +void MetricsPublisher::setSessionMetadata( + const std::string& session_id, const std::string& stream_id, const std::string& source_topic, + const std::string& codec_name, const std::string& encoder_name, + const std::string& transport_kind, const std::string& sink_uri) { + session_id_ = session_id; + stream_id_ = stream_id; + source_topic_ = source_topic; + codec_name_ = codec_name; + encoder_name_ = encoder_name; + transport_kind_ = transport_kind; + sink_uri_ = sink_uri; +} + +void MetricsPublisher::setEffectiveEncoding(uint32_t bitrate_kbps, uint32_t gop, float max_fps) { + bitrate_kbps_effective_ = bitrate_kbps; + gop_effective_ = gop; + max_fps_effective_ = max_fps; +} + +void MetricsPublisher::setAdaptationState(const std::string& profile, uint8_t level) { + adaptation_profile_ = profile; + adaptation_level_ = level; +} + +void MetricsPublisher::setCodecSelectionFlags(bool auto_codec_requested, bool hw_selected, + bool sw_fallback_applied) { + auto_codec_requested_ = auto_codec_requested; + hw_encoder_selected_ = hw_selected; + sw_fallback_applied_ = sw_fallback_applied; +} + +void MetricsPublisher::publishHeartbeat() { + RCLCPP_DEBUG(node_.get_logger(), "metrics heartbeat"); + publishSnapshot(); +} + +void MetricsPublisher::recordFrameIn(uint64_t now_ns, uint64_t source_timestamp_ns) { + frames_in_.fetch_add(1, std::memory_order_relaxed); + + if (source_timestamp_ns == 0 || now_ns < source_timestamp_ns) { + return; + } + + const uint64_t latency_ns = now_ns - source_timestamp_ns; + constexpr double kNsToMs = 1.0 / 1000000.0; + const double sample_ms = static_cast(latency_ns) * kNsToMs; + + if (std::isfinite(sample_ms) && sample_ms >= 0.0 && sample_ms < 10000.0) { + constexpr double alpha = 0.2; + const double previous = latency_estimate_ms_.load(std::memory_order_relaxed); + const double next = + previous <= 0.0 ? sample_ms : ((1.0 - alpha) * previous + alpha * sample_ms); + latency_estimate_ms_.store(next, std::memory_order_relaxed); + } +} + +void MetricsPublisher::recordFrameOut() { + frames_out_.fetch_add(1, std::memory_order_relaxed); +} + +void MetricsPublisher::recordFrameDropped() { + frames_dropped_.fetch_add(1, std::memory_order_relaxed); +} + +void MetricsPublisher::recordReconnect() { + reconnect_count_.fetch_add(1, std::memory_order_relaxed); +} + +void MetricsPublisher::updateRuntimeState(const std::string& runtime_state) { + runtime_state_ = runtime_state; +} + +void MetricsPublisher::publishEvent(const std::string& severity, const std::string& code, + const std::string& message) { + ros2_gst_video_bridge_msgs::msg::RuntimeEvent event; + event.stamp = node_.now(); + event.session_id = session_id_; + event.stream_id = stream_id_; + event.severity = severity; + event.code = code; + event.message = message; + event.state = runtime_state_; + event.codec_name = codec_name_; + event.encoder_name = encoder_name_; + event.transport_kind = transport_kind_; + event.sink_uri = sink_uri_; + event.adaptation_level = adaptation_level_; + event.software_fallback_applied = sw_fallback_applied_; + event_pub_->publish(event); +} + +ros2_gst_video_bridge_msgs::msg::RuntimeStatus +MetricsPublisher::buildStatusSnapshot(uint64_t now_ns, double elapsed_s) { + ros2_gst_video_bridge_msgs::msg::RuntimeStatus status; + status.stamp = node_.now(); + status.session_id = session_id_; + status.stream_id = stream_id_; + status.state = runtime_state_; + status.source_topic = source_topic_; + status.codec_name = codec_name_; + status.encoder_name = encoder_name_; + status.transport_kind = transport_kind_; + status.sink_uri = sink_uri_; + + const uint64_t in_count = frames_in_.load(std::memory_order_relaxed); + const uint64_t out_count = frames_out_.load(std::memory_order_relaxed); + const uint64_t drop_count = frames_dropped_.load(std::memory_order_relaxed); + const uint64_t reconnects = reconnect_count_.load(std::memory_order_relaxed); + + status.fps_in = static_cast(static_cast(in_count - last_frames_in_) / elapsed_s); + status.fps_out = + static_cast(static_cast(out_count - last_frames_out_) / elapsed_s); + status.dropped_total = drop_count; + status.dropped_since_last = drop_count - last_frames_dropped_; + status.reconnect_count = reconnects; + status.latency_estimate_ms = + static_cast(latency_estimate_ms_.load(std::memory_order_relaxed)); + status.max_fps_effective = max_fps_effective_; + status.bitrate_kbps_effective = bitrate_kbps_effective_; + status.gop_effective = gop_effective_; + status.auto_codec_requested = auto_codec_requested_; + status.hardware_encoder_selected = hw_encoder_selected_; + status.software_fallback_applied = sw_fallback_applied_; + status.adaptation_profile = adaptation_profile_; + status.adaptation_level = adaptation_level_; + + (void)now_ns; + return status; +} + +std::string +MetricsPublisher::buildLegacyPayload(const ros2_gst_video_bridge_msgs::msg::RuntimeStatus& status, + uint64_t drops_since_last) const { + std::ostringstream payload; + payload << "state=" << status.state << " fps_in=" << status.fps_in + << " fps_out=" << status.fps_out << " dropped_total=" << status.dropped_total + << " dropped_since_last=" << drops_since_last + << " reconnect_count=" << status.reconnect_count + << " latency_estimate_ms=" << status.latency_estimate_ms; + return payload.str(); +} + +void MetricsPublisher::publishSnapshot() { + const uint64_t now_ns = static_cast(node_.now().nanoseconds()); + const uint64_t elapsed_ns = std::max(1, now_ns - last_publish_time_ns_); + const double elapsed_s = static_cast(elapsed_ns) / 1000000000.0; + + const auto status = buildStatusSnapshot(now_ns, elapsed_s); + status_pub_->publish(status); + + std_msgs::msg::String msg; + msg.data = buildLegacyPayload(status, status.dropped_since_last); + metrics_pub_->publish(msg); + + last_frames_in_ = frames_in_.load(std::memory_order_relaxed); + last_frames_out_ = frames_out_.load(std::memory_order_relaxed); + last_frames_dropped_ = frames_dropped_.load(std::memory_order_relaxed); + last_publish_time_ns_ = now_ns; +} + +} // namespace ros2_gst_video_bridge diff --git a/src/runtime/stream_engine.cc b/src/runtime/stream_engine.cc new file mode 100644 index 0000000..ff435de --- /dev/null +++ b/src/runtime/stream_engine.cc @@ -0,0 +1,249 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/runtime/stream_engine.hpp" + +#include + +#include +#include +#include + +namespace ros2_gst_video_bridge { + +StreamEngine::StreamEngine(std::string pipeline) : pipeline_(std::move(pipeline)) {} + +StreamEngine::~StreamEngine() { + stop(); +} + +bool StreamEngine::start() { + std::lock_guard lock(mutex_); + last_error_.clear(); + + if (running_.load()) { + return true; + } + + if (!gst_initialized_) { + gst_init(nullptr, nullptr); + gst_initialized_ = true; + } + + GError* parse_error = nullptr; + pipeline_element_ = gst_parse_launch(pipeline_.c_str(), &parse_error); + if (parse_error != nullptr) { + std::ostringstream ss; + ss << "gst_parse_launch failed: " << parse_error->message; + last_error_ = ss.str(); + g_error_free(parse_error); + return false; + } + + if (pipeline_element_ == nullptr) { + last_error_ = "gst_parse_launch returned a null pipeline."; + return false; + } + + appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_element_), "bridge_appsrc"); + if (appsrc_ == nullptr) { + last_error_ = "Could not find appsrc element named bridge_appsrc in pipeline."; + gst_object_unref(pipeline_element_); + pipeline_element_ = nullptr; + return false; + } + + bus_ = gst_element_get_bus(pipeline_element_); + gst_app_src_set_stream_type(GST_APP_SRC(appsrc_), GST_APP_STREAM_TYPE_STREAM); + g_object_set(G_OBJECT(appsrc_), "is-live", TRUE, "format", GST_FORMAT_TIME, nullptr); + + const GstStateChangeReturn state_ret = + gst_element_set_state(pipeline_element_, GST_STATE_PLAYING); + if (state_ret == GST_STATE_CHANGE_FAILURE) { + last_error_ = "Failed to transition GStreamer pipeline to PLAYING state."; + if (bus_ != nullptr) { + gst_object_unref(bus_); + bus_ = nullptr; + } + gst_object_unref(appsrc_); + appsrc_ = nullptr; + gst_object_unref(pipeline_element_); + pipeline_element_ = nullptr; + running_.store(false); + return false; + } + + caps_width_ = 0; + caps_height_ = 0; + caps_format_.clear(); + running_.store(true); + bus_thread_ = std::jthread([this](std::stop_token stop_token) { busLoop(stop_token); }); + processBusMessages(0); + return true; +} + +void StreamEngine::stop() { + running_.store(false); + + if (bus_thread_.joinable()) { + bus_thread_.request_stop(); + bus_thread_.join(); + } + + std::lock_guard lock(mutex_); + + if (pipeline_element_ != nullptr) { + gst_element_set_state(pipeline_element_, GST_STATE_NULL); + } + + if (bus_ != nullptr) { + gst_object_unref(bus_); + bus_ = nullptr; + } + + if (appsrc_ != nullptr) { + gst_object_unref(appsrc_); + appsrc_ = nullptr; + } + + if (pipeline_element_ != nullptr) { + gst_object_unref(pipeline_element_); + pipeline_element_ = nullptr; + } +} + +bool StreamEngine::isRunning() const { + return running_.load(); +} + +const std::string& StreamEngine::pipeline() const { + return pipeline_; +} + +bool StreamEngine::pushFrame(const uint8_t* data, size_t size, int width, int height, + const std::string& gst_format, uint64_t timestamp_ns) { + std::lock_guard lock(mutex_); + + if (!running_.load() || appsrc_ == nullptr || pipeline_element_ == nullptr || data == nullptr || + size == 0) { + return false; + } + + if (!setAppSrcCaps(width, height, gst_format)) { + return false; + } + + GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr); + if (buffer == nullptr) { + last_error_ = "Failed to allocate GstBuffer."; + return false; + } + + GstMapInfo map_info; + if (!gst_buffer_map(buffer, &map_info, GST_MAP_WRITE)) { + last_error_ = "Failed to map GstBuffer for writing."; + gst_buffer_unref(buffer); + return false; + } + + std::memcpy(map_info.data, data, size); + gst_buffer_unmap(buffer, &map_info); + + GST_BUFFER_PTS(buffer) = static_cast(timestamp_ns); + GST_BUFFER_DTS(buffer) = GST_CLOCK_TIME_NONE; + + const GstFlowReturn flow_ret = gst_app_src_push_buffer(GST_APP_SRC(appsrc_), buffer); + processBusMessages(0); + if (flow_ret != GST_FLOW_OK) { + std::ostringstream ss; + ss << "appsrc push failed with flow status " << flow_ret; + last_error_ = ss.str(); + return false; + } + + return true; +} + +std::string StreamEngine::lastError() const { + std::lock_guard lock(mutex_); + return last_error_; +} + +bool StreamEngine::setAppSrcCaps(int width, int height, const std::string& gst_format) { + if (appsrc_ == nullptr) { + last_error_ = "Cannot set appsrc caps because appsrc is null."; + return false; + } + + if (width == caps_width_ && height == caps_height_ && gst_format == caps_format_) { + return true; + } + + GstCaps* caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, gst_format.c_str(), + "width", G_TYPE_INT, width, "height", G_TYPE_INT, height, + "framerate", GST_TYPE_FRACTION, 0, 1, nullptr); + + if (caps == nullptr) { + last_error_ = "Failed to create GstCaps for appsrc."; + return false; + } + + gst_app_src_set_caps(GST_APP_SRC(appsrc_), caps); + gst_caps_unref(caps); + + caps_width_ = width; + caps_height_ = height; + caps_format_ = gst_format; + return true; +} + +void StreamEngine::processBusMessages(GstClockTime timeout) { + if (bus_ == nullptr) { + return; + } + + const auto filter = + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_WARNING | GST_MESSAGE_EOS); + GstMessage* message = timeout == 0 ? gst_bus_pop_filtered(bus_, filter) + : gst_bus_timed_pop_filtered(bus_, timeout, filter); + + while (true) { + if (message == nullptr) { + break; + } + + if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { + GError* error = nullptr; + gchar* debug_info = nullptr; + gst_message_parse_error(message, &error, &debug_info); + if (error != nullptr) { + last_error_ = error->message; + g_error_free(error); + } + if (debug_info != nullptr) { + g_free(debug_info); + } + } else if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_EOS) { + last_error_ = "GStreamer pipeline reached EOS."; + } + + gst_message_unref(message); + message = gst_bus_pop_filtered(bus_, filter); + } +} + +void StreamEngine::busLoop(std::stop_token stop_token) { + using namespace std::chrono_literals; + + while (!stop_token.stop_requested()) { + { + std::lock_guard lock(mutex_); + if (running_.load() && bus_ != nullptr) { + processBusMessages(100 * GST_MSECOND); + } + } + std::this_thread::sleep_for(10ms); + } +} + +} // namespace ros2_gst_video_bridge diff --git a/src/runtime/topic_introspector.cc b/src/runtime/topic_introspector.cc new file mode 100644 index 0000000..2470d0c --- /dev/null +++ b/src/runtime/topic_introspector.cc @@ -0,0 +1,30 @@ +// Author: Mouhsine Kassimi Farhaoui +// Contact: mouhsine98@gmail.com + +#include "ros2_gst_video_bridge/runtime/topic_introspector.hpp" + +namespace ros2_gst_video_bridge { + +TopicIntrospector::TopicIntrospector(rclcpp::Node& node) : node_(node) {} + +std::map> TopicIntrospector::listTopics() const { + return node_.get_topic_names_and_types(); +} + +std::map> TopicIntrospector::listImageTopics() const { + const auto all_topics = node_.get_topic_names_and_types(); + std::map> image_topics; + + for (const auto& topic : all_topics) { + for (const auto& type : topic.second) { + if (type == "sensor_msgs/msg/Image" || type == "sensor_msgs/msg/CompressedImage") { + image_topics[topic.first] = topic.second; + break; + } + } + } + + return image_topics; +} + +} // namespace ros2_gst_video_bridge diff --git a/test/test_config_loader.cpp b/test/test_config_loader.cpp new file mode 100644 index 0000000..c2ff9af --- /dev/null +++ b/test/test_config_loader.cpp @@ -0,0 +1,67 @@ +#include "ros2_gst_video_bridge/core/config_loader.hpp" + +#include +#include + +#include + +namespace +{ + +class ConfigLoaderTest : public ::testing::Test +{ +protected: + static void SetUpTestSuite() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + + static void TearDownTestSuite() + { + if (rclcpp::ok()) { + rclcpp::shutdown(); + } + } +}; + +TEST_F(ConfigLoaderTest, AppliesJetsonLowLatencyProfileDefaults) +{ + rclcpp::NodeOptions options; + options.parameter_overrides(std::vector{ + rclcpp::Parameter("profile.machine", "jetson"), + rclcpp::Parameter("profile.stream", "low_latency") + }); + + auto node = std::make_shared("config_loader_profile_test", options); + const auto cfg = ros2_gst_video_bridge::ConfigLoader::loadFromNode(*node); + + EXPECT_EQ(cfg.profile.machine, "jetson"); + EXPECT_EQ(cfg.profile.stream, "low_latency"); + EXPECT_EQ(cfg.transport.kind, "srt"); + EXPECT_EQ(cfg.transport.latency_ms, 60); + EXPECT_EQ(cfg.codec.name, "h264"); + EXPECT_EQ(cfg.codec.tune, "zerolatency"); + EXPECT_GT(cfg.codec.bitrate_kbps, 0); +} + +TEST_F(ConfigLoaderTest, ValidateRejectsInvalidMode) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.runtime.mode = "invalid_mode"; + + const auto errors = ros2_gst_video_bridge::ConfigLoader::validate(cfg); + EXPECT_FALSE(errors.empty()); +} + +TEST_F(ConfigLoaderTest, ValidateAcceptsAutoCodec) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.codec.name = "auto"; + + const auto errors = ros2_gst_video_bridge::ConfigLoader::validate(cfg); + EXPECT_TRUE(errors.empty()); +} + +} // namespace diff --git a/test/test_pipeline_builder.cpp b/test/test_pipeline_builder.cpp new file mode 100644 index 0000000..f44a1bf --- /dev/null +++ b/test/test_pipeline_builder.cpp @@ -0,0 +1,125 @@ +#include "ros2_gst_video_bridge/core/pipeline_builder.hpp" + +#include + +namespace +{ + +TEST(PipelineBuilderTest, BuildsSrtH264Pipeline) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.transport.kind = "srt"; + cfg.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + cfg.codec.name = "h264"; + cfg.codec.bitrate_kbps = 2000; + cfg.codec.gop = 30; + + const auto pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + + EXPECT_NE(pipeline.find("appsrc name=bridge_appsrc"), std::string::npos); + EXPECT_NE(pipeline.find("queue leaky=downstream"), std::string::npos); + EXPECT_NE(pipeline.find("x264enc"), std::string::npos); + EXPECT_NE(pipeline.find("h264parse"), std::string::npos); + EXPECT_NE(pipeline.find("srtsink"), std::string::npos); +} + +TEST(PipelineBuilderTest, BuildsUdpPipelineWithParsedHostAndPort) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.transport.kind = "udp"; + cfg.transport.sink_uri = "udp://192.168.1.10:6000"; + cfg.codec.name = "h264"; + + const auto pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + + EXPECT_NE(pipeline.find("rtph264pay"), std::string::npos); + EXPECT_NE(pipeline.find("h264parse"), std::string::npos); + EXPECT_NE(pipeline.find("udpsink host=192.168.1.10 port=6000"), std::string::npos); +} + +TEST(PipelineBuilderTest, BuildsHardwareEncoderPipelineWithoutX26xOnlyProperties) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.transport.kind = "srt"; + cfg.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + cfg.codec.name = "h264"; + cfg.codec.encoder = "nvv4l2h264enc"; + cfg.codec.tune = "zerolatency"; + cfg.codec.bitrate_kbps = 2000; + cfg.codec.gop = 30; + + const auto pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + + EXPECT_NE(pipeline.find("nvv4l2h264enc"), std::string::npos); + EXPECT_NE(pipeline.find("nvvidconv"), std::string::npos); + EXPECT_NE(pipeline.find("video/x-raw(memory:NVMM),format=NV12"), std::string::npos); + EXPECT_NE(pipeline.find("h264parse"), std::string::npos); + EXPECT_EQ(pipeline.find(" tune="), std::string::npos); + EXPECT_EQ(pipeline.find(" key-int-max="), std::string::npos); + EXPECT_EQ(pipeline.find(" speed-preset="), std::string::npos); +} + +TEST(PipelineBuilderTest, BuildsH265PipelinesAcrossTransports) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.codec.name = "h265"; + + cfg.transport.kind = "srt"; + cfg.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + auto pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("x265enc"), std::string::npos); + EXPECT_NE(pipeline.find("h265parse"), std::string::npos); + EXPECT_NE(pipeline.find("mpegtsmux"), std::string::npos); + EXPECT_NE(pipeline.find("srtsink"), std::string::npos); + + cfg.transport.kind = "udp"; + cfg.transport.sink_uri = "udp://10.1.2.3:7000"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("rtph265pay"), std::string::npos); + EXPECT_NE(pipeline.find("udpsink host=10.1.2.3 port=7000"), std::string::npos); + + cfg.transport.kind = "rtsp"; + cfg.transport.sink_uri = "rtsp://127.0.0.1:8554/live"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("rtph265pay"), std::string::npos); + EXPECT_NE(pipeline.find("rtspclientsink location=rtsp://127.0.0.1:8554/live protocols=tcp"), std::string::npos); + + cfg.transport.kind = "file"; + cfg.transport.sink_uri = "/tmp/out_h265.ts"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("mpegtsmux"), std::string::npos); + EXPECT_NE(pipeline.find("filesink location=/tmp/out_h265.ts"), std::string::npos); +} + +TEST(PipelineBuilderTest, BuildsMjpegPipelinesAcrossTransports) +{ + ros2_gst_video_bridge::GstBridgeConfig cfg; + cfg.codec.name = "mjpeg"; + + cfg.transport.kind = "srt"; + cfg.transport.sink_uri = "srt://127.0.0.1:9000?mode=listener"; + auto pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("jpegenc"), std::string::npos); + EXPECT_NE(pipeline.find("matroskamux"), std::string::npos); + EXPECT_NE(pipeline.find("srtsink"), std::string::npos); + + cfg.transport.kind = "udp"; + cfg.transport.sink_uri = "udp://10.1.2.3:7100"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("rtpjpegpay"), std::string::npos); + EXPECT_NE(pipeline.find("udpsink host=10.1.2.3 port=7100"), std::string::npos); + + cfg.transport.kind = "rtsp"; + cfg.transport.sink_uri = "rtsp://127.0.0.1:8554/mjpeg"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("rtpjpegpay"), std::string::npos); + EXPECT_NE(pipeline.find("rtspclientsink location=rtsp://127.0.0.1:8554/mjpeg protocols=tcp"), std::string::npos); + + cfg.transport.kind = "file"; + cfg.transport.sink_uri = "/tmp/out_mjpeg.mkv"; + pipeline = ros2_gst_video_bridge::PipelineBuilder::build(cfg); + EXPECT_NE(pipeline.find("matroskamux"), std::string::npos); + EXPECT_NE(pipeline.find("filesink location=/tmp/out_mjpeg.mkv"), std::string::npos); +} + +} // namespace