Skip to content

Commit 572b862

Browse files
committed
Reuse existing ProxySQL connection
Also organize some more queries into functions, and keep the borrow checker happy by maintaining a collection of indices instead of mutable references.
1 parent 05c287b commit 572b862

File tree

3 files changed

+62
-41
lines changed

3 files changed

+62
-41
lines changed

src/main.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ mod readyset;
77
use clap::Parser;
88
use config::{read_config_file, OperationMode};
99
use file_guard::Lock;
10-
use mysql::{Conn, OptsBuilder};
1110
use proxysql::ProxySQL;
1211
use std::fs::OpenOptions;
1312

@@ -65,17 +64,8 @@ fn main() {
6564
if config.operation_mode == OperationMode::QueryDiscovery
6665
|| config.operation_mode == OperationMode::All
6766
{
68-
let mut conn = Conn::new(
69-
OptsBuilder::new()
70-
.ip_or_hostname(Some(config.proxysql_host.as_str()))
71-
.tcp_port(config.proxysql_port)
72-
.user(Some(config.proxysql_user.as_str()))
73-
.pass(Some(config.proxysql_password.clone().as_str()))
74-
.prefer_socket(false),
75-
)
76-
.expect("Failed to create ProxySQL connection");
7767
let mut query_discovery = queries::QueryDiscovery::new(&config);
78-
query_discovery.run(&mut proxysql, &mut conn);
68+
query_discovery.run(&mut proxysql);
7969
}
8070

8171
messages::print_info("Finished readyset_scheduler");

src/proxysql.rs

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,33 @@ impl ProxySQL {
104104
.expect("Failed to save query rules");
105105
}
106106

107+
pub fn update_servers(
108+
&mut self,
109+
hostgroup: u16,
110+
hostname: &str,
111+
port: u16,
112+
new_status: ProxySQLStatus,
113+
) {
114+
self.conn
115+
.query_drop(format!(
116+
"UPDATE mysql_servers SET status = '{new_status}'
117+
WHERE hostgroup_id = {hostgroup} AND hostname = '{hostname}' AND port = {port}"
118+
))
119+
.expect("Failed to update servers");
120+
}
121+
122+
pub fn load_servers(&mut self) {
123+
self.conn
124+
.query_drop("LOAD MYSQL SERVERS TO RUNTIME")
125+
.expect("Failed to load servers");
126+
}
127+
128+
pub fn save_servers(&mut self) {
129+
self.conn
130+
.query_drop("SAVE MYSQL SERVERS TO DISK")
131+
.expect("Failed to save servers");
132+
}
133+
107134
/// This function is used to check the current list of queries routed to Readyset.
108135
///
109136
/// # Returns
@@ -162,37 +189,32 @@ impl ProxySQL {
162189
pub fn health_check(&mut self) {
163190
let mut status_changes = Vec::new();
164191

165-
for readyset in self.readysets.iter_mut() {
192+
for (readyset_idx, readyset) in self.readysets.iter_mut().enumerate() {
166193
match readyset.check_readyset_is_ready() {
167194
Ok(ready) => match ready {
168195
ProxySQLStatus::Online => {
169-
status_changes.push((readyset, ProxySQLStatus::Online));
196+
status_changes.push((readyset_idx, ProxySQLStatus::Online));
170197
}
171198
ProxySQLStatus::Shunned => {
172-
status_changes.push((readyset, ProxySQLStatus::Shunned));
199+
status_changes.push((readyset_idx, ProxySQLStatus::Shunned));
173200
}
174201
ProxySQLStatus::OfflineSoft => {
175-
status_changes.push((readyset, ProxySQLStatus::OfflineSoft));
202+
status_changes.push((readyset_idx, ProxySQLStatus::OfflineSoft));
176203
}
177204
ProxySQLStatus::OfflineHard => {
178-
status_changes.push((readyset, ProxySQLStatus::OfflineHard));
205+
status_changes.push((readyset_idx, ProxySQLStatus::OfflineHard));
179206
}
180207
},
181208
Err(e) => {
182209
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
183-
status_changes.push((readyset, ProxySQLStatus::Shunned));
210+
status_changes.push((readyset_idx, ProxySQLStatus::Shunned));
184211
}
185212
};
186213
}
187214

188-
for (readyset, status) in status_changes {
215+
for (readyset_idx, status) in status_changes {
216+
let readyset = self.readysets.get(readyset_idx).unwrap();
189217
if readyset.get_proxysql_status() != status {
190-
let where_clause = format!(
191-
"WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}",
192-
self.readyset_hostgroup,
193-
readyset.get_hostname(),
194-
readyset.get_port()
195-
);
196218
messages::print_note(
197219
format!(
198220
"Server HG: {}, Host: {}, Port: {} is currently {} on ProxySQL and {} on Readyset. Changing to {}",
@@ -205,19 +227,22 @@ impl ProxySQL {
205227
)
206228
.as_str(),
207229
);
208-
readyset.change_proxysql_status(status);
209-
if self.dry_run {
210-
messages::print_info("Dry run, skipping changes to ProxySQL");
211-
continue;
212-
}
213-
let _ = self.conn.query_drop(format!(
214-
"UPDATE mysql_servers SET status = '{}' {}",
215-
readyset.get_proxysql_status(),
216-
where_clause
217-
));
218-
let _ = self.conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME");
219-
let _ = self.conn.query_drop("SAVE MYSQL SERVERS TO DISK");
220230
}
231+
let readyset = self.readysets.get_mut(readyset_idx).unwrap();
232+
readyset.change_proxysql_status(status);
233+
if self.dry_run {
234+
messages::print_info("Dry run, skipping changes to ProxySQL");
235+
continue;
236+
}
237+
let readyset = self.readysets.get(readyset_idx).unwrap();
238+
self.update_servers(
239+
self.readyset_hostgroup,
240+
readyset.get_hostname().clone().as_str(),
241+
readyset.get_port(),
242+
readyset.get_proxysql_status(),
243+
);
244+
self.load_servers();
245+
self.save_servers();
221246
}
222247
}
223248

@@ -259,4 +284,9 @@ impl ProxySQL {
259284
.filter(|readyset| readyset.is_proxysql_online())
260285
.collect()
261286
}
287+
288+
/// Returns a reference to the current connection to ProxySQL.
289+
pub fn get_connection(&mut self) -> &mut Conn {
290+
&mut self.conn
291+
}
262292
}

src/queries.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
messages,
44
proxysql::ProxySQL,
55
};
6-
use mysql::{prelude::Queryable, Conn};
6+
use mysql::prelude::Queryable;
77

88
pub struct Query {
99
digest_text: String,
@@ -153,7 +153,7 @@ impl QueryDiscovery {
153153
)
154154
}
155155

156-
pub fn run(&mut self, proxysql: &mut ProxySQL, conn: &mut Conn) {
156+
pub fn run(&mut self, proxysql: &mut ProxySQL) {
157157
if proxysql.number_of_online_readyset_instances() == 0 {
158158
return;
159159
}
@@ -164,7 +164,7 @@ impl QueryDiscovery {
164164

165165
let mut more_queries = true;
166166
while more_queries && current_queries_digest.len() < self.number_of_queries as usize {
167-
let queries_to_cache = self.find_queries_to_cache(conn);
167+
let queries_to_cache = self.find_queries_to_cache(proxysql);
168168
more_queries = !queries_to_cache.is_empty();
169169
for query in queries_to_cache[0..queries_to_cache.len()].iter() {
170170
if current_queries_digest.len() > self.number_of_queries as usize {
@@ -227,17 +227,18 @@ impl QueryDiscovery {
227227
///
228228
/// # Arguments
229229
///
230-
/// * `conn` - A reference to a connection to ProxySQL.
230+
/// * `proxysql` - A reference to ProxySQL.
231231
///
232232
/// # Returns
233233
///
234234
/// A vector of queries that are not cached in Readyset and are not in the mysql_query_rules table.
235-
fn find_queries_to_cache(&self, conn: &mut Conn) -> Vec<Query> {
235+
fn find_queries_to_cache(&self, proxysql: &mut ProxySQL) -> Vec<Query> {
236236
match self.query_discovery_mode {
237237
QueryDiscoveryMode::External => {
238238
todo!("External mode is not implemented yet");
239239
}
240240
_ => {
241+
let conn = proxysql.get_connection();
241242
let query = self.query_builder();
242243
let rows: Vec<(String, String, String)> =
243244
conn.query(query).expect("Failed to find queries to cache");

0 commit comments

Comments
 (0)