Skip to content

Commit 2d1d826

Browse files
author
Bo Maryniuk
committed
Re-batch OTLP payloads on the export path
1 parent 875c179 commit 2d1d826

3 files changed

Lines changed: 122 additions & 4 deletions

File tree

logjetd/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ pub struct CollectorConfig {
7878
pub cert_file: Option<PathBuf>,
7979
pub key_file: Option<PathBuf>,
8080
pub server_name: Option<String>,
81+
/// Merge up to this many stored OTLP requests into one POST. 1 = no re-batching.
82+
pub batch_size: usize,
83+
/// Flush a partial batch after this many ms. 0 = flush only on batch_size.
84+
pub batch_timeout_ms: u64,
8185
}
8286

8387
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -216,6 +220,10 @@ struct RawConfig {
216220
collector_key_file: Option<PathBuf>,
217221
#[serde(rename = "collector.server-name")]
218222
collector_server_name: Option<String>,
223+
#[serde(rename = "collector.batch-size")]
224+
collector_batch_size: Option<usize>,
225+
#[serde(rename = "collector.batch-timeout-ms")]
226+
collector_batch_timeout_ms: Option<u64>,
219227
#[serde(rename = "backpressure.enabled")]
220228
backpressure_enabled: Option<bool>,
221229
#[serde(rename = "backpressure.mode")]
@@ -284,6 +292,8 @@ impl Config {
284292
collector_cert_file: None,
285293
collector_key_file: None,
286294
collector_server_name: None,
295+
collector_batch_size: None,
296+
collector_batch_timeout_ms: None,
287297
backpressure_enabled: None,
288298
backpressure_mode: None,
289299
backpressure_max_buffered_records: None,
@@ -347,6 +357,8 @@ impl Config {
347357
cert_file: raw.collector_cert_file,
348358
key_file: raw.collector_key_file,
349359
server_name: raw.collector_server_name,
360+
batch_size: raw.collector_batch_size.unwrap_or(50).max(1),
361+
batch_timeout_ms: raw.collector_batch_timeout_ms.unwrap_or(200),
350362
};
351363
let backpressure = BackpressureConfig {
352364
enabled: raw.backpressure_enabled.unwrap_or(false),

logjetd/src/replay.rs

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ use std::net::{TcpStream, ToSocketAddrs};
44
use std::path::{Path, PathBuf};
55
use std::sync::{Arc, mpsc};
66
use std::thread;
7-
use std::time::Duration;
7+
use std::time::{Duration, Instant};
88

99
use logjet::{LogjetReader, ReaderConfig, RecordType};
10+
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
11+
use prost::Message;
1012
use rustls::{ClientConfig, ClientConnection, StreamOwned};
1113

1214
use crate::config::{BackpressureConfig, BackpressureMode, CollectorConfig, TlsConfig, UpstreamConfig, UpstreamMode};
@@ -19,6 +21,7 @@ pub fn replay_path_to_otlp_http(path: &Path, name: &str, collector: &CollectorCo
1921
let endpoint = CollectorEndpoint::parse(&collector.url)?;
2022
let tls_client = if endpoint.tls { Some(load_collector_client_config(collector)?) } else { None };
2123
let mut conn = CollectorConnection::connect(&endpoint, Duration::from_millis(collector.timeout_ms), tls_client.as_ref(), collector)?;
24+
let mut batcher = OtlpBatcher::new(collector.batch_size, collector.batch_timeout_ms);
2225

2326
for segment in list_named_segments(path, name)? {
2427
let file = File::open(&segment.path)?;
@@ -29,11 +32,12 @@ pub fn replay_path_to_otlp_http(path: &Path, name: &str, collector: &CollectorCo
2932
continue;
3033
}
3134

32-
conn.post(&record.payload)?;
35+
batcher.add(&record.payload, &mut conn)?;
3336
sent = sent.saturating_add(1);
3437
}
3538
}
3639

40+
batcher.flush(&mut conn)?;
3741
Ok(sent)
3842
}
3943

@@ -193,8 +197,19 @@ fn export_worker(
193197
collector_transport: CollectorTransport, task_rx: mpsc::Receiver<ExportTask>, result_tx: mpsc::Sender<ExportResult>,
194198
) -> io::Result<()> {
195199
let mut conn = collector_transport.open_connection()?;
196-
while let Ok(task) = task_rx.recv() {
197-
let outcome = conn.post(&task.payload).map(|()| ExportOutcome::Delivered);
200+
let mut batcher = OtlpBatcher::new(collector_transport.collector.batch_size, collector_transport.collector.batch_timeout_ms);
201+
let recv_timeout = Duration::from_millis(collector_transport.collector.batch_timeout_ms.max(50));
202+
loop {
203+
let task = match task_rx.recv_timeout(recv_timeout) {
204+
Ok(task) => task,
205+
Err(mpsc::RecvTimeoutError::Timeout) => {
206+
batcher.flush_if_expired(&mut conn)?;
207+
continue;
208+
}
209+
Err(mpsc::RecvTimeoutError::Disconnected) => break,
210+
};
211+
let outcome = batcher.add(&task.payload, &mut conn).map(|()| ExportOutcome::Delivered);
212+
batcher.flush_if_expired(&mut conn)?;
198213
let failed = outcome.is_err();
199214
if result_tx.send(ExportResult { seq: task.seq, outcome }).is_err() {
200215
break;
@@ -203,6 +218,7 @@ fn export_worker(
203218
break;
204219
}
205220
}
221+
let _ = batcher.flush(&mut conn);
206222
Ok(())
207223
}
208224

@@ -549,6 +565,94 @@ fn parse_content_length(headers: &str) -> usize {
549565
0
550566
}
551567

568+
/// Accumulates OTLP payloads and merges them into combined ExportLogsServiceRequests
569+
/// grouped by Resource+Scope before posting to the collector.
570+
struct OtlpBatcher {
571+
batch_size: usize,
572+
batch_timeout: Duration,
573+
pending: ExportLogsServiceRequest,
574+
pending_count: usize,
575+
first_added: Option<Instant>,
576+
}
577+
578+
impl OtlpBatcher {
579+
fn new(batch_size: usize, batch_timeout_ms: u64) -> Self {
580+
Self {
581+
batch_size,
582+
batch_timeout: Duration::from_millis(batch_timeout_ms),
583+
pending: ExportLogsServiceRequest { resource_logs: Vec::new() },
584+
pending_count: 0,
585+
first_added: None,
586+
}
587+
}
588+
589+
/// Add a raw OTLP payload to the batch. Flushes to conn if batch is full.
590+
fn add(&mut self, payload: &[u8], conn: &mut CollectorConnection) -> io::Result<()> {
591+
if self.batch_size <= 1 {
592+
return conn.post(payload);
593+
}
594+
match ExportLogsServiceRequest::decode(payload) {
595+
Ok(req) => self.merge(req),
596+
Err(_) => return conn.post(payload),
597+
}
598+
if self.pending_count >= self.batch_size {
599+
self.flush(conn)?;
600+
}
601+
Ok(())
602+
}
603+
604+
/// Flush any pending records if the batch timeout has expired.
605+
fn flush_if_expired(&mut self, conn: &mut CollectorConnection) -> io::Result<()> {
606+
if self.pending_count > 0 && self.batch_timeout.as_millis() > 0 && self.first_added.is_some_and(|t| t.elapsed() >= self.batch_timeout) {
607+
self.flush(conn)?;
608+
}
609+
Ok(())
610+
}
611+
612+
/// Flush all pending records to the collector.
613+
fn flush(&mut self, conn: &mut CollectorConnection) -> io::Result<()> {
614+
if self.pending_count == 0 {
615+
return Ok(());
616+
}
617+
let merged = std::mem::replace(&mut self.pending, ExportLogsServiceRequest { resource_logs: Vec::new() });
618+
self.pending_count = 0;
619+
self.first_added = None;
620+
conn.post(&merged.encode_to_vec())
621+
}
622+
623+
fn merge(&mut self, req: ExportLogsServiceRequest) {
624+
if self.first_added.is_none() {
625+
self.first_added = Some(Instant::now());
626+
}
627+
for incoming_rl in req.resource_logs {
628+
let existing = self.pending.resource_logs.iter_mut().find(|rl| rl.resource == incoming_rl.resource);
629+
match existing {
630+
Some(rl) => {
631+
for incoming_sl in incoming_rl.scope_logs {
632+
let existing_sl = rl.scope_logs.iter_mut().find(|sl| sl.scope == incoming_sl.scope);
633+
match existing_sl {
634+
Some(sl) => {
635+
self.pending_count += incoming_sl.log_records.len();
636+
sl.log_records.extend(incoming_sl.log_records);
637+
}
638+
None => {
639+
self.pending_count += incoming_sl.log_records.len();
640+
rl.scope_logs.push(incoming_sl);
641+
}
642+
}
643+
}
644+
}
645+
None => {
646+
for sl in &incoming_rl.scope_logs {
647+
self.pending_count += sl.log_records.len();
648+
}
649+
self.pending.resource_logs.push(incoming_rl);
650+
}
651+
}
652+
}
653+
}
654+
}
655+
552656
#[derive(Debug)]
553657
struct ExportTask {
554658
seq: u64,

logjetd/src/replay_utst.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ fn test_collector_transport(mode: BackpressureMode, max_buffered_records: usize)
116116
cert_file: None,
117117
key_file: None,
118118
server_name: None,
119+
batch_size: 1,
120+
batch_timeout_ms: 0,
119121
},
120122
upstream_mode: UpstreamMode::Keep,
121123
}

0 commit comments

Comments
 (0)