Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions crates/partition-store/src/idempotency_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{PartitionStoreTransaction, StorageAccess};
use bytes::Bytes;
use bytestring::ByteString;
use futures::Stream;
use futures_util::future::Either;
use futures_util::stream;
use restate_storage_api::idempotency_table::{
IdempotencyMetadata, IdempotencyTable, ReadOnlyIdempotencyTable,
Expand Down Expand Up @@ -115,6 +116,10 @@ impl ReadOnlyIdempotencyTable for PartitionStore {
&mut self,
idempotency_id: &IdempotencyId,
) -> Result<Option<IdempotencyMetadata>> {
if self.is_idempotency_table_disabled() {
return Ok(None);
}

self.assert_partition_key(idempotency_id)?;
get_idempotency_metadata(self, idempotency_id)
}
Expand All @@ -123,7 +128,11 @@ impl ReadOnlyIdempotencyTable for PartitionStore {
&self,
range: RangeInclusive<PartitionKey>,
) -> Result<impl Stream<Item = Result<(IdempotencyId, IdempotencyMetadata)>> + Send> {
all_idempotency_metadata(self, range)
if self.is_idempotency_table_disabled() {
Ok(Either::Left(stream::empty()))
} else {
Ok(Either::Right(all_idempotency_metadata(self, range)?))
}
}
}

Expand All @@ -132,6 +141,10 @@ impl ReadOnlyIdempotencyTable for PartitionStoreTransaction<'_> {
&mut self,
idempotency_id: &IdempotencyId,
) -> Result<Option<IdempotencyMetadata>> {
if self.is_idempotency_table_disabled() {
return Ok(None);
}

self.assert_partition_key(idempotency_id)?;
get_idempotency_metadata(self, idempotency_id)
}
Expand All @@ -140,7 +153,11 @@ impl ReadOnlyIdempotencyTable for PartitionStoreTransaction<'_> {
&self,
range: RangeInclusive<PartitionKey>,
) -> Result<impl Stream<Item = Result<(IdempotencyId, IdempotencyMetadata)>> + Send> {
all_idempotency_metadata(self, range)
if self.is_idempotency_table_disabled() {
Ok(Either::Left(stream::empty()))
} else {
Ok(Either::Right(all_idempotency_metadata(self, range)?))
}
}
}

Expand All @@ -150,11 +167,19 @@ impl IdempotencyTable for PartitionStoreTransaction<'_> {
idempotency_id: &IdempotencyId,
metadata: &IdempotencyMetadata,
) -> Result<()> {
if self.is_idempotency_table_disabled() {
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that write operations should fail if the idempotency table is disabled. This establishes the invariant that it is truely disabled. Even better if we made the write path only available for cfg(test) then no production code will ever depend on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When IdempotencyTable is empty when initializing PartitionStore, IdempotencyTable will be disabled. So there is no way to enable IdempotencyTable when creating PartitionStore. Do you have any good suggestions? Or should I just keep the test case where IdempotencyTable is disabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move the non-readonly traits to cfg(test). That way it's only present for tests where they might be needed to populate the table for running the test case. Would this work?

}

self.assert_partition_key(idempotency_id)?;
put_idempotency_metadata(self, idempotency_id, metadata)
}

async fn delete_idempotency_metadata(&mut self, idempotency_id: &IdempotencyId) -> Result<()> {
if self.is_idempotency_table_disabled() {
return Ok(());
}

self.assert_partition_key(idempotency_id)?;
delete_idempotency_metadata(self, idempotency_id)
}
Expand Down
96 changes: 94 additions & 2 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use rocksdb::ReadOptions;
use rocksdb::{BoundColumnFamily, SliceTransform};
use static_assertions::const_assert_eq;
use tracing::info;
use tracing::trace;

use restate_core::ShutdownError;
Expand Down Expand Up @@ -203,6 +204,7 @@
key_range: RangeInclusive<PartitionKey>,
key_buffer: BytesMut,
value_buffer: BytesMut,
idempotency_table_disabled: bool,
}

