Skip to content

Commit 8be7462

Browse files
committed
test next_sstable_id generation
1 parent f42bb33 commit 8be7462

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed

tests/next_sstable_id.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use std::sync::Arc;
2+
3+
use arrow_array::{Int64Array, RecordBatch, StringArray};
4+
use arrow_schema::{DataType, Field, Schema};
5+
use fusio::executor::NoopExecutor;
6+
use tonbo::db::DbBuilder;
7+
8+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
9+
async fn test_sstable_id_persistence_across_restarts() {
10+
let schema = Arc::new(Schema::new(vec![
11+
Field::new("id", DataType::Utf8, false),
12+
Field::new("value", DataType::Int64, false),
13+
]));
14+
15+
let db = DbBuilder::from_schema_key_name(schema.clone(), "id")
16+
.unwrap()
17+
.in_memory("test_db")
18+
.unwrap()
19+
.open_with_executor(Arc::new(NoopExecutor))
20+
.await
21+
.unwrap();
22+
23+
let batch1 = RecordBatch::try_new(
24+
schema.clone(),
25+
vec![
26+
Arc::new(StringArray::from(vec!["key1", "key2"])),
27+
Arc::new(Int64Array::from(vec![100, 200])),
28+
],
29+
)
30+
.unwrap();
31+
db.ingest(batch1).await.unwrap();
32+
33+
let batch2 = RecordBatch::try_new(
34+
schema.clone(),
35+
vec![
36+
Arc::new(StringArray::from(vec!["key3", "key4"])),
37+
Arc::new(Int64Array::from(vec![300, 400])),
38+
],
39+
)
40+
.unwrap();
41+
db.ingest(batch2).await.unwrap();
42+
43+
// Give db time to compact
44+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
45+
46+
let batches = db.scan().collect().await.unwrap();
47+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
48+
assert_eq!(total_rows, 4, "Should have all 4 rows in the database");
49+
}
50+
51+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
52+
async fn test_db_with_no_latest_version() {
53+
let schema = Arc::new(Schema::new(vec![
54+
Field::new("id", DataType::Utf8, false),
55+
Field::new("value", DataType::Int64, false),
56+
]));
57+
58+
let db = DbBuilder::from_schema_key_name(schema.clone(), "id")
59+
.unwrap()
60+
.in_memory("empty_db")
61+
.unwrap()
62+
.open_with_executor(Arc::new(NoopExecutor))
63+
.await
64+
.unwrap();
65+
66+
let batch = RecordBatch::try_new(
67+
schema,
68+
vec![
69+
Arc::new(StringArray::from(vec!["first_key"])),
70+
Arc::new(Int64Array::from(vec![1])),
71+
],
72+
)
73+
.unwrap();
74+
db.ingest(batch).await.unwrap();
75+
76+
let batches = db.scan().collect().await.unwrap();
77+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
78+
assert_eq!(total_rows, 1);
79+
}

0 commit comments

Comments
 (0)