Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions crates/store/re_log_encoding/src/codec/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,87 @@ pub(crate) fn read_arrow_from_bytes<R: std::io::Read>(
}

#[cfg(feature = "encoder")]
pub(crate) struct Payload {
pub(crate) struct Payload<'a> {
pub uncompressed_size: usize,
pub data: Vec<u8>,
data: &'a [u8],
capacity: usize,
}

#[cfg(feature = "encoder")]
pub(crate) fn encode_arrow(
impl Payload<'_> {
/// This version does not copy the data, but instead produces the `Vec`
/// from raw parts.
///
/// The safe version of this is [`Payload::to_vec`].
///
/// # Safety
///
/// The returned `Vec` must NOT be dropped! Pass it to `mem::forget` before that happens.
#[allow(unsafe_code)]
pub(crate) unsafe fn to_fake_temp_vec(&self) -> Vec<u8> {
// SAFETY: User is required to uphold safety invariants.
unsafe {
Vec::from_raw_parts(
self.data.as_ptr().cast_mut(),
self.data.len(),
self.capacity,
)
}
}

/// Copy the data to a `Vec`.
pub(crate) fn to_vec(&self) -> Vec<u8> {
self.data.to_vec()
}
}

#[cfg(feature = "encoder")]
pub(crate) struct ArrowEncodingContext {
uncompressed: Vec<u8>,
compressed: Vec<u8>,
}

#[cfg(feature = "encoder")]
impl ArrowEncodingContext {
pub fn new() -> Self {
Self {
uncompressed: Vec::new(),
compressed: Vec::new(),
}
}
}

// NOTE: Externally, `Payload`'s borrow of `'a` is treated as if it was `&'a mut`.
#[cfg(feature = "encoder")]
pub(crate) fn encode_arrow_with_ctx<'a>(
arrow_ctx: &'a mut ArrowEncodingContext,
batch: &ArrowRecordBatch,
compression: crate::Compression,
) -> Result<Payload, crate::encoder::EncodeError> {
let mut uncompressed = Vec::new();
write_arrow_to_bytes(&mut uncompressed, batch)?;
) -> Result<Payload<'a>, crate::encoder::EncodeError> {
let ArrowEncodingContext {
uncompressed,
compressed,
} = arrow_ctx;

uncompressed.clear();

write_arrow_to_bytes(uncompressed, batch)?;
let uncompressed_size = uncompressed.len();

let data = match compression {
crate::Compression::Off => uncompressed,
crate::Compression::LZ4 => lz4_flex::block::compress(&uncompressed),
let (capacity, data) = match compression {
crate::Compression::Off => (uncompressed.capacity(), &uncompressed[..]),
crate::Compression::LZ4 => {
let max_len = lz4_flex::block::get_maximum_output_size(uncompressed.len());
compressed.resize(max_len, 0);
let written_bytes = lz4_flex::block::compress_into(uncompressed, compressed)?;
(compressed.capacity(), &compressed[..written_bytes])
}
};

Ok(Payload {
uncompressed_size,
data,
capacity,
})
}

Expand Down
71 changes: 54 additions & 17 deletions crates/store/re_log_encoding/src/codec/file/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::{MessageHeader, MessageKind};
use crate::codec::arrow::encode_arrow;
use crate::codec::arrow::ArrowEncodingContext;
use crate::encoder::EncodeError;
use crate::Compression;
use re_log_types::LogMsg;