impl std::fmt::Debug for PartitionStore {
Expand All @@ -225,6 +227,7 @@
key_range: self.key_range.clone(),
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
idempotency_table_disabled: self.idempotency_table_disabled,
}
}
}
Expand Down Expand Up @@ -288,9 +291,77 @@
key_range,
key_buffer: BytesMut::new(),
value_buffer: BytesMut::new(),
idempotency_table_disabled: false,
}
}

/// Creates a new PartitionStore and checks if the idempotency table is empty,
/// disabling it when empty to avoid unnecessary operations.
pub(crate) async fn new_with_idempotency_check(
rocksdb: Arc<RocksDb>,
data_cf_name: CfName,
partition_id: PartitionId,
key_range: RangeInclusive<PartitionKey>,
) -> Result<Self> {
let mut store = Self {
rocksdb,
partition_id,
data_cf_name,
key_range,
key_buffer: BytesMut::new(),
value_buffer: BytesMut::new(),
idempotency_table_disabled: false,
};

// Check if the idempotency table is empty
let is_empty = store.is_idempotency_table_empty()?;
if is_empty {
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a debug log statement as it's an internal optimization.

"Idempotency table for partition {} is empty, disabling all idempotency table code paths for better performance",
partition_id
);
store.idempotency_table_disabled = true;
}

Ok(store)
}

/// Checks if the idempotency table is empty for this partition
fn is_idempotency_table_empty(&self) -> Result<bool> {
let table = self.table_handle(TableKind::Idempotency)?;
let mut opts = ReadOptions::default();

// Get the bytes representation of the KeyKind::Idempotency
let key_kind_bytes = KeyKind::Idempotency.as_bytes();

// Configure the iterator options
opts.set_prefix_same_as_start(true);
opts.set_async_io(true);
opts.set_total_order_seek(false);
Comment on lines +338 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure that this would work. I think a problem is that the configured prefix extractor is the fixed_prefix with a prefix length of 10 bytes (key kind + partition id). We would have loved to use the NewCappedPrefixTransform, but it's not yet exposed via the Rust bindings. Only seeking to the key kind might give wrong results w/o set_total_order_seek(true). Worth testing it out.


let mut it = self
.rocksdb
.inner()
.as_raw_db()
.raw_iterator_cf_opt(&table, opts);

// Seek to the first key with the idempotency prefix
it.seek(key_kind_bytes);

// If the iterator is valid and the key has our prefix, the table is not empty
let is_empty = !it.valid()
|| !it

Check failure on line 353 in crates/partition-store/src/partition_store.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

this `map_or` can be simplified
.key()
.map_or(false, |k| TableKind::Idempotency.has_key_kind(k));

Ok(is_empty)
}

#[inline]
pub fn is_idempotency_table_disabled(&self) -> bool {
self.idempotency_table_disabled
}

#[inline]
pub fn partition_id(&self) -> PartitionId {
self.partition_id
Expand Down Expand Up @@ -415,14 +486,29 @@
)
});

let partition_id = self.partition_id;
let partition_key_range = &self.key_range;
let idempotency_disabled = self.is_idempotency_table_disabled();

let partition_store = PartitionStore {
rocksdb: self.rocksdb.clone(),
partition_id: self.partition_id,
data_cf_name: self.data_cf_name.clone(),
key_range: self.key_range.clone(),
key_buffer: BytesMut::new(),
value_buffer: BytesMut::new(),
idempotency_table_disabled: idempotency_disabled,
};
Comment on lines +493 to +501
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new copy of self, we can give self.idempotency_disabled to the PartitionStoreTransaction.


PartitionStoreTransaction {
write_batch_with_index: rocksdb::WriteBatchWithIndex::new(0, true),
data_cf_handle,
rocksdb,
key_buffer: &mut self.key_buffer,
value_buffer: &mut self.value_buffer,
partition_id: self.partition_id,
partition_key_range: &self.key_range,
partition_id,
partition_key_range,
partition_store,
}
}

