Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,30 +795,11 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
}

impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
/// Load shard and remove it. If list is empty, last shard was full or
/// there are no shards at all.
fn take_shard<T>(
&self,
cursor: &mut <TX as DbTxMut>::CursorMut<T>,
key: T::Key,
) -> ProviderResult<Vec<u64>>
where
T: Table<Value = BlockNumberList>,
{
if let Some((_, list)) = cursor.seek_exact(key)? {
// delete old shard so new one can be inserted.
cursor.delete_current()?;
let list = list.iter().collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}

/// Insert history index to the database.
///
/// For each updated partial key, this function removes the last shard from
/// the database (if any), appends the new indices to it, chunks the resulting integer list and
/// inserts the new shards back into the database.
/// For each updated partial key, this function retrieves the last shard from the database
/// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
/// the shards back into the database.
///
/// This function is used by history indexing stages.
fn append_history_index<P, T>(
Expand All @@ -830,26 +811,46 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
P: Copy,
T: Table<Value = BlockNumberList>,
{
// This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
// will append duplicate entries instead of updating existing ones, causing data corruption.
assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");

let mut cursor = self.tx.cursor_write::<T>()?;

for (partial_key, indices) in index_updates {
let mut last_shard =
self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
last_shard.extend(indices);
// Chunk indices and insert them in shards of N size.
let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
let last_key = sharded_key_factory(partial_key, u64::MAX);
let mut last_shard = cursor
.seek_exact(last_key.clone())?
.map(|(_, list)| list)
.unwrap_or_else(BlockNumberList::empty);

last_shard.append(indices).map_err(ProviderError::other)?;

// fast path: all indices fit in one shard
if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
cursor.upsert(last_key, &last_shard)?;
continue;
}

// slow path: rechunk into multiple shards
let all_indices: Vec<u64> = last_shard.iter().collect();
let mut chunks = all_indices.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();

while let Some(list) = chunks.next() {
let highest_block_number = if chunks.peek().is_some() {
*list.last().expect("`chunks` does not return empty list")
} else {
// Insert last list with `u64::MAX`.
u64::MAX
};
cursor.insert(

cursor.upsert(
sharded_key_factory(partial_key, highest_block_number),
&BlockNumberList::new_pre_sorted(list.iter().copied()),
)?;
}
}

Ok(())
}
}
Expand Down
Loading