Skip to content

Separate db instance for cells #746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 34 additions & 200 deletions storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,13 @@ impl BaseDbExt for BaseDb {

impl WithMigrations for BaseDb {
const NAME: &'static str = "base";
const VERSION: Semver = [0, 0, 3];
const VERSION: Semver = [0, 1, 0];

fn register_migrations(
migrations: &mut Migrations<Self>,
cancelled: CancellationFlag,
_migrations: &mut Migrations<Self>,
_cancelled: CancellationFlag,
) -> Result<(), MigrationError> {
migrations.register([0, 0, 1], [0, 0, 2], move |db| {
base_migrations::v0_0_1_to_0_0_2(db, cancelled.clone())
})?;
migrations.register([0, 0, 2], [0, 0, 3], base_migrations::v_0_0_2_to_v_0_0_3)?;

// TODO: register migrations here
Ok(())
}
}
Expand All @@ -128,95 +124,7 @@ weedb::tables! {
pub full_block_ids: tables::FullBlockIds,
pub package_entries: tables::PackageEntries,
pub block_data_entries: tables::BlockDataEntries,
pub shard_states: tables::ShardStates,
pub cells: tables::Cells,
pub temp_cells: tables::TempCells,
pub block_connections: tables::BlockConnections,

// tables are empty, but they cannot be deleted because they are in a storage config
_shard_internal_messages: tables::ShardInternalMessagesOld,
_int_msg_stats_uncommited: tables::InternalMessageStatsUncommitedOld,
_shard_int_msgs_uncommited: tables::ShardInternalMessagesUncommitedOld,
_internal_message_stats: tables::InternalMessageStatsOld,
}
}

mod base_migrations {
use std::time::Instant;

use everscale_types::boc::Boc;
use tycho_block_util::archive::ArchiveEntryType;
use weedb::rocksdb::CompactOptions;

use super::*;
use crate::util::StoredValue;

pub fn v0_0_1_to_0_0_2(db: &BaseDb, cancelled: CancellationFlag) -> Result<(), MigrationError> {
let mut block_data_iter = db.package_entries.raw_iterator();
block_data_iter.seek_to_first();

tracing::info!("stated migrating package entries");

let started_at = Instant::now();
let mut total_processed = 0usize;
let mut block_ids_created = 0usize;

let full_block_ids_cf = &db.full_block_ids.cf();
let mut batch = weedb::rocksdb::WriteBatch::default();
let mut cancelled = cancelled.debounce(10);
loop {
let (key, value) = match block_data_iter.item() {
Some(item) if !cancelled.check() => item,
Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())),
None => {
block_data_iter.status()?;
break;
}
};

'item: {
let key = crate::PackageEntryKey::from_slice(key);
if key.ty != ArchiveEntryType::Block {
break 'item;
}

let file_hash = Boc::file_hash_blake(value);
batch.put_cf(full_block_ids_cf, key.block_id.to_vec(), file_hash);
block_ids_created += 1;
}

block_data_iter.next();
total_processed += 1;
}

db.rocksdb()
.write_opt(batch, db.full_block_ids.write_config())?;

tracing::info!(
elapsed = %humantime::format_duration(started_at.elapsed()),
total_processed,
block_ids_created,
"finished migrating package entries"
);
Ok(())
}

pub fn v_0_0_2_to_v_0_0_3(db: &BaseDb) -> Result<(), MigrationError> {
let mut opts = CompactOptions::default();
opts.set_exclusive_manual_compaction(true);
let null = Option::<&[u8]>::None;

let started_at = Instant::now();
tracing::info!("started cells compaction");
db.cells
.db()
.compact_range_cf_opt(&db.cells.cf(), null, null, &opts);
tracing::info!(
elapsed = %humantime::format_duration(started_at.elapsed()),
"finished cells compaction"
);

Ok(())
}
}

Expand All @@ -226,16 +134,13 @@ pub type RpcDb = WeeDb<RpcTables>;

