Skip to content

Commit 0b6de09

Browse files
committed
feat: revamp migrations to support recreating cfs
1 parent 301df49 commit 0b6de09

16 files changed

+2885
-1212
lines changed

server/state_store/src/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing::{debug, error, info, span, warn};
2828
pub mod in_memory_state;
2929
pub mod invocation_events;
3030
pub mod kv;
31+
pub mod migration_runner;
3132
pub mod migrations;
3233
pub mod requests;
3334
pub mod scanner;
@@ -108,7 +109,7 @@ impl IndexifyState {
108109
// Migrate the db before opening with all column families.
109110
// This is because the migration process may delete older column families.
110111
// If we open the db with all column families, it would fail to open.
111-
let sm_meta = migrations::migrate(&path)?;
112+
let sm_meta = migration_runner::run(&path)?;
112113

113114
let sm_column_families = IndexifyObjectsColumns::iter()
114115
.map(|cf| ColumnFamilyDescriptor::new(cf.to_string(), Options::default()));
@@ -303,7 +304,7 @@ impl IndexifyState {
303304
&txn,
304305
&request.processed_state_changes,
305306
)?;
306-
migrations::write_sm_meta(
307+
migration_runner::write_sm_meta(
307308
&self.db,
308309
&txn,
309310
&StateMachineMetadata {
+249
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
use std::path::Path;
2+
3+
use anyhow::{Context, Result};
4+
use data_model::StateMachineMetadata;
5+
use rocksdb::{Transaction, TransactionDB};
6+
use tracing::info;
7+
8+
use crate::{
9+
migrations::{
10+
contexts::{MigrationContext, PrepareContext},
11+
registry::MigrationRegistry,
12+
},
13+
serializer::{JsonEncode, JsonEncoder},
14+
state_machine::IndexifyObjectsColumns,
15+
};
16+
17+
/// Main function to run all necessary migrations on a database at the given
18+
/// path
19+
pub fn run(path: &Path) -> Result<StateMachineMetadata> {
20+
// Initialize prepare context
21+
let prepare_ctx = PrepareContext::new(path.to_path_buf());
22+
23+
// Initialize registry
24+
let registry = MigrationRegistry::new()?;
25+
let latest_version = registry.latest_version();
26+
27+
// Check if DB exists
28+
let db = match prepare_ctx.open_db() {
29+
Ok(db) => db,
30+
Err(e) if e.to_string().contains("No such file or directory") => {
31+
// New DB, return default metadata
32+
info!(
33+
"No database found. Initializing at version {}",
34+
latest_version
35+
);
36+
return Ok(StateMachineMetadata {
37+
db_version: latest_version,
38+
last_change_idx: 0,
39+
});
40+
}
41+
Err(e) => return Err(anyhow::anyhow!("Error opening database: {:?}", e)),
42+
};
43+
44+
// Read current metadata
45+
let mut sm_meta = read_sm_meta(&db)?;
46+
drop(db); // Close DB before migrations
47+
48+
// Find applicable migrations
49+
let migrations = registry.find_migrations(sm_meta.db_version);
50+
51+
// No migrations needed
52+
if migrations.is_empty() {
53+
info!(
54+
"Database already at version {}. No migrations needed.",
55+
sm_meta.db_version
56+
);
57+
return Ok(sm_meta);
58+
}
59+
60+
info!(
61+
"Starting migrations from v{} to v{}",
62+
sm_meta.db_version, latest_version
63+
);
64+
65+
// Execute each migration in sequence
66+
for migration in migrations {
67+
let from_version = sm_meta.db_version;
68+
let to_version = migration.version();
69+
70+
info!(
71+
"Running migration {}: v{} → v{}",
72+
migration.name(),
73+
from_version,
74+
to_version
75+
);
76+
77+
// Each migration prepares the DB as needed
78+
let db = migration
79+
.prepare(&prepare_ctx)
80+
.with_context(|| format!("Preparing DB for migration to v{}", to_version))?;
81+
82+
// Apply migration in a transaction
83+
let txn = db.transaction();
84+
85+
// Create migration context
86+
let migration_ctx = MigrationContext::new(&db, &txn);
87+
88+
// Apply the migration
89+
migration
90+
.apply(&migration_ctx)
91+
.with_context(|| format!("Applying migration to v{}", to_version))?;
92+
93+
// Update metadata in the same transaction
94+
sm_meta.db_version = to_version;
95+
write_sm_meta(&db, &txn, &sm_meta)?;
96+
97+
info!("Committing migration to v{}", to_version);
98+
txn.commit()
99+
.with_context(|| format!("Committing migration to v{}", to_version))?;
100+
101+
// Close DB after each migration to ensure clean state
102+
drop(db);
103+
}
104+
105+
info!(
106+
"Completed all migrations. DB now at version {}",
107+
sm_meta.db_version
108+
);
109+
Ok(sm_meta)
110+
}
111+
112+
/// Read state machine metadata from the database
113+
pub fn read_sm_meta(db: &TransactionDB) -> Result<StateMachineMetadata> {
114+
let meta = db.get_cf(
115+
&IndexifyObjectsColumns::StateMachineMetadata.cf_db(db),
116+
b"sm_meta",
117+
)?;
118+
match meta {
119+
Some(meta) => Ok(JsonEncoder::decode(&meta)?),
120+
None => Ok(StateMachineMetadata {
121+
db_version: 0,
122+
last_change_idx: 0,
123+
}),
124+
}
125+
}
126+
127+
/// Write state machine metadata to the database
128+
pub fn write_sm_meta(
129+
db: &TransactionDB,
130+
txn: &Transaction<TransactionDB>,
131+
sm_meta: &StateMachineMetadata,
132+
) -> Result<()> {
133+
let serialized_meta = JsonEncoder::encode(sm_meta)?;
134+
txn.put_cf(
135+
&IndexifyObjectsColumns::StateMachineMetadata.cf_db(db),
136+
b"sm_meta",
137+
&serialized_meta,
138+
)?;
139+
Ok(())
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use rocksdb::{ColumnFamilyDescriptor, Options, TransactionDBOptions};
145+
use tempfile::TempDir;
146+
147+
use super::*;
148+
use crate::migrations::migration_trait::Migration;
149+
150+
#[derive(Clone)]
151+
struct MockMigration {
152+
version: u64,
153+
name: &'static str,
154+
}
155+
156+
impl Migration for MockMigration {
157+
fn version(&self) -> u64 {
158+
self.version
159+
}
160+
161+
fn name(&self) -> &'static str {
162+
self.name
163+
}
164+
165+
fn prepare(&self, ctx: &PrepareContext) -> Result<TransactionDB> {
166+
// Simple mock - just open DB
167+
ctx.open_db()
168+
}
169+
170+
fn apply(&self, _ctx: &MigrationContext) -> Result<()> {
171+
// No-op for test
172+
Ok(())
173+
}
174+
175+
fn box_clone(&self) -> Box<dyn Migration> {
176+
Box::new(self.clone())
177+
}
178+
}
179+
180+
#[test]
181+
fn test_migration_new_db() -> Result<()> {
182+
let temp_dir = TempDir::new()?;
183+
let path = temp_dir.path();
184+
185+
// Create a mock migration
186+
let mock_migration = MockMigration {
187+
version: 1,
188+
name: "MockMigration",
189+
};
190+
191+
// Use the mock migration (e.g., log its name)
192+
info!("Testing with migration: {}", mock_migration.name());
193+
194+
// Run migrations on non-existent DB
195+
let sm_meta = run(path)?;
196+
197+
// Check migration resulted in latest version
198+
assert_eq!(
199+
sm_meta.db_version,
200+
MigrationRegistry::new()?.latest_version()
201+
);
202+
203+
Ok(())
204+
}
205+
206+
#[test]
207+
fn test_migration_existing_db() -> Result<()> {
208+
let temp_dir = TempDir::new()?;
209+
let path = temp_dir.path();
210+
211+
// Create DB with initial metadata
212+
let cf_desc = ColumnFamilyDescriptor::new(
213+
IndexifyObjectsColumns::StateMachineMetadata.as_ref(),
214+
Options::default(),
215+
);
216+
217+
let mut db_opts = Options::default();
218+
db_opts.create_missing_column_families(true);
219+
db_opts.create_if_missing(true);
220+
221+
let db = TransactionDB::open_cf_descriptors(
222+
&db_opts,
223+
&TransactionDBOptions::default(),
224+
path,
225+
vec![cf_desc],
226+
)?;
227+
228+
// Set initial version to 1
229+
let txn = db.transaction();
230+
let initial_meta = StateMachineMetadata {
231+
db_version: 1,
232+
last_change_idx: 0,
233+
};
234+
write_sm_meta(&db, &txn, &initial_meta)?;
235+
txn.commit()?;
236+
drop(db);
237+
238+
// Run migrations
239+
let sm_meta = run(path)?;
240+
241+
// Check migration resulted in latest version
242+
assert_eq!(
243+
sm_meta.db_version,
244+
MigrationRegistry::new()?.latest_version()
245+
);
246+
247+
Ok(())
248+
}
249+
}

0 commit comments

Comments
 (0)