Expand Down Expand Up @@ -600,6 +686,7 @@
data_cf_handle: Arc<BoundColumnFamily<'a>>,
key_buffer: &'a mut BytesMut,
value_buffer: &'a mut BytesMut,
partition_store: PartitionStore,
}

impl PartitionStoreTransaction<'_> {
Expand Down Expand Up @@ -668,6 +755,11 @@
pub(crate) fn assert_partition_key(&self, partition_key: &impl WithPartitionKey) -> Result<()> {
assert_partition_key_or_err(self.partition_key_range, partition_key)
}

#[inline]
pub fn is_idempotency_table_disabled(&self) -> bool {
self.partition_store.is_idempotency_table_disabled()
}
}

fn assert_partition_key_or_err(
Expand Down
50 changes: 42 additions & 8 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,28 @@ impl PartitionStoreManager {
}
}

let partition_store = PartitionStore::new(
let partition_store = match PartitionStore::new_with_idempotency_check(
self.rocksdb.clone(),
cf_name,
cf_name.clone(),
partition_id,
partition_key_range,
);
partition_key_range.clone(),
)
.await
{
Ok(store) => store,
Err(e) => {
warn!(
"Failed to check idempotency table: {e}, continuing with standard initialization"
);
PartitionStore::new(
self.rocksdb.clone(),
cf_name,
partition_id,
partition_key_range,
)
}
};
Comment on lines +131 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If new_with_idempotency_check fails, then we can escalate the error because it points towards something is wrong with RocksDB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned by PartitionStore::new_with_idempotency_check should be a StorageError because the corresponding table does not exist. However, this error should not occur because all places that call open_partition_store ensure that the table must exist. So would it be more appropriate to panic directly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, I don’t seem to see a way to convert StorageError to RocksError. Do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could introduce a PartitionStoreManagerError which has a variant RocksDb(#[from] RocksError) and IdempotencyTableCheck(StorageError).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your adivce.


guard.live.insert(partition_id, partition_store.clone());

Ok(partition_store)
Expand Down Expand Up @@ -190,12 +206,30 @@ impl PartitionStoreManager {
.await?;

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(

// Use the new method that checks if idempotency table is empty
let partition_store = match PartitionStore::new_with_idempotency_check(
self.rocksdb.clone(),
cf_name,
cf_name.clone(),
partition_id,
partition_key_range,
);
partition_key_range.clone(),
)
.await
{
Ok(store) => store,
Err(e) => {
warn!(
"Failed to check idempotency table: {e}, continuing with standard initialization"
);
PartitionStore::new(
self.rocksdb.clone(),
cf_name,
partition_id,
partition_key_range,
)
}
};
Comment on lines +211 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. We can escalate the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense


guard.live.insert(partition_id, partition_store.clone());

Ok(partition_store)
Expand Down
12 changes: 3 additions & 9 deletions crates/partition-store/src/tests/idempotency_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,21 @@ async fn test_idempotency_key() {
.get_idempotency_metadata(&IDEMPOTENCY_ID_1)
.await
.unwrap(),
Some(IdempotencyMetadata {
invocation_id: InvocationId::from_parts(10, FIXTURE_INVOCATION_1),
})
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have a test where we have a non-empty idempotency table and an empty one so that we test both code paths.

);
assert_eq!(
rocksdb
.get_idempotency_metadata(&IDEMPOTENCY_ID_2)
.await
.unwrap(),
Some(IdempotencyMetadata {
invocation_id: InvocationId::from_parts(10, FIXTURE_INVOCATION_2),
})
None
);
assert_eq!(
rocksdb
.get_idempotency_metadata(&IDEMPOTENCY_ID_3)
.await
.unwrap(),
Some(IdempotencyMetadata {
invocation_id: InvocationId::from_parts(10, FIXTURE_INVOCATION_3),
})
None
);
assert_eq!(
rocksdb
Expand Down
Loading