Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ Changelog for package reductstore_agent
* Merge pull request `#11 <https://github.com/reductstore/reductstore_agent/issues/11>`_ from reductstore/11-support-raw-output
Implement raw output format per pipeline
* Merge pull request `#38 <https://github.com/reductstore/reductstore_agent/issues/38>`_ from reductstore/38-add-downsampling-integration-test
Add integration test for downsampling
Add integration test for downsampling
* Merge pull request `#14 <https://github.com/reductstore/reductstore_agent/issues/14>`_ from reductstore/14-support-dynamic-labels
Add dynamic labels feature
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ Each pipeline supports the following parameters:
* `"mcap"` *(default)* — Send mcap files to ReductStore
* `"cdr"` — Send binary CDR to ReductStore

* **`mode`**: Choose one of dynamic labels mode:

* `"last"` *(default)* - use the most recent message
* `"first"` - use the first message in the current file
* `"max"` - use the maximum value across all messages in the file

## Snap Package

This snap provides the **ReductStore Agent ROS 2 node**, which records ROS 2 topics and streams them into ReductStore.
Expand Down
102 changes: 102 additions & 0 deletions reductstore_agent/dynamic_labels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2025 ReductSoftware UG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""Label state tracker for dynamic labels."""

from typing import Any

from .models import LabelMode, LabelTopicConfig, PipelineConfig
from .utils import extract_field


class LabelStateTracker:
"""Class for tracking dynamic label state."""

def __init__(self, cfg: PipelineConfig, logger=None):
"""Initialize a LabelStateTracker instance."""
self._configs: dict[str, LabelTopicConfig] = {
label_cfg.topic: label_cfg for label_cfg in cfg.labels
}
self.logger = logger
self._updaters: dict[str, callable] = {}

for topic, label_cfg in self._configs.items():
if label_cfg.mode is LabelMode.LAST:
self._updaters[topic] = self._update_last
elif label_cfg.mode is LabelMode.FIRST:
self._updaters[topic] = self._update_first
else:
self._updaters[topic] = self._update_max

self._values: dict[str, Any] = {}

def update(self, topic_name, msg):
"""Update label state from a single incoming message."""
cfg = self._configs.get(topic_name)
if cfg is None:
if self.logger:
self.logger.info(
"Cannot read config for topic " f"'{topic_name}'. Returning ..."
)
return

updater = self._updaters[topic_name]

for label_name, field_path in cfg.fields.items():
value = extract_field(msg, field_path)
updater(label_name, value)

def _update_last(self, label_key: str, value: Any):
"""Use the most recent message (default)."""
self._values[label_key] = value

def _update_first(self, label_key: str, value: Any):
"""Use the first message of the current file."""
if label_key not in self._values:
self._values[label_key] = value

def _update_max(self, label_key: str, value: Any):
"""Use the maximum value across all messages in the file."""
if label_key not in self._values:
self._values[label_key] = value
return

if value > self._values[label_key]:
self._values[label_key] = value

def get_labels(self) -> dict[str, str]:
"""Return current labels for writing."""
return {k: str(v) for k, v in self._values.items()}


class NullLabelStateTracker(LabelStateTracker):
"""Null object for LabelStateTracker."""

def __init__(self):
"""Initialize the null object without configuration."""
pass

def update(self, topic_name, msg):
"""Do nothing on update, as there is no state to track."""
pass

def get_labels(self) -> dict[str, str]:
"""Return the default/empty state."""
return {}
17 changes: 17 additions & 0 deletions reductstore_agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ class OutputFormat(str, Enum):
CDR = "cdr"


class LabelMode(str, Enum):
"""Dynamic Label mode per pipeline."""

LAST = "last"
FIRST = "first"
MAX = "max"


class LabelTopicConfig(BaseModel):
"""Config for dynamic labels."""

topic: str
mode: LabelMode = Field(default=LabelMode.LAST)
fields: dict[str, str]


class PipelineConfig(BaseModel):
"""Configuration for a recording pipeline."""

Expand All @@ -116,6 +132,7 @@ class PipelineConfig(BaseModel):
include_topics: list[str] = Field(default_factory=list)
exclude_topics: list[str] = Field(default_factory=list)
static_labels: dict[str, str] = Field(default_factory=dict)
labels: list[LabelTopicConfig] = Field(default_factory=list)
filename_mode: FilenameMode = FilenameMode.TIMESTAMP

downsampling_mode: DownsamplingMode = Field(default=DownsamplingMode.NONE)
Expand Down
36 changes: 36 additions & 0 deletions reductstore_agent/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,45 @@ def load_pipeline_config(self) -> dict[str, PipelineConfig]:
pipelines_raw[pipeline_name].setdefault("static_labels", {})[
label_key
] = value
elif subkey.startswith("labels."):
parts = subkey.split(".")
if len(parts) < 3:
raise ValueError(
(
f"Invalid label key '{parts} "
"Expected 'pipelines.<pipeline_name>.<labels>."
)
)
idx_str = parts[1]
rest_parts = parts[2:]
try:
idx = int(idx_str)
except ValueError:
continue
labels_dict = pipelines_raw[pipeline_name].setdefault("labels", {})
entry = labels_dict.setdefault(idx, {"fields": {}})

head = rest_parts[0]

# labels.<idx>.topic
if head == "topic" and len(rest_parts) == 1:
entry["topic"] = value
# labels.<idx>.mode
elif head == "mode" and len(rest_parts) == 1:
entry["mode"] = value
# labels.<idx>.fields.<field_name>
elif head == "fields" and len(rest_parts) == 2:
field_name = rest_parts[1]
entry["fields"][field_name] = value

else:
pipelines_raw[pipeline_name][subkey] = value

pipelines: dict[str, PipelineConfig] = {}
for name, cfg in pipelines_raw.items():
if "labels" in cfg and isinstance(cfg["labels"], dict):
label_dict: dict[int, dict[str, Any]] = cfg["labels"]
cfg["labels"] = [label_dict[i] for i in sorted(label_dict)]
pipelines[name] = PipelineConfig(**cfg)
return pipelines

Expand Down Expand Up @@ -188,6 +222,8 @@ def init_pipeline_writers(self):
for pipeline_name, cfg in self.pipeline_configs.items():
duration = cfg.split_max_duration_s
topics = self.resolve_topics(cfg, all_topics)
if not topics and cfg.include_topics:
topics = set(cfg.include_topics)

writer = create_writer(cfg, self.bucket, pipeline_name, self.logger)
state = PipelineState(
Expand Down
18 changes: 18 additions & 0 deletions reductstore_agent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,21 @@ def ns_to_us(ns: int) -> int:
def metadata_size(labels: dict) -> int:
"""Return byte size of metadata."""
return len(json.dumps(labels, separators=(",", ":")).encode("utf-8"))


def extract_field(msg, field_path):
"""Return the value of a specific msg_field."""
attrs = field_path.split(".")
value = msg
for attr in attrs:
try:
value = getattr(value, attr)
except AttributeError:
try:
value = value[attr]
except (KeyError, TypeError):
raise ValueError(
f"Field path '{field_path}' failed at attribute/key '{attr}'. "
"Key not found or object is incompatible."
)
return value
13 changes: 12 additions & 1 deletion reductstore_agent/writer/cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from rclpy.serialization import serialize_message
from reduct import Batch, Bucket

from ..dynamic_labels import LabelStateTracker
from ..utils import get_or_create_event_loop, metadata_size, ns_to_us
from .base import OutputWriter

Expand All @@ -43,6 +44,7 @@ def __init__(
pipeline_name: str,
flush_threshold_bytes: int = 5 * 1024 * 1024, # i.e. 2MB
logger=None,
label_tracker=LabelStateTracker,
):
"""Initialize CDROutput writer."""
self.bucket = bucket
Expand All @@ -52,6 +54,7 @@ def __init__(
self._batch_size_bytes: int = 0
self._is_flushing = False
self._batch_metadata_size: int = 0
self.label_tracker = label_tracker

self._topic_to_msg_type: Dict[str, str] = {}
if logger is None:
Expand All @@ -65,6 +68,9 @@ async def upload_to_reductstore(
self, serialized_data: bytes, timestamp_us: int, labels: Dict
):
"""Upload raw data to ReductStore with labels and timestamp."""
# Update dynamic labels
labels.update(self.label_tracker.get_labels())

await self.bucket.write(
entry_name=self.pipeline_name,
timestamp=timestamp_us,
Expand All @@ -74,6 +80,9 @@ async def upload_to_reductstore(

def write_message(self, message: Any, publish_time: int, topic: str, **kwargs):
"""Write message to batch - synchronous interface."""
# Process each message and update labels
self.label_tracker.update(topic, message)

try:
serialized_data = serialize_message(message)
except Exception as exc:
Expand All @@ -96,7 +105,6 @@ def write_message(self, message: Any, publish_time: int, topic: str, **kwargs):
loop = get_or_create_event_loop()
# Stream message if larger than 100KB
if len(serialized_data) >= KB_100:
self.logger.info("shouldnt be here")

async def upload_large():
await self.flush_and_upload_batch()
Expand All @@ -123,6 +131,9 @@ def append_record(
self, timestamp_us: int, serialized_data: bytes, labels: Dict
) -> None:
"""Append raw data to batch."""
# Add dynamic labels
labels.update(self.label_tracker.get_labels())

self._batch_size_bytes += len(serialized_data)
self._batch_metadata_size += metadata_size(labels)
self._batch.add(timestamp=timestamp_us, data=serialized_data, labels=labels)
Expand Down
11 changes: 9 additions & 2 deletions reductstore_agent/writer/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from reduct import Bucket

from ..dynamic_labels import LabelStateTracker, NullLabelStateTracker
from ..models import OutputFormat, PipelineConfig
from .base import OutputWriter
from .cdr import CdrOutputWriter
Expand All @@ -34,7 +35,7 @@ def create_writer(
"""
Create an appropriate output writer based on configuration.

