Skip to content

Conversation

@hungngyenn
Copy link
Contributor

@hungngyenn hungngyenn commented Oct 15, 2025

Closes #10

Please check if the PR fulfills these requirements

  • Tests for the changes have been added (for bug fixes / features)
  • Docs have been added / updated (for bug fixes / features)
  • CHANGELOG.md has been updated (for bug fixes / features / docs)

What kind of change does this PR introduce?

This PR introduces per-pipeline dwnsampling.

What was changed?

Configuration: Updated PipelineConfig to include three new optional parameters: downsampling_mode, stride_n, and max_rate_hz.

Logic: Implemented the down_sampling method to act as a gate function, enforcing message skips based on the configured mode.

stride Mode: Skips messages based on a counter (stride_n), ensuring only the N-th message is recorded.

max_rate Mode: Skips messages based on time (max_rate_hz), ensuring the interval between recorded messages meets a minimum time threshold.

Related issues

(Add links to related issues)

Does this PR introduce a breaking change?

No.

Other information:

Copy link
Member

@AnthonyCvn AnthonyCvn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice progress! I added a few comments for your review, and I'd also like to avoid having the state object updated in a static function (let's update it in the process_message function).

static_labels: dict[str, str] = Field(default_factory=dict)
filename_mode: FilenameMode = FilenameMode.TIMESTAMP

downsampling_mode: str = Field("none", pattern=r"^(none|max_rate|stride)$")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use an Enum for that field.

@AnthonyCvn
Copy link
Member

AnthonyCvn commented Oct 17, 2025

@hungngyenn I checked your integration test, the problem is that the publisher is at 10Hz and recorder has a max_duration_s set to 1s. So the thing is that after 1s it creates a record for ReductStore with max 10 messages inside the MCAP file.

I saw that other tests got broken and needs your attention. Some of which are about formatting and linting.

For formatting, you can run:

black .

And sort the imports with:

isort . 

For linting, you need to check the result of test_pep257 and test_flake8.

I hope that helps !

@atimin atimin self-requested a review October 23, 2025 14:35
Copy link
Member

@atimin atimin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just move the Downloader into a separated file and use Enum for the downsampling_mode field as @AnthonyCvn asked. Good job!

Copy link
Member

@AnthonyCvn AnthonyCvn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Added small suggestions and then you can merge.

timer: Timer | None = None
current_size: int = 0
is_uploading: bool = False
downsampler: Optional["Downsampler"] = Field(default=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
downsampler: Optional["Downsampler"] = Field(default=None)
downsampler: Downsampler | None = None

topics=topics,
buffer=buffer,
writer=writer,
downsampler=downsampler_instance,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
downsampler=downsampler_instance,
downsampler=Downsampler(cfg),

@atimin atimin self-requested a review October 27, 2025 11:32
@hungngyenn hungngyenn merged commit 73769c1 into main Oct 29, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add downsampling options max_rate and stride

5 participants