Skip to content

Commit a73cdcf

Browse files
committed
Add Message::HistoricalBatch variant for efficient bulk data transfer in Historical mode
1 parent b9f360f commit a73cdcf

File tree

4 files changed

+57
-1
lines changed

4 files changed

+57
-1
lines changed

wingfoil/src/channel/kanal_chan.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ impl<T: Element + Send> ChannelSender<T> {
110110
self.send_message(message)
111111
}
112112

113+
#[allow(dead_code)]
114+
pub fn send_historical_batch(&self, batch: Vec<ValueAt<T>>) -> SendResult {
115+
let message = Message::HistoricalBatch(batch.into_boxed_slice());
116+
self.send_message(message)
117+
}
118+
113119
pub fn close(&mut self) -> SendResult {
114120
// check if already closed
115121
if self.kanal_sender.is_none() {
@@ -177,6 +183,12 @@ impl<T: Element + Send> AsyncChannelSender<T> {
177183
self.send_message(message).await;
178184
}
179185

186+
#[allow(dead_code)]
187+
pub async fn send_historical_batch(&self, batch: Vec<ValueAt<T>>) {
188+
let message = Message::HistoricalBatch(batch.into_boxed_slice());
189+
self.send_message(message).await;
190+
}
191+
180192
pub async fn close(&mut self) {
181193
let message = Message::EndOfStream;
182194
self.send_message(message).await;

wingfoil/src/channel/message.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::types::Element;
99
/// Message that can be sent between threads.
1010
#[derive(new, Debug, Clone, PartialEq, Eq)]
1111
pub(crate) enum Message<T: Element + Send> {
12-
/// In [RunMode::HistoricalFrom], this message
12+
/// In [RunMode::HistoricalFrom], this message
1313
/// allows receiving graph to progress, even when
1414
/// the channel is ticking less frequently than
1515
/// other inputs.
@@ -22,6 +22,9 @@ pub(crate) enum Message<T: Element + Send> {
2222
HistoricalValue(ValueAt<T>),
2323
/// Sent in [RunMode::RealTime]. Just a value.
2424
RealtimeValue(T),
25+
/// Used in [RunMode::HistoricalFrom] for bulk data transfer.
26+
/// Contains multiple values with their timestamps.
27+
HistoricalBatch(Box<[ValueAt<T>]>),
2528
}
2629

2730
pub trait ReceiverMessageSource<T: Element + Send> {

wingfoil/src/nodes/async_io.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,12 @@ where
331331
Message::CheckPoint(t) => {
332332
time = *t;
333333
},
334+
Message::HistoricalBatch(batch) => {
335+
// Update time to earliest timestamp in batch
336+
if let Some(value_at) = batch.first() {
337+
time = value_at.time;
338+
}
339+
}
334340
Message::EndOfStream => {
335341
finished = true;
336342
},
@@ -356,6 +362,13 @@ where
356362
Message::HistoricalValue(value_at) => {
357363
yield (value_at.time, value_at.value)
358364
},
365+
Message::HistoricalBatch(batch) => {
366+
// Convert to Vec to own the data and avoid holding reference across await
367+
let batch_vec = batch.into_vec();
368+
for value_at in batch_vec {
369+
yield (value_at.time, value_at.value)
370+
}
371+
}
359372
Message::CheckPoint(_) => {},
360373
Message::EndOfStream => {},
361374
}

wingfoil/src/nodes/channel.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ impl<T: Element + Send> MutableNode for ReceiverStream<T> {
109109
Message::HistoricalValue(value_at) => {
110110
values.push(value_at.value);
111111
}
112+
Message::HistoricalBatch(_) => {
113+
return Err(anyhow!(
114+
"received HistoricalBatch but RunMode is RealTime"
115+
));
116+
}
112117
Message::EndOfStream => self.finished = true,
113118
Message::CheckPoint(_) => {}
114119
},
@@ -157,6 +162,29 @@ impl<T: Element + Send> MutableNode for ReceiverStream<T> {
157162
self.message_time = Some(value_at.time);
158163
self.queue.push_back(value_at);
159164
}
165+
Message::HistoricalBatch(batch) => {
166+
if batch.is_empty() {
167+
continue;
168+
}
169+
170+
// Validate: all timestamps must be >= current graph time
171+
let min_time = batch.iter().map(|va| va.time).min().unwrap();
172+
if min_time < state.time() {
173+
return Err(anyhow!(
174+
"received HistoricalBatch with timestamp less than graph time, {} < {}",
175+
min_time,
176+
state.time()
177+
));
178+
}
179+
180+
// Set message_time to earliest timestamp
181+
self.message_time = Some(min_time);
182+
183+
// Unpack all values into queue
184+
for value_at in batch.iter() {
185+
self.queue.push_back(value_at.clone());
186+
}
187+
}
160188
Message::EndOfStream => self.finished = true,
161189
Message::CheckPoint(check_point) => {
162190
self.message_time = Some(check_point);

0 commit comments

Comments
 (0)