Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pybag/encoding/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def encode(self, type_str: str, value: Any) -> None:
... # pragma: no cover

@abstractmethod
def save(self) -> bytes:
def save(self) -> Any:
... # pragma: no cover

# Primitive encoders -------------------------------------------------
Expand Down
28 changes: 25 additions & 3 deletions src/pybag/encoding/cdr.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
import logging
import struct
from dataclasses import dataclass
from typing import Any

from pybag.encoding import MessageDecoder, MessageEncoder
from pybag.io.raw_reader import BytesReader
from pybag.io.raw_writer import BytesWriter
from pybag.io.raw_writer import BaseWriter, BytesWriter

logger = logging.getLogger(__name__)

@dataclass(slots=True)
class SerializedMessage:
header: bytes
payload: memoryview

def __len__(self) -> int:
return len(self.header) + len(self.payload)

def write(self, writer: BaseWriter) -> None:
writer.write(self.header)
writer.write(self.payload)

def to_bytes(self) -> bytes:
return self.header + self.payload.tobytes()

def __bytes__(self) -> bytes: # pragma: no cover - convenience
return self.to_bytes()


class CdrDecoder(MessageDecoder):
def __init__(self, data: bytes):
Expand Down Expand Up @@ -149,13 +168,16 @@ def __init__(self, *, little_endian: bool = True) -> None:
def encoding(cls) -> str:
return "cdr"

def reset(self) -> None:
self._payload.clear()

def encode(self, type_str: str, value: Any) -> None:
"""Encode ``value`` based on ``type_str``."""
getattr(self, type_str)(value)

def save(self) -> bytes:
def save(self) -> SerializedMessage:
"""Return the encoded byte stream."""
return self._header + self._payload.as_bytes()
return SerializedMessage(self._header, self._payload.as_memoryview())

# Primitive encoders -------------------------------------------------

Expand Down
3 changes: 3 additions & 0 deletions src/pybag/io/raw_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def size(self) -> int:
def as_bytes(self) -> bytes:
return bytes(self._buffer)

def as_memoryview(self) -> memoryview:
return memoryview(self._buffer)

def clear(self) -> None:
self._buffer.clear()

Expand Down
25 changes: 17 additions & 8 deletions src/pybag/mcap/record_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import struct
from typing import Any, Callable

