Skip to content

Add information about server application version to metadata #1307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ pub(crate) struct MetadataReader {
pub(crate) struct Metadata {
pub(crate) peers: Vec<Peer>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
/// The release_version obtained from `system.local` on control connection.
pub(crate) cluster_version: Option<String>,
}

#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
pub struct Peer {
pub host_id: Uuid,
pub server_version: Option<String>,
pub address: NodeAddr,
pub tokens: Vec<Token>,
pub datacenter: Option<String>,
Expand Down Expand Up @@ -178,7 +181,9 @@ impl Peer {
}
}

pub(crate) fn into_peer_endpoint_and_tokens(self) -> (PeerEndpoint, Vec<Token>) {
pub(crate) fn into_peer_endpoint_tokens_and_server_version(
self,
) -> (PeerEndpoint, Vec<Token>, Option<String>) {
(
PeerEndpoint {
host_id: self.host_id,
Expand All @@ -187,6 +192,7 @@ impl Peer {
rack: self.rack,
},
self.tokens,
self.server_version,
)
}
}
Expand Down Expand Up @@ -408,13 +414,15 @@ impl Metadata {
datacenter: None,
rack: None,
host_id: Uuid::new_v4(),
server_version: None,
}
})
.collect();

Metadata {
peers,
keyspaces: HashMap::new(),
cluster_version: None,
}
}
}
Expand Down Expand Up @@ -723,10 +731,10 @@ impl ControlConnection {
keyspace_to_fetch: &[String],
fetch_schema: bool,
) -> Result<Metadata, MetadataError> {
let peers_query = self.query_peers(connect_port);
let peers_query = self.query_peers_and_cluster_version(connect_port);
let keyspaces_query = self.query_keyspaces(keyspace_to_fetch, fetch_schema);

let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?;
let ((peers, cluster_version), keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?;

// There must be at least one peer
if peers.is_empty() {
Expand All @@ -738,14 +746,20 @@ impl ControlConnection {
return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists));
}

Ok(Metadata { peers, keyspaces })
Ok(Metadata {
peers,
keyspaces,
cluster_version,
})
}
}

