Skip to content

Commit 498f243

Browse files
feat(aggregation-mode): add datasources and improve grafana dashboard (#2227)
1 parent 3bb9c48 commit 498f243

File tree

7 files changed

+2734
-503
lines changed

7 files changed

+2734
-503
lines changed

aggregation_mode/docker-compose.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
volumes:
22
postgres_data:
33

4+
networks:
5+
aligned-network:
6+
name: aligned-network
7+
driver: bridge
8+
49
name: aggregation-mode
510
services:
611
postgres:
@@ -14,10 +19,15 @@ services:
1419
- postgres_data:/var/lib/postgresql/data
1520
ports:
1621
- 5435:5432
22+
networks:
23+
- aligned-network
24+
1725
adminer:
1826
image: adminer
1927
restart: always
2028
depends_on:
2129
- postgres
2230
ports:
2331
- 8090:8080
32+
networks:
33+
- aligned-network

aggregation_mode/payments_poller/src/db.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,23 @@ impl Db {
8080
})
8181
.await
8282
}
83+
84+
pub async fn count_total_active_subscriptions(
85+
&self,
86+
epoch: BigDecimal,
87+
) -> Result<i64, sqlx::Error> {
88+
self.orchestrator
89+
.query(async |pool| {
90+
sqlx::query_scalar::<_, i64>(
91+
"
92+
SELECT COUNT(*)
93+
FROM payment_events
94+
WHERE started_at < $1 AND $1 < valid_until",
95+
)
96+
.bind(&epoch)
97+
.fetch_one(&pool)
98+
.await
99+
})
100+
.await
101+
}
83102
}

aggregation_mode/payments_poller/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
pub struct PaymentsPollerMetrics {
77
pub registry: Registry,
88
pub last_processed_block: Gauge,
9+
pub active_subscriptions: Gauge,
910
}
1011

1112
impl PaymentsPollerMetrics {
@@ -17,14 +18,21 @@ impl PaymentsPollerMetrics {
1718
"Last processed block by poller"
1819
))?;
1920

21+
let active_subscriptions = Gauge::with_opts(opts!(
22+
"active_subscriptions",
23+
"Active payment subscriptions by poller"
24+
))?;
25+
2026
registry.register(Box::new(last_processed_block.clone()))?;
27+
registry.register(Box::new(active_subscriptions.clone()))?;
2128

2229
// Arc is used because metrics are a shared resource accessed by both the background and metrics HTTP
2330
// server and the application code, across multiple Actix worker threads. The server outlives start(),
2431
// so the data must be static and safely shared between threads.
2532
let metrics = Arc::new(Self {
2633
registry,
2734
last_processed_block,
35+
active_subscriptions,
2836
});
2937

3038
let server_metrics = metrics.clone();
@@ -63,4 +71,8 @@ impl PaymentsPollerMetrics {
6371
pub fn register_last_processed_block(&self, value: u64) {
6472
self.last_processed_block.set(value as f64);
6573
}
74+
75+
pub fn register_active_subscriptions(&self, value: i64) {
76+
self.active_subscriptions.set(value as f64);
77+
}
6678
}

aggregation_mode/payments_poller/src/payments.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{str::FromStr, sync::Arc};
1+
use std::{
2+
str::FromStr,
3+
sync::Arc,
4+
time::{SystemTime, UNIX_EPOCH},
5+
};
26

37
use crate::{
48
config::Config,
@@ -131,7 +135,28 @@ impl PaymentsPoller {
131135
continue;
132136
};
133137

138+
let now_epoch = match SystemTime::now().duration_since(UNIX_EPOCH) {
139+
Ok(duration) => duration.as_secs(),
140+
Err(_) => {
141+
continue;
142+
}
143+
};
144+
145+
// Note: This implies a call to the database, and may be optimized to reduce the amount of calls
146+
let Ok(active_subscriptions_amount) = self
147+
.db
148+
.count_total_active_subscriptions(
149+
BigDecimal::from_str(&now_epoch.to_string()).unwrap(),
150+
)
151+
.await
152+
else {
153+
tracing::error!("Failed to get the active subscriptions amount");
154+
continue;
155+
};
156+
134157
self.metrics.register_last_processed_block(current_block);
158+
self.metrics
159+
.register_active_subscriptions(active_subscriptions_amount);
135160

136161
tokio::time::sleep(std::time::Duration::from_secs(
137162
seconds_to_wait_between_polls,

0 commit comments

Comments
 (0)