Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion diskann-garnet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "diskann-garnet"
version = "2.0.2"
version = "2.0.3"
edition = "2024"
authors.workspace = true
license.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion diskann-garnet/diskann-garnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package>
<metadata>
<id>diskann-garnet</id>
<version>2.0.2</version>
<version>2.0.3</version>
<readme>docs/README.md</readme>
<authors>Microsoft</authors>
<projectUrl>https://github.com/microsoft/DiskANN</projectUrl>
Expand Down
142 changes: 82 additions & 60 deletions diskann-garnet/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use crossbeam::queue::ArrayQueue;
use std::sync::{
RwLock, RwLockReadGuard,
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use thiserror::Error;

Expand Down Expand Up @@ -66,6 +66,12 @@ struct Barrier {
quantization_enabled: bool,
}

struct IdMinter {
next_id: u32,
max_block: u32,
buffer: Vec<u8>,
}

/// The free space map manages an ID pool in Garnet to track which IDs are being
/// used. This is necessary to reuse deleted vectors IDs, which is needed due to
/// the small ID space of u32.
Expand All @@ -88,16 +94,16 @@ pub(crate) struct FreeSpaceMap {
has_free_ids: AtomicBool,
/// A queue of previously deleted IDs to prevent excessive reads to the FSM
fast_free_list: ArrayQueue<u32>,
/// The maximum block ID stored in the FSM
max_block: RwLock<u32>,
/// The next ID that will be minted if reusing previously deleted IDs is unavailable
next_id: AtomicU32,
/// Controls minting new IDs and expanding the FSM blocks
id_minter: RwLock<IdMinter>,
/// The total number of IDs marked used in the FSM
total_used: AtomicUsize,
/// Controls when ID reuse for deleted IDs is enabled.
reuse_enabled: AtomicBool,
/// Quantization backfill related parameters that must be synchronized.
barrier: RwLock<Barrier>,
/// Refill lock, to prevent multiple fast free list refills happening concurrently.
refill_lock: Mutex<()>,
}

impl FreeSpaceMap {
Expand All @@ -109,24 +115,28 @@ impl FreeSpaceMap {
) -> Result<Self, FsmError> {
let has_free_ids = AtomicBool::new(false);
let fast_free_list = ArrayQueue::new(FAST_SIZE);
let max_block = RwLock::new(u32::MAX);
let next_id = AtomicU32::new(0);
let total_used = AtomicUsize::new(0);
let reuse_enabled = AtomicBool::new(reuse_enabled);
let barrier = RwLock::new(Barrier {
max_id_for_backfill: u32::MAX,
quantization_enabled,
});
let id_minter = RwLock::new(IdMinter {
next_id: 0,
max_block: u32::MAX,
buffer: vec![0u8; BLOCK_SIZE_BYTES],
});
let refill_lock = Mutex::new(());

let mut this = Self {
callbacks,
has_free_ids,
fast_free_list,
max_block,
next_id,
id_minter,
total_used,
reuse_enabled,
barrier,
refill_lock,
};

// Attempt to load state from Garnet.
Expand All @@ -139,7 +149,8 @@ impl FreeSpaceMap {
} else {
// Allocate first block.
let (block_id, _, _) = this.indexes_for_id(0);
this.expand_to(ctx, block_id)?;
let mut id_minter = this.id_minter.write().unwrap();
this.expand_to(&mut id_minter, ctx, block_id)?;
}

Ok(this)
Expand Down Expand Up @@ -186,11 +197,10 @@ impl FreeSpaceMap {
}
}

let mut max_block = self.max_block.write().unwrap();
*max_block = max_block_id - 1;
let mut id_minter = self.id_minter.write().unwrap();
id_minter.max_block = max_block_id - 1;

self.next_id
.store((last_used_id + 1) as u32, Ordering::Release);
id_minter.next_id = (last_used_id + 1) as u32;

self.total_used.store(total_used, Ordering::Release);

Expand All @@ -212,20 +222,14 @@ impl FreeSpaceMap {
self.mark_id(ctx, id, true)
}

/// Mark an ID according to value (true = used, false = free).
/// Mark an ID according to value (true = used, false = free), but don't check that its in range.
///
/// Side effects only happen if the value actually changed. The return value reflects whether
/// data changed or not.
fn mark_id(&self, ctx: &Context, id: u32, used: bool) -> Result<bool, FsmError> {
if id >= self.next_id.load(Ordering::Acquire) {
return Err(FsmError::IdOutOfRange(id));
}

///
/// This version does not acquire a guard on `id_minter` and is safe to call while that lock is held.
fn mark_id_unchecked(&self, ctx: &Context, id: u32, used: bool) -> Result<bool, FsmError> {
let (block_id, byte_idx, bit_idx) = self.indexes_for_id(id);
let max_block = *self.max_block.read().unwrap();
if block_id > max_block || max_block == u32::MAX {
return Err(FsmError::Garnet(GarnetError::Read));
}

let block_key = Self::block_key(block_id);
let mut changed = false;

Expand Down Expand Up @@ -256,14 +260,29 @@ impl FreeSpaceMap {
Ok(changed)
}

/// Mark an ID according to value (true = used, false = free).
/// Side effects only happen if the value actually changed. The return value reflects whether
/// data changed or not.
fn mark_id(&self, ctx: &Context, id: u32, used: bool) -> Result<bool, FsmError> {
{
let id_minter = self.id_minter.read().unwrap();
if id >= id_minter.next_id {
return Err(FsmError::IdOutOfRange(id));
}
}

self.mark_id_unchecked(ctx, id, used)
}

/// Return whether a given ID is free.
pub(crate) fn is_free(&self, ctx: &Context, id: u32) -> Result<bool, FsmError> {
if id >= self.next_id.load(Ordering::Acquire) {
let id_minter = self.id_minter.read().unwrap();
if id >= id_minter.next_id {
return Err(FsmError::IdOutOfRange(id));
}

Comment on lines +279 to 283
let (block_id, byte_idx, bit_idx) = self.indexes_for_id(id);
let max_block = *self.max_block.read().unwrap();
let max_block = id_minter.max_block;
if block_id > max_block || max_block == u32::MAX {
return Err(FsmError::Garnet(GarnetError::Read));
}
Expand Down Expand Up @@ -327,17 +346,12 @@ impl FreeSpaceMap {
}
}

let id = self.next_id.fetch_add(1, Ordering::AcqRel);
{
// Make sure we have enough FSM blocks for this id.
let (block_id, _, _) = self.indexes_for_id(id);
let max_block = { *self.max_block.read().unwrap() };
if block_id > max_block {
self.expand_to(ctx, id)?;
}
}

self.mark_used(ctx, id)?;
// Mint a new ID and mark it used.
let mut id_minter = self.id_minter.write().unwrap();
let id = id_minter.next_id;
id_minter.next_id += 1;
self.expand_to(&mut id_minter, ctx, id)?;
self.mark_id_unchecked(ctx, id, true)?;

Ok(ReuseGuard::new(id, barrier))
}
Expand All @@ -346,7 +360,8 @@ impl FreeSpaceMap {
///
/// This ID may be free if that ID has been deleted since the ID was created.
pub(crate) fn max_id(&self) -> u32 {
self.next_id.load(Ordering::Acquire).saturating_sub(1)
let id_minter = self.id_minter.read().unwrap();
id_minter.next_id.saturating_sub(1)
}

/// Return the number of IDs currently marked used in the FSM.
Expand Down Expand Up @@ -374,20 +389,24 @@ impl FreeSpaceMap {

/// Scan the FSM blocks to fill up fast_free_list.
fn refill_fast_free_list(&self, ctx: &Context) -> Result<bool, FsmError> {
// NOTE: We take a write lock to prevent multiple refills happening simultaneously.
#[allow(clippy::readonly_write_lock)]
let max_block = self.max_block.write().unwrap();
// NOTE: We take a lock to prevent multiple refills happening simultaneously.
let _guard = self.refill_lock.lock().unwrap();

// If we had to wait to acquire the lock, it's possible some else refilled the list first, so check it again.
if !self.fast_free_list.is_empty() {
return Ok(true);
}

let (max_block, next_id) = {
let id_minter = self.id_minter.read().unwrap();
(id_minter.max_block, id_minter.next_id)
};

let mut has_free_ids = false;
let mut id = 0u32;
let mut block = vec![0u8; BLOCK_SIZE_BYTES];
'scan: for block_id in 0..=*max_block {
if id >= self.next_id.load(Ordering::Acquire) {
'scan: for block_id in 0..=max_block {
if id >= next_id {
// Don't look at IDs outside the current range.
break;
}
Expand All @@ -401,7 +420,7 @@ impl FreeSpaceMap {
}

for &byte in &block {
if id >= self.next_id.load(Ordering::Acquire) {
if id >= next_id {
// Don't look at IDs outside the current range.
break 'scan;
}
Expand All @@ -412,7 +431,7 @@ impl FreeSpaceMap {
}

for bidx in 0..8 {
if id >= self.next_id.load(Ordering::Acquire) {
if id >= next_id {
// Don't look at IDs outside the current range.
break 'scan;
}
Expand All @@ -437,26 +456,29 @@ impl FreeSpaceMap {
}

/// Ensure enough blocks exist in the FSM to hold `id`.
fn expand_to(&self, ctx: &Context, id: u32) -> Result<(), FsmError> {
fn expand_to(
&self,
id_minter: &mut RwLockWriteGuard<IdMinter>,
ctx: &Context,
id: u32,
) -> Result<(), FsmError> {
let (block_id, _, _) = self.indexes_for_id(id);
let mut max_block = self.max_block.write().unwrap();
if *max_block == u32::MAX || block_id == *max_block + 1 {
if id_minter.max_block == u32::MAX || block_id == id_minter.max_block + 1 {
let block_key = Self::block_key(block_id);
let block_bytes = vec![0u8; BLOCK_SIZE_BYTES];

if !self
.callbacks
.write_wid(&ctx.term(Term::Metadata), block_key, &block_bytes)
.write_wid(&ctx.term(Term::Metadata), block_key, &id_minter.buffer)
{
return Err(FsmError::Garnet(GarnetError::Write));
}

if *max_block == u32::MAX {
*max_block = 0;
if id_minter.max_block == u32::MAX {
id_minter.max_block = 0;
} else {
*max_block += 1;
id_minter.max_block += 1;
}
} else if block_id > *max_block + 1 {
} else if block_id > id_minter.max_block + 1 {
return Err(FsmError::IdOutOfRange(id));
}

Expand All @@ -468,7 +490,7 @@ impl FreeSpaceMap {
where
F: FnMut(u32) -> bool,
{
let max_block = { *self.max_block.read().unwrap() };
let max_block = { self.id_minter.read().unwrap().max_block };
let mut block = vec![0u8; BLOCK_SIZE_BYTES];
let mut id = 0u32;

Expand Down Expand Up @@ -509,7 +531,7 @@ impl FreeSpaceMap {
// 2. All vector data for any pending unquantized inserts has been written. (because
// `insert()` updates data under the same read barrier via `ReuseGuard`)
let mut guard = self.barrier.write().unwrap();
guard.max_id_for_backfill = self.next_id.load(Ordering::Acquire).saturating_sub(1);
guard.max_id_for_backfill = self.max_id();
guard.quantization_enabled = true;
}

Expand Down Expand Up @@ -672,8 +694,8 @@ mod tests {

// Loading FSM from store should recover all the state.
let fsm = FreeSpaceMap::new(&ctx, store.callbacks(), false, true).unwrap();
assert_eq!(*fsm.max_block.read().unwrap(), 0);
assert_eq!(fsm.next_id.load(Ordering::Acquire), 64);
assert_eq!(fsm.id_minter.read().unwrap().max_block, 0);
assert_eq!(fsm.max_id() + 1, 64);
assert!(fsm.has_free_ids.load(Ordering::Acquire));
assert_eq!(fsm.fast_free_list.len(), 1);
assert_eq!(fsm.next_id(&ctx).unwrap().id(), 37);
Expand Down
Loading
Loading