#[derive(DeserializeRow)]
#[scylla(crate = "scylla_cql")]
struct NodeInfoRow {
host_id: Option<Uuid>,
#[scylla(rename = "release_version")]
server_version: Option<String>,
#[scylla(rename = "rpc_address")]
untranslated_ip_addr: IpAddr,
#[scylla(rename = "data_center")]
Expand All @@ -772,9 +786,15 @@ impl NodeInfoSource {
const METADATA_QUERY_PAGE_SIZE: i32 = 1024;

impl ControlConnection {
async fn query_peers(&self, connect_port: u16) -> Result<Vec<Peer>, MetadataError> {
/// Returns the vector of peers and the cluster version.
/// Cluster version is the `release_version` column from `system.local` query
/// executed on control connection.
async fn query_peers_and_cluster_version(
&self,
connect_port: u16,
) -> Result<(Vec<Peer>, Option<String>), MetadataError> {
let mut peers_query = Statement::new(
"select host_id, rpc_address, data_center, rack, tokens from system.peers",
"select host_id, release_version, rpc_address, data_center, rack, tokens from system.peers",
);
peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
let peers_query_stream = self
Expand All @@ -795,7 +815,7 @@ impl ControlConnection {
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));

let mut local_query =
Statement::new("select host_id, rpc_address, data_center, rack, tokens from system.local WHERE key='local'");
Statement::new("select host_id, release_version, rpc_address, data_center, rack, tokens from system.local WHERE key='local'");
local_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
let local_query_stream = self
.query_iter(local_query)
Expand All @@ -821,7 +841,9 @@ impl ControlConnection {

let translated_peers_futures = untranslated_rows.map(|row_result| async {
match row_result {
Ok((source, row)) => Self::create_peer_from_row(source, row, local_address).await,
Ok((source, row)) => Self::create_peer_from_row(source, row, local_address)
.await
.map(|peer| (source, peer)),
Err(err) => {
warn!(
"system.peers or system.local has an invalid row, skipping it: {}",
Expand All @@ -832,12 +854,24 @@ impl ControlConnection {
}
});

let peers = translated_peers_futures
let vec_capacity = translated_peers_futures.size_hint().0;
let (peers, cluster_version) = translated_peers_futures
.buffer_unordered(256)
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.fold(
(Vec::with_capacity(vec_capacity), None),
|(mut peers, cluster_version), (source, peer)| async move {
let new_cluster_version = match (&cluster_version, source) {
(None, NodeInfoSource::Local) => peer.server_version.clone(),
_ => cluster_version,
};

peers.push(peer);
(peers, new_cluster_version)
},
)
.await;
Ok(peers)
Ok((peers, cluster_version))
}

async fn create_peer_from_row(
Expand All @@ -847,6 +881,7 @@ impl ControlConnection {
) -> Option<Peer> {
let NodeInfoRow {
host_id,
server_version,
untranslated_ip_addr,
datacenter,
rack,
Expand Down Expand Up @@ -899,6 +934,7 @@ impl ControlConnection {

Some(Peer {
host_id,
server_version,
address: node_addr,
tokens,
datacenter,
Expand Down
11 changes: 10 additions & 1 deletion scylla/src/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl Display for NodeAddr {
#[derive(Debug)]
pub struct Node {
pub host_id: Uuid,
pub server_version: Option<String>,
pub address: NodeAddr,
pub datacenter: Option<String>,
pub rack: Option<String>,
Expand All @@ -92,6 +93,7 @@ impl Node {
/// Creates a new node which starts connecting in the background.
pub(crate) fn new(
peer: PeerEndpoint,
server_version: Option<String>,
pool_config: &PoolConfig,
keyspace_name: Option<VerifiedKeyspaceName>,
enabled: bool,
Expand All @@ -117,6 +119,7 @@ impl Node {

Node {
host_id,
server_version,
address,
datacenter,
rack,
Expand All @@ -133,13 +136,18 @@ impl Node {
///
/// - `node` - previous definition of that node
/// - `address` - new address to connect to
pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self {
pub(crate) fn inherit_with_ip_changed(
node: &Node,
endpoint: PeerEndpoint,
server_version: Option<String>,
) -> Self {
let address = endpoint.address;
if let Some(ref pool) = node.pool {
pool.update_endpoint(endpoint);
}
Self {
address,
server_version,
down_marker: false.into(),
datacenter: node.datacenter.clone(),
rack: node.rack.clone(),
Expand Down Expand Up @@ -367,6 +375,7 @@ mod tests {
) -> Self {
Self {
host_id: id.unwrap_or(Uuid::new_v4()),
server_version: None,
address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from((
[255, 255, 255, 255],
0,
Expand Down
25 changes: 22 additions & 3 deletions scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct ClusterState {
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) locator: ReplicaLocator,
pub(crate) cluster_version: Option<String>,
}

/// Enables printing [ClusterState] struct in a neat way, skipping the clutter involved by
Expand Down Expand Up @@ -84,21 +85,28 @@ impl ClusterState {

let node: Arc<Node> = match known_peers.get(&peer_host_id) {
Some(node) if node.datacenter == peer.datacenter && node.rack == peer.rack => {
let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
let (peer_endpoint, tokens, server_version) =
peer.into_peer_endpoint_tokens_and_server_version();
peer_tokens = tokens;
if node.address == peer_address {
node.clone()
} else {
// If IP changes, the Node struct is recreated, but the underlying pool is preserved and notified about the IP change.
Arc::new(Node::inherit_with_ip_changed(node, peer_endpoint))
Arc::new(Node::inherit_with_ip_changed(
node,
peer_endpoint,
server_version,
))
}
}
_ => {
let is_enabled = host_filter.map_or(true, |f| f.accept(&peer));
let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
let (peer_endpoint, tokens, server_version) =
peer.into_peer_endpoint_tokens_and_server_version();
peer_tokens = tokens;
Arc::new(Node::new(
peer_endpoint,
server_version,
pool_config,
used_keyspace.clone(),
is_enabled,
Expand Down Expand Up @@ -194,6 +202,7 @@ impl ClusterState {
known_peers: new_known_peers,
keyspaces,
locator,
cluster_version: metadata.cluster_version,
}
}

Expand All @@ -212,6 +221,16 @@ impl ClusterState {
self.locator.unique_nodes_in_global_ring()
}

/// Returns the cluster version.
///
/// Cluster version is the server application version used by
/// the current control connection node (if such information is provided by the server).
///
/// To check version of each node, we suggest using [`ClusterState::get_nodes_info()`].
pub fn cluster_version(&self) -> Option<&str> {
self.cluster_version.as_deref()
}

/// Compute token of a table partition key
///
/// `partition_key` argument contains the values of all partition key
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/policies/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,12 +1437,14 @@ mod tests {
address: id_to_invalid_addr(*id),
tokens: vec![Token::new(*id as i64 * 100)],
host_id: Uuid::new_v4(),
server_version: None,
})
.collect::<Vec<_>>();

let info = Metadata {
peers,
keyspaces: HashMap::new(),
cluster_version: None,
};

ClusterState::new(
Expand Down
1 change: 1 addition & 0 deletions scylla/src/policies/load_balancing/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ mod tests {
known_peers: Default::default(),
keyspaces: Default::default(),
locator,
cluster_version: None,
};
let routing_info = RoutingInfo::default();
let plan = Plan::new(&policy, &routing_info, &cluster_state);
Expand Down
9 changes: 9 additions & 0 deletions scylla/src/routing/locator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(1),
tokens: vec![Token::new(50), Token::new(250), Token::new(400)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// B
Expand All @@ -72,6 +73,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(2),
tokens: vec![Token::new(100), Token::new(600), Token::new(900)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// C
Expand All @@ -80,6 +82,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(3),
tokens: vec![Token::new(300), Token::new(650), Token::new(700)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// D
Expand All @@ -88,6 +91,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(4),
tokens: vec![Token::new(350), Token::new(550)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// E
Expand All @@ -96,6 +100,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(5),
tokens: vec![Token::new(150), Token::new(750)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// F
Expand All @@ -104,6 +109,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(6),
tokens: vec![Token::new(200), Token::new(450)],
host_id: Uuid::new_v4(),
server_version: None,
},
Peer {
// G
Expand All @@ -112,6 +118,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
address: id_to_invalid_addr(7),
tokens: vec![Token::new(500), Token::new(800)],
host_id: Uuid::new_v4(),
server_version: None,
},
];

Expand Down Expand Up @@ -161,6 +168,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
Metadata {
peers: Vec::from(peers),
keyspaces,
cluster_version: None,
}
}

Expand All @@ -184,6 +192,7 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator<Item = (Token, A
for peer in &metadata.peers {
let node = Arc::new(Node::new(
peer.to_peer_endpoint(),
None,
&pool_config,
None,
true,
Expand Down
Loading
Loading