Skip to content

Commit 2f61dc6

Browse files
committed
Add KDB+ write support
- Add NanoTime::to_kdb_timestamp() for converting to KDB timestamps - Add KdbSerialize trait for serializing Rust types to KDB rows - Add kdb_write() function to write stream data to KDB+ tables - Add KdbWriteOperators trait for fluent API (.kdb_write()) - Add filter_map operator to StreamOperators
1 parent cc2ccde commit 2f61dc6

File tree

3 files changed

+251
-2
lines changed

3 files changed

+251
-2
lines changed

wingfoil/src/adapters/kdb.rs

Lines changed: 233 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! KDB+ database adapter for reading data from q/kdb+ instances.
1+
//! KDB+ database adapter for reading and writing data to q/kdb+ instances.
22
//!
33
//! This module provides async connectivity to KDB+ databases using the `kdbplus` crate,
44
//! allowing query results to be streamed into wingfoil graphs.
@@ -36,11 +36,13 @@
3636
//! .unwrap();
3737
//! ```
3838
39-
use crate::nodes::produce_async;
39+
use crate::nodes::{FutStream, StreamOperators, produce_async};
4040
use crate::types::*;
41+
use futures::StreamExt;
4142
use kdbplus::ipc::{ConnectionMethod, K, QStream};
4243
use kdbplus::qtype;
4344
use std::fmt;
45+
use std::pin::Pin;
4446
use std::rc::Rc;
4547
use tinyvec::TinyVec;
4648

@@ -210,6 +212,151 @@ where
210212
})
211213
}
212214

