Skip to content

Commit 1475a9a

Browse files
committed
Add RunParams to produce_async for runtime context access
- produce_async now receives RunParams (run_mode, run_for, start_time) instead of no arguments, allowing async producers to adapt behavior - Add RunParams::end_time() and end_time_kdb() methods (error on Cycles) - kdb_read now derives start/end times from RunMode/RunFor automatically - Fix overflow in limit() by initializing time to time0 instead of ZERO - Initialize Graph start_time from run_mode.start_time() during construction
1 parent c8a3bfa commit 1475a9a

File tree

7 files changed

+214
-150
lines changed

7 files changed

+214
-150
lines changed

wingfoil/examples/async/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use wingfoil::*;
3838
let period = Duration::from_millis(10);
3939
let run_for = RunFor::Duration(period * 5);
4040
let run_mode = RunMode::RealTime;
41-
let producer = async move || {
41+
let producer = move |_ctx: RunParams| async move {
4242
Ok(stream! {
4343
for i in 0.. {
4444
tokio::time::sleep(period).await; // simulate waiting IO

wingfoil/examples/async/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fn main() {
1111
let run_for = RunFor::Duration(period * 5);
1212
let run_mode = RunMode::RealTime;
1313

14-
let producer = async move || {
14+
let producer = move |_ctx: RunParams| async move {
1515
Ok(stream! {
1616
for i in 0.. {
1717
tokio::time::sleep(period).await; // simulate waiting IO

wingfoil/examples/rfq/market_data.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl HistoricalMarketDataProvider {
5959
impl MarketDataProvider for HistoricalMarketDataProvider {
6060
fn notifications(&self) -> Rc<dyn Stream<TinyVec<[Params; 1]>>> {
6161
let msgs = self.msgs.clone();
62-
produce_async(async move || {
62+
produce_async(move |_ctx| async move {
6363
Ok(stream! {
6464
for msg in msgs {
6565
yield msg;
@@ -77,7 +77,7 @@ pub struct RealTimeMarketDataProvider {
7777
impl MarketDataProvider for RealTimeMarketDataProvider {
7878
fn notifications(&self) -> Rc<dyn Stream<TinyVec<[Params; 1]>>> {
7979
let env = self.env.clone();
80-
produce_async(async move || Ok(Self::notifications(env).await))
80+
produce_async(move |_ctx| async move { Ok(Self::notifications(env).await) })
8181
}
8282
}
8383

wingfoil/src/adapters/kdb/integration_tests.rs

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,16 @@ fn test_kdb_round_trip() {
116116

117117
// Write trades using kdb_write
118118
let trades_to_write = trades.clone();
119-
let write_stream = produce_async(async move || {
120-
Ok(async_stream::stream! {
121-
for trade in trades_to_write {
122-
let time = NanoTime::from_kdb_timestamp(trade.time);
123-
yield (time, trade);
124-
}
125-
})
119+
let write_stream = produce_async(move |_ctx| {
120+
let trades = trades_to_write.clone();
121+
async move {
122+
Ok(async_stream::stream! {
123+
for trade in trades {
124+
let time = NanoTime::from_kdb_timestamp(trade.time);
125+
yield (time, trade);
126+
}
127+
})
128+
}
126129
});
127130

128131
kdb_write(conn.clone(), table_name, &write_stream)
@@ -137,20 +140,22 @@ fn test_kdb_round_trip() {
137140
eprintln!("DEBUG: table count = {:?}", count);
138141

139142
// Read trades back using kdb_read with chunking
143+
// Start/end times derived from RunMode/RunFor
140144
let query = format!("select from {}", table_name);
141145
let read_stream: Rc<dyn Stream<TinyVec<[TestTrade; 1]>>> = kdb_read(
142146
conn.clone(),
143147
query,
144-
"time", // time column name
145-
0, // start_time: KDB epoch
146-
i64::MAX, // end_time: far future
147-
1000, // rows_per_chunk: small for testing
148+
"time", // time column name
149+
1000, // rows_per_chunk: small for testing
148150
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
149151
);
150152

151153
let collected = read_stream.accumulate();
152154
collected
153-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
155+
.run(
156+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
157+
RunFor::Forever,
158+
)
154159
.expect("Read failed");
155160

156161
// Flatten and compare
@@ -209,15 +214,16 @@ fn test_kdb_read_chunking_basic() {
209214
conn.clone(),
210215
query,
211216
"time",
212-
0,
213-
i64::MAX,
214217
5, // 5 rows per chunk
215218
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
216219
);
217220

218221
let collected = read_stream.accumulate();
219222
collected
220-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
223+
.run(
224+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
225+
RunFor::Forever,
226+
)
221227
.expect("Read failed");
222228

223229
let read_trades: Vec<TestTrade> = collected.peek_value().into_iter().flatten().collect();
@@ -283,15 +289,16 @@ fn test_kdb_read_with_where_clause() {
283289
conn.clone(),
284290
query,
285291
"time",
286-
0,
287-
i64::MAX,
288292
10, // 10 rows per chunk
289293
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
290294
);
291295

292296
let collected = read_stream.accumulate();
293297
collected
294-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
298+
.run(
299+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
300+
RunFor::Forever,
301+
)
295302
.expect("Read failed");
296303

297304
let read_trades: Vec<TestTrade> = collected.peek_value().into_iter().flatten().collect();
@@ -339,20 +346,22 @@ fn test_kdb_read_time_range() {
339346
});
340347

341348
// Read only middle 30 rows (time range 30B to 60B)
349+
// Use RunMode/RunFor to specify the time range
342350
let query = format!("select from {}", table_name);
343351
let read_stream: Rc<dyn Stream<TinyVec<[TestTrade; 1]>>> = kdb_read(
344352
conn.clone(),
345353
query,
346354
"time",
347-
30_000_000_000, // start at row 30
348-
60_000_000_000, // end at row 60
349-
10, // 10 rows per chunk
355+
10, // 10 rows per chunk
350356
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
351357
);
352358

353359
let collected = read_stream.accumulate();
354360
collected
355-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
361+
.run(
362+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(30_000_000_000)),
363+
RunFor::Duration(std::time::Duration::from_nanos(30_000_000_000)), // 30B duration
364+
)
356365
.expect("Read failed");
357366

358367
let read_trades: Vec<TestTrade> = collected.peek_value().into_iter().flatten().collect();
@@ -407,15 +416,16 @@ fn test_kdb_read_edge_cases() {
407416
conn.clone(),
408417
query.clone(),
409418
"time",
410-
0,
411-
i64::MAX,
412419
100, // Much larger than actual row count
413420
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
414421
);
415422

416423
let collected = read_stream.accumulate();
417424
collected
418-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
425+
.run(
426+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
427+
RunFor::Forever,
428+
)
419429
.expect("Read failed");
420430

421431
let read_trades: Vec<TestTrade> = collected.peek_value().into_iter().flatten().collect();
@@ -426,15 +436,16 @@ fn test_kdb_read_edge_cases() {
426436
conn.clone(),
427437
query.clone(),
428438
"time",
429-
0,
430-
i64::MAX,
431439
1, // 1 row per chunk
432440
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
433441
);
434442

435443
let collected2 = read_stream2.accumulate();
436444
collected2
437-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
445+
.run(
446+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
447+
RunFor::Forever,
448+
)
438449
.expect("Read failed");
439450

440451
let read_trades2: Vec<TestTrade> = collected2.peek_value().into_iter().flatten().collect();
@@ -444,20 +455,18 @@ fn test_kdb_read_edge_cases() {
444455
"Should read all 7 rows with tiny chunks"
445456
);
446457

447-
// Test 3: Empty time range (no data)
448-
let read_stream3: Rc<dyn Stream<TinyVec<[TestTrade; 1]>>> = kdb_read(
449-
conn.clone(),
450-
query,
451-
"time",
452-
100_000_000_000, // Way after all data
453-
200_000_000_000,
454-
10,
455-
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
456-
);
458+
// Test 3: Empty time range (no data) - start way after all data
459+
let read_stream3: Rc<dyn Stream<TinyVec<[TestTrade; 1]>>> =
460+
kdb_read(conn.clone(), query, "time", 10, |t: &TestTrade| {
461+
NanoTime::from_kdb_timestamp(t.time)
462+
});
457463

458464
let collected3 = read_stream3.accumulate();
459465
collected3
460-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
466+
.run(
467+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(100_000_000_000)),
468+
RunFor::Duration(std::time::Duration::from_nanos(100_000_000_000)),
469+
)
461470
.expect("Read failed");
462471

463472
let read_trades3: Vec<TestTrade> = collected3.peek_value().into_iter().flatten().collect();
@@ -519,15 +528,16 @@ fn test_kdb_read_uneven_distribution() {
519528
conn.clone(),
520529
query,
521530
"time",
522-
0,
523-
i64::MAX,
524531
5, // 5 rows per chunk - will adapt to both dense and sparse regions
525532
|t: &TestTrade| NanoTime::from_kdb_timestamp(t.time),
526533
);
527534

528535
let collected = read_stream.accumulate();
529536
collected
530-
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
537+
.run(
538+
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
539+
RunFor::Forever,
540+
)
531541
.expect("Read failed");
532542

533543
let read_trades: Vec<TestTrade> = collected.peek_value().into_iter().flatten().collect();

0 commit comments

Comments
 (0)