impl WithMigrations for RpcDb {
const NAME: &'static str = "rpc";
const VERSION: Semver = [0, 0, 2];
const VERSION: Semver = [0, 1, 0];

fn register_migrations(
migrations: &mut Migrations<Self>,
cancelled: CancellationFlag,
_migrations: &mut Migrations<Self>,
_cancelled: CancellationFlag,
) -> Result<(), MigrationError> {
migrations.register([0, 0, 1], [0, 0, 2], move |db| {
rpc_migrations::v0_0_1_to_0_0_2(db, cancelled.clone())
})?;

// TODO: register migrations here
Ok(())
}
}
Expand All @@ -251,102 +156,6 @@ weedb::tables! {
}
}

mod rpc_migrations {
use std::time::Instant;

use everscale_types::boc::{Boc, BocTag};
use everscale_types::models::Transaction;
use tycho_util::sync::CancellationFlag;
use weedb::MigrationError;

use crate::{RpcDb, TransactionMask};

pub fn v0_0_1_to_0_0_2(db: &RpcDb, cancelled: CancellationFlag) -> Result<(), MigrationError> {
const BATCH_SIZE: usize = 100_000;

let mut transactions_iter = db.transactions.raw_iterator();
transactions_iter.seek_to_first();

tracing::info!("stated migrating transactions");

let started_at = Instant::now();
let mut total_processed = 0usize;

let transactions_cf = &db.transactions.cf();
let mut batch = weedb::rocksdb::WriteBatch::default();
let mut cancelled = cancelled.debounce(10);
let mut buffer = Vec::new();
loop {
let (key, value) = match transactions_iter.item() {
Some(item) if !cancelled.check() => item,
Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())),
None => {
transactions_iter.status()?;
break;
}
};

if BocTag::from_bytes(value[0..4].try_into().unwrap()).is_none() {
// Skip already updated values.
transactions_iter.next();
continue;
}

let tx_cell = Boc::decode(value).map_err(|_e| {
MigrationError::Custom(anyhow::anyhow!("invalid transaction in db").into())
})?;

let tx_hash = tx_cell.repr_hash();

let tx = tx_cell.parse::<Transaction>().map_err(|_e| {
MigrationError::Custom(anyhow::anyhow!("invalid transaction in db").into())
})?;

let mut mask = TransactionMask::empty();
if tx.in_msg.is_some() {
mask.set(TransactionMask::HAS_MSG_HASH, true);
}

let boc_start = if mask.has_msg_hash() { 65 } else { 33 }; // 1 + 32 + (32)

buffer.clear();
buffer.reserve(boc_start + value.len());

buffer.push(mask.bits()); // Mask
buffer.extend_from_slice(tx_hash.as_slice()); // Tx hash

if let Some(in_msg) = tx.in_msg {
buffer.extend_from_slice(in_msg.repr_hash().as_slice()); // InMsg hash
}

buffer.extend_from_slice(value); // Tx data

batch.put_cf(transactions_cf, key, &buffer);

transactions_iter.next();
total_processed += 1;

if total_processed % BATCH_SIZE == 0 {
db.rocksdb()
.write_opt(std::mem::take(&mut batch), db.transactions.write_config())?;
}
}

if !batch.is_empty() {
db.rocksdb()
.write_opt(batch, db.transactions.write_config())?;
}

tracing::info!(
elapsed = %humantime::format_duration(started_at.elapsed()),
total_processed,
"finished migrating transactions"
);

Ok(())
}
}

// === Mempool DB ===