215+
/// Trait for serializing Rust types to KDB row data.
216+
///
217+
/// Implementors should create a K object representing a single row
218+
/// that can be inserted into a KDB table.
219+
pub trait KdbSerialize: Sized {
220+
/// Serialize self into a K object representing a row.
221+
///
222+
/// The returned K object should be a compound list containing
223+
/// the column values in the same order as the target table schema.
224+
fn to_kdb_row(&self) -> K;
225+
}
226+
227+
/// Write stream data to a KDB+ table.
228+
///
229+
/// This function connects to a KDB+ instance and inserts records from the
230+
/// upstream stream into the specified table. Each batch of records is
231+
/// inserted using the `` `tablename insert row`` syntax.
232+
///
233+
/// # Type Parameters
234+
/// * `T` - The record type, must implement `Element`, `Send`, and `KdbSerialize`
235+
///
236+
/// # Arguments
237+
/// * `connection` - KDB connection configuration
238+
/// * `table_name` - Name of the target table
239+
/// * `upstream` - Stream of record batches to insert
240+
///
241+
/// # Returns
242+
/// A Node that drives the write operation.
243+
///
244+
/// # Example
245+
///
246+
/// ```ignore
247+
/// use wingfoil::adapters::kdb::*;
248+
/// use wingfoil::*;
249+
///
250+
/// #[derive(Debug, Clone, Default)]
251+
/// struct Trade {
252+
/// sym: String,
253+
/// price: f64,
254+
/// size: i64,
255+
/// }
256+
///
257+
/// impl KdbSerialize for Trade {
258+
/// fn to_kdb_row(&self) -> K {
259+
/// K::new_compound_list(vec![
260+
/// K::new_symbol(self.sym.clone()),
261+
/// K::new_float(self.price),
262+
/// K::new_long(self.size),
263+
/// ])
264+
/// }
265+
/// }
266+
///
267+
/// let conn = KdbConnection::new("localhost", 5000);
268+
/// let trades: Rc<dyn Stream<TinyVec<[Trade; 1]>>> = /* ... */;
269+
/// kdb_write(conn, "trades", &trades)
270+
/// .run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(10)))
271+
/// .unwrap();
272+
/// ```
273+
pub fn kdb_write<T>(
274+
connection: KdbConnection,
275+
table_name: impl Into<String>,
276+
upstream: &Rc<dyn Stream<TinyVec<[T; 1]>>>,
277+
) -> Rc<dyn Node>
278+
where
279+
T: Element + Send + KdbSerialize + 'static,
280+
{
281+
let table_name = table_name.into();
282+
283+
let consumer = Box::new(move |source: Pin<Box<dyn FutStream<TinyVec<[T; 1]>>>>| {
284+
kdb_write_consumer(connection, table_name, source)
285+
});
286+
287+
upstream.consume_async(consumer)
288+
}
289+
290+
async fn kdb_write_consumer<T>(
291+
connection: KdbConnection,
292+
table_name: String,
293+
mut source: Pin<Box<dyn FutStream<TinyVec<[T; 1]>>>>,
294+
) where
295+
T: Element + Send + KdbSerialize + 'static,
296+
{
297+
let creds = connection.credentials_string();
298+
299+
// Connect to KDB
300+
let mut socket = match QStream::connect(
301+
ConnectionMethod::TCP,
302+
&connection.host,
303+
connection.port,
304+
&creds,
305+
)
306+
.await
307+
{
308+
Ok(s) => s,
309+
Err(e) => {
310+
log::error!("KDB connection failed: {}", e);
311+
return;
312+
}
313+
};
314+
315+
// Process incoming records
316+
while let Some((_time, batch)) = source.next().await {
317+
for record in batch {
318+
let row = record.to_kdb_row();
319+
320+
// Build functional query: (insert;`tablename;row)
321+
// This is a compound list with: function symbol, table symbol, and row data
322+
let query = K::new_compound_list(vec![
323+
K::new_symbol("insert".to_string()),
324+
K::new_symbol(table_name.clone()),
325+
row,
326+
]);
327+
328+
// Send async message (fire-and-forget insert)
329+
if let Err(e) = socket.send_async_message(&query).await {
330+
log::error!("KDB insert failed: {}", e);
331+
}
332+
}
333+
}
334+
}
335+
336+
/// Extension trait for writing streams to KDB+ tables.
337+
///
338+
/// This trait provides a fluent API for writing `TinyVec<[T; 1]>` streams
339+
/// (output of `combine`, `kdb_read`, etc.) to KDB+ tables.
340+
pub trait KdbWriteOperators<T: Element> {
341+
/// Write this stream to a KDB+ table.
342+
///
343+
/// # Arguments
344+
/// * `conn` - KDB connection configuration
345+
/// * `table` - Name of the target table
346+
///
347+
/// # Returns
348+
/// A Node that drives the write operation.
349+
fn kdb_write(self: &Rc<Self>, conn: KdbConnection, table: &str) -> Rc<dyn Node>;
350+
}
351+
352+
impl<T: Element + Send + KdbSerialize + 'static> KdbWriteOperators<T>
353+
for dyn Stream<TinyVec<[T; 1]>>
354+
{
355+
fn kdb_write(self: &Rc<Self>, conn: KdbConnection, table: &str) -> Rc<dyn Node> {
356+
kdb_write(conn, table, self)
357+
}
358+
}
359+
213360
/// Extract column names from a KDB table result.
214361
///
215362
/// For tables, the result is a flipped dictionary where the keys are column names.
@@ -376,4 +523,88 @@ mod tests {
376523
let nano = NanoTime::from_kdb_timestamp(kdb_time);
377524
assert_eq!(u64::from(nano), 946_684_801_000_000_000);
378525
}
526+
527+
#[test]
528+
fn test_nanotime_kdb_timestamp_round_trip() {
529+
// Test round-trip conversion: NanoTime -> KDB timestamp -> NanoTime
530+
let original = NanoTime::new(1_000_000_000_000_000_000); // Some Unix timestamp
531+
let kdb_ts = original.to_kdb_timestamp();
532+
let restored = NanoTime::from_kdb_timestamp(kdb_ts);
533+
assert_eq!(original, restored);
534+
535+
// Test with KDB epoch (2000-01-01)
536+
let kdb_epoch = NanoTime::new(946_684_800_000_000_000);
537+
assert_eq!(kdb_epoch.to_kdb_timestamp(), 0);
538+
539+
// Test with values after KDB epoch
540+
let after_epoch = NanoTime::new(946_684_801_000_000_000); // 1 second after
541+
assert_eq!(after_epoch.to_kdb_timestamp(), 1_000_000_000);
542+
}
543+
544+
#[test]
545+
fn test_kdb_serialize_trait() {
546+
// Test that KdbSerialize trait can be implemented
547+
#[derive(Debug, Clone, Default)]
548+
struct TestRecord {
549+
sym: String,
550+
price: f64,
551+
size: i64,
552+
}
553+
554+
impl KdbSerialize for TestRecord {
555+
fn to_kdb_row(&self) -> K {
556+
K::new_compound_list(vec![
557+
K::new_symbol(self.sym.clone()),
558+
K::new_float(self.price),
559+
K::new_long(self.size),
560+
])
561+
}
562+
}
563+
564+
let record = TestRecord {
565+
sym: "AAPL".to_string(),
566+
price: 185.50,
567+
size: 100,
568+
};
569+
570+
let row = record.to_kdb_row();
571+
assert_eq!(row.get_type(), qtype::COMPOUND_LIST);
572+
}
573+
574+
#[test]
575+
fn test_kdb_write_node_creation() {
576+
use crate::nodes::constant;
577+
578+
// Test that kdb_write creates a valid node
579+
#[derive(Debug, Clone, Default)]
580+
struct TestTrade {
581+
sym: String,
582+
price: f64,
583+
}
584+
585+
impl KdbSerialize for TestTrade {
586+
fn to_kdb_row(&self) -> K {
587+
K::new_compound_list(vec![
588+
K::new_symbol(self.sym.clone()),
589+
K::new_float(self.price),
590+
])
591+
}
592+
}
593+
594+
let conn = KdbConnection::new("localhost", 5000);
595+
596+
// Create a constant stream that produces TinyVec batches
597+
let trade = TestTrade {
598+
sym: "TEST".to_string(),
599+
price: 100.0,
600+
};
601+
let mut batch: TinyVec<[TestTrade; 1]> = TinyVec::new();
602+
batch.push(trade);
603+
604+
let stream = constant(batch);
605+
606+
// Verify we can create the kdb_write node (doesn't require actual KDB connection)
607+
let _node = kdb_write(conn, "test_table", &stream);
608+
// Node creation succeeds - actual connection would be attempted during run()
609+
}
379610
}

wingfoil/src/nodes/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,12 @@ pub trait StreamOperators<T: Element> {
323323
/// Map's it's source into a new Stream using the supplied closure.
324324
fn map<OUT: Element>(self: &Rc<Self>, func: impl Fn(T) -> OUT + 'static)
325325
-> Rc<dyn Stream<OUT>>;
326+
/// Combines map and filter: transforms and optionally filters values.
327+
/// The closure returns (OUT, bool) where bool indicates whether to emit.
328+
fn filter_map<OUT: Element>(
329+
self: &Rc<Self>,
330+
func: impl Fn(T) -> (OUT, bool) + 'static,
331+
) -> Rc<dyn Stream<OUT>>;
326332
/// Map's source into a new Stream using a fallible closure.
327333
/// Errors propagate to graph execution.
328334
fn try_map<OUT: Element>(
@@ -532,6 +538,13 @@ where
532538
MapStream::new(self.clone(), Box::new(func)).into_stream()
533539
}
534540

541+
fn filter_map<OUT: Element>(
542+
self: &Rc<Self>,
543+
func: impl Fn(T) -> (OUT, bool) + 'static,
544+
) -> Rc<dyn Stream<OUT>> {
545+
MapFilterStream::new(self.clone(), Box::new(func)).into_stream()
546+
}
547+
535548
fn try_map<OUT: Element>(
536549
self: &Rc<Self>,
537550
func: impl Fn(T) -> anyhow::Result<OUT> + 'static,

wingfoil/src/time.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ impl NanoTime {
5454
pub fn from_kdb_timestamp(kdb_nanos: i64) -> Self {
5555
Self::new((kdb_nanos as u64).wrapping_add(Self::KDB_EPOCH_OFFSET_NANOS))
5656
}
57+
58+
/// Convert to KDB timestamp (nanoseconds from 2000-01-01).
59+
pub fn to_kdb_timestamp(self) -> i64 {
60+
(self.0.wrapping_sub(Self::KDB_EPOCH_OFFSET_NANOS)) as i64
61+
}
5762
}
5863

5964
impl From<u128> for NanoTime {

0 commit comments

Comments
 (0)