diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 12b3d70dd..10f402281 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -70,6 +70,23 @@ FetchContent_Declare(json ) FetchContent_MakeAvailable(json) +FetchContent_Declare( + mcap + GIT_REPOSITORY https://github.com/foxglove/mcap.git + GIT_TAG releases/cpp/v2.1.2 +) +FetchContent_MakeAvailable(mcap) + +set(LZ4_BUILD_CLI OFF CACHE BOOL "" FORCE) +set(LZ4_BUILD_LEGACY_LZ4C OFF CACHE BOOL "" FORCE) +FetchContent_Declare( + lz4 + GIT_REPOSITORY https://github.com/lz4/lz4.git + GIT_TAG v1.10.0 + SOURCE_SUBDIR build/cmake +) +FetchContent_MakeAvailable(lz4) + if(DEFINED SANITIZE) if(NOT CMAKE_CXX_COMPILER_ID MATCHES ".*Clang") message(FATAL_ERROR "Sanitizers are only supported with Clang, not ${CMAKE_CXX_COMPILER_ID}") @@ -261,3 +278,15 @@ set_property(TARGET example_message_filtering PROPERTY CXX_STANDARD_REQUIRED Tru target_compile_options(example_message_filtering PUBLIC ${SANITIZER_COMPILE_OPTIONS}) target_link_options(example_message_filtering PUBLIC ${SANITIZER_LINK_OPTIONS}) target_link_libraries(example_message_filtering foxglove_cpp_static) + +add_executable(example_ws_stream_mcap + examples/ws-stream-mcap/src/main.cpp + examples/ws-stream-mcap/src/mcap_player.cpp +) +set_property(TARGET example_ws_stream_mcap PROPERTY CXX_STANDARD 17) +set_property(TARGET example_ws_stream_mcap PROPERTY CXX_STANDARD_REQUIRED True) +target_compile_options(example_ws_stream_mcap PUBLIC ${SANITIZER_COMPILE_OPTIONS} ${STRICT_COMPILE_OPTIONS}) +target_link_options(example_ws_stream_mcap PUBLIC ${SANITIZER_LINK_OPTIONS}) +target_compile_definitions(example_ws_stream_mcap PRIVATE MCAP_COMPRESSION_NO_ZSTD) +target_include_directories(example_ws_stream_mcap SYSTEM PRIVATE ${mcap_SOURCE_DIR}/cpp/mcap/include) +target_link_libraries(example_ws_stream_mcap foxglove_cpp_static lz4_static) diff --git a/cpp/examples/ws-stream-mcap/src/main.cpp b/cpp/examples/ws-stream-mcap/src/main.cpp new file mode 100644 index 000000000..42f79308d --- /dev/null +++ b/cpp/examples/ws-stream-mcap/src/main.cpp @@ -0,0 +1,189 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mcap_player.hpp" + +using namespace std::chrono_literals; + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +static std::function sigint_handler; + +static void printUsage(const char* program) { + std::cerr << "Usage: " << program << " --file [--port ] [--host ]\n" + << " --file MCAP file to stream (required)\n" + << " --port Server port (default: 8765)\n" + << " --host Server host (default: 127.0.0.1)\n"; +} + +// NOLINTNEXTLINE(bugprone-exception-escape) +int main(int argc, char* argv[]) { + std::string file_path; + uint16_t port = 8765; + std::string host = "127.0.0.1"; + + // Parse CLI arguments + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if ((arg == "--file" || arg == "-f") && i + 1 < argc) { + file_path = argv[++i]; + } else if ((arg == "--port" || arg == "-p") && i + 1 < argc) { + port = static_cast(std::stoi(argv[++i])); + } else if (arg == "--host" && i + 1 < argc) { + host = argv[++i]; + } else if (arg == "--help" || arg == "-h") { + printUsage(argv[0]); + return 0; + } else { + std::cerr << "Unknown argument: " << arg << '\n'; + printUsage(argv[0]); + return 1; + } + } + + if (file_path.empty()) { + std::cerr << "Error: --file is required\n"; + printUsage(argv[0]); + return 1; + } + + foxglove::setLogLevel(foxglove::LogLevel::Info); + + // Extract file name for the server name + std::string server_name = file_path; + auto slash_pos = server_name.find_last_of('/'); + if (slash_pos != std::string::npos) { + server_name = server_name.substr(slash_pos + 1); + } + + std::cerr << "Loading MCAP summary\n"; + + auto player = McapPlayer::create(file_path); + if (!player) { + return 1; + } + + auto time_range = player->timeRange(); + auto shared_player = std::make_shared(); + auto player_ptr = std::shared_ptr(std::move(player)); + + foxglove::WebSocketServerOptions options = {}; + options.name = server_name; + options.host = host; + options.port = port; + // Explicitly advertise capabilities used by this example. + // Advertise playback time range (inclusive start/end, in nanoseconds since epoch) and + // enable Time + RangedPlayback so the Foxglove playback bar can control replay. + options.capabilities = foxglove::WebSocketServerCapabilities::RangedPlayback | + foxglove::WebSocketServerCapabilities::Time; + options.playback_time_range = time_range; + + const auto& mtx = shared_player; + const auto& player_ref = player_ptr; + // Handle playback control requests from Foxglove and return the updated playback state. + options.callbacks.onPlaybackControlRequest = [mtx, player_ref]( + const foxglove::PlaybackControlRequest& request + ) -> std::optional { + std::lock_guard lock(*mtx); + + bool did_seek = request.seek_time.has_value(); + + if (request.seek_time.has_value()) { + if (!player_ref->seek(*request.seek_time)) { + did_seek = false; + } + } + + player_ref->setPlaybackSpeed(request.playback_speed); + + switch (request.playback_command) { + case foxglove::PlaybackCommand::Play: + player_ref->play(); + break; + case foxglove::PlaybackCommand::Pause: + player_ref->pause(); + break; + } + + return foxglove::PlaybackState{ + player_ref->status(), + player_ref->currentTime(), + player_ref->playbackSpeed(), + did_seek, + request.request_id, + }; + }; + + auto server_result = foxglove::WebSocketServer::create(std::move(options)); + if (!server_result.has_value()) { + std::cerr << "Failed to create server: " << foxglove::strerror(server_result.error()) << '\n'; + return 1; + } + auto server = std::move(server_result.value()); + + std::atomic_bool done{false}; + std::signal(SIGINT, [](int) { + if (sigint_handler) { + sigint_handler(); + } + }); + sigint_handler = [&] { + std::cerr << "Shutting down...\n"; + done = true; + }; + + std::cerr << "Server listening on " << host << ":" << port << '\n'; + std::cerr << "Waiting for client\n"; + std::this_thread::sleep_for(1s); + + std::cerr << "Starting stream\n"; + auto last_status = foxglove::PlaybackStatus::Paused; + foxglove::PlaybackStatus current_status = foxglove::PlaybackStatus::Paused; + + while (!done) { + { + std::lock_guard lock(*mtx); + current_status = player_ptr->status(); + + if (current_status == foxglove::PlaybackStatus::Ended && + last_status != foxglove::PlaybackStatus::Ended) { + server.broadcastPlaybackState(foxglove::PlaybackState{ + foxglove::PlaybackStatus::Ended, + player_ptr->currentTime(), + player_ptr->playbackSpeed(), + false, + std::nullopt, + }); + } + } + last_status = current_status; + + if (current_status != foxglove::PlaybackStatus::Playing) { + std::this_thread::sleep_for(10ms); + continue; + } + + std::optional sleep_duration; + { + std::lock_guard lock(*mtx); + sleep_duration = player_ptr->logNextMessage(server); + } + + if (sleep_duration.has_value()) { + auto capped = std::min(*sleep_duration, std::chrono::nanoseconds(1'000'000'000)); + std::this_thread::sleep_for(capped); + } + } + + server.stop(); + return 0; +} diff --git a/cpp/examples/ws-stream-mcap/src/mcap_player.cpp b/cpp/examples/ws-stream-mcap/src/mcap_player.cpp new file mode 100644 index 000000000..0ee4a9f2b --- /dev/null +++ b/cpp/examples/ws-stream-mcap/src/mcap_player.cpp @@ -0,0 +1,201 @@ +#define MCAP_IMPLEMENTATION +#include "mcap_player.hpp" + +#include + +#include +#include + +std::unique_ptr McapPlayer::create(const std::string& path) { + auto player = std::unique_ptr(new McapPlayer()); + + auto status = player->reader_.open(path); + if (!status.ok()) { + std::cerr << "Failed to open MCAP file: " << status.message << '\n'; + return nullptr; + } + + auto on_problem = [](const mcap::Status& problem) { + std::cerr << "MCAP read problem: " << problem.message << '\n'; + }; + + status = player->reader_.readSummary(mcap::ReadSummaryMethod::AllowFallbackScan, on_problem); + if (!status.ok()) { + std::cerr << "Failed to read MCAP summary: " << status.message << '\n'; + return nullptr; + } + + // Extract time range from statistics + auto stats = player->reader_.statistics(); + if (!stats.has_value()) { + std::cerr << "MCAP file has no statistics record\n"; + return nullptr; + } + player->time_range_ = {stats->messageStartTime, stats->messageEndTime}; + player->current_time_ = stats->messageStartTime; + + if (!player->createChannels()) { + return nullptr; + } + + player->resetMessageView(player->current_time_); + + return player; +} + +McapPlayer::~McapPlayer() { + iterator_.reset(); + message_view_.reset(); + reader_.close(); +} + +bool McapPlayer::createChannels() { + const auto& schemas = reader_.schemas(); + for (const auto& [id, channel_ptr] : reader_.channels()) { + std::optional schema; + if (channel_ptr->schemaId != 0) { + auto schema_it = schemas.find(channel_ptr->schemaId); + if (schema_it != schemas.end()) { + const auto& mcap_schema = schema_it->second; + foxglove::Schema s; + s.name = mcap_schema->name; + s.encoding = mcap_schema->encoding; + s.data = reinterpret_cast(mcap_schema->data.data()); + s.data_len = mcap_schema->data.size(); + schema = std::move(s); + } + } + + auto channel_result = foxglove::RawChannel::create( + channel_ptr->topic, channel_ptr->messageEncoding, std::move(schema) + ); + if (!channel_result.has_value()) { + std::cerr << "Failed to create channel for topic '" << channel_ptr->topic + << "': " << foxglove::strerror(channel_result.error()) << '\n'; + return false; + } + channels_.emplace(id, std::move(channel_result.value())); + } + return true; +} + +void McapPlayer::resetMessageView(uint64_t start_time) { + iterator_.reset(); + message_view_.reset(); + + mcap::ReadMessageOptions opts; + opts.startTime = start_time; + opts.endTime = time_range_.second + 1; + + message_view_ = std::make_unique(reader_.readMessages( + [](const mcap::Status& problem) { + std::cerr << "MCAP message read problem: " << problem.message << '\n'; + }, + opts + )); + iterator_ = message_view_->begin(); + + time_tracker_.reset(); +} + +std::pair McapPlayer::timeRange() const { + return time_range_; +} + +void McapPlayer::setPlaybackSpeed(float speed) { + speed = TimeTracker::clampSpeed(speed); + if (time_tracker_.has_value()) { + time_tracker_->setSpeed(speed); + } + playback_speed_ = speed; +} + +void McapPlayer::play() { + if (status_ == foxglove::PlaybackStatus::Ended) { + return; + } + if (time_tracker_.has_value()) { + time_tracker_->resume(); + } + status_ = foxglove::PlaybackStatus::Playing; +} + +void McapPlayer::pause() { + if (status_ == foxglove::PlaybackStatus::Ended) { + return; + } + if (time_tracker_.has_value()) { + time_tracker_->pause(); + } + status_ = foxglove::PlaybackStatus::Paused; +} + +bool McapPlayer::seek(uint64_t log_time) { + log_time = std::max(time_range_.first, std::min(log_time, time_range_.second)); + resetMessageView(log_time); + current_time_ = log_time; + if (status_ == foxglove::PlaybackStatus::Ended) { + status_ = foxglove::PlaybackStatus::Paused; + } + return true; +} + +foxglove::PlaybackStatus McapPlayer::status() const { + return status_; +} + +uint64_t McapPlayer::currentTime() const { + return current_time_; +} + +float McapPlayer::playbackSpeed() const { + return playback_speed_; +} + +std::optional McapPlayer::logNextMessage( + const foxglove::WebSocketServer& server +) { + if (status_ != foxglove::PlaybackStatus::Playing) { + return std::nullopt; + } + + if (!iterator_.has_value() || !message_view_ || *iterator_ == message_view_->end()) { + status_ = foxglove::PlaybackStatus::Ended; + current_time_ = time_range_.second; + return std::nullopt; + } + + const auto& msg = **iterator_; + + // Initialize the time tracker on the first message + if (!time_tracker_.has_value()) { + time_tracker_.emplace(msg.message.logTime, playback_speed_); + } + + auto wakeup = time_tracker_->wakeupFor(msg.message.logTime); + auto now = std::chrono::steady_clock::now(); + if (wakeup > now) { + auto sleep_duration = std::chrono::duration_cast(wakeup - now); + return sleep_duration; + } + + current_time_ = msg.message.logTime; + + if (auto timestamp = time_tracker_->notify(msg.message.logTime)) { + // Broadcast time with the current playback time (nanoseconds since epoch). + // Requires WebSocketServerCapabilities::Time to be advertised by the server. + server.broadcastTime(*timestamp); + } + + auto channel_it = channels_.find(static_cast(msg.message.channelId)); + if (channel_it != channels_.end()) { + channel_it->second.log( + reinterpret_cast(msg.message.data), + msg.message.dataSize, + msg.message.logTime + ); + } + + ++(*iterator_); + return std::nullopt; +} diff --git a/cpp/examples/ws-stream-mcap/src/mcap_player.hpp b/cpp/examples/ws-stream-mcap/src/mcap_player.hpp new file mode 100644 index 000000000..5ee6fa042 --- /dev/null +++ b/cpp/examples/ws-stream-mcap/src/mcap_player.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include + +#include "playback_source.hpp" +#include "time_tracker.hpp" + +/// Plays back messages from an MCAP file, implementing the PlaybackSource interface. +/// +/// Uses the high-level mcap::McapReader + mcap::LinearMessageView API for iteration. +/// The LinearMessageView::Iterator supports peek-without-advance, so we only increment +/// the iterator after actually logging a message. +class McapPlayer final : public PlaybackSource { +public: + /// Creates a new McapPlayer from the given MCAP file path. + /// Returns nullptr on failure (prints error to stderr). + static std::unique_ptr create(const std::string& path); + + ~McapPlayer() override; + + // PlaybackSource interface + [[nodiscard]] std::pair timeRange() const override; + void setPlaybackSpeed(float speed) override; + void play() override; + void pause() override; + bool seek(uint64_t log_time) override; + [[nodiscard]] foxglove::PlaybackStatus status() const override; + [[nodiscard]] uint64_t currentTime() const override; + [[nodiscard]] float playbackSpeed() const override; + std::optional logNextMessage(const foxglove::WebSocketServer& server + ) override; + +private: + McapPlayer() = default; + + /// Creates RawChannels from the MCAP channel metadata. + bool createChannels(); + + /// Resets the message view iterator to start from the given log time. + void resetMessageView(uint64_t start_time); + + mcap::McapReader reader_; + std::unordered_map channels_; + std::unique_ptr message_view_; + std::optional iterator_; + std::optional time_tracker_; + + std::pair time_range_ = {0, 0}; + foxglove::PlaybackStatus status_ = foxglove::PlaybackStatus::Paused; + uint64_t current_time_ = 0; + float playback_speed_ = 1.0F; +}; diff --git a/cpp/examples/ws-stream-mcap/src/playback_source.hpp b/cpp/examples/ws-stream-mcap/src/playback_source.hpp new file mode 100644 index 000000000..6be24cf3c --- /dev/null +++ b/cpp/examples/ws-stream-mcap/src/playback_source.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include +#include +#include +#include + +/// A data source that supports ranged playback with play/pause, seek, and variable speed. +/// +/// Implementations are responsible for: +/// - Tracking playback state (playing/paused/ended) and current position +/// - Pacing message delivery according to timestamps and playback speed +/// - Logging messages to channels and broadcasting time updates to the server +class PlaybackSource { +public: + virtual ~PlaybackSource() = default; + PlaybackSource() = default; + PlaybackSource(PlaybackSource&&) = default; + PlaybackSource& operator=(PlaybackSource&&) = default; + + PlaybackSource(const PlaybackSource&) = delete; + PlaybackSource& operator=(const PlaybackSource&) = delete; + + /// Returns the inclusive (start, end) time bounds in nanoseconds since epoch. + [[nodiscard]] virtual std::pair timeRange() const = 0; + + /// Sets the playback speed multiplier (e.g., 1.0 for real-time, 2.0 for double speed). + virtual void setPlaybackSpeed(float speed) = 0; + + /// Begins or resumes playback. + virtual void play() = 0; + + /// Pauses playback. + virtual void pause() = 0; + + /// Seeks to the specified timestamp in nanoseconds. Returns true on success. + virtual bool seek(uint64_t log_time) = 0; + + /// Returns the current playback status. + [[nodiscard]] virtual foxglove::PlaybackStatus status() const = 0; + + /// Returns the current playback position in nanoseconds since epoch. + [[nodiscard]] virtual uint64_t currentTime() const = 0; + + /// Returns the current playback speed multiplier. + [[nodiscard]] virtual float playbackSpeed() const = 0; + + /// Logs the next message for playback if it's ready and broadcasts time updates. + /// + /// Returns: + /// - std::nullopt if a message was logged or playback has ended + /// - a duration to wait before trying again if the next message isn't due yet + virtual std::optional logNextMessage( + const foxglove::WebSocketServer& server + ) = 0; +}; diff --git a/cpp/examples/ws-stream-mcap/src/time_tracker.hpp b/cpp/examples/ws-stream-mcap/src/time_tracker.hpp new file mode 100644 index 000000000..798c390c7 --- /dev/null +++ b/cpp/examples/ws-stream-mcap/src/time_tracker.hpp @@ -0,0 +1,115 @@ +#pragma once + +#include +#include +#include +#include +#include + +/// Tracks the relationship between file timestamps and wall-clock time. +/// +/// Converts between "log time" (nanosecond timestamps in the MCAP file) and real wall-clock +/// time, accounting for playback speed, pause/resume, and speed changes. +class TimeTracker { +public: + static constexpr float kMinPlaybackSpeed = 0.01f; + + TimeTracker(uint64_t offset_ns, float speed) + : start_(std::chrono::steady_clock::now()) + , offset_ns_(offset_ns) + , speed_(clampSpeed(speed)) + , paused_(false) + , paused_elapsed_ns_(0) + , notify_interval_ns_(1'000'000'000 / 60) + , notify_last_(0) {} + + /// Returns the current log time based on elapsed wall time and playback speed. + uint64_t currentLogTime() const { + if (paused_) { + return offset_ns_ + paused_elapsed_ns_; + } + auto elapsed_wall = std::chrono::steady_clock::now() - start_; + auto elapsed_nanos = static_cast( + std::chrono::duration(elapsed_wall).count() * static_cast(speed_) + ); + return offset_ns_ + paused_elapsed_ns_ + elapsed_nanos; + } + + /// Returns the wall-clock time point at which a message with the given log_time should be + /// emitted. + std::chrono::steady_clock::time_point wakeupFor(uint64_t log_time) const { + uint64_t current = currentLogTime(); + if (log_time <= current) { + return std::chrono::steady_clock::now(); + } + uint64_t log_diff_ns = log_time - current; + uint64_t wall_diff_ns; + if (speed_ > 0.0f) { + wall_diff_ns = + static_cast(static_cast(log_diff_ns) / static_cast(speed_)); + } else { + wall_diff_ns = 1'000'000'000; + } + return std::chrono::steady_clock::now() + std::chrono::nanoseconds(wall_diff_ns); + } + + /// Pauses time tracking, accumulating elapsed log time. + void pause() { + if (!paused_) { + auto elapsed_wall = std::chrono::steady_clock::now() - start_; + auto elapsed_nanos = static_cast( + std::chrono::duration(elapsed_wall).count() * static_cast(speed_) + ); + paused_elapsed_ns_ += elapsed_nanos; + paused_ = true; + } + } + + /// Resumes time tracking from where it was paused. + void resume() { + if (paused_) { + start_ = std::chrono::steady_clock::now(); + paused_ = false; + } + } + + /// Changes the playback speed, accumulating elapsed time at the old speed. + void setSpeed(float speed) { + speed = clampSpeed(speed); + if (!paused_) { + auto elapsed_wall = std::chrono::steady_clock::now() - start_; + auto elapsed_nanos = static_cast( + std::chrono::duration(elapsed_wall).count() * static_cast(speed_) + ); + paused_elapsed_ns_ += elapsed_nanos; + start_ = std::chrono::steady_clock::now(); + } + speed_ = speed; + } + + /// Clamps speed to a minimum value. + static float clampSpeed(float speed) { + if (std::isfinite(speed) && speed >= kMinPlaybackSpeed) { + return speed; + } + return kMinPlaybackSpeed; + } + + /// Returns the current log time if enough time has passed since the last notification (~60 Hz). + std::optional notify(uint64_t current_ns) { + if (current_ns - notify_last_ >= notify_interval_ns_) { + notify_last_ = current_ns; + return current_ns; + } + return std::nullopt; + } + +private: + std::chrono::steady_clock::time_point start_; + uint64_t offset_ns_; + float speed_; + bool paused_; + uint64_t paused_elapsed_ns_; + uint64_t notify_interval_ns_; + uint64_t notify_last_; +};