pub(crate) fn encode(
buf: &mut Vec<u8>,
arrow_ctx: &mut ArrowEncodingContext,
message: &LogMsg,
compression: Compression,
) -> Result<(), EncodeError> {
Expand All @@ -25,23 +26,59 @@ pub(crate) fn encode(
set_store_info.encode(buf)?;
}
LogMsg::ArrowMsg(store_id, arrow_msg) => {
let payload = encode_arrow(&arrow_msg.batch, compression)?;
let arrow_msg = ArrowMsg {
store_id: Some(store_id.clone().into()),
compression: match compression {
Compression::Off => proto::Compression::None as i32,
Compression::LZ4 => proto::Compression::Lz4 as i32,
},
uncompressed_size: payload.uncompressed_size as i32,
encoding: Encoding::ArrowIpc as i32,
payload: payload.data,
};
let header = MessageHeader {
kind: MessageKind::ArrowMsg,
len: arrow_msg.encoded_len() as u64,
let payload = crate::codec::arrow::encode_arrow_with_ctx(
arrow_ctx,
&arrow_msg.batch,
compression,
)?;

// Optimization: `ArrowMsg` requires an owned `Vec` for its `payload` field, but
// with how we're using it here, it shouldn't need to be. Ideally, it would be a
// `Cow` so that we could pass it borrowed data, but that's not something that `prost` supports.
//
// This optimization removes a copy of the payload, which may be very significant.
// For a program that does nothing but log ~300 million points to `/dev/null`,
// running on a Ryzen 9 7950x + Linux 6.13.2 (Fedora 41):
// * The system time is reduced by ~15% (due to fewer page faults, roughly 1/10th as many),
// * The total runtime is reduced by ~5%.
// * Most importantly, the maximum resident set size (physical memory usage) goes from ~7 GiB to ~1 GiB.
//
// The entire block is marked `unsafe`, because it contains unsafe code that depends on technically
// safe code for its soundness. We control both the unsafe and safe parts, so we can still be sure
// we're correct here. Unsafe operations are further marked with their own `unsafe` blocks.
//
// SAFETY: See safety comment on `payload` below.
#[allow(unsafe_code, unused_unsafe)]
unsafe {
let arrow_msg = ArrowMsg {
store_id: Some(store_id.clone().into()),
compression: match compression {
Compression::Off => proto::Compression::None as i32,
Compression::LZ4 => proto::Compression::Lz4 as i32,
},
uncompressed_size: payload.uncompressed_size as i32,
encoding: Encoding::ArrowIpc as i32,

// SAFETY: For this to be safe, we have to ensure the resulting `vec` is not resized, as that could trigger
// an allocation and cause the original `Vec` held in `arrow_ctx` to then contain a dangling pointer.
//
// * The variable binding for `ArrowMsg` is immutable, so only a shared (`&self`) reference may be
// produced from it and to the `Vec` being constructed here, so the payload can never be resized.
// * We `mem::forget` the payload once we've encoded the `ArrowMsg` to avoid its `Drop` call.
// * Without this, the `Vec` would free its backing storage, which we need to avoid.
payload: unsafe { payload.to_fake_temp_vec() },
};
let header = MessageHeader {
kind: MessageKind::ArrowMsg,
len: arrow_msg.encoded_len() as u64,
};
header.encode(buf)?;
arrow_msg.encode(buf)?;

// See `SAFETY` comment on `payload` above.
#[allow(clippy::mem_forget)]
std::mem::forget(arrow_msg.payload);
};
header.encode(buf)?;
arrow_msg.encode(buf)?;
}
LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
let blueprint_activation_command: BlueprintActivationCommand =
Expand Down
10 changes: 9 additions & 1 deletion crates/store/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.

use crate::codec;
use crate::codec::arrow::ArrowEncodingContext;
use crate::codec::file::{self, encoder};
use crate::FileHeader;
use crate::Serializer;
Expand Down Expand Up @@ -120,6 +121,7 @@ pub struct Encoder<W: std::io::Write> {
compression: Compression,
write: W,
scratch: Vec<u8>,
arrow_ctx: ArrowEncodingContext,
}

impl<W: std::io::Write> Encoder<W> {
Expand All @@ -140,6 +142,7 @@ impl<W: std::io::Write> Encoder<W> {
compression: options.compression,
write,
scratch: Vec::new(),
arrow_ctx: ArrowEncodingContext::new(),
})
}

Expand All @@ -150,7 +153,12 @@ impl<W: std::io::Write> Encoder<W> {
self.scratch.clear();
match self.serializer {
Serializer::Protobuf => {
encoder::encode(&mut self.scratch, message, self.compression)?;
encoder::encode(
&mut self.scratch,
&mut self.arrow_ctx,
message,
self.compression,
)?;

self.write
.write_all(&self.scratch)
Expand Down
7 changes: 4 additions & 3 deletions crates/store/re_log_encoding/src/protobuf_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ pub fn log_msg_to_proto(
message: re_log_types::LogMsg,
compression: crate::Compression,
) -> Result<re_protos::log_msg::v1alpha1::LogMsg, crate::encoder::EncodeError> {
use crate::codec::arrow::encode_arrow;
use crate::codec::arrow::{encode_arrow_with_ctx, ArrowEncodingContext};
use re_protos::log_msg::v1alpha1::{
ArrowMsg, BlueprintActivationCommand, LogMsg as ProtoLogMsg, SetStoreInfo,
};

let mut arrow_ctx = ArrowEncodingContext::new();
let proto_msg = match message {
re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
let set_store_info: SetStoreInfo = set_store_info.into();
Expand All @@ -89,7 +90,7 @@ pub fn log_msg_to_proto(
}
}
re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
let payload = encode_arrow(&arrow_msg.batch, compression)?;
let payload = encode_arrow_with_ctx(&mut arrow_ctx, &arrow_msg.batch, compression)?;
let arrow_msg = ArrowMsg {
store_id: Some(store_id.into()),
compression: match compression {
Expand All @@ -102,7 +103,7 @@ pub fn log_msg_to_proto(
},
uncompressed_size: payload.uncompressed_size as i32,
encoding: re_protos::log_msg::v1alpha1::Encoding::ArrowIpc as i32,
payload: payload.data,
payload: payload.to_vec(),
};
ProtoLogMsg {
msg: Some(re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(
Expand Down
Loading