Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repository = "https://github.com/tonbo-io/tonbo"
readme = "README.md"

[workspace]
members = [".", "predicate"]
members = ["."]


[features]
Expand Down Expand Up @@ -38,28 +38,30 @@ s3-smoke = []

[dependencies]
anyhow = "1"
arrow-array = "56.2.0"
arrow-buffer = "56.2.0"
arrow-ipc = "56.1.0"
arrow-schema = { version = "56.2.0", features = ["serde"] }
arrow-select = "56.2.0"
aisle = { git = "https://github.com/tonbo-io/aisle", branch = "main", default-features = false, features = ["row_filter"] }
arrow-array = "57.1.0"
arrow-buffer = "57.1.0"
arrow-ipc = "57.1.0"
arrow-schema = { version = "57.1.0", features = ["serde"] }
arrow-select = "57.1.0"
crc32c = "0.6"
crossbeam-skiplist = "0.1"
fusio = { version = "0.5.0", default-features = false, features = [
datafusion-common = "51.0.0"
fusio = { version = "0.6.0", default-features = false, features = [
"aws",
"dyn",
"executor",
"fs",
] }
fusio-manifest = { version = "0.5.0", package = "fusio-manifest", default-features = false, features = [
fusio-manifest = { version = "0.6.0", package = "fusio-manifest", default-features = false, features = [
"std",
] }
fusio-parquet = { version = "0.5.0", package = "fusio-parquet" }
fusio-parquet = { version = "0.6.0", package = "fusio-parquet" }
futures = "0.3"
lockable = "0.2"
once_cell = "1"
parking_lot = "0.12"
parquet = { version = "56.2.0", default-features = false, features = [
parquet = { version = "57.1.0", default-features = false, features = [
"async",
"zstd",
] }
Expand All @@ -76,9 +78,8 @@ tokio = { version = "1", default-features = false, features = [
"sync",
"time",
], optional = true }
tonbo-predicate = { version = "0.1.0", path = "predicate" }
typed-arrow = { version = "0.5.1", features = ["ext-hooks"], optional = true }
typed-arrow-dyn = { version = "0.0.6", features = ["serde"] }
typed-arrow = { version = "0.6.0", default-features = false, features = ["arrow-57", "ext-hooks", "views"], optional = true }
typed-arrow-dyn = { version = "0.0.7", default-features = false, features = ["arrow-57", "serde"] }
ulid = { version = "1", features = ["serde"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand All @@ -91,7 +92,7 @@ js-sys = "0.3"
clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3"
tempfile = "3"
typed-arrow = { version = "0.5.1", features = ["ext-hooks"] }
typed-arrow = { version = "0.6.0", default-features = false, features = ["arrow-57", "ext-hooks", "views"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ builders.append_rows(users);
db.ingest(builders.finish().into_record_batch()).await?;

// Query
let filter = Predicate::gt(ColumnRef::new("score"), ScalarValue::from(80_i64));
let filter = Expr::gt("score", ScalarValue::from(80_i64));
let results = db.scan().filter(filter).collect().await?;
```

Expand Down
2 changes: 1 addition & 1 deletion examples/01_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
db.ingest(builders.finish().into_record_batch()).await?;

// 3. Query: score > 80
let filter = Predicate::gt(ColumnRef::new("score"), ScalarValue::from(80_i64));
let filter = Expr::gt("score", ScalarValue::from(80_i64));
let batches = db.scan().filter(filter).collect().await?;

println!("Users with score > 80:");
Expand Down
4 changes: 2 additions & 2 deletions examples/02_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tx.delete("u2")?;

// Read-your-writes: see uncommitted changes within the transaction
let filter = Predicate::is_not_null(ColumnRef::new("id"));
let filter = Expr::is_not_null("id");
let preview = tx.scan().filter(filter).collect().await?;

println!("Before commit (read-your-writes):");
Expand All @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tx.commit().await?;

// Verify after commit
let filter = Predicate::is_not_null(ColumnRef::new("id"));
let filter = Expr::is_not_null("id");
let committed = db.scan().filter(filter).collect().await?;

println!("\nAfter commit:");
Expand Down
2 changes: 1 addition & 1 deletion examples/02b_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
db.ingest(builders.finish().into_record_batch()).await?;

// Snapshot sees only data at snapshot time
let filter = Predicate::is_not_null(ColumnRef::new("id"));
let filter = Expr::is_not_null("id");
let snapshot_data = snapshot.scan(&db).filter(filter.clone()).collect().await?;

println!("Snapshot (frozen in time):");
Expand Down
44 changes: 22 additions & 22 deletions examples/03_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// 1. Equality: price == 29
println!("1. price == 29:");
let filter = Predicate::eq(ColumnRef::new("price"), ScalarValue::from(29_i64));
let filter = Expr::eq("price", ScalarValue::from(29_i64));
print_products(&db, filter).await?;

// 2. Comparison: price > 100
println!("\n2. price > 100:");
let filter = Predicate::gt(ColumnRef::new("price"), ScalarValue::from(100_i64));
let filter = Expr::gt("price", ScalarValue::from(100_i64));
print_products(&db, filter).await?;

// 3. Range: 50 <= price <= 300
println!("\n3. 50 <= price <= 300:");
let filter = Predicate::and(vec![
Predicate::gte(ColumnRef::new("price"), ScalarValue::from(50_i64)),
Predicate::lte(ColumnRef::new("price"), ScalarValue::from(300_i64)),
let filter = Expr::and(vec![
Expr::gt_eq("price", ScalarValue::from(50_i64)),
Expr::lt_eq("price", ScalarValue::from(300_i64)),
]);
print_products(&db, filter).await?;

// 4. IN list: category in ["Electronics", "Office"]
println!("\n4. category IN ['Electronics', 'Office']:");
let filter = Predicate::in_list(
ColumnRef::new("category"),
let filter = Expr::in_list(
"category",
vec![
ScalarValue::from("Electronics"),
ScalarValue::from("Office"),
Expand All @@ -95,43 +95,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// 5. IS NULL: category is null
println!("\n5. category IS NULL:");
let filter = Predicate::is_null(ColumnRef::new("category"));
let filter = Expr::is_null("category");
print_products(&db, filter).await?;

// 6. IS NOT NULL: category is not null
println!("\n6. category IS NOT NULL:");
let filter = Predicate::is_not_null(ColumnRef::new("category"));
let filter = Expr::is_not_null("category");
print_products(&db, filter).await?;

// 7. AND: Electronics AND price < 100
println!("\n7. category == 'Electronics' AND price < 100:");
let filter = Predicate::and(vec![
Predicate::eq(ColumnRef::new("category"), ScalarValue::from("Electronics")),
Predicate::lt(ColumnRef::new("price"), ScalarValue::from(100_i64)),
let filter = Expr::and(vec![
Expr::eq("category", ScalarValue::from("Electronics")),
Expr::lt("price", ScalarValue::from(100_i64)),
]);
print_products(&db, filter).await?;

// 8. OR: Furniture OR price < 10
println!("\n8. category == 'Furniture' OR price < 10:");
let filter = Predicate::or(vec![
Predicate::eq(ColumnRef::new("category"), ScalarValue::from("Furniture")),
Predicate::lt(ColumnRef::new("price"), ScalarValue::from(10_i64)),
let filter = Expr::or(vec![
Expr::eq("category", ScalarValue::from("Furniture")),
Expr::lt("price", ScalarValue::from(10_i64)),
]);
print_products(&db, filter).await?;

// 9. NOT: NOT category == 'Electronics'
println!("\n9. NOT category == 'Electronics':");
let filter = Predicate::eq(ColumnRef::new("category"), ScalarValue::from("Electronics")).not();
let filter = Expr::not(Expr::eq("category", ScalarValue::from("Electronics")));
print_products(&db, filter).await?;

// 10. Complex: (Electronics OR Furniture) AND price > 100
println!("\n10. (Electronics OR Furniture) AND price > 100:");
let filter = Predicate::and(vec![
Predicate::or(vec![
Predicate::eq(ColumnRef::new("category"), ScalarValue::from("Electronics")),
Predicate::eq(ColumnRef::new("category"), ScalarValue::from("Furniture")),
let filter = Expr::and(vec![
Expr::or(vec![
Expr::eq("category", ScalarValue::from("Electronics")),
Expr::eq("category", ScalarValue::from("Furniture")),
]),
Predicate::gt(ColumnRef::new("price"), ScalarValue::from(100_i64)),
Expr::gt("price", ScalarValue::from(100_i64)),
]);
print_products(&db, filter).await?;

Expand All @@ -140,7 +140,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

async fn print_products(
db: &DB<LocalFs, TokioExecutor>,
filter: Predicate,
filter: Expr,
) -> Result<(), Box<dyn std::error::Error>> {
let batches = db.scan().filter(filter).collect().await?;
let mut found = false;
Expand Down
5 changes: 1 addition & 4 deletions examples/04_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Inserted 3 events to S3");

// Query from S3
let filter = Predicate::eq(
ColumnRef::new("event_type"),
ScalarValue::from("user.created"),
);
let filter = Expr::eq("event_type", ScalarValue::from("user.created"));
let batches = db.scan().filter(filter).collect().await?;

println!("\nEvents where event_type = 'user.created':");
Expand Down
6 changes: 3 additions & 3 deletions examples/05_scan_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// 3. Filter + Limit: high price orders, max 2
println!("\n3. WHERE price > 100 LIMIT 2:");
let filter = Predicate::gt(ColumnRef::new("price"), ScalarValue::from(100_i64));
let filter = Expr::gt("price", ScalarValue::from(100_i64));
let batches = db.scan().filter(filter).limit(2).collect().await?;
for batch in &batches {
for o in batch.iter_views::<Order>()?.try_flatten()? {
Expand All @@ -118,7 +118,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// 4. Filter + Projection: high-value orders, show only id and price
println!("\n4. SELECT id, price WHERE price > 100:");
let filter = Predicate::gt(ColumnRef::new("price"), ScalarValue::from(100_i64));
let filter = Expr::gt("price", ScalarValue::from(100_i64));
let projected_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("price", DataType::Int64, false),
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// 5. All combined: Filter + Projection + Limit
println!("\n5. SELECT id, product WHERE quantity = 1 LIMIT 3:");
let filter = Predicate::eq(ColumnRef::new("quantity"), ScalarValue::from(1_i64));
let filter = Expr::eq("quantity", ScalarValue::from(1_i64));
let projected_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("product", DataType::Utf8, false),
Expand Down
10 changes: 5 additions & 5 deletions examples/06_composite_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Filter by first key component: device_id = 'sensor-1'
println!("\nReadings for sensor-1:");
let filter = Predicate::eq(ColumnRef::new("device_id"), ScalarValue::from("sensor-1"));
let filter = Expr::eq("device_id", ScalarValue::from("sensor-1"));
let batches = db.scan().filter(filter).collect().await?;
print_readings(&batches)?;

// Filter by second key component: timestamp > 2000
println!("\nReadings after timestamp 2000:");
let filter = Predicate::gt(ColumnRef::new("timestamp"), ScalarValue::from(2000_i64));
let filter = Expr::gt("timestamp", ScalarValue::from(2000_i64));
let batches = db.scan().filter(filter).collect().await?;
print_readings(&batches)?;

// Combined filter on both key components
println!("\nSensor-1 readings after timestamp 1500:");
let filter = Predicate::and(vec![
Predicate::eq(ColumnRef::new("device_id"), ScalarValue::from("sensor-1")),
Predicate::gt(ColumnRef::new("timestamp"), ScalarValue::from(1500_i64)),
let filter = Expr::and(vec![
Expr::eq("device_id", ScalarValue::from("sensor-1")),
Expr::gt("timestamp", ScalarValue::from(1500_i64)),
]);
let batches = db.scan().filter(filter).collect().await?;
print_readings(&batches)?;
Expand Down
6 changes: 3 additions & 3 deletions examples/07_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Method 1: collect() - loads all matching rows into memory
// Good for small result sets
println!("=== Method 1: collect() ===");
let filter = Predicate::eq(ColumnRef::new("level"), ScalarValue::from("ERROR"));
let filter = Expr::eq("level", ScalarValue::from("ERROR"));
let batches = db.scan().filter(filter).collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
println!(
Expand All @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Method 2: stream() - process batches one at a time
// Good for large result sets or when you want to stop early
println!("=== Method 2: stream() ===");
let filter = Predicate::eq(ColumnRef::new("level"), ScalarValue::from("WARN"));
let filter = Expr::eq("level", ScalarValue::from("WARN"));
let mut stream = db.scan().filter(filter).stream().await?;

let mut batch_count = 0;
Expand All @@ -73,7 +73,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Method 3: stream() with early termination
// Process until you find what you need
println!("=== Method 3: stream() with early exit ===");
let filter = Predicate::eq(ColumnRef::new("level"), ScalarValue::from("INFO"));
let filter = Expr::eq("level", ScalarValue::from("INFO"));
let mut stream = db.scan().filter(filter).stream().await?;

let mut found_count = 0;
Expand Down
13 changes: 5 additions & 8 deletions examples/10_dynamic/10a_dynamic_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,25 @@ async fn main() {
.expect("schema ok");
db.ingest(batch).await.expect("insert dynamic batch");

let key_col = ColumnRef::new("id");
let key_col = "id";

// Scan for a specific key (id == "carol") using predicate
let carol_pred = Predicate::eq(key_col.clone(), ScalarValue::from("carol"));
let carol_pred = Expr::eq(key_col, ScalarValue::from("carol"));
let out = scan_pairs(&db, carol_pred).await;
println!("dynamic scan rows (carol): {:?}", out);

// Query expression: id == "dave"
let expr = Predicate::eq(key_col.clone(), ScalarValue::from("dave"));
let expr = Expr::eq(key_col, ScalarValue::from("dave"));
let out_q = scan_pairs(&db, expr).await;
println!("dynamic query rows (id == dave): {:?}", out_q);

// Scan all dynamic rows (id is not null)
let all_pred = Predicate::is_not_null(key_col.clone());
let all_pred = Expr::is_not_null(key_col);
let all_rows = scan_pairs(&db, all_pred).await;
println!("dynamic rows (all): {:?}", all_rows);
}

async fn scan_pairs(
db: &DB<InMemoryFs, TokioExecutor>,
predicate: Predicate,
) -> Vec<(String, i32)> {
async fn scan_pairs(db: &DB<InMemoryFs, TokioExecutor>, predicate: Expr) -> Vec<(String, i32)> {
let batches = db.scan().filter(predicate).collect().await.expect("scan");
batches
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion examples/10_dynamic/10b_dynamic_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() {
db.ingest(batch).await.expect("insert");

// Scan all rows using a trivial predicate
let pred = Predicate::is_not_null(ColumnRef::new("id"));
let pred = Expr::is_not_null("id");
let rows: Vec<(String, i32)> = db
.scan()
.filter(pred)
Expand Down
10 changes: 5 additions & 5 deletions examples/10_dynamic/10c_dynamic_composite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ async fn main() {
db.ingest(batch).await.expect("insert");

// Predicate over composite key: id = 'a' AND ts BETWEEN 5 AND 10
let pred = Predicate::and(vec![
Predicate::eq(ColumnRef::new("id"), ScalarValue::from("a")),
Predicate::and(vec![
Predicate::gte(ColumnRef::new("ts"), ScalarValue::from(5i64)),
Predicate::lte(ColumnRef::new("ts"), ScalarValue::from(10i64)),
let pred = Expr::and(vec![
Expr::eq("id", ScalarValue::from("a")),
Expr::and(vec![
Expr::gt_eq("ts", ScalarValue::from(5i64)),
Expr::lt_eq("ts", ScalarValue::from(10i64)),
]),
]);

Expand Down
6 changes: 3 additions & 3 deletions examples/10_dynamic/10d_dynamic_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main() {
// tx.delete("ghost").expect("stage delete");

// Read-your-writes inside the transaction.
let pred = Predicate::eq(ColumnRef::new("id"), ScalarValue::from("user-1"));
let pred = Expr::eq("id", ScalarValue::from("user-1"));
let preview_batches = tx.scan().filter(pred).collect().await.expect("preview");
let mut preview_rows = Vec::new();
for batch in &preview_batches {
Expand All @@ -78,12 +78,12 @@ async fn main() {
tx.commit().await.expect("commit");

// Post-commit read via the public scan path.
let all_pred = Predicate::is_not_null(ColumnRef::new("id"));
let all_pred = Expr::is_not_null("id");
let committed = scan_pairs(&db, &all_pred).await;
println!("committed rows: {:?}", committed);
}

async fn scan_pairs(db: &DB<LocalFs, TokioExecutor>, predicate: &Predicate) -> Vec<(String, i32)> {
async fn scan_pairs(db: &DB<LocalFs, TokioExecutor>, predicate: &Expr) -> Vec<(String, i32)> {
let mut stream = db
.scan()
.filter(predicate.clone())
Expand Down
Loading
Loading