Skip to content

feat(mcap): add robotics telemetry read planning#7074

Open
RitwijParmar wants to merge 3 commits into
Eventual-Inc:mainfrom
RitwijParmar:codex/daft-mcap-telemetry-planner
Open

feat(mcap): add robotics telemetry read planning#7074
RitwijParmar wants to merge 3 commits into
Eventual-Inc:mainfrom
RitwijParmar:codex/daft-mcap-telemetry-planner

Conversation

@RitwijParmar

Copy link
Copy Markdown
Contributor

Summary

Adds a lightweight planning layer for large MCAP robotics logs, so users can inspect topic/time coverage and plan sharded or resumable reads before decoding payloads.

This adds:

  • daft.inspect_mcap(...) for topic manifests with schema/message metadata, time bounds, message counts, and payload-size estimates
  • daft.plan_mcap_reads(...) for per-topic time windows sized by max_messages_per_task
  • optional read_mcap(..., include_record_metadata=True) columns for schema/message encoding and payload size
  • docs for inspecting manifests, planning distributed reads, and feeding planned windows back into read_mcap
  • synthetic camera/IMU/LiDAR-style MCAP tests that cover sharding and resume workflows

Why

Large robotics logs often mix high-rate IMU data, bursty camera frames, and LiDAR topics in one MCAP file. For distributed processing, replay, or debugging, it helps to cheaply answer: what topics exist, what time ranges do they cover, and how should the work be split by topic/time before reading payloads?

inspect_mcap gives that manifest without decoding messages. plan_mcap_reads turns it into concrete read windows that can be resumed or scheduled independently.

Testing

  • python3 -m compileall daft/io/mcap/_mcap.py daft/io/__init__.py daft/__init__.py tests/io/mcap/test_mcap.py
  • python3 -m ruff check daft/io/mcap/_mcap.py daft/io/__init__.py tests/io/mcap/test_mcap.py
  • python3 -m ruff format daft/io/mcap/_mcap.py daft/io/__init__.py daft/__init__.py tests/io/mcap/test_mcap.py --check
  • git diff --check
  • DAFT_RUNNER=native .venv/bin/python -m pytest tests/io/mcap/test_mcap.py -q

Focused pytest result: 13 passed, 1 skipped.

@github-actions github-actions Bot added the feat label Jun 4, 2026
@greptile-apps

greptile-apps Bot commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds inspect_mcap and plan_mcap_reads as new public APIs on top of the existing read_mcap connector, plus an include_record_metadata flag on read_mcap itself. The inspect path scans all MCAP message records (without running payload decoders) to build a per-topic statistics manifest; the plan path turns that manifest into time-windowed read tasks for distributed or resumable processing.

  • inspect_mcap iterates every message in every matched file to collect per-topic time bounds and payload-size statistics, returning one row per file/topic/schema/encoding combination.
  • plan_mcap_reads calls inspect_mcap internally and splits each topic's time span into windows sized by max_messages_per_task; time-uniform splitting can produce degenerate or empty windows when messages cluster at a single timestamp.
  • The PR silently removes a try/except guard around topic_start_time_resolver calls in MCAPSource.get_tasks, changing the failure mode for existing resolver users from a graceful fallback to a hard exception.

Confidence Score: 3/5

Two real defects in the core implementation need attention before merging: the window-degeneration bug and the unannounced behavioral change to topic_start_time_resolver.

The window-planning logic produces incorrect output when all messages in a topic share the same log_time — estimated counts are non-zero for windows that return nothing from read_mcap. Separately, the removal of the try/except around topic_start_time_resolver silently changes a public contract, turning graceful degradation into hard failures for existing callers. Both defects are in the changed code path.

daft/io/mcap/_mcap.py — specifically the window-count clamping in plan_mcap_reads and the get_tasks error handling change.

Important Files Changed

Filename Overview
daft/io/mcap/_mcap.py Core implementation of inspect_mcap and plan_mcap_reads; contains a breaking change in topic_start_time_resolver error handling and a window-degeneration bug when all messages share the same log timestamp.
daft/io/init.py Adds inspect_mcap and plan_mcap_reads to the daft.io public namespace and all; straightforward plumbing.
daft/init.py Exposes inspect_mcap and plan_mcap_reads at the top-level daft namespace; no issues.
tests/io/mcap/test_mcap.py Adds 8 new tests covering inspect_mcap, plan_mcap_reads, include_record_metadata, and topic_start_time_resolver error propagation; good coverage, but resume test only verifies total row count without checking for duplication or gaps at window boundaries.
docs/connectors/mcap.md Adds inspect and plan documentation sections; plan output table is missing schema_name, schema_encoding, and message_encoding columns that are present in the actual DataFrame.
docs/connectors/index.md Adds index entries for inspect_mcap and plan_mcap_reads; no issues.

