Skip to content

Commit 4c5d792

Browse files
committed
wip: improve gc cells
1 parent 4e6e925 commit 4c5d792

File tree

2 files changed

+200
-3
lines changed

2 files changed

+200
-3
lines changed

storage/src/store/shard_state/cell_storage.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ impl CellStorage {
5050
StoreContext::new(&self.db, &self.raw_cells_cache, capacity)
5151
}
5252

53+
pub fn create_remove_ctx(&self) -> RemoveContext {
54+
RemoveContext::new(&self.db, &self.raw_cells_cache)
55+
}
56+
5357
pub fn apply_temp_cell(&self, root: &HashBytes) -> Result<()> {
5458
const MAX_NEW_CELLS_BATCH_SIZE: usize = 10000;
5559

@@ -554,6 +558,148 @@ impl StoreContext {
554558
}
555559
}
556560

561+
#[derive(Clone)]
562+
struct RemovedCell {
563+
old_rc: i64,
564+
removes: u32,
565+
refs: Vec<HashBytes>,
566+
}
567+
568+
impl<'a> RemovedCell {
569+
fn remove(&'a mut self) -> Result<Option<&'a [HashBytes]>, CellStorageError> {
570+
self.removes += 1;
571+
if self.removes as i64 <= self.old_rc {
572+
Ok(self.next_refs())
573+
} else {
574+
Err(CellStorageError::CounterMismatch)
575+
}
576+
}
577+
578+
fn next_refs(&'a self) -> Option<&'a [HashBytes]> {
579+
if self.old_rc > self.removes as i64 {
580+
None
581+
} else {
582+
Some(&self.refs)
583+
}
584+
}
585+
}
586+
587+
pub struct RemoveContext {
588+
db: CellsDb,
589+
raw_cache: Arc<RawCellsCache>,
590+
transaction: FastDashMap<HashBytes, RemovedCell>,
591+
}
592+
593+
impl RemoveContext {
594+
fn new(db: &CellsDb, raw_cache: &Arc<RawCellsCache>) -> Self {
595+
Self {
596+
db: db.clone(),
597+
raw_cache: raw_cache.clone(),
598+
transaction: FastDashMap::with_capacity_and_hasher_and_shard_amount(
599+
128,
600+
Default::default(),
601+
512,
602+
),
603+
}
604+
}
605+
606+
pub fn len(&self) -> usize {
607+
self.transaction.len()
608+
}
609+
610+
pub fn remove_cell(&self, hash: &HashBytes) -> Result<(), CellStorageError> {
611+
let mut stack = Vec::with_capacity(16);
612+
stack.push(std::slice::from_ref(hash).to_vec());
613+
614+
let mut buffer = Vec::with_capacity(4);
615+
616+
// While some cells left
617+
'outer: loop {
618+
let Some(iter) = stack.last_mut() else {
619+
break;
620+
};
621+
622+
for cell_id in iter.iter() {
623+
// Process the current cell.
624+
let refs = match self.transaction.entry(*cell_id) {
625+
Entry::Occupied(mut v) => v.get_mut().remove()?.map(|v| v.to_vec()),
626+
Entry::Vacant(v) => {
627+
let old_rc =
628+
self.raw_cache
629+
.get_rc_for_delete(&self.db, cell_id, &mut buffer)?;
630+
debug_assert!(old_rc > 0);
631+
632+
v.insert(RemovedCell {
633+
old_rc,
634+
removes: 1,
635+
refs: buffer.clone(),
636+
})
637+
.next_refs()
638+
.map(|v| v.to_vec())
639+
}
640+
};
641+
642+
if let Some(refs) = refs {
643+
// And proceed to its refs if any.
644+
stack.push(refs);
645+
continue 'outer;
646+
}
647+
}
648+
649+
// Drop the current cell when all of its children were processed.
650+
stack.pop();
651+
}
652+
653+
// Clear big chunks of data before finalization
654+
drop(stack);
655+
656+
Ok(())
657+
}
658+
659+
pub fn finalize(self, batch: &mut WriteBatch) -> usize {
660+
std::thread::scope(|s| {
661+
let number_shards = self.transaction._shard_count();
662+
// safety: we hold only read locks
663+
let shards = unsafe { (0..number_shards).map(|i| self.transaction._get_read_shard(i)) };
664+
let cache = &self.raw_cache;
665+
666+
// todo: clamp to number of cpus x2
667+
for shard in shards {
668+
// spawned threads will be joined at the end of the scope, so we don't need to store them
669+
s.spawn(move || {
670+
for (key, item) in shard {
671+
let value = item.get();
672+
673+
let new_rc = value.old_rc - value.removes as i64;
674+
cache.on_remove_cell(key, new_rc);
675+
}
676+
});
677+
}
678+
679+
let batch_update = s.spawn(|| {
680+
let cells_cf = &self.db.cells.cf();
681+
682+
let total = self.transaction.len();
683+
684+
for kv in self.transaction.iter() {
685+
let key = kv.key();
686+
let value = kv.value();
687+
688+
batch.merge_cf(
689+
cells_cf,
690+
key.as_slice(),
691+
refcount::encode_negative_refcount(value.removes),
692+
);
693+
}
694+
695+
total
696+
});
697+
698+
batch_update.join().expect("thread panicked")
699+
})
700+
}
701+
}
702+
557703
#[derive(thiserror::Error, Debug)]
558704
pub enum CellStorageError {
559705
#[error("Cell not found in cell db")]

storage/src/store/shard_state/mod.rs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tycho_block_util::state::*;
1414
use tycho_util::metrics::HistogramGuard;
1515
use tycho_util::FastHashMap;
1616
use weedb::rocksdb;
17+
use weedb::rocksdb::WriteBatch;
1718

1819
use self::cell_storage::*;
1920
use self::store_state_raw::StoreStateContext;
@@ -293,13 +294,22 @@ impl ShardStateStorage {
293294
let key = key.to_vec();
294295

295296
let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
297+
let start = std::time::Instant::now();
296298
let (stats, mut batch) = cell_storage.remove_cell(&alloc, &root_hash)?;
297299

300+
let finished = start.elapsed().as_millis();
301+
298302
batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
299303
db.raw()
300304
.rocksdb()
301305
.write_opt(batch, db.cells.write_config())?;
302306

307+
tracing::info!(
308+
ms = finished,
309+
full_ms = start.elapsed().as_millis(),
310+
"remove_cell_time"
311+
);
312+
303313
Ok::<_, anyhow::Error>((stats, alloc))
304314
})
305315
.await??;
@@ -359,6 +369,10 @@ impl ShardStateStorage {
359369
// Iterate all states and remove outdated
360370
let mut removed_states = 0usize;
361371
let mut removed_cells = 0usize;
372+
373+
let mut cur_block_id = BlockId::default();
374+
let mut data_roots = Vec::with_capacity(16);
375+
362376
loop {
363377
let _hist = HistogramGuard::begin("tycho_storage_state_data_gc_time");
364378
let (key, value) = match iter.item() {
@@ -380,24 +394,61 @@ impl ShardStateStorage {
380394
continue;
381395
}
382396

397+
if cur_block_id == block_id {
398+
data_roots.push(root_hash);
399+
400+
iter.next();
401+
continue;
402+
}
403+
404+
cur_block_id = block_id;
405+
data_roots.clear();
406+
383407
alloc.reset();
384408

385409
{
386410
let _guard = self.gc_lock.lock().await;
387411

388412
let db = self.cells_db.clone();
389413
let cell_storage = self.cell_storage.clone();
414+
390415
let key = key.to_vec();
416+
let data_roots = data_roots.clone();
391417

392418
let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
393-
let (stats, mut batch) = cell_storage.remove_cell(&alloc, &root_hash)?;
419+
let start = std::time::Instant::now();
420+
421+
let ctx = cell_storage.create_remove_ctx();
422+
rayon::scope(|s| {
423+
for data_root in data_roots.iter() {
424+
s.spawn(|_| {
425+
ctx.remove_cell(data_root).unwrap();
426+
});
427+
}
428+
});
429+
430+
let mut batch = WriteBatch::with_capacity_bytes(ctx.len() * (32 + 8 + 8));
431+
432+
let total = ctx.finalize(&mut batch);
433+
434+
let finished = start.elapsed().as_millis();
435+
436+
batch.delete_cf(
437+
&db.shard_state_data.get_unbounded_cf().bound(),
438+
key.as_slice(),
439+
);
394440

395-
batch.delete_cf(&db.shard_state_data.get_unbounded_cf().bound(), key);
396441
db.raw()
397442
.rocksdb()
398443
.write_opt(batch, db.cells.write_config())?;
399444

400-
Ok::<_, anyhow::Error>((stats, alloc))
445+
tracing::info!(
446+
ms = finished,
447+
full_ms = start.elapsed().as_millis(),
448+
"accounts remove_cell_time"
449+
);
450+
451+
Ok::<_, anyhow::Error>((total, alloc))
401452
})
402453
.await??;
403454

0 commit comments

Comments
 (0)