diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 709b0003e..7ee7061bf 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -103,17 +103,13 @@ impl BaseDbExt for BaseDb { impl WithMigrations for BaseDb { const NAME: &'static str = "base"; - const VERSION: Semver = [0, 0, 3]; + const VERSION: Semver = [0, 1, 0]; fn register_migrations( - migrations: &mut Migrations, - cancelled: CancellationFlag, + _migrations: &mut Migrations, + _cancelled: CancellationFlag, ) -> Result<(), MigrationError> { - migrations.register([0, 0, 1], [0, 0, 2], move |db| { - base_migrations::v0_0_1_to_0_0_2(db, cancelled.clone()) - })?; - migrations.register([0, 0, 2], [0, 0, 3], base_migrations::v_0_0_2_to_v_0_0_3)?; - + // TODO: register migrations here Ok(()) } } @@ -128,95 +124,7 @@ weedb::tables! { pub full_block_ids: tables::FullBlockIds, pub package_entries: tables::PackageEntries, pub block_data_entries: tables::BlockDataEntries, - pub shard_states: tables::ShardStates, - pub cells: tables::Cells, - pub temp_cells: tables::TempCells, pub block_connections: tables::BlockConnections, - - // tables are empty, but they cannot be deleted because they are in a storage config - _shard_internal_messages: tables::ShardInternalMessagesOld, - _int_msg_stats_uncommited: tables::InternalMessageStatsUncommitedOld, - _shard_int_msgs_uncommited: tables::ShardInternalMessagesUncommitedOld, - _internal_message_stats: tables::InternalMessageStatsOld, - } -} - -mod base_migrations { - use std::time::Instant; - - use everscale_types::boc::Boc; - use tycho_block_util::archive::ArchiveEntryType; - use weedb::rocksdb::CompactOptions; - - use super::*; - use crate::util::StoredValue; - - pub fn v0_0_1_to_0_0_2(db: &BaseDb, cancelled: CancellationFlag) -> Result<(), MigrationError> { - let mut block_data_iter = db.package_entries.raw_iterator(); - block_data_iter.seek_to_first(); - - tracing::info!("stated migrating package entries"); - - let started_at = Instant::now(); - let mut total_processed = 0usize; - let mut block_ids_created = 0usize; - - let full_block_ids_cf = &db.full_block_ids.cf(); - let mut batch = weedb::rocksdb::WriteBatch::default(); - let mut cancelled = cancelled.debounce(10); - loop { - let (key, value) = match block_data_iter.item() { - Some(item) if !cancelled.check() => item, - Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())), - None => { - block_data_iter.status()?; - break; - } - }; - - 'item: { - let key = crate::PackageEntryKey::from_slice(key); - if key.ty != ArchiveEntryType::Block { - break 'item; - } - - let file_hash = Boc::file_hash_blake(value); - batch.put_cf(full_block_ids_cf, key.block_id.to_vec(), file_hash); - block_ids_created += 1; - } - - block_data_iter.next(); - total_processed += 1; - } - - db.rocksdb() - .write_opt(batch, db.full_block_ids.write_config())?; - - tracing::info!( - elapsed = %humantime::format_duration(started_at.elapsed()), - total_processed, - block_ids_created, - "finished migrating package entries" - ); - Ok(()) - } - - pub fn v_0_0_2_to_v_0_0_3(db: &BaseDb) -> Result<(), MigrationError> { - let mut opts = CompactOptions::default(); - opts.set_exclusive_manual_compaction(true); - let null = Option::<&[u8]>::None; - - let started_at = Instant::now(); - tracing::info!("started cells compaction"); - db.cells - .db() - .compact_range_cf_opt(&db.cells.cf(), null, null, &opts); - tracing::info!( - elapsed = %humantime::format_duration(started_at.elapsed()), - "finished cells compaction" - ); - - Ok(()) } } @@ -226,16 +134,13 @@ pub type RpcDb = WeeDb; impl WithMigrations for RpcDb { const NAME: &'static str = "rpc"; - const VERSION: Semver = [0, 0, 2]; + const VERSION: Semver = [0, 1, 0]; fn register_migrations( - migrations: &mut Migrations, - cancelled: CancellationFlag, + _migrations: &mut Migrations, + _cancelled: CancellationFlag, ) -> Result<(), MigrationError> { - migrations.register([0, 0, 1], [0, 0, 2], move |db| { - rpc_migrations::v0_0_1_to_0_0_2(db, cancelled.clone()) - })?; - + // TODO: register migrations here Ok(()) } } @@ -251,102 +156,6 @@ weedb::tables! { } } -mod rpc_migrations { - use std::time::Instant; - - use everscale_types::boc::{Boc, BocTag}; - use everscale_types::models::Transaction; - use tycho_util::sync::CancellationFlag; - use weedb::MigrationError; - - use crate::{RpcDb, TransactionMask}; - - pub fn v0_0_1_to_0_0_2(db: &RpcDb, cancelled: CancellationFlag) -> Result<(), MigrationError> { - const BATCH_SIZE: usize = 100_000; - - let mut transactions_iter = db.transactions.raw_iterator(); - transactions_iter.seek_to_first(); - - tracing::info!("stated migrating transactions"); - - let started_at = Instant::now(); - let mut total_processed = 0usize; - - let transactions_cf = &db.transactions.cf(); - let mut batch = weedb::rocksdb::WriteBatch::default(); - let mut cancelled = cancelled.debounce(10); - let mut buffer = Vec::new(); - loop { - let (key, value) = match transactions_iter.item() { - Some(item) if !cancelled.check() => item, - Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())), - None => { - transactions_iter.status()?; - break; - } - }; - - if BocTag::from_bytes(value[0..4].try_into().unwrap()).is_none() { - // Skip already updated values. - transactions_iter.next(); - continue; - } - - let tx_cell = Boc::decode(value).map_err(|_e| { - MigrationError::Custom(anyhow::anyhow!("invalid transaction in db").into()) - })?; - - let tx_hash = tx_cell.repr_hash(); - - let tx = tx_cell.parse::().map_err(|_e| { - MigrationError::Custom(anyhow::anyhow!("invalid transaction in db").into()) - })?; - - let mut mask = TransactionMask::empty(); - if tx.in_msg.is_some() { - mask.set(TransactionMask::HAS_MSG_HASH, true); - } - - let boc_start = if mask.has_msg_hash() { 65 } else { 33 }; // 1 + 32 + (32) - - buffer.clear(); - buffer.reserve(boc_start + value.len()); - - buffer.push(mask.bits()); // Mask - buffer.extend_from_slice(tx_hash.as_slice()); // Tx hash - - if let Some(in_msg) = tx.in_msg { - buffer.extend_from_slice(in_msg.repr_hash().as_slice()); // InMsg hash - } - - buffer.extend_from_slice(value); // Tx data - - batch.put_cf(transactions_cf, key, &buffer); - - transactions_iter.next(); - total_processed += 1; - - if total_processed % BATCH_SIZE == 0 { - db.rocksdb() - .write_opt(std::mem::take(&mut batch), db.transactions.write_config())?; - } - } - - if !batch.is_empty() { - db.rocksdb() - .write_opt(batch, db.transactions.write_config())?; - } - - tracing::info!( - elapsed = %humantime::format_duration(started_at.elapsed()), - total_processed, - "finished migrating transactions" - ); - - Ok(()) - } -} - // === Mempool DB === pub type MempoolDb = WeeDb; @@ -442,7 +251,7 @@ impl VersionProvider for StateVersionProvider { pub type InternalQueueDB = WeeDb; -impl WithMigrations for crate::InternalQueueDB { +impl WithMigrations for InternalQueueDB { const NAME: &'static str = "int_queue"; const VERSION: Semver = [0, 0, 1]; @@ -465,3 +274,28 @@ weedb::tables! { pub shard_internal_messages: tables::ShardInternalMessages, } } + +// === Cells Db === + +pub type CellsDb = WeeDb; + +impl WithMigrations for CellsDb { + const NAME: &'static str = "cells"; + const VERSION: Semver = [0, 0, 1]; + + fn register_migrations( + _migrations: &mut Migrations, + _cancelled: CancellationFlag, + ) -> Result<(), MigrationError> { + // TODO: register migrations here + Ok(()) + } +} + +weedb::tables! { + pub struct CellsTables { + pub cells: tables::Cells, + pub temp_cells: tables::TempCells, + pub shard_states: tables::ShardStates, + } +} diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 4d8f57d0f..63e6b12ab 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -10,6 +10,7 @@ use super::refcount; // took from // https://github.com/tikv/tikv/blob/d60c7fb6f3657dc5f3c83b0e3fc6ac75636e1a48/src/config/mod.rs#L170 // todo: need to benchmark and update if it's not optimal +const DEFAULT_BLOB_FILE_SIZE: u64 = bytesize::GIB; const DEFAULT_MIN_BLOB_SIZE: u64 = bytesize::KIB * 32; /// Stores generic node parameters @@ -49,7 +50,12 @@ impl ColumnFamilyOptions for ArchiveBlockIds { opts.set_merge_operator_associative("archive_data_merge", archive_data_merge); // data is hardly compressible and dataset is small opts.set_compression_type(DBCompressionType::None); - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::None, + ); } } @@ -73,7 +79,12 @@ impl ColumnFamilyOptions for Archives { // data is already compressed opts.set_compression_type(DBCompressionType::None); - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::None, + ); opts.set_max_write_buffer_number(8); // 8 * 512MB = 4GB; opts.set_write_buffer_size(512 * 1024 * 1024); // 512 per memtable @@ -165,7 +176,12 @@ impl ColumnFamilyOptions for PackageEntries { opts.set_write_buffer_size(512 * 1024 * 1024); // 512 per memtable opts.set_min_write_buffer_number_to_merge(2); // allow early flush - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::Zstd, + ); // This flag specifies that the implementation should optimize the filters // mainly for cases where keys are found rather than also optimize for keys @@ -204,7 +220,12 @@ impl ColumnFamilyOptions for BlockDataEntries { // data is already compressed opts.set_compression_type(DBCompressionType::None); - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::None, + ); } } @@ -405,7 +426,12 @@ impl ColumnFamily for ShardInternalMessages { impl ColumnFamilyOptions for ShardInternalMessages { fn options(opts: &mut Options, caches: &mut Caches) { internal_queue_options(opts, caches); - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::None, + ); } } @@ -665,7 +691,12 @@ impl ColumnFamilyOptions for Transactions { fn options(opts: &mut Options, caches: &mut Caches) { zstd_block_based_table_factory(opts, caches); opts.set_compression_type(DBCompressionType::Zstd); - with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd); + with_blob_db( + opts, + DEFAULT_BLOB_FILE_SIZE, + DEFAULT_MIN_BLOB_SIZE, + DBCompressionType::Zstd, + ); } } @@ -807,10 +838,17 @@ fn zstd_block_based_table_factory(opts: &mut Options, caches: &Caches) { opts.set_compression_type(DBCompressionType::Zstd); } -fn with_blob_db(opts: &mut Options, min_value_size: u64, compression_type: DBCompressionType) { +fn with_blob_db( + opts: &mut Options, + file_size: u64, + min_value_size: u64, + compression_type: DBCompressionType, +) { opts.set_enable_blob_files(true); + opts.set_blob_file_size(file_size); + opts.set_min_blob_size(min_value_size); + opts.set_enable_blob_gc(true); - opts.set_min_blob_size(min_value_size); opts.set_blob_compression_type(compression_type); } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 12c16a77b..054be651b 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -30,6 +30,7 @@ const RPC_DB_SUBDIR: &str = "rpc"; const FILES_SUBDIR: &str = "files"; const MEMPOOL_SUBDIR: &str = "mempool"; const INT_QUEUE_SUBDIR: &str = "int_queue"; +const CELLS_DB_SUBDIR: &str = "cells"; pub struct StorageBuilder { config: StorageConfig, @@ -133,6 +134,12 @@ impl StorageBuilder { base_db.normalize_version()?; // TODO: Remove on testnet reset base_db.apply_migrations().await?; + let cells_db = + CellsDb::builder_prepared(self.config.root_dir.join(CELLS_DB_SUBDIR), caches.clone()) + .with_metrics_enabled(self.config.rocksdb_enable_metrics) + .with_options(|opts, _| update_options(opts, threads, fdlimit)) + .build()?; + let temp_file_storage = TempFileStorage::new(&file_db)?; let blocks_storage_config = BlockStorageConfig { @@ -150,14 +157,14 @@ impl StorageBuilder { self.config.archive_chunk_size, )); let shard_state_storage = ShardStateStorage::new( - base_db.clone(), + cells_db.clone(), block_handle_storage.clone(), block_storage.clone(), temp_file_storage.clone(), self.config.cells_cache_size, )?; let persistent_state_storage = PersistentStateStorage::new( - base_db.clone(), + cells_db.clone(), &file_db, block_handle_storage.clone(), block_storage.clone(), @@ -196,6 +203,7 @@ impl StorageBuilder { let inner = Arc::new(Inner { root, base_db, + cells_db, config: self.config, block_handle_storage, block_storage, @@ -270,6 +278,10 @@ impl Storage { &self.inner.base_db } + pub fn cells_db(&self) -> &CellsDb { + &self.inner.cells_db + } + pub fn mempool_db(&self) -> &MempoolDb { &self.inner.mempool_storage.db } @@ -326,6 +338,7 @@ impl Storage { struct Inner { root: FileDb, base_db: BaseDb, + cells_db: CellsDb, config: StorageConfig, block_handle_storage: Arc, diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 640818254..e3c6f7091 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -23,7 +23,7 @@ pub use self::queue_state::writer::QueueStateWriter; pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader}; pub use self::shard_state::writer::ShardStateWriter; use super::{KeyBlocksDirection, ShardStateStorage}; -use crate::db::{BaseDb, FileDb, MappedFile}; +use crate::db::{CellsDb, FileDb, MappedFile}; use crate::store::{BlockHandle, BlockHandleStorage, BlockStorage}; mod queue_state { @@ -83,7 +83,7 @@ pub struct PersistentStateStorage { impl PersistentStateStorage { pub fn new( - db: BaseDb, + db: CellsDb, files_dir: &FileDb, block_handle_storage: Arc, block_storage: Arc, @@ -634,7 +634,7 @@ impl PersistentStateStorage { } struct Inner { - db: BaseDb, + db: CellsDb, storage_dir: FileDb, block_handles: Arc, blocks: Arc, diff --git a/storage/src/store/persistent_state/shard_state/writer.rs b/storage/src/store/persistent_state/shard_state/writer.rs index 49018eb1b..649c3050f 100644 --- a/storage/src/store/persistent_state/shard_state/writer.rs +++ b/storage/src/store/persistent_state/shard_state/writer.rs @@ -11,10 +11,10 @@ use tycho_util::compression::ZstdCompressedFile; use tycho_util::sync::CancellationFlag; use tycho_util::FastHashMap; -use crate::db::{BaseDb, FileDb}; +use crate::db::{CellsDb, FileDb}; pub struct ShardStateWriter<'a> { - db: &'a BaseDb, + db: &'a CellsDb, states_dir: &'a FileDb, block_id: &'a BlockId, } @@ -35,7 +35,7 @@ impl<'a> ShardStateWriter<'a> { PathBuf::from(block_id.to_string()).with_extension(Self::FILE_EXTENSION_TEMP) } - pub fn new(db: &'a BaseDb, states_dir: &'a FileDb, block_id: &'a BlockId) -> Self { + pub fn new(db: &'a CellsDb, states_dir: &'a FileDb, block_id: &'a BlockId) -> Self { Self { db, states_dir, diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index 25a571779..151c59879 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -19,7 +19,7 @@ use weedb::{rocksdb, BoundedCfHandle}; use crate::db::*; pub struct CellStorage { - db: BaseDb, + db: CellsDb, cells_cache: Arc, raw_cells_cache: Arc, } @@ -27,7 +27,7 @@ pub struct CellStorage { type CellsIndex = FastDashMap>; impl CellStorage { - pub fn new(db: BaseDb, cache_size_bytes: ByteSize) -> Arc { + pub fn new(db: CellsDb, cache_size_bytes: ByteSize) -> Arc { let cells_cache = Default::default(); let raw_cells_cache = Arc::new(RawCellsCache::new(cache_size_bytes.as_u64())); @@ -89,7 +89,7 @@ impl CellStorage { struct Context<'a> { cells_cf: BoundedCfHandle<'a>, - db: &'a BaseDb, + db: &'a CellsDb, buffer: Vec, transaction: FastHashMap, new_cells_batch: rocksdb::WriteBatch, @@ -98,7 +98,7 @@ impl CellStorage { } impl<'a> Context<'a> { - fn new(db: &'a BaseDb, raw_cache: &'a RawCellsCache) -> Self { + fn new(db: &'a CellsDb, raw_cache: &'a RawCellsCache) -> Self { Self { cells_cf: db.cells.cf(), db, @@ -266,7 +266,7 @@ impl CellStorage { } struct Context<'a> { - db: &'a BaseDb, + db: &'a CellsDb, raw_cells_cache: &'a RawCellsCache, alloc: &'a Bump, transaction: FastHashMap<&'a HashBytes, AddedCell<'a>>, @@ -948,7 +948,7 @@ impl RawCellsCache { fn get_raw( &self, - db: &BaseDb, + db: &CellsDb, key: &HashBytes, ) -> Result, rocksdb::Error> { use quick_cache::sync::GuardResult; @@ -985,7 +985,7 @@ impl RawCellsCache { fn get_rc_for_insert( &self, - db: &BaseDb, + db: &CellsDb, key: &HashBytes, depth: usize, ) -> Result { @@ -1023,7 +1023,7 @@ impl RawCellsCache { fn get_rc_for_delete( &self, - db: &BaseDb, + db: &CellsDb, key: &HashBytes, refs_buffer: &mut Vec, ) -> Result { diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index 1b77690ec..cacc575d4 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -23,7 +23,7 @@ mod entries_buffer; mod store_state_raw; pub struct ShardStateStorage { - db: BaseDb, + cells_db: CellsDb, block_handle_storage: Arc, block_storage: Arc, @@ -38,16 +38,16 @@ pub struct ShardStateStorage { impl ShardStateStorage { pub fn new( - db: BaseDb, + cells_db: CellsDb, block_handle_storage: Arc, block_storage: Arc, temp_file_storage: TempFileStorage, cache_size_bytes: ByteSize, ) -> Result> { - let cell_storage = CellStorage::new(db.clone(), cache_size_bytes); + let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes); Ok(Arc::new(Self { - db, + cells_db, block_handle_storage, block_storage, temp_file_storage, @@ -107,8 +107,8 @@ impl ShardStateStorage { let _hist = HistogramGuard::begin("tycho_storage_state_store_time"); let block_id = *handle.id(); - let raw_db = self.db.rocksdb().clone(); - let cf = self.db.shard_states.get_unbounded_cf(); + let raw_db = self.cells_db.rocksdb().clone(); + let cf = self.cells_db.shard_states.get_unbounded_cf(); let cell_storage = self.cell_storage.clone(); let block_handle_storage = self.block_handle_storage.clone(); let handle = handle.clone(); @@ -168,7 +168,7 @@ impl ShardStateStorage { pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result { let ctx = StoreStateContext { - db: self.db.clone(), + db: self.cells_db.clone(), cell_storage: self.cell_storage.clone(), temp_file_storage: self.temp_file_storage.clone(), min_ref_mc_state: self.min_ref_mc_state.clone(), @@ -201,12 +201,12 @@ impl ShardStateStorage { ); let started_at = Instant::now(); - let raw = self.db.rocksdb(); + let raw = self.cells_db.rocksdb(); // Manually get required column factory and r/w options let snapshot = raw.snapshot(); - let shard_states_cf = self.db.shard_states.get_unbounded_cf(); - let mut states_read_options = self.db.shard_states.new_read_config(); + let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf(); + let mut states_read_options = self.cells_db.shard_states.new_read_config(); states_read_options.set_snapshot(&snapshot); let mut alloc = bumpalo::Bump::new(); @@ -244,7 +244,7 @@ impl ShardStateStorage { { let _guard = self.gc_lock.lock().await; - let db = self.db.clone(); + let db = self.cells_db.clone(); let cell_storage = self.cell_storage.clone(); let key = key.to_vec(); @@ -299,7 +299,7 @@ impl ShardStateStorage { } } - let snapshot = self.db.rocksdb().snapshot(); + let snapshot = self.cells_db.rocksdb().snapshot(); // 1. Find target block @@ -349,7 +349,7 @@ impl ShardStateStorage { } pub fn load_state_root(&self, block_id: &BlockId) -> Result { - let shard_states = &self.db.shard_states; + let shard_states = &self.cells_db.shard_states; let shard_state = shard_states.get(block_id.to_vec())?; match shard_state { Some(root) => Ok(HashBytes::from_slice(&root[..32])), @@ -362,7 +362,7 @@ impl ShardStateStorage { mc_seqno: u32, snapshot: &rocksdb::Snapshot<'_>, ) -> Result> { - let shard_states = &self.db.shard_states; + let shard_states = &self.cells_db.shard_states; let mut bound = BlockId { shard: ShardIdent::MASTERCHAIN, @@ -378,7 +378,7 @@ impl ShardStateStorage { readopts.set_iterate_upper_bound(bound.to_vec().as_slice()); let mut iter = self - .db + .cells_db .rocksdb() .raw_iterator_cf_opt(&shard_states.cf(), readopts); iter.seek_to_first(); diff --git a/storage/src/store/shard_state/store_state_raw.rs b/storage/src/store/shard_state/store_state_raw.rs index bb0c843ef..2e8c78f6e 100644 --- a/storage/src/store/shard_state/store_state_raw.rs +++ b/storage/src/store/shard_state/store_state_raw.rs @@ -21,7 +21,7 @@ use crate::util::StoredValue; pub const MAX_DEPTH: u16 = u16::MAX - 1; pub struct StoreStateContext { - pub db: BaseDb, + pub db: CellsDb, pub cell_storage: Arc, pub temp_file_storage: TempFileStorage, pub min_ref_mc_state: MinRefMcStateTracker, @@ -247,7 +247,7 @@ struct FinalizationContext<'a> { } impl<'a> FinalizationContext<'a> { - fn new(db: &'a BaseDb) -> Self { + fn new(db: &'a CellsDb) -> Self { Self { pruned_branches: Default::default(), cell_usages: FastHashMap::with_capacity_and_hasher(128, Default::default()), @@ -258,7 +258,7 @@ impl<'a> FinalizationContext<'a> { } } - fn clear_temp_cells(&self, db: &BaseDb) -> std::result::Result<(), rocksdb::Error> { + fn clear_temp_cells(&self, db: &CellsDb) -> std::result::Result<(), rocksdb::Error> { let from = &[0x00; 32]; let to = &[0xff; 32]; db.rocksdb().delete_range_cf(&self.temp_cells_cf, from, to) @@ -591,11 +591,11 @@ mod test { }) .build() .await?; - let base_db = storage.base_db(); + let cells_db = storage.cells_db(); let cell_storage = &storage.shard_state_storage().cell_storage; let store_ctx = StoreStateContext { - db: base_db.clone(), + db: cells_db.clone(), cell_storage: cell_storage.clone(), temp_file_storage: storage.temp_file_storage().clone(), min_ref_mc_state: MinRefMcStateTracker::new(), @@ -614,12 +614,12 @@ mod test { } tracing::info!("Finished processing all states"); tracing::info!("Starting gc"); - states_gc(cell_storage, base_db).await?; + states_gc(cell_storage, cells_db).await?; Ok(()) } - async fn states_gc(cell_storage: &Arc, db: &BaseDb) -> Result<()> { + async fn states_gc(cell_storage: &Arc, db: &CellsDb) -> Result<()> { let states_iterator = db.shard_states.iterator(IteratorMode::Start); let bump = bumpalo::Bump::new(); @@ -657,7 +657,7 @@ mod test { tycho_util::test::init_logger("rand_cells_storage", "debug"); let (storage, _tempdir) = Storage::new_temp().await?; - let base_db = storage.base_db(); + let cells_db = storage.cells_db(); let cell_storage = &storage.shard_state_storage().cell_storage; let mut rng = StdRng::seed_from_u64(1337); @@ -719,9 +719,9 @@ mod test { cell_keys.push(*cell_hash); - base_db + cells_db .rocksdb() - .write_opt(batch, base_db.cells.write_config())?; + .write_opt(batch, cells_db.cells.write_config())?; tracing::info!("Iteration {i} Finished. traversed: {traversed}",); } @@ -736,18 +736,18 @@ mod test { traverse_cell((cell as Arc).as_ref()); let (res, batch) = cell_storage.remove_cell(&bump, &key)?; - base_db + cells_db .rocksdb() - .write_opt(batch, base_db.cells.write_config())?; + .write_opt(batch, cells_db.cells.write_config())?; tracing::info!("Gc {id} of {total} done. Traversed: {res}",); bump.reset(); } // two compactions in row. First one run merge operators, second one will remove all tombstones - base_db.trigger_compaction().await; - base_db.trigger_compaction().await; + cells_db.trigger_compaction().await; + cells_db.trigger_compaction().await; - let cells_left = base_db.cells.iterator(IteratorMode::Start).count(); + let cells_left = cells_db.cells.iterator(IteratorMode::Start).count(); tracing::info!("States GC finished. Cells left: {cells_left}"); assert_eq!(cells_left, 0, "Gc is broken. Press F to pay respect"); Ok(())