Sequence Diagram

sequenceDiagram
    participant User
    participant inspect_mcap
    participant plan_mcap_reads
    participant _inspect_mcap_rows
    participant read_mcap
    participant MCAPSource
    participant MCAPSourceTask

    User->>inspect_mcap: inspect_mcap(path, topics, start_time, end_time)
    inspect_mcap->>_inspect_mcap_rows: _inspect_mcap_rows(path, ...)
    _inspect_mcap_rows->>_inspect_mcap_rows: list_files then open each MCAP
    _inspect_mcap_rows->>_inspect_mcap_rows: iter_messages (no decoders) then accumulate _MCAPTopicStats
    _inspect_mcap_rows-->>inspect_mcap: list of dict rows
    inspect_mcap-->>User: DataFrame (topic manifest)

    User->>plan_mcap_reads: plan_mcap_reads(path, max_messages_per_task)
    plan_mcap_reads->>_inspect_mcap_rows: _inspect_mcap_rows(path, ...)
    _inspect_mcap_rows-->>plan_mcap_reads: manifest rows
    plan_mcap_reads->>plan_mcap_reads: split each topic span into time windows
    plan_mcap_reads-->>User: DataFrame (read plan rows)

    User->>read_mcap: read_mcap(file_path, topics, start_time, end_time)
    read_mcap->>MCAPSource: MCAPSource(include_record_metadata)
    MCAPSource->>MCAPSource: topic_start_time_resolver(file_path) if set
    MCAPSource->>MCAPSourceTask: yield MCAPSourceTask per file/topic
    MCAPSourceTask->>MCAPSourceTask: iter_messages then buffer rows then yield RecordBatch
    MCAPSourceTask-->>User: DataFrame (messages)
Loading

Reviews (1): Last reviewed commit: "feat(mcap): add robotics telemetry read ..." | Re-trigger Greptile

Comment thread daft/io/mcap/_mcap.py
Comment on lines 487 to 490
if self._topic_start_time_resolver is not None:
try:
keyframes = self._topic_start_time_resolver(file_path)
except Exception:
keyframes = None
keyframes = self._topic_start_time_resolver(file_path)

if not keyframes:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent error swallowing removed — behavior-breaking for existing callers

The previous code wrapped self._topic_start_time_resolver(file_path) in a try/except Exception that fell back to keyframes = None on any failure, so a resolver that threw would degrade gracefully to a full-file read. Removing that guard means any exception from the resolver now propagates and aborts the entire read, which is a change in public contract for users who had resolvers that occasionally fail. If this is intentional, the PR title should carry ! per the project's conventional commit convention (e.g., feat!(mcap): ...).

Comment thread daft/io/mcap/_mcap.py
Comment on lines +298 to +310
Args:
path: MCAP file or directory path.
io_config: IO configuration to use for remote storage.
start_time: Optional lower log-time bound.
end_time: Optional upper log-time bound.
topics: Optional topics to plan.
max_messages_per_task: Target maximum messages per planned read window.