Args
Args:
----
config : PipelineConfig
Pipeline configuration
Expand All @@ -44,25 +45,31 @@ def create_writer(
Name of the pipeline
logger : optional
Optional logger

Returns
-------
OutputWriter
Configured writer instance

"""
if config.labels:
tracker_instance = LabelStateTracker(config, logger)
else:
tracker_instance = NullLabelStateTracker()

if config.output_format == OutputFormat.MCAP:
return McapOutputWriter(
bucket=bucket,
pipeline_name=pipeline_name or "default",
config=config,
logger=logger,
label_tracker=tracker_instance,
)
elif config.output_format == OutputFormat.CDR:
return CdrOutputWriter(
bucket=bucket,
pipeline_name=pipeline_name or "default",
logger=logger,
label_tracker=tracker_instance,
)
else:
raise ValueError(f"Unsupported output format: {config.output_format}")
12 changes: 11 additions & 1 deletion reductstore_agent/writer/mcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from reduct import Bucket
from rosbag2_py import LocalMessageDefinitionSource

from ..dynamic_labels import LabelStateTracker
from ..models import FilenameMode, PipelineConfig
from ..utils import get_or_create_event_loop
from .base import OutputWriter
Expand All @@ -42,6 +43,7 @@ def __init__(
pipeline_name: str,
config: PipelineConfig,
logger=None,
label_tracker=LabelStateTracker,
):
"""Initialize MCAP writer."""
self.bucket = bucket
Expand All @@ -51,6 +53,7 @@ def __init__(
self.first_timestamp: int | None = None
self.current_size = 0
self._is_uploading = False
self.label_tracker = label_tracker

# Schema tracking
self.schema_by_type: Dict[str, Schema] = {}
Expand Down Expand Up @@ -118,6 +121,9 @@ def write_message(
if self.first_timestamp is None:
self.first_timestamp = publish_time

# Process each message and update labels
self.label_tracker.update(topic, message)

# Get the actual schema from our registry
actual_schema = self.schemas_by_topic.get(topic)
if not actual_schema:
Expand Down Expand Up @@ -202,14 +208,18 @@ async def _upload_to_reductstore(self, file_index: int):
"""Upload the MCAP file to ReductStore."""
content_length = self._buffer.tell()
self._buffer.seek(0)
labels: dict[str, str] = dict(self.config.static_labels)

# Attach dynamic labels
labels.update(self.label_tracker.get_labels())

await self.bucket.write(
entry_name=self.pipeline_name,
data=self._read_in_chunks(),
timestamp=file_index,
content_length=content_length,
content_type="application/mcap",
labels=self.config.static_labels,
labels=labels,
)

async def _read_in_chunks(
Expand Down
Loading