Skip to content

Commit 7830867

Browse files
Luke OsborneLuke Osborne
authored andcommitted
adapter: Extract class for showing readyset status
This commit extracts the logic for obtaining the information for the reuslt of a `show readyset status` call to a struct ReadySetStatusReporter, and separates the formatting logic to a new ReadySetStatus struct. This better encapsulates this logic and will allow for easier re-use for a separate http/json formatting request. Change-Id: I27caba35eda3ed8ee4a37a309cd5780957b10ffd Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6399 Tested-by: Buildkite CI Reviewed-by: Jason Brown <[email protected]>
1 parent 7657877 commit 7830867

File tree

6 files changed

+128
-73
lines changed

6 files changed

+128
-73
lines changed

readyset-adapter/src/backend.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ use crate::metrics_handle::{MetricsHandle, MetricsSummary};
112112
use crate::query_handler::SetBehavior;
113113
use crate::query_status_cache::QueryStatusCache;
114114
use crate::rewrite::ProcessedQueryParams;
115+
use crate::status_reporter::ReadySetStatusReporter;
115116
pub use crate::upstream_database::UpstreamPrepare;
116117
use crate::utils::create_dummy_column;
117118
use crate::{create_dummy_schema, rewrite, QueryHandler, UpstreamDatabase, UpstreamDestination};
@@ -2157,27 +2158,13 @@ where
21572158
self.show_caches(query_id).await
21582159
}
21592160
SqlQuery::Show(ShowStatement::ReadySetStatus) => {
2160-
// Add upstream connectivity status
2161-
let mut additional_meta = if let Some(upstream) = &mut self.upstream {
2162-
let connection_status = upstream
2163-
.is_connected()
2164-
.await
2165-
.is_ok()
2166-
.then(|| "Connected".to_string())
2167-
.unwrap_or_else(|| "Unreachable".to_string());
2168-
vec![("Database Connection".to_string(), connection_status)]
2169-
} else {
2170-
vec![]
2171-
};
2172-
let conn_count = match &self.connections {
2173-
Some(s) => s.len(),
2174-
None => 0,
2161+
let reporter = ReadySetStatusReporter {
2162+
upstream: &mut self.upstream,
2163+
connector: &mut self.noria,
2164+
connections: &self.connections,
2165+
authority: &self.authority,
21752166
};
2176-
additional_meta.push(("Connection Count".to_string(), conn_count.to_string()));
2177-
2178-
self.noria
2179-
.readyset_status(&self.authority, additional_meta)
2180-
.await
2167+
Ok(reporter.report_status().await.into_query_result())
21812168
}
21822169
SqlQuery::Show(ShowStatement::ReadySetStatusAdapter) => self.readyset_adapter_status(),
21832170
SqlQuery::Show(ShowStatement::ReadySetMigrationStatus(id)) => {

readyset-adapter/src/backend/noria_connector.rs

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use nom_sql::{
99
self, ColumnConstraint, DeleteStatement, DialectDisplay, Expr, InsertStatement, Relation,
1010
SqlIdentifier, SqlQuery, UnaryOperator, UpdateStatement,
1111
};
12-
use readyset_client::consensus::{Authority, AuthorityControl};
1312
use readyset_client::consistency::Timestamp;
1413
use readyset_client::internal::LocalNodeIndex;
1514
use readyset_client::recipe::changelist::{Change, ChangeList, IntoChanges};
1615
use readyset_client::results::{ResultIterator, Results};
16+
use readyset_client::status::ReadySetControllerStatus;
1717
use readyset_client::{
1818
ColumnSchema, GraphvizOptions, ReadQuery, ReaderAddress, ReaderHandle, ReadySetHandle,
1919
SchemaType, Table, TableOperation, View, ViewCreateRequest, ViewQuery,
2020
};
21-
use readyset_data::{DfType, DfValue, Dialect, TimestampTz};
21+
use readyset_data::{DfType, DfValue, Dialect};
2222
use readyset_errors::{
2323
internal_err, invariant_eq, table_err, unsupported, unsupported_err, ReadySetError,
2424
ReadySetResult,
@@ -854,55 +854,8 @@ impl NoriaConnector {
854854

855855
/// Returns status provided by the Controller and persisted in the Authority. Also appends
856856
/// additional_meta provided by the caller to the status.
857-
pub(crate) async fn readyset_status(
858-
&mut self,
859-
authority: &Authority,
860-
mut additional_meta: Vec<(String, String)>,
861-
) -> ReadySetResult<QueryResult<'static>> {
862-
let mut status =
863-
match noria_await!(self.inner.get_mut()?, self.inner.get_mut()?.noria.status()) {
864-
Ok(s) => <Vec<(String, String)>>::from(s),
865-
Err(_) => vec![(
866-
"ReadySet Controller Status".to_string(),
867-
"Unavailable".to_string(),
868-
)],
869-
};
870-
871-
// Helper function for formatting
872-
fn time_or_null(time_ms: Option<u64>) -> String {
873-
if let Some(t) = time_ms {
874-
TimestampTz::from_unix_ms(t).to_string()
875-
} else {
876-
"NULL".to_string()
877-
}
878-
}
879-
880-
if let Ok(Some(stats)) = authority.persistent_stats().await {
881-
status.push((
882-
"Last started Controller".to_string(),
883-
time_or_null(stats.last_controller_startup),
884-
));
885-
status.push((
886-
"Last completed snapshot".to_string(),
887-
time_or_null(stats.last_completed_snapshot),
888-
));
889-
status.push((
890-
"Last started replication".to_string(),
891-
time_or_null(stats.last_started_replication),
892-
));
893-
if let Some(err) = stats.last_replicator_error {
894-
status.push(("Last replicator error".to_string(), err))
895-
}
896-
}
897-
898-
additional_meta.append(&mut status);
899-
900-
Ok(QueryResult::MetaVariables(
901-
additional_meta
902-
.into_iter()
903-
.map(MetaVariable::from)
904-
.collect(),
905-
))
857+
pub(crate) async fn readyset_status(&mut self) -> ReadySetResult<ReadySetControllerStatus> {
858+
noria_await!(self.inner.get_mut()?, self.inner.get_mut()?.noria.status())
906859
}
907860

908861
/// Query the status of a pending migration identified by the given `migration_id`. Once the

readyset-adapter/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod proxied_queries_reporter;
1414
mod query_handler;
1515
pub mod query_status_cache;
1616
pub mod rewrite;
17+
mod status_reporter;
1718
pub mod upstream_database;
1819
mod utils;
1920
pub mod views_synchronizer;
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use std::net::SocketAddr;
2+
use std::sync::Arc;
3+
4+
use crossbeam_skiplist::SkipSet;
5+
use readyset_client::consensus::{Authority, AuthorityControl};
6+
use readyset_client::debug::stats::PersistentStats;
7+
use readyset_client::status::ReadySetControllerStatus;
8+
use readyset_data::TimestampTz;
9+
10+
use crate::backend::noria_connector::{MetaVariable, QueryResult};
11+
use crate::backend::NoriaConnector;
12+
use crate::UpstreamDatabase;
13+
14+
#[derive(Debug)]
15+
pub(crate) struct ReadySetStatus {
16+
pub(crate) controller_status: Option<ReadySetControllerStatus>,
17+
pub(crate) connected: bool,
18+
pub(crate) connection_count: usize,
19+
pub(crate) persistent_stats: Option<PersistentStats>,
20+
}
21+
22+
impl ReadySetStatus {
23+
pub(crate) fn into_query_result(self) -> QueryResult<'static> {
24+
let mut status = Vec::new();
25+
status.push((
26+
"Database Connection".to_string(),
27+
if self.connected {
28+
"Connected".to_string()
29+
} else {
30+
"Unreachable".to_string()
31+
},
32+
));
33+
34+
status.push((
35+
"Connection Count".to_string(),
36+
self.connection_count.to_string(),
37+
));
38+
39+
if let Some(controller_status) = self.controller_status {
40+
status.append(&mut Vec::<(String, String)>::from(controller_status));
41+
} else {
42+
status.push((
43+
"ReadySet Controller Status".to_string(),
44+
"Unavailable".to_string(),
45+
));
46+
}
47+
48+
if let Some(stats) = self.persistent_stats {
49+
status.push((
50+
"Last started Controller".to_string(),
51+
time_or_null(stats.last_controller_startup),
52+
));
53+
status.push((
54+
"Last completed snapshot".to_string(),
55+
time_or_null(stats.last_completed_snapshot),
56+
));
57+
status.push((
58+
"Last started replication".to_string(),
59+
time_or_null(stats.last_started_replication),
60+
));
61+
if let Some(err) = stats.last_replicator_error {
62+
status.push(("Last replicator error".to_string(), err))
63+
}
64+
}
65+
66+
QueryResult::MetaVariables(status.into_iter().map(MetaVariable::from).collect())
67+
}
68+
}
69+
70+
pub(crate) struct ReadySetStatusReporter<'a, DB>
71+
where
72+
DB: UpstreamDatabase,
73+
{
74+
pub(crate) upstream: &'a mut Option<DB>,
75+
pub(crate) connector: &'a mut NoriaConnector,
76+
pub(crate) connections: &'a Option<Arc<SkipSet<SocketAddr>>>,
77+
pub(crate) authority: &'a Arc<Authority>,
78+
}
79+
80+
impl<DB: UpstreamDatabase> ReadySetStatusReporter<'_, DB> {
81+
pub(crate) async fn report_status(self) -> ReadySetStatus {
82+
let controller_status = self.connector.readyset_status().await.ok();
83+
let connected = if let Some(upstream) = self.upstream.as_mut() {
84+
upstream.is_connected().await.unwrap_or_default()
85+
} else {
86+
false
87+
};
88+
let connection_count = self
89+
.connections
90+
.as_ref()
91+
.map(|c| c.len())
92+
.unwrap_or_default();
93+
let persistent_stats = self.authority.persistent_stats().await.unwrap_or_default();
94+
95+
ReadySetStatus {
96+
controller_status,
97+
connected,
98+
connection_count,
99+
persistent_stats,
100+
}
101+
}
102+
}
103+
104+
// Helper function for formatting
105+
fn time_or_null(time_ms: Option<u64>) -> String {
106+
if let Some(t) = time_ms {
107+
TimestampTz::from_unix_ms(t).to_string()
108+
} else {
109+
"NULL".to_string()
110+
}
111+
}

readyset-client/src/debug/stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub struct NodeStats {
4545
}
4646

4747
/// Status that we persist in the Authority to make it available across restarts.
48-
#[derive(Clone, Default, Serialize, Deserialize)]
48+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
4949
pub struct PersistentStats {
5050
/// Time in millis when the controller last started up.
5151
pub last_controller_startup: Option<u64>,

readyset-mysql/tests/integration.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1834,7 +1834,10 @@ async fn show_readyset_status() {
18341834

18351835
// NOTE: If this readyset extension has changed, verify the new behavior is correct then update
18361836
// the expected values below
1837-
assert_eq!(ret.len(), 5);
1837+
assert_eq!(ret.len(), 6);
1838+
let row = ret.remove(0);
1839+
assert_eq!(row.get::<String, _>(0).unwrap(), "Database Connection");
1840+
assert_eq!(row.get::<String, _>(1).unwrap(), "Unreachable");
18381841
let row = ret.remove(0);
18391842
assert_eq!(row.get::<String, _>(0).unwrap(), "Connection Count");
18401843
assert_eq!(row.get::<String, _>(1).unwrap(), "0");

0 commit comments

Comments
 (0)