Skip to content

Commit c475599

Browse files
committed
Allow configuration for PostgreSQL
This patch adds a new and optional configuration option, database_type, to the set of options available in the scheduler's config. Valid values are "mysql" and "postgresql", and if the option is omitted, "mysql" as treated as the default. Note that this patch does not yet introduce PostgreSQL support, but provides the initial restructuring of the scheduler. Abstract usage of an SQL connection into a new module, sql_connection, and a new enum representing a connection, SQLConnection, which represents either a MySQL or a PostgreSQL connection. Also replace all uses of mysql::Conn with the new SQLConnection enum, which acts as a thin wrapper.
1 parent 572b862 commit c475599

File tree

9 files changed

+185
-68
lines changed

9 files changed

+185
-68
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ serde = "1.0"
1313
chrono = "0.4.35"
1414
file-guard = "0.2.0"
1515
once_cell = "1.10.0"
16+
anyhow = "1.0.97"
1617

1718

1819
[package.metadata.generate-rpm]

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SAVE SCHEDULER TO DISK;
3939
```
4040

4141
Configure `/etc/readyset_proxysql_scheduler.cnf` as follow:
42+
* `database_type` - (Optional) - Either `"mysql"` or `"postgresql"` (Default `"mysql"`)
4243
* `proxysql_user` - (Required) - ProxySQL admin user
4344
* `proxysql_password` - (Required) - ProxySQL admin password
4445
* `proxysql_host` - (Required) - ProxySQL admin host

src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ use std::{
77

88
use crate::messages::MessageType;
99

10+
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq)]
11+
#[serde(rename_all = "lowercase")]
12+
pub enum DatabaseType {
13+
#[default]
14+
MySQL,
15+
PostgreSQL,
16+
}
17+
1018
#[derive(Deserialize, Clone, Copy, PartialEq, PartialOrd, Default, Debug)]
1119
pub enum OperationMode {
1220
HealthCheck,
@@ -61,6 +69,8 @@ fn default_number_of_queries() -> u16 {
6169

6270
#[derive(Deserialize, Clone, Debug)]
6371
pub struct Config {
72+
#[serde(default)]
73+
pub database_type: DatabaseType,
6474
pub proxysql_user: String,
6575
pub proxysql_password: String,
6676
pub proxysql_host: String,

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod messages;
33
mod proxysql;
44
mod queries;
55
mod readyset;
6+
mod sql_connection;
67

78
use clap::Parser;
89
use config::{read_config_file, OperationMode};

src/proxysql.rs

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
use chrono::{DateTime, Local};
2-
use mysql::{prelude::Queryable, Conn, OptsBuilder};
32

43
use crate::{
5-
config::Config,
4+
config::{Config, DatabaseType},
65
messages,
76
queries::Query,
87
readyset::{ProxySQLStatus, Readyset},
8+
sql_connection::SQLConnection,
99
};
1010

1111
const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at";
1212
const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at";
1313

1414
pub struct ProxySQL {
15+
database_type: DatabaseType,
1516
readyset_hostgroup: u16,
1617
warmup_time_s: u16,
17-
conn: Conn,
18+
conn: SQLConnection,
1819
readysets: Vec<Readyset>,
1920
dry_run: bool,
2021
}
@@ -31,17 +32,18 @@ impl ProxySQL {
3132
///
3233
/// A new ProxySQL struct.
3334
pub fn new(config: &Config, dry_run: bool) -> Self {
34-
let mut conn = Conn::new(
35-
OptsBuilder::new()
36-
.ip_or_hostname(Some(config.proxysql_host.as_str()))
37-
.tcp_port(config.proxysql_port)
38-
.user(Some(config.proxysql_user.as_str()))
39-
.pass(Some(config.proxysql_password.as_str()))
40-
.prefer_socket(false),
35+
let mut conn = SQLConnection::new(
36+
config.database_type,
37+
&config.proxysql_host,
38+
config.proxysql_port,
39+
&config.proxysql_user,
40+
&config.proxysql_password,
4141
)
4242
.expect("Failed to create ProxySQL connection");
43-
44-
let query = format!(
43+
if config.database_type == DatabaseType::PostgreSQL {
44+
todo!("PostgreSQL ProxySQL server management");
45+
}
46+
let query = &format!(
4547
"SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {} AND status IN ('ONLINE', 'SHUNNED', 'OFFLINE_SOFT')",
4648
config.readyset_hostgroup
4749
);
@@ -58,6 +60,7 @@ impl ProxySQL {
5860
.collect::<Vec<Readyset>>();
5961

6062
ProxySQL {
63+
database_type: config.database_type,
6164
conn,
6265
readyset_hostgroup: config.readyset_hostgroup,
6366
warmup_time_s: config.warmup_time_s,
@@ -83,22 +86,31 @@ impl ProxySQL {
8386
pub fn add_as_query_rule(&mut self, query: &Query) {
8487
let datetime_now: DateTime<Local> = Local::now();
8588
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
89+
if self.database_type == DatabaseType::PostgreSQL {
90+
todo!("PostgreSQL ProxySQL query rule management");
91+
}
8692
if self.warmup_time_s > 0 {
87-
self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
93+
self.conn.query_drop(&format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
8894
messages::print_note("Inserted warm-up rule");
8995
} else {
90-
self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
96+
self.conn.query_drop(&format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
9197
messages::print_note("Inserted destination rule");
9298
}
9399
}
94100

95101
pub fn load_query_rules(&mut self) {
102+
if self.database_type == DatabaseType::PostgreSQL {
103+
todo!("PostgreSQL ProxySQL query rule loading");
104+
}
96105
self.conn
97106
.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME")
98107
.expect("Failed to load query rules");
99108
}
100109

101110
pub fn save_query_rules(&mut self) {
111+
if self.database_type == DatabaseType::PostgreSQL {
112+
todo!("PostgreSQL ProxySQL query rule saving");
113+
}
102114
self.conn
103115
.query_drop("SAVE MYSQL QUERY RULES TO DISK")
104116
.expect("Failed to save query rules");
@@ -111,21 +123,30 @@ impl ProxySQL {
111123
port: u16,
112124
new_status: ProxySQLStatus,
113125
) {
126+
if self.database_type == DatabaseType::PostgreSQL {
127+
todo!("PostgreSQL ProxySQL server updating");
128+
}
114129
self.conn
115-
.query_drop(format!(
130+
.query_drop(&format!(
116131
"UPDATE mysql_servers SET status = '{new_status}'
117132
WHERE hostgroup_id = {hostgroup} AND hostname = '{hostname}' AND port = {port}"
118133
))
119134
.expect("Failed to update servers");
120135
}
121136

122137
pub fn load_servers(&mut self) {
138+
if self.database_type == DatabaseType::PostgreSQL {
139+
todo!("PostgreSQL ProxySQL server loading");
140+
}
123141
self.conn
124142
.query_drop("LOAD MYSQL SERVERS TO RUNTIME")
125143
.expect("Failed to load servers");
126144
}
127145

128146
pub fn save_servers(&mut self) {
147+
if self.database_type == DatabaseType::PostgreSQL {
148+
todo!("PostgreSQL ProxySQL server saving");
149+
}
129150
self.conn
130151
.query_drop("SAVE MYSQL SERVERS TO DISK")
131152
.expect("Failed to save servers");
@@ -136,9 +157,12 @@ impl ProxySQL {
136157
/// # Returns
137158
/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to Readyset.
138159
pub fn find_queries_routed_to_readyset(&mut self) -> Vec<String> {
160+
if self.database_type == DatabaseType::PostgreSQL {
161+
todo!("PostgreSQL ProxySQL query rule detection");
162+
}
139163
let rows: Vec<String> = self
140164
.conn
141-
.query(format!(
165+
.query(&format!(
142166
"SELECT digest FROM mysql_query_rules WHERE comment LIKE '{MIRROR_QUERY_TOKEN}%' OR comment LIKE '{DESTINATION_QUERY_TOKEN}%'"
143167
))
144168
.expect("Failed to find queries routed to Readyset");
@@ -155,7 +179,10 @@ impl ProxySQL {
155179
let datetime_now: DateTime<Local> = Local::now();
156180
let tz = datetime_now.format("%z").to_string();
157181
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
158-
let rows: Vec<(u16, String)> = self.conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{MIRROR_QUERY_TOKEN}: ____-__-__ __:__:__';")).expect("Failed to select mirror rules");
182+
if self.database_type == DatabaseType::PostgreSQL {
183+
todo!("PostgreSQL ProxySQL query rule updating");
184+
}
185+
let rows: Vec<(u16, String)> = self.conn.query(&format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{MIRROR_QUERY_TOKEN}: ____-__-__ __:__:__';")).expect("Failed to select mirror rules");
159186
for (rule_id, comment) in rows {
160187
let datetime_mirror_str = comment
161188
.split(&format!("{MIRROR_QUERY_TOKEN}:"))
@@ -173,7 +200,7 @@ impl ProxySQL {
173200
.num_seconds();
174201
if elapsed > self.warmup_time_s as i64 {
175202
let comment = format!("{comment}\n {DESTINATION_QUERY_TOKEN}: {date_formatted}");
176-
self.conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", self.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule");
203+
self.conn.query_drop(&format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", self.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule");
177204
messages::print_note(
178205
format!("Updated rule ID {} from warmup to destination", rule_id).as_str(),
179206
);
@@ -286,7 +313,7 @@ impl ProxySQL {
286313
}
287314

288315
/// Returns a reference to the current connection to ProxySQL.
289-
pub fn get_connection(&mut self) -> &mut Conn {
316+
pub fn get_connection(&mut self) -> &mut SQLConnection {
290317
&mut self.conn
291318
}
292319
}

src/queries.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::{
2-
config::{Config, QueryDiscoveryMode},
2+
config::{Config, DatabaseType, QueryDiscoveryMode},
33
messages,
44
proxysql::ProxySQL,
55
};
6-
use mysql::prelude::Queryable;
76

87
pub struct Query {
98
digest_text: String,
@@ -72,6 +71,7 @@ impl Query {
7271
}
7372

7473
pub struct QueryDiscovery {
74+
database_type: DatabaseType,
7575
query_discovery_mode: QueryDiscoveryMode,
7676
query_discovery_min_execution: u64,
7777
query_discovery_min_rows_sent: u64,
@@ -81,9 +81,13 @@ pub struct QueryDiscovery {
8181
offset: u16,
8282
}
8383

84-
/// Query Discovery is a feature responsible for discovering queries that are hurting the database performance.
85-
/// The queries are discovered by analyzing the stats_mysql_query_digest table and finding queries that are not cached in Readyset and are not in the mysql_query_rules table.
86-
/// The query discover is also responsible for promoting the queries from mirror(warmup) to destination.
84+
/// Query Discovery is a feature responsible for discovering queries that are hurting the database
85+
/// performance.
86+
///
87+
/// For MySQL, the queries are discovered by analyzing the stats_mysql_query_digest table and
88+
/// finding queries that are not cached in Readyset and are not in the mysql_query_rules table.
89+
/// The query discover is also responsible for promoting the queries from mirror(warmup) to
90+
/// destination.
8791
impl QueryDiscovery {
8892
/// This function is used to create a new QueryDiscovery struct.
8993
///
@@ -96,6 +100,7 @@ impl QueryDiscovery {
96100
/// A new QueryDiscovery struct.
97101
pub fn new(config: &Config) -> Self {
98102
QueryDiscovery {
103+
database_type: config.database_type,
99104
query_discovery_mode: config.query_discovery_mode,
100105
query_discovery_min_execution: config.query_discovery_min_execution,
101106
query_discovery_min_rows_sent: config.query_discovery_min_row_sent,
@@ -106,13 +111,19 @@ impl QueryDiscovery {
106111
}
107112
}
108113

109-
/// This function is used to generate the query responsible for finding queries that are not cached in Readyset and are not in the mysql_query_rules table.
114+
/// This function is used to generate the query responsible for finding queries that are not
115+
/// cached in Readyset and are not in the ProxySQL's query rules table.
116+
///
110117
/// Queries have to return 3 fields: digest_text, digest, and schema name.
111118
///
112119
/// # Returns
113120
///
114-
/// A string containing the query responsible for finding queries that are not cached in Readyset and are not in the mysql_query_rules table.
121+
/// A string containing the query responsible for finding queries that are not cached in
122+
/// Readyset and are not in the ProxySQL query stats table.
115123
fn query_builder(&self) -> String {
124+
if self.database_type == DatabaseType::PostgreSQL {
125+
todo!("PostgreSQL query discovery");
126+
}
116127
let order_by = match self.query_discovery_mode {
117128
QueryDiscoveryMode::SumRowsSent => "s.sum_rows_sent".to_string(),
118129
QueryDiscoveryMode::SumTime => "s.sum_time".to_string(),
@@ -223,15 +234,17 @@ impl QueryDiscovery {
223234
}
224235
}
225236

226-
/// This function is used to find queries that are not cached in Readyset and are not in the mysql_query_rules table.
237+
/// This function is used to find queries that are not cached in Readyset and are not in the
238+
/// ProxySQL query rules table.
227239
///
228240
/// # Arguments
229241
///
230242
/// * `proxysql` - A reference to ProxySQL.
231243
///
232244
/// # Returns
233245
///
234-
/// A vector of queries that are not cached in Readyset and are not in the mysql_query_rules table.
246+
/// A vector of queries that are not cached in Readyset and are not in the ProxySQL query rules
247+
/// table.
235248
fn find_queries_to_cache(&self, proxysql: &mut ProxySQL) -> Vec<Query> {
236249
match self.query_discovery_mode {
237250
QueryDiscoveryMode::External => {
@@ -241,7 +254,7 @@ impl QueryDiscovery {
241254
let conn = proxysql.get_connection();
242255
let query = self.query_builder();
243256
let rows: Vec<(String, String, String)> =
244-
conn.query(query).expect("Failed to find queries to cache");
257+
conn.query(&query).expect("Failed to find queries to cache");
245258
rows.iter()
246259
.map(|(digest_text, digest, schema)| {
247260
Query::new(

0 commit comments

Comments
 (0)