Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::time::SystemTime;

use arrow::array::{BinaryArray, RecordBatch};
use arrow::array::{ArrayData, BinaryArray, RecordBatch};
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, Field, Schema};
use quiver::record_bundle::{
BundleDescriptor, PayloadRef, RecordBundle, SchemaFingerprint, SlotDescriptor, SlotId,
Expand Down Expand Up @@ -348,8 +349,38 @@ impl OtlpBytesAdapter {
OtlpProtoBytes::ExportTracesRequest(_) => SignalType::Traces,
};

// Create a record batch with a single binary column containing the OTLP bytes
let binary_array = BinaryArray::from_vec(vec![bytes.as_bytes()]);
// Create a record batch with a single binary column containing the OTLP bytes.
// Use zero-copy wrapping: clone_bytes() is just an Arc refcount bump,
// and Buffer::from(Bytes) wraps without copying the payload data.
let data_bytes = bytes.clone_bytes();
let len = i32::try_from(data_bytes.len()).map_err(|_| {
(
BundleConversionError::RecordBatchCreationError(format!(
"OTLP payload too large for BinaryArray: {} bytes exceeds i32::MAX",
data_bytes.len()
)),
bytes.clone(),
)
})?;
let data_buffer = Buffer::from(data_bytes);
let offsets = Buffer::from_slice_ref([0i32, len]);

let array_data = match ArrayData::builder(DataType::Binary)
.len(1)
.add_buffer(offsets)
.add_buffer(data_buffer)
.build()
{
Ok(data) => data,
Err(e) => {
return Err((
BundleConversionError::RecordBatchCreationError(e.to_string()),
bytes,
));
}
};

let binary_array = BinaryArray::from(array_data);
Comment thread
gscalderon marked this conversation as resolved.
Outdated
let batch = match RecordBatch::try_new(otlp_binary_schema(), vec![Arc::new(binary_array)]) {
Ok(batch) => batch,
Err(e) => {
Expand Down Expand Up @@ -736,6 +767,41 @@ mod tests {
assert!(adapter.payload(wrong_slot).is_none());
}

#[test]
fn test_otlp_bytes_adapter_zero_copy() {
let test_bytes = b"zero-copy verification payload".to_vec();
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Logs, test_bytes);

let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();

let slot = to_otlp_slot_id(SignalType::Logs);
let payload = adapter.payload(slot).unwrap();
let column = payload.batch.column(0);
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();

// The Arrow buffer should alias the original bytes (zero-copy).
// Compare the pointer of the stored value with the original OtlpProtoBytes.
let arrow_value_ptr = binary_array.value(0).as_ptr();
let original_ptr = adapter.bytes.as_bytes().as_ptr();
assert_eq!(
arrow_value_ptr, original_ptr,
"BinaryArray value should point to the same memory as OtlpProtoBytes (zero-copy)"
);
}

#[test]
fn test_otlp_bytes_adapter_empty_payload() {
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Metrics, vec![]);

let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();

let slot = to_otlp_slot_id(SignalType::Metrics);
let payload = adapter.payload(slot).unwrap();
let column = payload.batch.column(0);
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(binary_array.value(0), b"");
}

#[test]
fn test_extract_otlp_bytes() {
let original_bytes = b"original OTLP data".to_vec();
Expand Down
Loading