-
Notifications
You must be signed in to change notification settings - Fork 97
Expand file tree
/
Copy path09_time_travel.rs
More file actions
146 lines (133 loc) · 4.89 KB
/
09_time_travel.rs
File metadata and controls
146 lines (133 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Time Travel: list versions and create snapshots at specific timestamps
//!
//! This example demonstrates the time travel API:
//! - `db.list_versions(limit)` - enumerate committed versions
//! - `db.snapshot_at(timestamp)` - create a snapshot at a specific timestamp
//!
//! Tonbo supports two levels of time travel:
//! 1. **MVCC timestamps** - every commit gets a logical timestamp for visibility control
//! 2. **Manifest versions** - when data is flushed to SST files, a version snapshot is recorded in
//! the manifest, enabling queries against historical file sets
//!
//! Run: cargo run --example 09_time_travel
use tonbo::prelude::*;
#[derive(Record)]
struct Product {
#[metadata(k = "tonbo.key", v = "true")]
id: i64,
name: String,
price: i64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = DbBuilder::from_schema(Product::schema())?
.on_disk("/tmp/tonbo_time_travel")?
.open()
.await?;
// === Insert data in multiple transactions ===
// Each transaction gets a unique MVCC timestamp
// Transaction 1: Initial products
let products = vec![
Product {
id: 1,
name: "Laptop".into(),
price: 999,
},
Product {
id: 2,
name: "Mouse".into(),
price: 29,
},
];
let mut builders = Product::new_builders(products.len());
builders.append_rows(products);
db.ingest(builders.finish().into_record_batch()).await?;
println!("Tx1: Inserted Laptop ($999), Mouse ($29)");
// Transaction 2: Price update
let mut tx = db.begin_transaction().await?;
let update = vec![Product {
id: 1,
name: "Laptop".into(),
price: 899,
}];
let mut builders = Product::new_builders(update.len());
builders.append_rows(update);
tx.upsert_batch(&builders.finish().into_record_batch())?;
tx.commit().await?;
println!("Tx2: Laptop price reduced to $899");
// Transaction 3: New product
let mut tx = db.begin_transaction().await?;
let update = vec![Product {
id: 3,
name: "Keyboard".into(),
price: 79,
}];
let mut builders = Product::new_builders(update.len());
builders.append_rows(update);
tx.upsert_batch(&builders.finish().into_record_batch())?;
tx.commit().await?;
println!("Tx3: Added Keyboard ($79)");
// === List persisted versions ===
// Versions are created when data is flushed to SST files
println!("\n=== Persisted Versions (from manifest) ===");
let versions = db.list_versions(10).await?;
if versions.is_empty() {
println!(" (no SST versions yet - data is in memory)");
} else {
for (i, v) in versions.iter().enumerate() {
println!(
" Version {}: timestamp={}, ssts={}, levels={}",
versions.len() - i,
v.timestamp.get(),
v.sst_count,
v.level_count
);
}
}
// === Query current state ===
println!("\n=== Current State ===");
let batches = db.scan().collect().await?;
for batch in &batches {
for product in batch.iter_views::<Product>()?.try_flatten()? {
println!(" {} - {} (${})", product.id, product.name, product.price);
}
}
// === Snapshot at specific MVCC timestamp ===
// Query using timestamps from list_versions for reliable time travel
if let Some(first_version) = versions.last() {
println!(
"\n=== Snapshot at timestamp={} (first version) ===",
first_version.timestamp.get()
);
let snapshot = db.snapshot_at(first_version.timestamp).await?;
let batches = snapshot.scan(&db).collect().await?;
for batch in &batches {
for product in batch.iter_views::<Product>()?.try_flatten()? {
println!(" {} - {} (${})", product.id, product.name, product.price);
}
}
}
if let Some(latest_version) = versions.first() {
println!(
"\n=== Snapshot at timestamp={} (latest version) ===",
latest_version.timestamp.get()
);
let snapshot = db.snapshot_at(latest_version.timestamp).await?;
let batches = snapshot.scan(&db).collect().await?;
for batch in &batches {
for product in batch.iter_views::<Product>()?.try_flatten()? {
println!(" {} - {} (${})", product.id, product.name, product.price);
}
}
}
// Current snapshot (includes in-memory data)
println!("\n=== Current State (begin_snapshot) ===");
let snapshot = db.begin_snapshot().await?;
let batches = snapshot.scan(&db).collect().await?;
for batch in &batches {
for product in batch.iter_views::<Product>()?.try_flatten()? {
println!(" {} - {} (${})", product.id, product.name, product.price);
}
}
Ok(())
}