Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 99 additions & 6 deletions tools/restatectl/src/commands/node/disable_node_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_metadata_store::{MetadataStoreClient, ReadError};
use restate_types::PlainNodeId;
use restate_types::epoch::EpochMetadata;
use restate_types::identifiers::PartitionId;
use restate_types::logs::LogletId;
use restate_types::logs::metadata::{Logs, ProviderKind};
use restate_types::metadata_store::keys::partition_processor_epoch_key;
use restate_types::nodes_config::{
MetadataServerState, NodesConfigError, NodesConfiguration, Role, StorageState,
MetadataServerState, NodeConfig, NodesConfigError, NodesConfiguration, Role, StorageState,
WorkerState,
};
use restate_types::partition_table::PartitionTable;
use tokio::task::JoinSet;

#[derive(Debug, thiserror::Error)]
pub enum DisableNodeError {
Expand All @@ -35,22 +42,43 @@ pub enum DisableNodeError {
DefaultLogletProvider(ProviderKind),
#[error("metadata server is an active member")]
MetadataMember,
#[error("worker cannot be disabled")]
Worker(#[from] DisableWorkerError),
}

pub struct DisableNodeChecker<'a, 'b> {
#[derive(Debug, thiserror::Error)]
pub enum DisableWorkerError {
#[error("worker is active; drain it first")]
Active,
#[error("failed reading epoch metadata from metadata store: {0}; try again later")]
EpochMetadata(#[from] ReadError),
#[error("worker is part of replica set of partition {0}; wait until it is drained")]
Replica(PartitionId),
}

pub struct DisableNodeChecker<'a> {
nodes_configuration: &'a NodesConfiguration,
logs: &'b Logs,
logs: &'a Logs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if everything has the same lifetime, do you still need the named lifetime 'a?

partition_table: &'a PartitionTable,
metadata_client: &'a MetadataStoreClient,
}

impl<'a, 'b> DisableNodeChecker<'a, 'b> {
pub fn new(nodes_configuration: &'a NodesConfiguration, logs: &'b Logs) -> Self {
impl<'a> DisableNodeChecker<'a> {
pub fn new(
nodes_configuration: &'a NodesConfiguration,
logs: &'a Logs,
partition_table: &'a PartitionTable,
metadata_client: &'a MetadataStoreClient,
) -> Self {
DisableNodeChecker {
nodes_configuration,
logs,
partition_table,
metadata_client,
}
}

pub fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> {
pub async fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> {
let node_config = match self.nodes_configuration.find_node_by_id(node_id) {
Ok(node_config) => node_config,
// unknown or deleted nodes can be safely disabled
Expand All @@ -64,6 +92,8 @@ impl<'a, 'b> DisableNodeChecker<'a, 'b> {

self.safe_to_disable_log_server(node_id, node_config.log_server_config.storage_state)?;

self.safe_to_disable_worker(node_id, node_config).await?;

// only safe to disable node if it does not run a metadata server or is not a member
// todo atm we must consider the role because the default metadata server state is MetadataServerState::Member.
// We need to introduce a provisioning state or make the metadata server state optional in the NodeConfig.
Expand All @@ -77,6 +107,69 @@ impl<'a, 'b> DisableNodeChecker<'a, 'b> {
Ok(())
}

/// Checks whether it is safe to disable the worker. A worker is safe to disable if it is not
/// used in any partition replica set, and it cannot be added to any future replica sets.
async fn safe_to_disable_worker(
&self,
node_id: PlainNodeId,
node_config: &NodeConfig,
) -> Result<(), DisableWorkerError> {
match node_config.worker_config.worker_state {
WorkerState::Active => {
return Err(DisableWorkerError::Active);
}
Comment on lines +118 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, the safety depends on the replication property or partitions located on this worker node. So, this could be a stricter than needed. That said, best to get a feel on how it feels in practice before we make it more relaxed. So, +1 to keeping it as is for the first iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

WorkerState::Draining => {
// we need to check whether the worker is part of any replica sets

// Unfortunately, there is no guarantee that a draining worker won't be added to any
// future replica set because the corresponding NodesConfiguration might not have
// been sent to all nodes of the cluster :-( We might want to wait for a given
// duration after marking a worker as draining before we consider it safe to
// disable it.
}
WorkerState::Provisioning | WorkerState::Disabled => {
return Ok(());
}
}

let mut replica_membership = JoinSet::new();

// We assume that the partition table does contain all relevant partitions. This will break
// once we support dynamic partition table updates. Once this happens, we need to wait until
// the draining worker state has been propagated to all nodes.
for partition_id in self.partition_table.iter_ids().copied() {
replica_membership.spawn({
let metadata_client = self.metadata_client.clone();

async move {
// todo replace with multi-get when available
let epoch_metadata = metadata_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little anxious about doing this on restatectl's side. I understand that epoch-metadata is the source-of-truth, but would using the view from PartitionReplicaSetStates be a possible alternative? We can expose those replica-sets via datafusion (I'm already on it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's #3356 right? I'll look into how to use the new table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit more work but wouldn't it be even better to put the remove node operation in the ClusterCtrlSvc? Then we can use from places like the BYOC deployment controller. The source of truth behind that can still be the DF but we also have access to locally-cached metadata on the server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pcholakov that should be our long-term goal, yes.

.get::<EpochMetadata>(partition_processor_epoch_key(partition_id))
.await?;

if epoch_metadata.is_some_and(|epoch_metadata| {
// check whether node_id is contained in current or next replica set; if yes, then
// we cannot safely disable this node yet
epoch_metadata.current().replica_set().contains(node_id)
|| epoch_metadata
.next()
.is_some_and(|next| next.replica_set().contains(node_id))
}) {
Err(DisableWorkerError::Replica(partition_id))
} else {
Ok(())
}
}
});
}

while let Some(result) = replica_membership.join_next().await {
result.expect("check replica membership not to panic")?;
Copy link

Copilot AI Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using 'expect' here could lead to a panic if an async task fails unexpectedly. Consider propagating the error to provide more graceful error handling.

Suggested change
result.expect("check replica membership not to panic")?;
match result {
Ok(Ok(())) => continue,
Ok(Err(err)) => return Err(err),
Err(join_error) => return Err(DisableWorkerError::JoinError(join_error)),
}

Copilot uses AI. Check for mistakes.

}

Ok(())
}

/// Checks whether it is safe to disable the given log server identified by the node_id. It is
/// safe to disable the log server if it can no longer be added to new node sets (== not being
/// a candidate) and it is no longer part of any known node sets.
Expand Down
17 changes: 14 additions & 3 deletions tools/restatectl/src/commands/node/remove_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use cling::{Collect, Run};
use itertools::Itertools;

use restate_cli_util::c_println;
use restate_metadata_store::protobuf::metadata_proxy_svc::client::MetadataStoreProxy;
use restate_metadata_store::protobuf::metadata_proxy_svc::{
PutRequest, client::new_metadata_proxy_client,
};
use restate_metadata_store::serialize_value;
use restate_metadata_store::{MetadataStoreClient, serialize_value};
use restate_types::PlainNodeId;
use restate_types::metadata::Precondition;
use restate_types::metadata_store::keys::NODES_CONFIG_KEY;
Expand All @@ -41,13 +42,23 @@ pub async fn remove_nodes(
) -> anyhow::Result<()> {
let mut nodes_configuration = connection.get_nodes_configuration().await?;
let logs = connection.get_logs().await?;
let partition_table = connection.get_partition_table().await?;

let disable_node_checker = DisableNodeChecker::new(&nodes_configuration, &logs);
let channel = connection.open_connection().await?;
let metadata_client = MetadataStoreClient::new(MetadataStoreProxy::new(channel), None);

let disable_node_checker = DisableNodeChecker::new(
&nodes_configuration,
&logs,
&partition_table,
&metadata_client,
);

for node_id in &opts.nodes {
disable_node_checker
.safe_to_disable_node(*node_id)
.context("It is not safe to disable node {node_id}")?
.await
.context(format!("It is not safe to disable node {}", node_id))?
}

let precondition = Precondition::MatchesVersion(nodes_configuration.version());
Expand Down
26 changes: 26 additions & 0 deletions tools/restatectl/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,32 @@ impl ConnectionInfo {
.clone(),
)
}

/// Creates and returns a (lazy) connection to any of the addresses specified in the `address`
/// field. This method fails with [`ConnectionInfoError::NoAvailableNodes`] if it cannot
/// establish a connection to any of the specified nodes.
pub(crate) async fn open_connection(&self) -> Result<Channel, ConnectionInfoError> {
let mut open_connections = self.open_connections.lock().await;

for address in &self.address {
if self.dead_nodes.read().unwrap().contains(address) {
continue;
}

if let Some(channel) = self.connect_internal(address, &mut open_connections).await {
// check whether we can reach the node
let mut node_client = new_node_ctl_client(channel.clone());
if node_client.get_ident(()).await.is_err() {
// todo maybe retry before marking a node as dead?
self.dead_nodes.write().unwrap().insert(address.clone());
} else {
return Ok(channel);
}
}
}

Err(ConnectionInfoError::NoAvailableNodes(NoRoleError(None)))
}
}

/// Error type returned by a [`ConnectionInfo::try_each`] node_operation closure
Expand Down
Loading