diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 4ce459690c..c6d0fe16b1 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -118,11 +118,14 @@ pub(crate) struct MetadataReader { pub(crate) struct Metadata { pub(crate) peers: Vec, pub(crate) keyspaces: HashMap>, + /// The release_version obtained from `system.local` on control connection. + pub(crate) cluster_version: Option, } #[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way pub struct Peer { pub host_id: Uuid, + pub server_version: Option, pub address: NodeAddr, pub tokens: Vec, pub datacenter: Option, @@ -178,7 +181,9 @@ impl Peer { } } - pub(crate) fn into_peer_endpoint_and_tokens(self) -> (PeerEndpoint, Vec) { + pub(crate) fn into_peer_endpoint_tokens_and_server_version( + self, + ) -> (PeerEndpoint, Vec, Option) { ( PeerEndpoint { host_id: self.host_id, @@ -187,6 +192,7 @@ impl Peer { rack: self.rack, }, self.tokens, + self.server_version, ) } } @@ -408,6 +414,7 @@ impl Metadata { datacenter: None, rack: None, host_id: Uuid::new_v4(), + server_version: None, } }) .collect(); @@ -415,6 +422,7 @@ impl Metadata { Metadata { peers, keyspaces: HashMap::new(), + cluster_version: None, } } } @@ -723,10 +731,10 @@ impl ControlConnection { keyspace_to_fetch: &[String], fetch_schema: bool, ) -> Result { - let peers_query = self.query_peers(connect_port); + let peers_query = self.query_peers_and_cluster_version(connect_port); let keyspaces_query = self.query_keyspaces(keyspace_to_fetch, fetch_schema); - let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?; + let ((peers, cluster_version), keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?; // There must be at least one peer if peers.is_empty() { @@ -738,7 +746,11 @@ impl ControlConnection { return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists)); } - Ok(Metadata { peers, keyspaces }) + Ok(Metadata { + peers, + keyspaces, + cluster_version, + }) } } @@ -746,6 +758,8 @@ impl ControlConnection { #[scylla(crate = "scylla_cql")] struct NodeInfoRow { host_id: Option, + #[scylla(rename = "release_version")] + server_version: Option, #[scylla(rename = "rpc_address")] untranslated_ip_addr: IpAddr, #[scylla(rename = "data_center")] @@ -772,9 +786,15 @@ impl NodeInfoSource { const METADATA_QUERY_PAGE_SIZE: i32 = 1024; impl ControlConnection { - async fn query_peers(&self, connect_port: u16) -> Result, MetadataError> { + /// Returns the vector of peers and the cluster version. + /// Cluster version is the `release_version` column from `system.local` query + /// executed on control connection. + async fn query_peers_and_cluster_version( + &self, + connect_port: u16, + ) -> Result<(Vec, Option), MetadataError> { let mut peers_query = Statement::new( - "select host_id, rpc_address, data_center, rack, tokens from system.peers", + "select host_id, release_version, rpc_address, data_center, rack, tokens from system.peers", ); peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE); let peers_query_stream = self @@ -795,7 +815,7 @@ impl ControlConnection { .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); let mut local_query = - Statement::new("select host_id, rpc_address, data_center, rack, tokens from system.local WHERE key='local'"); + Statement::new("select host_id, release_version, rpc_address, data_center, rack, tokens from system.local WHERE key='local'"); local_query.set_page_size(METADATA_QUERY_PAGE_SIZE); let local_query_stream = self .query_iter(local_query) @@ -821,7 +841,9 @@ impl ControlConnection { let translated_peers_futures = untranslated_rows.map(|row_result| async { match row_result { - Ok((source, row)) => Self::create_peer_from_row(source, row, local_address).await, + Ok((source, row)) => Self::create_peer_from_row(source, row, local_address) + .await + .map(|peer| (source, peer)), Err(err) => { warn!( "system.peers or system.local has an invalid row, skipping it: {}", @@ -832,12 +854,24 @@ impl ControlConnection { } }); - let peers = translated_peers_futures + let vec_capacity = translated_peers_futures.size_hint().0; + let (peers, cluster_version) = translated_peers_futures .buffer_unordered(256) .filter_map(std::future::ready) - .collect::>() + .fold( + (Vec::with_capacity(vec_capacity), None), + |(mut peers, cluster_version), (source, peer)| async move { + let new_cluster_version = match (&cluster_version, source) { + (None, NodeInfoSource::Local) => peer.server_version.clone(), + _ => cluster_version, + }; + + peers.push(peer); + (peers, new_cluster_version) + }, + ) .await; - Ok(peers) + Ok((peers, cluster_version)) } async fn create_peer_from_row( @@ -847,6 +881,7 @@ impl ControlConnection { ) -> Option { let NodeInfoRow { host_id, + server_version, untranslated_ip_addr, datacenter, rack, @@ -899,6 +934,7 @@ impl ControlConnection { Some(Peer { host_id, + server_version, address: node_addr, tokens, datacenter, diff --git a/scylla/src/cluster/node.rs b/scylla/src/cluster/node.rs index e4e9332ae3..e56cbb1616 100644 --- a/scylla/src/cluster/node.rs +++ b/scylla/src/cluster/node.rs @@ -75,6 +75,7 @@ impl Display for NodeAddr { #[derive(Debug)] pub struct Node { pub host_id: Uuid, + pub server_version: Option, pub address: NodeAddr, pub datacenter: Option, pub rack: Option, @@ -92,6 +93,7 @@ impl Node { /// Creates a new node which starts connecting in the background. pub(crate) fn new( peer: PeerEndpoint, + server_version: Option, pool_config: &PoolConfig, keyspace_name: Option, enabled: bool, @@ -117,6 +119,7 @@ impl Node { Node { host_id, + server_version, address, datacenter, rack, @@ -133,13 +136,18 @@ impl Node { /// /// - `node` - previous definition of that node /// - `address` - new address to connect to - pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self { + pub(crate) fn inherit_with_ip_changed( + node: &Node, + endpoint: PeerEndpoint, + server_version: Option, + ) -> Self { let address = endpoint.address; if let Some(ref pool) = node.pool { pool.update_endpoint(endpoint); } Self { address, + server_version, down_marker: false.into(), datacenter: node.datacenter.clone(), rack: node.rack.clone(), @@ -367,6 +375,7 @@ mod tests { ) -> Self { Self { host_id: id.unwrap_or(Uuid::new_v4()), + server_version: None, address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from(( [255, 255, 255, 255], 0, diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 76d8852370..1755343bf7 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -24,6 +24,7 @@ pub struct ClusterState { pub(crate) known_peers: HashMap>, // Invariant: nonempty after Cluster::new() pub(crate) keyspaces: HashMap, pub(crate) locator: ReplicaLocator, + pub(crate) cluster_version: Option, } /// Enables printing [ClusterState] struct in a neat way, skipping the clutter involved by @@ -84,21 +85,28 @@ impl ClusterState { let node: Arc = match known_peers.get(&peer_host_id) { Some(node) if node.datacenter == peer.datacenter && node.rack == peer.rack => { - let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens(); + let (peer_endpoint, tokens, server_version) = + peer.into_peer_endpoint_tokens_and_server_version(); peer_tokens = tokens; if node.address == peer_address { node.clone() } else { // If IP changes, the Node struct is recreated, but the underlying pool is preserved and notified about the IP change. - Arc::new(Node::inherit_with_ip_changed(node, peer_endpoint)) + Arc::new(Node::inherit_with_ip_changed( + node, + peer_endpoint, + server_version, + )) } } _ => { let is_enabled = host_filter.map_or(true, |f| f.accept(&peer)); - let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens(); + let (peer_endpoint, tokens, server_version) = + peer.into_peer_endpoint_tokens_and_server_version(); peer_tokens = tokens; Arc::new(Node::new( peer_endpoint, + server_version, pool_config, used_keyspace.clone(), is_enabled, @@ -194,6 +202,7 @@ impl ClusterState { known_peers: new_known_peers, keyspaces, locator, + cluster_version: metadata.cluster_version, } } @@ -212,6 +221,16 @@ impl ClusterState { self.locator.unique_nodes_in_global_ring() } + /// Returns the cluster version. + /// + /// Cluster version is the server application version used by + /// the current control connection node (if such information is provided by the server). + /// + /// To check version of each node, we suggest using [`ClusterState::get_nodes_info()`]. + pub fn cluster_version(&self) -> Option<&str> { + self.cluster_version.as_deref() + } + /// Compute token of a table partition key /// /// `partition_key` argument contains the values of all partition key diff --git a/scylla/src/policies/load_balancing/default.rs b/scylla/src/policies/load_balancing/default.rs index 57e021650f..7a85c24c1b 100644 --- a/scylla/src/policies/load_balancing/default.rs +++ b/scylla/src/policies/load_balancing/default.rs @@ -1437,12 +1437,14 @@ mod tests { address: id_to_invalid_addr(*id), tokens: vec![Token::new(*id as i64 * 100)], host_id: Uuid::new_v4(), + server_version: None, }) .collect::>(); let info = Metadata { peers, keyspaces: HashMap::new(), + cluster_version: None, }; ClusterState::new( diff --git a/scylla/src/policies/load_balancing/plan.rs b/scylla/src/policies/load_balancing/plan.rs index 910301bcb4..10efc51fbd 100644 --- a/scylla/src/policies/load_balancing/plan.rs +++ b/scylla/src/policies/load_balancing/plan.rs @@ -229,6 +229,7 @@ mod tests { known_peers: Default::default(), keyspaces: Default::default(), locator, + cluster_version: None, }; let routing_info = RoutingInfo::default(); let plan = Plan::new(&policy, &routing_info, &cluster_state); diff --git a/scylla/src/routing/locator/test.rs b/scylla/src/routing/locator/test.rs index 478918caa6..222b65285a 100644 --- a/scylla/src/routing/locator/test.rs +++ b/scylla/src/routing/locator/test.rs @@ -64,6 +64,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(1), tokens: vec![Token::new(50), Token::new(250), Token::new(400)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // B @@ -72,6 +73,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(2), tokens: vec![Token::new(100), Token::new(600), Token::new(900)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // C @@ -80,6 +82,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(3), tokens: vec![Token::new(300), Token::new(650), Token::new(700)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // D @@ -88,6 +91,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(4), tokens: vec![Token::new(350), Token::new(550)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // E @@ -96,6 +100,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(5), tokens: vec![Token::new(150), Token::new(750)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // F @@ -104,6 +109,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(6), tokens: vec![Token::new(200), Token::new(450)], host_id: Uuid::new_v4(), + server_version: None, }, Peer { // G @@ -112,6 +118,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { address: id_to_invalid_addr(7), tokens: vec![Token::new(500), Token::new(800)], host_id: Uuid::new_v4(), + server_version: None, }, ]; @@ -161,6 +168,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { Metadata { peers: Vec::from(peers), keyspaces, + cluster_version: None, } } @@ -184,6 +192,7 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator() + .unwrap() + .0; + assert_eq!(&release_version, hardcoded_scylla_version); + + let cluster_state = session.get_cluster_state(); + + let cluster_version = cluster_state.cluster_version().unwrap(); + assert_eq!(cluster_version, hardcoded_scylla_version); + + for node in cluster_state.get_nodes_info() { + assert_eq!( + node.server_version.as_deref(), + Some(hardcoded_scylla_version) + ); + } +} + #[tokio::test] async fn test_fetch_system_keyspace() { setup_tracing();