Returns:
DataFrame: One row per planned file/topic/time window.
"""
import math

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Degenerate windows when all messages share the same log timestamp

When first_log_time == last_log_time (all messages on a single timestamp), end_bound - start_bound == 1 so window_width is always 1 regardless of how many windows window_count requests. Window 0 covers [T, T+1) and contains every message; all subsequent windows cover [T+1, …) and are empty. Meanwhile, estimated_message_count distributes the total evenly across all windows using divmod, so callers get non-zero estimates for windows that actually return nothing from read_mcap. A guard like if window_count > end_bound - start_bound: window_count = end_bound - start_bound (minimum 1) would avoid emitting unreachable windows.

Comment thread daft/io/mcap/_mcap.py
Comment on lines +149 to +184
def _empty_mcap_inspection() -> DataFrame:
import pyarrow as pa

from daft.convert import from_arrow

schema = pa.schema(
[
pa.field("file_path", pa.string()),
pa.field("topic", pa.string()),
pa.field("schema_name", pa.string()),
pa.field("schema_encoding", pa.string()),
pa.field("message_encoding", pa.string()),
pa.field("message_count", pa.int64()),
pa.field("first_log_time", pa.int64()),
pa.field("last_log_time", pa.int64()),
pa.field("first_publish_time", pa.int64()),
pa.field("last_publish_time", pa.int64()),
pa.field("min_message_size", pa.int64()),
pa.field("max_message_size", pa.int64()),
pa.field("total_message_size", pa.int64()),
pa.field("avg_message_size", pa.float64()),
]
)
return from_arrow(pa.Table.from_pylist([], schema=schema))


def _inspect_mcap_rows(
path: str,
io_config: IOConfig | None = None,
start_time: int | None = None,
end_time: int | None = None,
topics: list[str] | None = None,
) -> list[dict[str, object]]:
import importlib

make_reader = importlib.import_module("mcap.reader").make_reader

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Inline imports violate project import convention

import pyarrow as pa, from daft.convert import from_arrow, import importlib, from daft.convert import from_pylist, and import math are all scattered inside the new functions. The project rule requires import statements at the top of the file. math and importlib are standard-library modules with no optional-dependency concern; the stdlib and internal daft imports should move to the module header.

Rule Used: Import statements should be placed at the top of t... (source)

Learned From
Eventual-Inc/Daft#5078

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment thread docs/connectors/mcap.md
Comment on lines +130 to +139
| Column | Type | Description |
|--------|------|-------------|
| `file_path` | `string` | MCAP file path to read |
| `topic` | `string` | Topic to read |
| `window_index` | `int64` | Zero-based window index for this file/topic |
| `window_count` | `int64` | Number of windows planned for this file/topic |
| `start_time` | `int64` | Inclusive log-time lower bound |
| `end_time` | `int64` | Exclusive log-time upper bound |
| `estimated_message_count` | `int64` | Approximate number of messages in this window |
| `estimated_payload_bytes` | `int64` | Approximate raw payload bytes in this window |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The plan_mcap_reads output table is missing three columns that are actually present in every output row: schema_name, schema_encoding, and message_encoding. These come directly from the _empty_mcap_read_plan schema and the plan_rows dict.

Suggested change
| Column | Type | Description |
|--------|------|-------------|
| `file_path` | `string` | MCAP file path to read |
| `topic` | `string` | Topic to read |
| `window_index` | `int64` | Zero-based window index for this file/topic |
| `window_count` | `int64` | Number of windows planned for this file/topic |
| `start_time` | `int64` | Inclusive log-time lower bound |
| `end_time` | `int64` | Exclusive log-time upper bound |
| `estimated_message_count` | `int64` | Approximate number of messages in this window |
| `estimated_payload_bytes` | `int64` | Approximate raw payload bytes in this window |
| Column | Type | Description |
|--------|------|-------------|
| `file_path` | `string` | MCAP file path to read |
| `topic` | `string` | Topic to read |
| `schema_name` | `string` | MCAP schema name, such as a ROS2 message type |
| `schema_encoding` | `string` | Schema encoding stored in the MCAP file |
| `message_encoding` | `string` | Channel message encoding |
| `window_index` | `int64` | Zero-based window index for this file/topic |
| `window_count` | `int64` | Number of windows planned for this file/topic |
| `start_time` | `int64` | Inclusive log-time lower bound |
| `end_time` | `int64` | Exclusive log-time upper bound |
| `estimated_message_count` | `int64` | Approximate number of messages in this window |
| `estimated_payload_bytes` | `int64` | Approximate raw payload bytes in this window |

@RitwijParmar

Copy link
Copy Markdown
Contributor Author

Looks like the remaining red checks are from external Hugging Face rate limits rather than this MCAP change. Both native/ray IO jobs are failing on HTTP 429 for Eventual-Inc/sample-parquet / stanfordnlp/imdb, while style and the focused MCAP tests are green locally.

I don’t have permission to rerun the failed jobs from the fork. Could someone rerun those IO jobs when convenient?

@RitwijParmar

Copy link
Copy Markdown
Contributor Author

Quick follow-up on the Greptile review: the two core issues it flagged are covered in the latest PR head.

What changed after the reviewed commit c48d270:

  • c24f079 adds stricter integer validation around planner manifest fields.
  • cd4abd7 handles degenerate/single-timestamp topics by collapsing them into one replayable window instead of producing time windows that estimate messages but read back empty.

The resolver behavior is also back to graceful fallback: topic_start_time_resolver exceptions are caught and the file falls back to the unscoped read path. There is regression coverage in test_mcap_topic_start_time_resolver_errors_fall_back_to_unscoped_reads.

Relevant tests now in the PR:

  • test_mcap_plan_reads_collapses_single_timestamp_windows
  • test_mcap_topic_start_time_resolver_errors_fall_back_to_unscoped_reads
  • test_mcap_plan_reads_can_resume_from_planned_windows

Docs also include the plan output metadata columns schema_name, schema_encoding, and message_encoding in the planning table.

I can’t run the focused Daft pytest in this local checkout because the compiled extension cannot be imported here, but git diff --check is clean and the PR branch includes the regression coverage above. A Greptile/CI re-review should be looking at cd4abd7 rather than the earlier c48d270 commit.

@RitwijParmar RitwijParmar force-pushed the codex/daft-mcap-telemetry-planner branch from cd4abd7 to 23f5fb6 Compare June 18, 2026 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant