Skip to content

Commit 92f911c

Browse files
committed
Refactor KDB adapter to use q expression strings for writes
- Add to_kdb_row_expr() method to KdbSerialize trait - Switch from K object construction to q string expressions for inserts - Use send_sync_message instead of send_async_message for reliability - Improve dictionary handling in read.rs with cleaner let-chain syntax - Update integration tests to use new API
1 parent c566de8 commit 92f911c

File tree

3 files changed

+46
-33
lines changed

3 files changed

+46
-33
lines changed

wingfoil/src/adapters/kdb/read.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,13 @@ where
102102
fn extract_column_names(result: &K) -> Vec<String> {
103103
// Tables in KDB (qtype 98) are flipped dictionaries
104104
// The underlying dictionary has symbol list keys (column names) and list of lists values
105-
if result.get_type() == qtype::TABLE {
106-
// Get the underlying dictionary from the table
107-
if let Ok(dict) = result.get_dictionary() {
108-
// Dictionary keys are a K object - extract symbols
109-
if let Ok(symbols) = dict.as_vec::<String>() {
110-
return symbols.clone();
111-
}
112-
}
105+
if result.get_type() == qtype::TABLE
106+
&& let Ok(dict) = result.get_dictionary()
107+
&& let Ok(dict_parts) = dict.as_vec::<K>()
108+
&& let Some(keys) = dict_parts.first()
109+
&& let Ok(symbols) = keys.as_vec::<String>()
110+
{
111+
return symbols.clone();
113112
}
114113
Vec::new()
115114
}
@@ -121,11 +120,11 @@ fn iterate_rows(result: &K) -> Vec<K> {
121120
// Tables in KDB (qtype 98) are column-oriented
122121
if result.get_type() == qtype::TABLE
123122
&& let Ok(dict) = result.get_dictionary()
123+
&& let Ok(dict_parts) = dict.as_vec::<K>()
124+
&& dict_parts.len() >= 2
124125
{
125-
// The dictionary values are a list of column vectors
126-
// We need to access the values side of the dictionary
127-
if let Ok(col_list) = dict.as_vec::<K>() {
128-
// col_list contains the column vectors
126+
let values = &dict_parts[1];
127+
if let Ok(col_list) = values.as_vec::<K>() {
129128
// Get the number of rows from the first column
130129
if let Some(first_col) = col_list.first() {
131130
let n_rows = first_col.len();

wingfoil/src/adapters/kdb/write.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ pub trait KdbSerialize: Sized {
1919
/// The returned K object should be a compound list containing
2020
/// the column values in the same order as the target table schema.
2121
fn to_kdb_row(&self) -> K;
22+
23+
/// Serialize self into a q expression string representing a row.
24+
///
25+
/// The returned string should be valid q syntax for a list of values,
26+
/// e.g., "(`AAPL; 185.5; 100)" for a trade record.
27+
fn to_kdb_row_expr(&self) -> String;
2228
}
2329

2430
/// Write stream data to a KDB+ table.
@@ -112,18 +118,12 @@ async fn kdb_write_consumer<T>(
112118
// Process incoming records
113119
while let Some((_time, batch)) = source.next().await {
114120
for record in batch {
115-
let row = record.to_kdb_row();
116-
117-
// Build functional query: (insert;`tablename;row)
118-
// This is a compound list with: function symbol, table symbol, and row data
119-
let query = K::new_compound_list(vec![
120-
K::new_symbol("insert".to_string()),
121-
K::new_symbol(table_name.clone()),
122-
row,
123-
]);
124-
125-
// Send async message (fire-and-forget insert)
126-
if let Err(e) = socket.send_async_message(&query).await {
121+
// Build insert query as a string: `tablename insert (values)
122+
let row_expr = record.to_kdb_row_expr();
123+
let query = format!("`{} insert {}", table_name, row_expr);
124+
125+
// Send sync message to ensure insert completes before continuing
126+
if let Err(e) = socket.send_sync_message(&query.as_str()).await {
127127
log::error!("KDB insert failed: {}", e);
128128
}
129129
}
@@ -177,6 +177,10 @@ mod tests {
177177
K::new_long(self.size),
178178
])
179179
}
180+
181+
fn to_kdb_row_expr(&self) -> String {
182+
format!("(`{}; {}; {})", self.sym, self.price, self.size)
183+
}
180184
}
181185

182186
let record = TestRecord {
@@ -207,6 +211,10 @@ mod tests {
207211
K::new_float(self.price),
208212
])
209213
}
214+
215+
fn to_kdb_row_expr(&self) -> String {
216+
format!("(`{}; {})", self.sym, self.price)
217+
}
210218
}
211219

212220
let conn = KdbConnection::new("localhost", 5000);

wingfoil/tests/kdb_integration.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ impl KdbSerialize for TestTrade {
5252
K::new_long(self.time),
5353
])
5454
}
55+
56+
fn to_kdb_row_expr(&self) -> String {
57+
// Use {:e} to ensure float format (e.g., 380.0 not 380)
58+
format!(
59+
"(`{}; {:e}; {}; {})",
60+
self.sym, self.price, self.size, self.time
61+
)
62+
}
5563
}
5664

5765
impl KdbDeserialize for TestTrade {
@@ -152,15 +160,16 @@ fn test_kdb_round_trip() {
152160
});
153161

154162
kdb_write(conn.clone(), table_name, &write_stream)
155-
.run(
156-
RunMode::HistoricalFrom(NanoTime::ZERO),
157-
RunFor::Duration(std::time::Duration::from_secs(1)),
158-
)
163+
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
159164
.expect("Write failed");
160165

161-
// Small delay to ensure writes are flushed
166+
// Small delay to ensure async writes are flushed
162167
std::thread::sleep(std::time::Duration::from_millis(100));
163168

169+
// Debug: check table contents directly
170+
let count: K = rt.block_on(async { run_q(&conn, &format!("count {}", table_name)).await });
171+
eprintln!("DEBUG: table count = {:?}", count);
172+
164173
// Read trades back using kdb_read
165174
let query = format!("select from {}", table_name);
166175
let read_stream: Rc<dyn Stream<TinyVec<[TestTrade; 1]>>> =
@@ -170,10 +179,7 @@ fn test_kdb_round_trip() {
170179

171180
let collected = read_stream.accumulate();
172181
collected
173-
.run(
174-
RunMode::HistoricalFrom(NanoTime::ZERO),
175-
RunFor::Duration(std::time::Duration::from_secs(1)),
176-
)
182+
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
177183
.expect("Read failed");
178184

179185
// Flatten and compare

0 commit comments

Comments
 (0)