Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/tiered/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ pub(crate) fn range_get(
) -> io::Result<Option<Vec<u8>>> {
let result = block_on(runtime, backend.range_get(key, start, len))
.map_err(|e| io::Error::other(format!("storage range_get {key}: {e}")))?;
validate_range_get_result(key, start, len, result)
}

fn validate_range_get_result(
key: &str,
start: u64,
len: u32,
result: Option<Vec<u8>>,
) -> io::Result<Option<Vec<u8>>> {
if let Some(bytes) = &result {
if bytes.len() != len as usize {
return Err(io::Error::new(
Expand Down Expand Up @@ -216,10 +225,12 @@ mod tests {
use async_trait::async_trait;
use std::sync::Arc;

struct RangeIgnoringBackend;
struct WrongLengthRangeBackend {
bytes: Vec<u8>,
}

#[async_trait]
impl StorageBackend for RangeIgnoringBackend {
impl StorageBackend for WrongLengthRangeBackend {
async fn get(&self, _key: &str) -> Result<Option<Vec<u8>>> {
Ok(Some(vec![1, 2, 3, 4, 5, 6]))
}
Expand Down Expand Up @@ -256,16 +267,18 @@ mod tests {
}

async fn range_get(&self, _key: &str, _start: u64, _len: u32) -> Result<Option<Vec<u8>>> {
Ok(Some(vec![1, 2, 3, 4, 5, 6]))
Ok(Some(self.bytes.clone()))
}
}

#[test]
fn range_get_rejects_overlong_backend_response() {
fn range_get_rejects_wrong_length_backend_response() {
let rt = tokio::runtime::Runtime::new().unwrap();
let backend: Arc<dyn StorageBackend> = Arc::new(RangeIgnoringBackend);
let err = range_get(backend.as_ref(), rt.handle(), "g0", 1, 2).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(err.to_string().contains("exact range"));
for bytes in [vec![1], vec![1, 2, 3, 4, 5, 6]] {
let backend: Arc<dyn StorageBackend> = Arc::new(WrongLengthRangeBackend { bytes });
let err = range_get(backend.as_ref(), rt.handle(), "g0", 1, 2).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(err.to_string().contains("exact range"));
}
}
}
231 changes: 231 additions & 0 deletions src/tiered/test_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2210,3 +2210,234 @@ fn seekable_miss_warms_current_group_via_optional_prefetch() {
assert_eq!(backend.range_gets.load(Ordering::Relaxed), 1);
assert_eq!(backend.full_gets.load(Ordering::Relaxed), 1);
}

struct TimedOutTakeoverBackend {
stale_full_blob: Vec<u8>,
fresh_range_blob: Vec<u8>,
release_full_get: AtomicBool,
full_gets: AtomicU64,
range_gets: AtomicU64,
}

#[async_trait]
impl StorageBackend for TimedOutTakeoverBackend {
async fn get(&self, _key: &str) -> Result<Option<Vec<u8>>> {
self.full_gets.fetch_add(1, Ordering::AcqRel);
while !self.release_full_get.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(1));
}
Ok(Some(self.stale_full_blob.clone()))
}

async fn put(&self, _key: &str, _data: &[u8]) -> Result<()> {
Ok(())
}

async fn delete(&self, _key: &str) -> Result<()> {
Ok(())
}

async fn list(&self, _prefix: &str, _after: Option<&str>) -> Result<Vec<String>> {
Ok(Vec::new())
}

async fn put_if_absent(&self, _key: &str, _data: &[u8]) -> Result<hadb_storage::CasResult> {
Ok(hadb_storage::CasResult {
success: true,
etag: None,
})
}

async fn put_if_match(
&self,
_key: &str,
_data: &[u8],
_etag: &str,
) -> Result<hadb_storage::CasResult> {
Ok(hadb_storage::CasResult {
success: true,
etag: None,
})
}

async fn range_get(&self, _key: &str, start: u64, len: u32) -> Result<Option<Vec<u8>>> {
self.range_gets.fetch_add(1, Ordering::AcqRel);
let start = start as usize;
let end = start + len as usize;
Ok(Some(self.fresh_range_blob[start..end].to_vec()))
}
}

#[test]
fn seekable_timeout_takeover_keeps_foreground_frame_when_stale_prefetch_finishes() {
let dir = TempDir::new().unwrap();
let page_size = 64u32;
let pages_per_group = 4u32;
let fresh_pages: Vec<Option<Vec<u8>>> = (0..pages_per_group)
.map(|i| Some(vec![i as u8 + 51; page_size as usize]))
.collect();
let stale_pages: Vec<Option<Vec<u8>>> = (0..pages_per_group)
.map(|i| Some(vec![i as u8 + 101; page_size as usize]))
.collect();
let (fresh_blob, frame_table) = encode_page_group_seekable(
&fresh_pages,
page_size,
2,
17,
#[cfg(feature = "zstd")]
None,
&[],
None,
)
.unwrap();
let (stale_blob, stale_frame_table) = encode_page_group_seekable(
&stale_pages,
page_size,
2,
17,
#[cfg(feature = "zstd")]
None,
&[],
None,
)
.unwrap();
assert_eq!(
frame_table
.iter()
.map(|entry| (entry.offset, entry.len))
.collect::<Vec<_>>(),
stale_frame_table
.iter()
.map(|entry| (entry.offset, entry.len))
.collect::<Vec<_>>(),
"fresh and stale fixtures must share frame ranges for this race test"
);

let backend = Arc::new(TimedOutTakeoverBackend {
stale_full_blob: stale_blob,
fresh_range_blob: fresh_blob,
release_full_get: AtomicBool::new(false),
full_gets: AtomicU64::new(0),
range_gets: AtomicU64::new(0),
});
let cache = Arc::new(
DiskCache::new(
dir.path(),
3600,
pages_per_group,
2,
page_size,
8,
None,
Vec::new(),
)
.unwrap(),
);
let manifest = Manifest {
page_count: 8,
page_size,
pages_per_group,
sub_pages_per_frame: 2,
page_group_keys: vec![String::new(), "g1".to_string()],
group_pages: vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7]],
frame_tables: vec![Vec::new(), frame_table.clone()],
..Manifest::empty()
};
let manifest_ref = Arc::new(ArcSwap::from_pointee(manifest));
let replay_gate = Arc::new(parking_lot::RwLock::new(()));
let replay_epoch = Arc::new(AtomicU64::new(0));
let rt = tokio::runtime::Runtime::new().unwrap();
let pool = Arc::new(PrefetchPool::new(
1,
backend.clone(),
rt.handle().clone(),
Arc::clone(&cache),
pages_per_group,
Arc::new(AtomicU64::new(8)),
#[cfg(feature = "zstd")]
None,
None,
Arc::clone(&manifest_ref),
Arc::clone(&replay_gate),
Arc::clone(&replay_epoch),
8,
RemoteIoBudget::new(2, 0),
));
let mut handle = TurboliteHandle::new_tiered(
Some(backend.clone()),
Some(rt.handle().clone()),
Arc::clone(&cache),
manifest_ref,
Arc::new(Mutex::new(HashSet::new())),
Arc::new(Mutex::new(Vec::new())),
Arc::new(AtomicU64::new(0)),
dir.path().join("db.lock"),
pages_per_group,
0,
true,
vec![0.3, 0.3, 0.4],
vec![0.0, 0.0, 0.0],
Some(pool.clone()),
false,
#[cfg(feature = "zstd")]
None,
None,
false,
false,
4,
32 * 1024 * 1024,
None,
false,
0,
0,
false,
false,
Arc::clone(&replay_gate),
Arc::clone(&replay_epoch),
)
.unwrap();

assert_eq!(
pool.submit_optional(
Some("posts".to_string()),
1,
"g1".to_string(),
frame_table,
page_size,
2,
vec![4, 5, 6, 7],
HashMap::new(),
0,
cache.as_ref(),
),
PrefetchSubmitOutcome::Accepted
);
while backend.full_gets.load(Ordering::Acquire) == 0 || !pool.is_optional_fetching(1) {
std::thread::sleep(Duration::from_millis(1));
}

replay_epoch.fetch_add(1, Ordering::AcqRel);

let mut buf = vec![0u8; page_size as usize];
handle
.read_exact_at(&mut buf, pages_per_group as u64 * page_size as u64)
.unwrap();
assert_eq!(buf, vec![51u8; page_size as usize]);
assert_eq!(backend.range_gets.load(Ordering::Relaxed), 1);

backend.release_full_get.store(true, Ordering::Release);
pool.wait_idle();

let mut cached = vec![0u8; page_size as usize];
cache
.read_page(pages_per_group as u64, &mut cached)
.unwrap();
assert_eq!(
cached,
vec![51u8; page_size as usize],
"late optional full-group prefetch must not overwrite the foreground frame"
);
assert_eq!(cache.group_state(1), GroupState::None);
assert_eq!(pool.stats_json()["cancelled"], 1);
}

Loading