Skip to content

Commit 539d81a

Browse files
committed
Fix heartbeat metrics read from CH
1 parent 4590f5e commit 539d81a

4 files changed

Lines changed: 47 additions & 39 deletions

File tree

src/bin/api/core/clickhouse/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
11
use clickhouse::Client as ChClient;
2-
use std::sync::Arc;
3-
use tokio::sync::Mutex;
42

53
pub mod query;
64

75
#[derive(Clone)]
86
pub struct ChContext {
9-
inner: Arc<Mutex<ChClient>>,
7+
url: String,
108
}
119

1210
impl ChContext {
1311
pub fn new(url: &str) -> Self {
14-
let client = ChClient::default().with_url(url);
15-
1612
Self {
17-
inner: Arc::new(Mutex::new(client)),
13+
url: url.to_string(),
1814
}
1915
}
2016

21-
pub fn client(&self) -> Arc<Mutex<ChClient>> {
22-
self.inner.clone()
17+
pub fn client(&self) -> ChClient {
18+
ChClient::default().with_url(&self.url)
2319
}
2420
}

src/bin/api/core/clickhouse/query.rs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,30 @@ impl Queries for ChContext {
4040
T: DeserializeOwned + Debug + Send + Clone + Sync + 'static,
4141
{
4242
let query = format!(
43-
"SELECT
44-
toInt64(toUnixTimestamp(toDateTime(argMax(Timestamp, Timestamp)))) AS timestamp,
45-
Path AS metric,
46-
toFloat64(argMax(Value, Timestamp)) AS value
47-
FROM default.graphite_data
48-
WHERE Path LIKE '{}.{}.{}.heartbeat'
49-
GROUP BY Path",
43+
r#"SELECT Path AS metric,
44+
toFloat64(argMax(Value, Timestamp)) AS value,
45+
toInt64(toUnixTimestamp(toDateTime(argMax(Timestamp, Timestamp)))) AS timestamp
46+
FROM default.graphite_data
47+
WHERE Path = '{}.{}.{}.heartbeat'
48+
GROUP BY Path"#,
5049
env, hostname, uuid
5150
);
5251

53-
log::debug!("CH Query {}", query);
52+
println!("CH Query\n{}", query);
5453

5554
let client = self.client();
56-
let client = client.lock().await;
5755

5856
let result = client.query(&query).fetch_all::<Metric<T>>().await;
57+
58+
match result {
59+
Ok(ref rows) => {
60+
log::debug!("Fetched {} rows from CH", rows.len());
61+
}
62+
Err(ref e) => {
63+
log::error!("Failed to fetch heartbeat from CH: {:?}", e);
64+
}
65+
}
66+
5967
result.ok().and_then(|mut rows| rows.pop())
6068
}
6169

@@ -69,30 +77,28 @@ impl Queries for ChContext {
6977
{
7078
let start_str = start.format("%Y-%m-%d %H:%M:%S").to_string();
7179
let query = format!(
72-
"SELECT
73-
anyLast(timestamp) as timestamp,
74-
metric,
75-
toInt64(sum(metric_value)) AS value
76-
FROM (
77-
SELECT
78-
toInt64(toUnixTimestamp(toDateTime(anyLast(Timestamp)))) AS timestamp,
79-
extract(Path, '[^.]+$') AS metric,
80-
anyLast(Value) AS metric_value
81-
FROM default.graphite_data
82-
WHERE Path LIKE '%.%.{conn_id}.conn_stat.%'
83-
AND Timestamp >= toDateTime('{start}')
84-
AND Timestamp < toDateTime('{start}') + INTERVAL 1 DAY
85-
GROUP BY Path
86-
)
87-
GROUP BY metric",
80+
r#"SELECT metric,
81+
toInt64(sum(metric_value)) AS value,
82+
anyLast(timestamp) AS timestamp
83+
FROM (
84+
SELECT
85+
toInt64(toUnixTimestamp(toDateTime(anyLast(Timestamp)))) AS timestamp,
86+
extract(Path, '[^.]+$') AS metric,
87+
anyLast(Value) AS metric_value
88+
FROM default.graphite_data
89+
WHERE Path LIKE '%.%.{conn_id}.conn_stat.%'
90+
AND Timestamp >= toDateTime('{start}')
91+
AND Timestamp < toDateTime('{start}') + INTERVAL 1 DAY
92+
GROUP BY Path
93+
)
94+
GROUP BY metric"#,
8895
conn_id = conn_id,
8996
start = start_str,
9097
);
9198

92-
log::debug!("CH Query {}", query);
99+
println!("CH Query\n{}", query);
93100

94101
let client = self.client();
95-
let client = client.lock().await;
96102

97103
let result = client.query(&query).fetch_all::<Metric<T>>().await;
98104

src/bin/api/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async fn main() -> Result<()> {
3838
.expect("required config path as an argument");
3939
println!("Config file {:?}", config_path);
4040

41-
let mut settings = ApiSettings::new(&config_path);
41+
let settings = ApiSettings::new(&config_path);
4242

4343
settings.validate().expect("Wrong settings file");
4444
println!(">>> Settings: {:?}", settings.clone());
@@ -61,7 +61,13 @@ async fn main() -> Result<()> {
6161

6262
let debug = settings.debug.enabled;
6363

64-
let db = PgContext::init(&settings.pg).await.expect("DB error");
64+
let db = match PgContext::init(&settings.pg).await {
65+
Ok(db) => db,
66+
Err(err) => {
67+
log::error!("Failed to init DB: {}", err);
68+
return Err(err.into());
69+
}
70+
};
6571

6672
let ch = ChContext::new(&settings.clickhouse.address);
6773
let publisher = ZmqPublisher::new(&settings.zmq.endpoint).await;

src/metrics/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ impl<T: fmt::Display> fmt::Display for Metric<T> {
5050

5151
impl<T: std::fmt::Display + std::fmt::Debug> Metric<T> {
5252
pub async fn send(&self, server: &str) -> Result<(), io::Error> {
53-
let metric_string = format!("{}\n", self.to_string());
53+
let metric_string = format!("{self}\n");
5454

55-
log::debug!("Sending carbon metric string: {}", metric_string);
55+
log::debug!("Sending carbon metric string: {:?}", metric_string);
5656

5757
match TcpStream::connect(server).await {
5858
Ok(mut stream) => {

0 commit comments

Comments
 (0)