diff --git a/tools/restatectl/src/commands/node/disable_node_checker.rs b/tools/restatectl/src/commands/node/disable_node_checker.rs index 645281ecd7..5cd24bd16f 100644 --- a/tools/restatectl/src/commands/node/disable_node_checker.rs +++ b/tools/restatectl/src/commands/node/disable_node_checker.rs @@ -8,12 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_metadata_store::{MetadataStoreClient, ReadError}; use restate_types::PlainNodeId; +use restate_types::epoch::EpochMetadata; +use restate_types::identifiers::PartitionId; use restate_types::logs::LogletId; use restate_types::logs::metadata::{Logs, ProviderKind}; +use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::nodes_config::{ - MetadataServerState, NodesConfigError, NodesConfiguration, Role, StorageState, + MetadataServerState, NodeConfig, NodesConfigError, NodesConfiguration, Role, StorageState, + WorkerState, }; +use restate_types::partition_table::PartitionTable; +use tokio::task::JoinSet; #[derive(Debug, thiserror::Error)] pub enum DisableNodeError { @@ -35,22 +42,43 @@ pub enum DisableNodeError { DefaultLogletProvider(ProviderKind), #[error("metadata server is an active member")] MetadataMember, + #[error("worker cannot be disabled")] + Worker(#[from] DisableWorkerError), } -pub struct DisableNodeChecker<'a, 'b> { +#[derive(Debug, thiserror::Error)] +pub enum DisableWorkerError { + #[error("worker is active; drain it first")] + Active, + #[error("failed reading epoch metadata from metadata store: {0}; try again later")] + EpochMetadata(#[from] ReadError), + #[error("worker is part of replica set of partition {0}; wait until it is drained")] + Replica(PartitionId), +} + +pub struct DisableNodeChecker<'a> { nodes_configuration: &'a NodesConfiguration, - logs: &'b Logs, + logs: &'a Logs, + partition_table: &'a PartitionTable, + metadata_client: &'a MetadataStoreClient, } -impl<'a, 'b> DisableNodeChecker<'a, 'b> { - pub fn new(nodes_configuration: &'a NodesConfiguration, logs: &'b Logs) -> Self { +impl<'a> DisableNodeChecker<'a> { + pub fn new( + nodes_configuration: &'a NodesConfiguration, + logs: &'a Logs, + partition_table: &'a PartitionTable, + metadata_client: &'a MetadataStoreClient, + ) -> Self { DisableNodeChecker { nodes_configuration, logs, + partition_table, + metadata_client, } } - pub fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> { + pub async fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> { let node_config = match self.nodes_configuration.find_node_by_id(node_id) { Ok(node_config) => node_config, // unknown or deleted nodes can be safely disabled @@ -64,6 +92,8 @@ impl<'a, 'b> DisableNodeChecker<'a, 'b> { self.safe_to_disable_log_server(node_id, node_config.log_server_config.storage_state)?; + self.safe_to_disable_worker(node_id, node_config).await?; + // only safe to disable node if it does not run a metadata server or is not a member // todo atm we must consider the role because the default metadata server state is MetadataServerState::Member. // We need to introduce a provisioning state or make the metadata server state optional in the NodeConfig. @@ -77,6 +107,69 @@ impl<'a, 'b> DisableNodeChecker<'a, 'b> { Ok(()) } + /// Checks whether it is safe to disable the worker. A worker is safe to disable if it is not + /// used in any partition replica set, and it cannot be added to any future replica sets. + async fn safe_to_disable_worker( + &self, + node_id: PlainNodeId, + node_config: &NodeConfig, + ) -> Result<(), DisableWorkerError> { + match node_config.worker_config.worker_state { + WorkerState::Active => { + return Err(DisableWorkerError::Active); + } + WorkerState::Draining => { + // we need to check whether the worker is part of any replica sets + + // Unfortunately, there is no guarantee that a draining worker won't be added to any + // future replica set because the corresponding NodesConfiguration might not have + // been sent to all nodes of the cluster :-( We might want to wait for a given + // duration after marking a worker as draining before we consider it safe to + // disable it. + } + WorkerState::Provisioning | WorkerState::Disabled => { + return Ok(()); + } + } + + let mut replica_membership = JoinSet::new(); + + // We assume that the partition table does contain all relevant partitions. This will break + // once we support dynamic partition table updates. Once this happens, we need to wait until + // the draining worker state has been propagated to all nodes. + for partition_id in self.partition_table.iter_ids().copied() { + replica_membership.spawn({ + let metadata_client = self.metadata_client.clone(); + + async move { + // todo replace with multi-get when available + let epoch_metadata = metadata_client + .get::(partition_processor_epoch_key(partition_id)) + .await?; + + if epoch_metadata.is_some_and(|epoch_metadata| { + // check whether node_id is contained in current or next replica set; if yes, then + // we cannot safely disable this node yet + epoch_metadata.current().replica_set().contains(node_id) + || epoch_metadata + .next() + .is_some_and(|next| next.replica_set().contains(node_id)) + }) { + Err(DisableWorkerError::Replica(partition_id)) + } else { + Ok(()) + } + } + }); + } + + while let Some(result) = replica_membership.join_next().await { + result.expect("check replica membership not to panic")?; + } + + Ok(()) + } + /// Checks whether it is safe to disable the given log server identified by the node_id. It is /// safe to disable the log server if it can no longer be added to new node sets (== not being /// a candidate) and it is no longer part of any known node sets. diff --git a/tools/restatectl/src/commands/node/remove_nodes.rs b/tools/restatectl/src/commands/node/remove_nodes.rs index 5f4dec7cb7..5ced64f927 100644 --- a/tools/restatectl/src/commands/node/remove_nodes.rs +++ b/tools/restatectl/src/commands/node/remove_nodes.rs @@ -14,10 +14,11 @@ use cling::{Collect, Run}; use itertools::Itertools; use restate_cli_util::c_println; +use restate_metadata_store::protobuf::metadata_proxy_svc::client::MetadataStoreProxy; use restate_metadata_store::protobuf::metadata_proxy_svc::{ PutRequest, client::new_metadata_proxy_client, }; -use restate_metadata_store::serialize_value; +use restate_metadata_store::{MetadataStoreClient, serialize_value}; use restate_types::PlainNodeId; use restate_types::metadata::Precondition; use restate_types::metadata_store::keys::NODES_CONFIG_KEY; @@ -41,13 +42,23 @@ pub async fn remove_nodes( ) -> anyhow::Result<()> { let mut nodes_configuration = connection.get_nodes_configuration().await?; let logs = connection.get_logs().await?; + let partition_table = connection.get_partition_table().await?; - let disable_node_checker = DisableNodeChecker::new(&nodes_configuration, &logs); + let channel = connection.open_connection().await?; + let metadata_client = MetadataStoreClient::new(MetadataStoreProxy::new(channel), None); + + let disable_node_checker = DisableNodeChecker::new( + &nodes_configuration, + &logs, + &partition_table, + &metadata_client, + ); for node_id in &opts.nodes { disable_node_checker .safe_to_disable_node(*node_id) - .context("It is not safe to disable node {node_id}")? + .await + .context(format!("It is not safe to disable node {}", node_id))? } let precondition = Precondition::MatchesVersion(nodes_configuration.version()); diff --git a/tools/restatectl/src/connection.rs b/tools/restatectl/src/connection.rs index a9855081b7..7e87663d7e 100644 --- a/tools/restatectl/src/connection.rs +++ b/tools/restatectl/src/connection.rs @@ -416,6 +416,32 @@ impl ConnectionInfo { .clone(), ) } + + /// Creates and returns a (lazy) connection to any of the addresses specified in the `address` + /// field. This method fails with [`ConnectionInfoError::NoAvailableNodes`] if it cannot + /// establish a connection to any of the specified nodes. + pub(crate) async fn open_connection(&self) -> Result { + let mut open_connections = self.open_connections.lock().await; + + for address in &self.address { + if self.dead_nodes.read().unwrap().contains(address) { + continue; + } + + if let Some(channel) = self.connect_internal(address, &mut open_connections).await { + // check whether we can reach the node + let mut node_client = new_node_ctl_client(channel.clone()); + if node_client.get_ident(()).await.is_err() { + // todo maybe retry before marking a node as dead? + self.dead_nodes.write().unwrap().insert(address.clone()); + } else { + return Ok(channel); + } + } + } + + Err(ConnectionInfoError::NoAvailableNodes(NoRoleError(None))) + } } /// Error type returned by a [`ConnectionInfo::try_each`] node_operation closure