Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,24 +394,35 @@ pub async fn single_loglet_readstream_with_trims(
}

// When reading record 8, it's acceptable to observe the record, or the trim gap. Both are
// acceptable because replicated loglet read stream's read-head cannot be completely disabled.
// acceptable because replicated loglet read stream's read-ahead cannot be completely disabled.
// Its minimum is to immediately read the next record after consuming the last one, so we'll
// see record8 because it's already cached.
//
// read stream should send a gap from 8->10
let record = read_stream.next().await.unwrap()?;
assert_that!(record.sequence_number(), eq(Lsn::new(8)));
if record.is_trim_gap() {
assert!(record.is_trim_gap());
assert_that!(record.trim_gap_to_sequence_number(), eq(Some(Lsn::new(10))));
} else {
// data record.
assert_that!(record.decode_unchecked::<String>(), eq("record8"));
// next record should be the trim gap
// For record 9 and 10, we can either observe the records or a trim gap of arbitrary length since
// we don't guarantee that all log servers have seen all records and the same trim points. Hence,
// it could happen that we are reading from N1, whose local trim point is at 9 because it did not
// replicate record10, which delivers a TrimGap(9) record. When continuing reading from N2, we
// might read record10 because it did not receive the trim command (yet).
let mut lsn = Lsn::new(8);

while lsn <= Lsn::new(10) {
let record = read_stream.next().await.unwrap()?;
assert_that!(record.sequence_number(), eq(Lsn::new(9)));
assert!(record.is_trim_gap());
assert_that!(record.trim_gap_to_sequence_number(), eq(Some(Lsn::new(10))));
assert_that!(record.sequence_number(), eq(lsn));

if record.is_trim_gap() {
// update the expected read pointer
lsn = record
.trim_gap_to_sequence_number()
.expect("A trim gap should have a known end")
.next();
} else {
assert_that!(
record.decode_unchecked::<String>(),
eq(format!("record{}", lsn))
);
// update the expected read pointer
lsn = lsn.next();
}
}

for i in 11..=20 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ impl From<PartitionProcessorRpcError> for RpcErrorKind {
match value {
PartitionProcessorRpcError::NotLeader(_) => RpcErrorKind::NotLeader,
PartitionProcessorRpcError::LostLeadership(_) => RpcErrorKind::LostLeadership,
PartitionProcessorRpcError::Busy => RpcErrorKind::Busy,
PartitionProcessorRpcError::Internal(msg) => RpcErrorKind::Internal(msg),
PartitionProcessorRpcError::Starting => RpcErrorKind::Starting,
PartitionProcessorRpcError::Stopping => RpcErrorKind::Stopping,
Expand Down
8 changes: 3 additions & 5 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,9 @@ pub enum PartitionProcessorRpcError {
NotLeader(PartitionId),
#[error("not leader anymore for partition '{0}'")]
LostLeadership(PartitionId),
// todo: remove in 1.5
#[error("rejecting rpc because too busy")]
// #[deprecated(since = "1.4.0", note = "retained for backwards compatibility with <= 1.3.2 nodes, remove in 1.5")]
Busy,
// Removed in 1.6.0. Kept here to prevent reintroduction at a later point.
//#[error("rejecting rpc because too busy")]
//Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Expand All @@ -155,7 +154,6 @@ impl PartitionProcessorRpcError {
PartitionProcessorRpcError::NotLeader(_) => true,
PartitionProcessorRpcError::LostLeadership(_) => true,
PartitionProcessorRpcError::Stopping => true,
PartitionProcessorRpcError::Busy => false,
PartitionProcessorRpcError::Internal(_) => false,
PartitionProcessorRpcError::Starting => false,
}
Expand Down
23 changes: 17 additions & 6 deletions server/tests/common/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
// by the Apache License, Version 2.0.

#![allow(dead_code)]
use std::num::NonZeroU32;
use std::{sync::Arc, time::Duration};

use enumset::{EnumSet, enum_set};
use googletest::IntoTestResult;
use googletest::internal::test_outcome::TestAssertionFailure;
use std::num::NonZeroU32;
use std::{sync::Arc, time::Duration};
use tracing::info;

use restate_bifrost::{Bifrost, loglet::Loglet};
use restate_core::TaskCenter;
Expand All @@ -22,7 +24,7 @@ use restate_local_cluster_runner::{
cluster::{Cluster, MaybeTempDir, StartedCluster},
node::{BinarySource, Node},
};
use restate_metadata_store::MetadataStoreClient;
use restate_metadata_store::{MetadataStoreClient, retry_on_retryable_error};
use restate_rocksdb::RocksDbManager;
use restate_tracing_instrumentation::prometheus_metrics::Prometheus;
use restate_types::logs::LogletId;
Expand All @@ -31,6 +33,7 @@ use restate_types::logs::metadata::{Chain, LogletParams, SegmentIndex};
use restate_types::metadata::Precondition;
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::net::metadata::MetadataKind;
use restate_types::retries::RetryPolicy;
use restate_types::{
GenerationalNodeId, PlainNodeId, Version, Versioned,
config::Configuration,
Expand Down Expand Up @@ -136,6 +139,8 @@ where

cluster.wait_healthy(Duration::from_secs(30)).await?;

info!("Test cluster is healthy");

let loglet_params = ReplicatedLogletParams {
loglet_id: LogletId::new(LogId::MIN, SegmentIndex::OLDEST),
sequencer,
Expand All @@ -158,9 +163,13 @@ where
.await
.map_err(|err| TestAssertionFailure::create(err.to_string()))?;
let logs = logs_builder.build();
metadata_store_client
.put(BIFROST_CONFIG_KEY.clone(), &logs, Precondition::None)
.await?;
retry_on_retryable_error(
RetryPolicy::fixed_delay(Duration::from_millis(500), Some(20)),
|| metadata_store_client.put(BIFROST_CONFIG_KEY.clone(), &logs, Precondition::None),
)
.await?;

info!("Written initial logs configuration: {logs:?}");

// join a new node to the cluster solely to act as a bifrost client
// it will have node id log_server_count+2
Expand All @@ -172,6 +181,8 @@ where
)
.await?;

info!("Starting test");

// global metadata should now be set, running in scope sets it in the task center context
future(TestEnv {
bifrost,
Expand Down
4 changes: 3 additions & 1 deletion server/tests/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod tests {
loglet::{AppendError, FindTailOptions},
};
use restate_core::{Metadata, TaskCenterFutureExt};
use restate_types::config::LogFormat;
use restate_types::live::{LiveLoad, LiveLoadExt};
use restate_types::{
GenerationalNodeId, Version,
Expand Down Expand Up @@ -153,13 +154,14 @@ mod tests {
#[test(restate_core::test)]
async fn three_logserver_readstream_with_trims() -> googletest::Result<()> {
// For this test to work, we need to disable the record cache to ensure we
// observer the moving trimpoint.
// observe the moving trimpoint.
let mut config = Configuration::default();
// disable read-ahead to avoid reading records from log-servers before the trim taking
// place.
config.bifrost.replicated_loglet.readahead_records = NonZeroU16::new(1).unwrap();
config.bifrost.replicated_loglet.readahead_trigger_ratio = 1.0;
config.bifrost.record_cache_memory_size = 0_u64.into();
config.common.log_format = LogFormat::Compact;
run_in_test_env(
config,
GenerationalNodeId::new(5, 1), // local sequencer
Expand Down
Loading