from pybag.encoding.cdr import SerializedMessage
from pybag.io.raw_writer import BaseWriter
from pybag.mcap.records import (
AttachmentIndexRecord,
Expand Down Expand Up @@ -137,14 +138,22 @@ def write_channel(cls, writer: BaseWriter, record: ChannelRecord) -> None:

@classmethod
def write_message(cls, writer: BaseWriter, record: MessageRecord) -> None:
payload = (
cls._encode_uint16(record.channel_id)
+ cls._encode_uint32(record.sequence)
+ cls._encode_timestamp(record.log_time)
+ cls._encode_timestamp(record.publish_time)
+ record.data
)
cls._write_record(writer, RecordType.MESSAGE, payload)
# Avoid creating a large temporary payload buffer which would copy the
# message data by constructing the fixed-size header in-place and
# writing it before the pre-serialized message bytes.
payload_length = 2 + 4 + 8 + 8 + len(record.data)
header = bytearray(1 + 8 + 2 + 4 + 8 + 8)
struct.pack_into("<B", header, 0, int(RecordType.MESSAGE))
struct.pack_into("<Q", header, 1, payload_length)
struct.pack_into("<H", header, 9, record.channel_id)
struct.pack_into("<I", header, 11, record.sequence)
struct.pack_into("<Q", header, 15, record.log_time)
struct.pack_into("<Q", header, 23, record.publish_time)
writer.write(header)
if isinstance(record.data, SerializedMessage):
record.data.write(writer)
else:
writer.write(record.data)

@classmethod
def write_chunk(cls, writer: BaseWriter, record: ChunkRecord) -> None:
Expand Down
4 changes: 3 additions & 1 deletion src/pybag/mcap/records.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from enum import IntEnum

from pybag.encoding.cdr import SerializedMessage


@dataclass
class HeaderRecord:
Expand Down Expand Up @@ -38,7 +40,7 @@ class MessageRecord:
sequence: int
log_time: int
publish_time: int
data: bytes
data: bytes | SerializedMessage


@dataclass
Expand Down
13 changes: 8 additions & 5 deletions src/pybag/mcap_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

logger = logging.getLogger(__name__)

DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024


class McapFileWriter:
"""High level writer for producing MCAP files."""
Expand All @@ -35,7 +37,7 @@ def __init__(
writer: BaseWriter,
*,
profile: str = "ros2",
chunk_size: int | None = None,
chunk_size: int | None = DEFAULT_CHUNK_SIZE,
chunk_compression: Literal["lz4", "zstd"] | None = None,
) -> None:
self._writer = CrcWriter(writer)
Expand Down Expand Up @@ -86,16 +88,17 @@ def open(
file_path: str | Path,
*,
profile: str = "ros2",
chunk_size: int | None = None,
chunk_compression: Literal["lz4", "zstd"] | None = "lz4",
chunk_size: int | None = DEFAULT_CHUNK_SIZE,
chunk_compression: Literal["lz4", "zstd"] | None = None,
) -> "McapFileWriter":
"""Create a writer backed by a file on disk.

Args:
file_path: The path to the file to write to.
profile: The profile to use for the MCAP file.
chunk_size: The size of the chunk to write to in bytes.
chunk_compression: The compression to use for the chunk.
chunk_size: The target chunk size in bytes. Pass ``None`` to disable
chunking.
chunk_compression: Optional compression to apply to each chunk.

Returns:
A writer backed by a file on disk.
Expand Down
22 changes: 20 additions & 2 deletions src/pybag/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Callable

from pybag.encoding import MessageEncoder
from pybag.encoding.cdr import CdrEncoder
from pybag.encoding.cdr import CdrEncoder, SerializedMessage
from pybag.mcap.records import ChannelRecord, SchemaRecord
from pybag.schema import SchemaEncoder
from pybag.schema.compiler import compile_serializer
Expand All @@ -21,6 +21,7 @@ def __init__(
self._schema_encoder = schema_encoder
self._message_encoder = message_encoder
self._compiled: dict[type[Message], Callable[[MessageEncoder, Message], None]] = {}
self._encoders: dict[tuple[type[Message], bool], CdrEncoder] = {}

@property
def schema_encoding(self) -> str:
Expand All @@ -31,6 +32,15 @@ def message_encoding(self) -> str:
return self._message_encoder.encoding()

def serialize_message(self, message: Message, *, little_endian: bool = True) -> bytes:
serialized = self.serialize_message_view(message, little_endian=little_endian)
return serialized.to_bytes()

def serialize_message_view(
self,
message: Message,
*,
little_endian: bool = True,
) -> SerializedMessage:
if not is_dataclass(message): # pragma: no cover - defensive programming
raise TypeError("Expected a dataclass instance")

Expand All @@ -40,7 +50,15 @@ def serialize_message(self, message: Message, *, little_endian: bool = True) ->
serializer = compile_serializer(schema, sub_schemas)
self._compiled[message_type] = serializer

encoder = self._message_encoder(little_endian=little_endian)
key = (message_type, little_endian)
encoder = self._encoders.get(key)
if encoder is None:
encoder = self._message_encoder(little_endian=little_endian) # type: ignore[call-arg]
self._encoders[key] = encoder
# Cache is keyed by endianness, so the header stays valid.
else:
encoder.reset()
Comment on lines +53 to +60

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid clearing encoder buffer while SerializedMessage holds memoryview

The encoder cache introduced in serialize_message_view reuses a single CdrEncoder per message type and immediately calls encoder.reset() when the encoder is reused (lines 53‑60). However, CdrEncoder.save() now returns a SerializedMessage containing a memoryview of the encoder’s internal bytearray. When serialize_message_view is called again while the previous SerializedMessage is still alive, BytesWriter.clear() attempts to resize the buffer and Python raises BufferError: Existing exports of data: object cannot be re-sized. Even simple sequential calls like serialize_message_view(msg1); serialize_message_view(msg2) crash, making the zero-copy API unusable. The encoder should not be cleared until the previous view is released or the data should be copied before reset.

Useful? React with 👍 / 👎.


serializer(encoder, message)
return encoder.save()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_mcap_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class Example:

with tempfile.TemporaryDirectory() as tmpdir:
file_path = Path(tmpdir) / "test.mcap"
with McapFileWriter.open(file_path, profile="ros2") as mcap:
with McapFileWriter.open(file_path, profile="ros2", chunk_size=None) as mcap:
channel_id = mcap.add_channel("/example", Example)
mcap.write_message("/example", 1, Example(5))
reader = CrcReader(BytesReader(file_path.read_bytes()))
Expand Down
Loading