pub type MempoolDb = WeeDb<MempoolTables>;
Expand Down Expand Up @@ -442,7 +251,7 @@ impl VersionProvider for StateVersionProvider {

pub type InternalQueueDB = WeeDb<InternalQueueTables>;

impl WithMigrations for crate::InternalQueueDB {
impl WithMigrations for InternalQueueDB {
const NAME: &'static str = "int_queue";
const VERSION: Semver = [0, 0, 1];

Expand All @@ -465,3 +274,28 @@ weedb::tables! {
pub shard_internal_messages: tables::ShardInternalMessages,
}
}

// === Cells Db ===

pub type CellsDb = WeeDb<CellsTables>;

impl WithMigrations for CellsDb {
const NAME: &'static str = "cells";
const VERSION: Semver = [0, 0, 1];

fn register_migrations(
_migrations: &mut Migrations<Self>,
_cancelled: CancellationFlag,
) -> Result<(), MigrationError> {
// TODO: register migrations here
Ok(())
}
}

weedb::tables! {
pub struct CellsTables<Caches> {
pub cells: tables::Cells,
pub temp_cells: tables::TempCells,
pub shard_states: tables::ShardStates,
}
}
54 changes: 46 additions & 8 deletions storage/src/db/kv_db/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::refcount;
// took from
// https://github.com/tikv/tikv/blob/d60c7fb6f3657dc5f3c83b0e3fc6ac75636e1a48/src/config/mod.rs#L170
// todo: need to benchmark and update if it's not optimal
const DEFAULT_BLOB_FILE_SIZE: u64 = bytesize::GIB;
const DEFAULT_MIN_BLOB_SIZE: u64 = bytesize::KIB * 32;

/// Stores generic node parameters
Expand Down Expand Up @@ -49,7 +50,12 @@ impl ColumnFamilyOptions<Caches> for ArchiveBlockIds {
opts.set_merge_operator_associative("archive_data_merge", archive_data_merge);
// data is hardly compressible and dataset is small
opts.set_compression_type(DBCompressionType::None);
with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::None,
);
}
}

Expand All @@ -73,7 +79,12 @@ impl ColumnFamilyOptions<Caches> for Archives {

// data is already compressed
opts.set_compression_type(DBCompressionType::None);
with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::None,
);

opts.set_max_write_buffer_number(8); // 8 * 512MB = 4GB;
opts.set_write_buffer_size(512 * 1024 * 1024); // 512 per memtable
Expand Down Expand Up @@ -165,7 +176,12 @@ impl ColumnFamilyOptions<Caches> for PackageEntries {
opts.set_write_buffer_size(512 * 1024 * 1024); // 512 per memtable
opts.set_min_write_buffer_number_to_merge(2); // allow early flush

with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::Zstd,
);

// This flag specifies that the implementation should optimize the filters
// mainly for cases where keys are found rather than also optimize for keys
Expand Down Expand Up @@ -204,7 +220,12 @@ impl ColumnFamilyOptions<Caches> for BlockDataEntries {

// data is already compressed
opts.set_compression_type(DBCompressionType::None);
with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::None,
);
}
}

Expand Down Expand Up @@ -405,7 +426,12 @@ impl ColumnFamily for ShardInternalMessages {
impl ColumnFamilyOptions<Caches> for ShardInternalMessages {
fn options(opts: &mut Options, caches: &mut Caches) {
internal_queue_options(opts, caches);
with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::None,
);
}
}

Expand Down Expand Up @@ -665,7 +691,12 @@ impl ColumnFamilyOptions<Caches> for Transactions {
fn options(opts: &mut Options, caches: &mut Caches) {
zstd_block_based_table_factory(opts, caches);
opts.set_compression_type(DBCompressionType::Zstd);
with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd);
with_blob_db(
opts,
DEFAULT_BLOB_FILE_SIZE,
DEFAULT_MIN_BLOB_SIZE,
DBCompressionType::Zstd,
);
}
}

Expand Down Expand Up @@ -807,10 +838,17 @@ fn zstd_block_based_table_factory(opts: &mut Options, caches: &Caches) {
opts.set_compression_type(DBCompressionType::Zstd);
}

fn with_blob_db(opts: &mut Options, min_value_size: u64, compression_type: DBCompressionType) {
fn with_blob_db(
opts: &mut Options,
file_size: u64,
min_value_size: u64,
compression_type: DBCompressionType,
) {
opts.set_enable_blob_files(true);
opts.set_blob_file_size(file_size);
opts.set_min_blob_size(min_value_size);

opts.set_enable_blob_gc(true);

opts.set_min_blob_size(min_value_size);
opts.set_blob_compression_type(compression_type);
}
Loading