|
| 1 | +//! Database migration tool |
| 2 | +//! |
| 3 | +//! Copies data from one MDBX database to another, table by table. |
| 4 | +//! Allows customization of database parameters (page size, max size, growth step, etc.). |
| 5 | +
|
| 6 | +use clap::Parser; |
| 7 | +use reth_db::DatabaseEnv; |
| 8 | +use reth_db_api::{database::Database, transaction::DbTx}; |
| 9 | +use reth_libmdbx::WriteFlags; |
| 10 | +use reth_node_core::args::{parse_byte_size, ByteSize}; |
| 11 | +use std::{path::PathBuf, time::Instant}; |
| 12 | +use tracing::info; |
| 13 | + |
| 14 | +/// Format byte size to human-readable string |
| 15 | +fn format_byte_size(bytes: usize) -> String { |
| 16 | + ByteSize(bytes).to_string() |
| 17 | +} |
| 18 | + |
| 19 | +/// Arguments for the `reth db migrate` command |
| 20 | +#[derive(Parser, Debug)] |
| 21 | +#[command(next_help_heading = "Copy Options")] |
| 22 | +pub struct Command { |
| 23 | + /// Destination database directory (must not exist). |
| 24 | + #[arg(long, value_name = "DEST_PATH")] |
| 25 | + to: PathBuf, |
| 26 | + |
| 27 | + /// Specific tables to copy (comma-separated). Example: --tables Headers,Bodies |
| 28 | + #[arg(long, value_delimiter = ',')] |
| 29 | + tables: Vec<String>, |
| 30 | + |
| 31 | + /// Page size (e.g., 4KB, 8KB). Default: system default (typically 4KB). |
| 32 | + /// NOTE: Can only be set when creating a new database. |
| 33 | + #[arg(long, value_parser = parse_byte_size, verbatim_doc_comment)] |
| 34 | + page_size: Option<usize>, |
| 35 | + |
| 36 | + /// Maximum database size (e.g., 4TB, 12TB). Default: source database size. |
| 37 | + #[arg(long, value_parser = parse_byte_size)] |
| 38 | + max_size: Option<usize>, |
| 39 | + |
| 40 | + /// Database growth step (e.g., 4GB, 8GB). |
| 41 | + #[arg(long, default_value = "4GB", value_parser = parse_byte_size)] |
| 42 | + growth_step: usize, |
| 43 | + |
| 44 | + /// Commit every N records. Smaller = less memory, slower. |
| 45 | + #[arg(long, default_value = "100000")] |
| 46 | + commit_every: usize, |
| 47 | + |
| 48 | + /// Suppress progress messages. |
| 49 | + #[arg(long, short)] |
| 50 | + quiet: bool, |
| 51 | +} |
| 52 | + |
| 53 | +impl Command { |
| 54 | + /// Execute the database migration |
| 55 | + pub fn execute( |
| 56 | + &self, |
| 57 | + src_env: &DatabaseEnv, |
| 58 | + db_args: &reth_db::mdbx::DatabaseArguments, |
| 59 | + ) -> eyre::Result<()> { |
| 60 | + // Ensure destination doesn't exist |
| 61 | + if self.to.exists() { |
| 62 | + eyre::bail!("Destination {:?} already exists", self.to); |
| 63 | + } |
| 64 | + |
| 65 | + // Ensure parent directory exists |
| 66 | + if let Some(parent) = self.to.parent() { |
| 67 | + if !parent.exists() { |
| 68 | + std::fs::create_dir_all(parent)?; |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + if !self.quiet { |
| 73 | + info!(target: "reth::cli", "Starting database migration..."); |
| 74 | + info!(target: "reth::cli", "Destination: {:?}", self.to); |
| 75 | + } |
| 76 | + |
| 77 | + let start = Instant::now(); |
| 78 | + self.execute_custom_copy(src_env, db_args)?; |
| 79 | + |
| 80 | + let elapsed = start.elapsed(); |
| 81 | + |
| 82 | + if !self.quiet { |
| 83 | + info!(target: "reth::cli", "Copy completed in {:.2}s", elapsed.as_secs_f64()); |
| 84 | + |
| 85 | + // Display size comparison |
| 86 | + let src_size = src_env.info()?.map_size(); |
| 87 | + if let Ok(dst_metadata) = std::fs::metadata(&self.to.join("mdbx.dat")) { |
| 88 | + let dst_size = dst_metadata.len() as usize; |
| 89 | + info!(target: "reth::cli", "Source database map size: {}", format_byte_size(src_size)); |
| 90 | + info!(target: "reth::cli", "Destination file size: {}", format_byte_size(dst_size)); |
| 91 | + |
| 92 | + if dst_size < src_size { |
| 93 | + let reduction = ((src_size - dst_size) as f64 / src_size as f64) * 100.0; |
| 94 | + info!(target: "reth::cli", |
| 95 | + "Size reduction: {} ({:.2}% smaller due to defragmentation)", |
| 96 | + format_byte_size(src_size - dst_size), |
| 97 | + reduction); |
| 98 | + info!(target: "reth::cli", |
| 99 | + " Note: This is normal. The copy process eliminates fragmentation,"); |
| 100 | + info!(target: "reth::cli", |
| 101 | + " empty pages, and compacts the data structure."); |
| 102 | + } |
| 103 | + } |
| 104 | + } |
| 105 | + |
| 106 | + Ok(()) |
| 107 | + } |
| 108 | + |
| 109 | + /// Execute database copy with parameter customization |
| 110 | + fn execute_custom_copy( |
| 111 | + &self, |
| 112 | + src_env: &DatabaseEnv, |
| 113 | + base_db_args: &reth_db::mdbx::DatabaseArguments, |
| 114 | + ) -> eyre::Result<()> { |
| 115 | + use reth_db::tables::Tables; |
| 116 | + |
| 117 | + // Get source database parameters for display |
| 118 | + let src_info = src_env.info()?; |
| 119 | + let src_stat = src_env.stat()?; |
| 120 | + let src_page_size = src_stat.page_size(); |
| 121 | + let src_map_size = src_info.map_size(); |
| 122 | + |
| 123 | + // Start with system database arguments (includes log_level, exclusive, max_readers, etc.) |
| 124 | + // then override with user-specified parameters |
| 125 | + let mut dst_args = base_db_args.clone(); |
| 126 | + |
| 127 | + // Determine target parameters |
| 128 | + // Priority: user specified > source database |
| 129 | + let max_size_bytes = self.max_size.unwrap_or(src_map_size); |
| 130 | + let growth_step_bytes = self.growth_step; |
| 131 | + |
| 132 | + dst_args = dst_args |
| 133 | + .with_geometry_max_size(Some(max_size_bytes)) |
| 134 | + .with_growth_step(Some(growth_step_bytes)); |
| 135 | + |
| 136 | + // Override page size if user specified it |
| 137 | + if let Some(page_size) = self.page_size { |
| 138 | + dst_args = dst_args.with_page_size(Some(page_size)); |
| 139 | + } |
| 140 | + |
| 141 | + if !self.quiet { |
| 142 | + info!(target: "reth::cli", "Source database parameters:"); |
| 143 | + info!(target: "reth::cli", " Page size: {}", format_byte_size(src_page_size as usize)); |
| 144 | + info!(target: "reth::cli", " Map size: {}", format_byte_size(src_map_size)); |
| 145 | + info!(target: "reth::cli", "Target database parameters:"); |
| 146 | + if let Some(page_size) = self.page_size { |
| 147 | + info!(target: "reth::cli", " Page size: {} (custom)", format_byte_size(page_size)); |
| 148 | + } else { |
| 149 | + info!(target: "reth::cli", " Page size: {} (using system default)", |
| 150 | + format_byte_size(src_page_size as usize)); |
| 151 | + } |
| 152 | + info!(target: "reth::cli", " Map size: {}", format_byte_size(max_size_bytes)); |
| 153 | + info!(target: "reth::cli", " Growth step: {}", format_byte_size(growth_step_bytes)); |
| 154 | + info!(target: "reth::cli", " (Other settings: log_level, exclusive, max_readers, etc. inherited from system config)"); |
| 155 | + } |
| 156 | + |
| 157 | + // Create destination database and initialize all tables |
| 158 | + // Using init_db() to properly create all tables and record client version |
| 159 | + let dst_env = reth_db::init_db(&self.to, dst_args)?; |
| 160 | + |
| 161 | + if !self.quiet { |
| 162 | + info!(target: "reth::cli", "Destination database initialized"); |
| 163 | + } |
| 164 | + |
| 165 | + // Determine which tables to copy |
| 166 | + let tables_to_copy: Vec<String> = if self.tables.is_empty() { |
| 167 | + Tables::ALL.iter().map(|t| t.name().to_string()).collect() |
| 168 | + } else { |
| 169 | + // Validate table names |
| 170 | + let valid_tables: std::collections::HashSet<&str> = |
| 171 | + Tables::ALL.iter().map(|t| t.name()).collect(); |
| 172 | + |
| 173 | + for table in &self.tables { |
| 174 | + if !valid_tables.contains(table.as_str()) { |
| 175 | + eyre::bail!("Unknown table: {}", table); |
| 176 | + } |
| 177 | + } |
| 178 | + |
| 179 | + self.tables.clone() |
| 180 | + }; |
| 181 | + |
| 182 | + if !self.quiet { |
| 183 | + info!(target: "reth::cli", "Copying {} tables", tables_to_copy.len()); |
| 184 | + if !self.tables.is_empty() { |
| 185 | + info!(target: "reth::cli", " Note: Only copying selected tables. Other tables will be empty."); |
| 186 | + } |
| 187 | + } |
| 188 | + |
| 189 | + // Copy each table using table-specific implementations |
| 190 | + let total_tables = tables_to_copy.len(); |
| 191 | + for (idx, table_name) in tables_to_copy.iter().enumerate() { |
| 192 | + if !self.quiet { |
| 193 | + info!(target: "reth::cli", "[{}/{}] Copying table: {}", |
| 194 | + idx + 1, total_tables, table_name); |
| 195 | + } |
| 196 | + |
| 197 | + self.copy_table_generic(src_env, &dst_env, table_name)?; |
| 198 | + } |
| 199 | + |
| 200 | + Ok(()) |
| 201 | + } |
| 202 | + |
| 203 | + /// Copy a table using generic byte-level copying |
| 204 | + /// This works for all tables but doesn't validate table-specific types |
| 205 | + fn copy_table_generic( |
| 206 | + &self, |
| 207 | + src_env: &DatabaseEnv, |
| 208 | + dst_env: &DatabaseEnv, |
| 209 | + table_name: &str, |
| 210 | + ) -> eyre::Result<usize> { |
| 211 | + let mut src_tx = src_env.tx()?; |
| 212 | + |
| 213 | + // Disable timeout for long-running read transaction during copy |
| 214 | + // This is necessary because copying large tables can take a very long time |
| 215 | + src_tx.disable_long_read_transaction_safety(); |
| 216 | + |
| 217 | + let mut dst_tx = dst_env.tx_mut()?; |
| 218 | + |
| 219 | + // Open the databases (tables) by name |
| 220 | + // Source: read-only, use open_db() - table must exist |
| 221 | + let src_db = src_tx.inner.open_db(Some(table_name))?; |
| 222 | + // Destination: tables are already created by init_db(), just open them |
| 223 | + let dst_db = dst_tx.inner.open_db(Some(table_name))?; |
| 224 | + |
| 225 | + // Clear destination table before copying |
| 226 | + // This is necessary because: |
| 227 | + // 1. init_db() may have pre-populated some tables (e.g., VersionHistory) |
| 228 | + // 2. APPEND flag requires an empty table or strictly ordered keys |
| 229 | + dst_tx.inner.clear_db(dst_db.dbi())?; |
| 230 | + |
| 231 | + // Get total number of entries for progress calculation |
| 232 | + let total_entries = src_tx.inner.db_stat(&src_db)?.entries(); |
| 233 | + |
| 234 | + if !self.quiet { |
| 235 | + info!( |
| 236 | + target: "reth::cli", |
| 237 | + " Starting copy of table '{}' ({} records)", |
| 238 | + table_name, |
| 239 | + total_entries |
| 240 | + ); |
| 241 | + } |
| 242 | + |
| 243 | + // Get cursor for source and destination |
| 244 | + let src_cursor = src_tx.inner.cursor(&src_db)?; |
| 245 | + let mut dst_cursor = dst_tx.inner.cursor(&dst_db)?; |
| 246 | + |
| 247 | + let mut copied = 0usize; |
| 248 | + let mut batch_count = 0usize; |
| 249 | + let mut last_progress = Instant::now(); |
| 250 | + let start_time = Instant::now(); |
| 251 | + |
| 252 | + // Iterate through all records as byte slices |
| 253 | + for item in src_cursor.iter_slices() { |
| 254 | + let (key, value) = item?; |
| 255 | + |
| 256 | + // Insert into destination (convert Cow to slice) |
| 257 | + // Use APPEND flag for better performance (assumes ordered insert) |
| 258 | + dst_tx.inner.put(dst_db.dbi(), &key, &value, WriteFlags::APPEND)?; |
| 259 | + copied += 1; |
| 260 | + batch_count += 1; |
| 261 | + |
| 262 | + // Periodic commit |
| 263 | + if batch_count >= self.commit_every { |
| 264 | + drop(dst_cursor); |
| 265 | + dst_tx.commit()?; |
| 266 | + |
| 267 | + // Start new transaction |
| 268 | + dst_tx = dst_env.tx_mut()?; |
| 269 | + // Re-open destination table (already created, but need handle in new transaction) |
| 270 | + let dst_db = dst_tx.inner.open_db(Some(table_name))?; |
| 271 | + dst_cursor = dst_tx.inner.cursor(&dst_db)?; |
| 272 | + batch_count = 0; |
| 273 | + |
| 274 | + // Progress logging |
| 275 | + if !self.quiet && last_progress.elapsed().as_secs() >= 5 { |
| 276 | + let percentage = if total_entries > 0 { |
| 277 | + (copied as f64 / total_entries as f64 * 100.0).min(100.0) |
| 278 | + } else { |
| 279 | + 0.0 |
| 280 | + }; |
| 281 | + info!( |
| 282 | + target: "reth::cli", |
| 283 | + " Progress: {}/{} records ({:.2}%)", |
| 284 | + copied, |
| 285 | + total_entries, |
| 286 | + percentage |
| 287 | + ); |
| 288 | + last_progress = Instant::now(); |
| 289 | + } |
| 290 | + } |
| 291 | + } |
| 292 | + |
| 293 | + // Final commit |
| 294 | + if batch_count > 0 { |
| 295 | + drop(dst_cursor); |
| 296 | + dst_tx.commit()?; |
| 297 | + } |
| 298 | + |
| 299 | + // Log completion |
| 300 | + if !self.quiet { |
| 301 | + let elapsed = start_time.elapsed(); |
| 302 | + let rate = if elapsed.as_secs() > 0 { |
| 303 | + copied as f64 / elapsed.as_secs() as f64 |
| 304 | + } else { |
| 305 | + copied as f64 |
| 306 | + }; |
| 307 | + |
| 308 | + if copied == 0 { |
| 309 | + info!( |
| 310 | + target: "reth::cli", |
| 311 | + " Completed table '{}': empty table", |
| 312 | + table_name |
| 313 | + ); |
| 314 | + } else { |
| 315 | + info!( |
| 316 | + target: "reth::cli", |
| 317 | + " Completed table '{}': {} records in {:.2}s ({:.0} records/sec)", |
| 318 | + table_name, |
| 319 | + copied, |
| 320 | + elapsed.as_secs_f64(), |
| 321 | + rate |
| 322 | + ); |
| 323 | + } |
| 324 | + } |
| 325 | + |
| 326 | + Ok(copied) |
| 327 | + } |
| 328 | +} |
| 329 | + |
0 commit comments