Skip to content

Commit 2af26fd

Browse files
[runtime] Remove Hand-Rolled Metrics Server + [deployer] Use 24.04 (#855)
1 parent 4843b7c commit 2af26fd

6 files changed

Lines changed: 149 additions & 164 deletions

File tree

deployer/src/ec2/aws.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), E
5050
Ok(())
5151
}
5252

53-
/// Finds the latest Ubuntu 22.04 ARM64 AMI in the region
53+
/// Finds the latest Ubuntu 24.04 ARM64 AMI in the region
5454
pub async fn find_latest_ami(client: &Ec2Client) -> Result<String, Ec2Error> {
5555
let resp = client
5656
.describe_images()
5757
.filters(
5858
Filter::builder()
5959
.name("name")
60-
.values("ubuntu/images/hvm-ssd/ubuntu-jammy-22.04-arm64-server-*")
60+
.values("ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*")
6161
.build(),
6262
)
6363
.filters(

examples/flood/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
FROM arm64v8/ubuntu
1+
# Use same Ubuntu version as `commonware-deployer`
2+
FROM arm64v8/ubuntu:24.04
23

34
# Install necessary tools and libraries
45
RUN apt-get update && apt-get install -y \

runtime/src/lib.rs

Lines changed: 105 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,7 @@ mod tests {
342342
use std::panic::{catch_unwind, AssertUnwindSafe};
343343
use std::str::FromStr;
344344
use std::sync::Mutex;
345-
use telemetry::metrics;
346-
use tracing::error;
345+
use tracing::{error, Level};
347346
use utils::reschedule;
348347

349348
fn test_error_future<R: Runner>(runner: R) {
@@ -980,110 +979,6 @@ mod tests {
980979
})
981980
}
982981

983-
fn test_metrics_serve<R, L, Si, St>(runner: R)
984-
where
985-
R: Runner,
986-
R::Context: Spawner + Metrics + Network<L, Si, St> + Clock,
987-
L: Listener<Si, St>,
988-
Si: Sink,
989-
St: Stream,
990-
{
991-
runner.start(|context| async move {
992-
// Register a test metric
993-
let counter: Counter<u64> = Counter::default();
994-
context.register("test_counter", "Test counter", counter.clone());
995-
counter.inc();
996-
997-
// Define the server address
998-
let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
999-
1000-
// Start the metrics server (serves one connection and exits)
1001-
context
1002-
.with_label("server")
1003-
.spawn(move |context| async move {
1004-
metrics::server::serve(context, address).await;
1005-
});
1006-
1007-
// Helper functions to parse HTTP response
1008-
async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1009-
let mut line = Vec::new();
1010-
loop {
1011-
let mut byte = [0; 1];
1012-
stream.recv(&mut byte).await?;
1013-
if byte[0] == b'\n' {
1014-
if line.last() == Some(&b'\r') {
1015-
line.pop(); // Remove trailing \r
1016-
}
1017-
break;
1018-
}
1019-
line.push(byte[0]);
1020-
}
1021-
String::from_utf8(line).map_err(|_| Error::ReadFailed)
1022-
}
1023-
1024-
async fn read_headers<St: Stream>(
1025-
stream: &mut St,
1026-
) -> Result<HashMap<String, String>, Error> {
1027-
let mut headers = HashMap::new();
1028-
loop {
1029-
let line = read_line(stream).await?;
1030-
if line.is_empty() {
1031-
break;
1032-
}
1033-
let parts: Vec<&str> = line.splitn(2, ": ").collect();
1034-
if parts.len() == 2 {
1035-
headers.insert(parts[0].to_string(), parts[1].to_string());
1036-
}
1037-
}
1038-
Ok(headers)
1039-
}
1040-
1041-
async fn read_body<St: Stream>(
1042-
stream: &mut St,
1043-
content_length: usize,
1044-
) -> Result<String, Error> {
1045-
let mut body = vec![0; content_length];
1046-
stream.recv(&mut body).await?;
1047-
String::from_utf8(body).map_err(|_| Error::ReadFailed)
1048-
}
1049-
1050-
// Simulate a client connecting to the server
1051-
let client_handle = context
1052-
.with_label("client")
1053-
.spawn(move |context| async move {
1054-
let (_, mut stream) = loop {
1055-
match context.dial(address).await {
1056-
Ok((sink, stream)) => break (sink, stream),
1057-
Err(e) => {
1058-
// The client may be polled before the server is ready, that's alright!
1059-
error!(err =?e, "failed to connect");
1060-
context.sleep(Duration::from_millis(10)).await;
1061-
}
1062-
}
1063-
};
1064-
1065-
// Read and verify the HTTP status line
1066-
let status_line = read_line(&mut stream).await.unwrap();
1067-
assert_eq!(status_line, "HTTP/1.1 200 OK");
1068-
1069-
// Read and parse headers
1070-
let headers = read_headers(&mut stream).await.unwrap();
1071-
let content_length = headers
1072-
.get("Content-Length")
1073-
.unwrap()
1074-
.parse::<usize>()
1075-
.unwrap();
1076-
1077-
// Read and verify the body
1078-
let body = read_body(&mut stream, content_length).await.unwrap();
1079-
assert!(body.contains("test_counter_total 1"));
1080-
});
1081-
1082-
// Wait for the client task to complete
1083-
client_handle.await.unwrap();
1084-
});
1085-
}
1086-
1087982
#[test]
1088983
fn test_deterministic_future() {
1089984
let runner = deterministic::Runner::default();
@@ -1233,12 +1128,6 @@ mod tests {
12331128
test_metrics_label(executor);
12341129
}
12351130

1236-
#[test]
1237-
fn test_deterministic_metrics_serve() {
1238-
let executor = deterministic::Runner::default();
1239-
test_metrics_serve(executor);
1240-
}
1241-
12421131
#[test]
12431132
fn test_tokio_error_future() {
12441133
let runner = tokio::Runner::default();
@@ -1388,8 +1277,110 @@ mod tests {
13881277
}
13891278

13901279
#[test]
1391-
fn test_tokio_metrics_serve() {
1280+
fn test_tokio_telemetry() {
13921281
let executor = tokio::Runner::default();
1393-
test_metrics_serve(executor);
1282+
executor.start(|context| async move {
1283+
// Define the server address
1284+
let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1285+
1286+
// Configure telemetry
1287+
tokio::telemetry::init(
1288+
context.with_label("metrics"),
1289+
Level::INFO,
1290+
Some(address),
1291+
None,
1292+
);
1293+
1294+
// Register a test metric
1295+
let counter: Counter<u64> = Counter::default();
1296+
context.register("test_counter", "Test counter", counter.clone());
1297+
counter.inc();
1298+
1299+
// Helper functions to parse HTTP response
1300+
async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1301+
let mut line = Vec::new();
1302+
loop {
1303+
let mut byte = [0; 1];
1304+
stream.recv(&mut byte).await?;
1305+
if byte[0] == b'\n' {
1306+
if line.last() == Some(&b'\r') {
1307+
line.pop(); // Remove trailing \r
1308+
}
1309+
break;
1310+
}
1311+
line.push(byte[0]);
1312+
}
1313+
String::from_utf8(line).map_err(|_| Error::ReadFailed)
1314+
}
1315+
1316+
async fn read_headers<St: Stream>(
1317+
stream: &mut St,
1318+
) -> Result<HashMap<String, String>, Error> {
1319+
let mut headers = HashMap::new();
1320+
loop {
1321+
let line = read_line(stream).await?;
1322+
if line.is_empty() {
1323+
break;
1324+
}
1325+
let parts: Vec<&str> = line.splitn(2, ": ").collect();
1326+
if parts.len() == 2 {
1327+
headers.insert(parts[0].to_string(), parts[1].to_string());
1328+
}
1329+
}
1330+
Ok(headers)
1331+
}
1332+
1333+
async fn read_body<St: Stream>(
1334+
stream: &mut St,
1335+
content_length: usize,
1336+
) -> Result<String, Error> {
1337+
let mut body = vec![0; content_length];
1338+
stream.recv(&mut body).await?;
1339+
String::from_utf8(body).map_err(|_| Error::ReadFailed)
1340+
}
1341+
1342+
// Simulate a client connecting to the server
1343+
let client_handle = context
1344+
.with_label("client")
1345+
.spawn(move |context| async move {
1346+
let (mut sink, mut stream) = loop {
1347+
match context.dial(address).await {
1348+
Ok((sink, stream)) => break (sink, stream),
1349+
Err(e) => {
1350+
// The client may be polled before the server is ready, that's alright!
1351+
error!(err =?e, "failed to connect");
1352+
context.sleep(Duration::from_millis(10)).await;
1353+
}
1354+
}
1355+
};
1356+
1357+
// Send a GET request to the server
1358+
let request = format!(
1359+
"GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1360+
address
1361+
);
1362+
sink.send(request.as_bytes()).await.unwrap();
1363+
1364+
// Read and verify the HTTP status line
1365+
let status_line = read_line(&mut stream).await.unwrap();
1366+
assert_eq!(status_line, "HTTP/1.1 200 OK");
1367+
1368+
// Read and parse headers
1369+
let headers = read_headers(&mut stream).await.unwrap();
1370+
println!("Headers: {:?}", headers);
1371+
let content_length = headers
1372+
.get("content-length")
1373+
.unwrap()
1374+
.parse::<usize>()
1375+
.unwrap();
1376+
1377+
// Read and verify the body
1378+
let body = read_body(&mut stream, content_length).await.unwrap();
1379+
assert!(body.contains("test_counter_total 1"));
1380+
});
1381+
1382+
// Wait for the client task to complete
1383+
client_handle.await.unwrap();
1384+
});
13941385
}
13951386
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//! Utility functions for metrics
22
33
pub mod histogram;
4-
pub mod server;
54
pub mod status;

runtime/src/telemetry/metrics/server.rs

Lines changed: 0 additions & 44 deletions
This file was deleted.

runtime/src/tokio/telemetry.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,15 @@ use super::{
44
tracing::{export, Config},
55
Context,
66
};
7-
use crate::{telemetry::metrics, Metrics, Spawner};
7+
use crate::{Metrics, Spawner};
8+
use axum::{
9+
body::Body,
10+
http::{header, Response, StatusCode},
11+
routing::get,
12+
serve, Extension, Router,
13+
};
814
use std::net::SocketAddr;
15+
use tokio::net::TcpListener;
916
use tracing::Level;
1017
use tracing_subscriber::{layer::SubscriberExt, Registry};
1118

@@ -26,7 +33,38 @@ pub fn init(context: Context, level: Level, metrics: Option<SocketAddr>, traces:
2633
if let Some(cfg) = metrics {
2734
context
2835
.with_label("metrics")
29-
.spawn(move |context| async move { metrics::server::serve(context, cfg).await });
36+
.spawn(move |context| async move {
37+
// Create a tokio listener for the metrics server.
38+
//
39+
// We explicitly avoid using a runtime `Listener` because
40+
// it will track bandwidth used for metrics and apply a policy
41+
// for read/write timeouts fit for a p2p network.
42+
let listener = TcpListener::bind(cfg)
43+
.await
44+
.expect("Failed to bind metrics server");
45+
46+
// Create a router for the metrics server
47+
let app = Router::new()
48+
.route(
49+
"/metrics",
50+
get(|Extension(ctx): Extension<Context>| async move {
51+
Response::builder()
52+
.status(StatusCode::OK)
53+
.header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
54+
.body(Body::from(ctx.encode()))
55+
.expect("Failed to create response")
56+
}),
57+
)
58+
.layer(Extension(context));
59+
60+
// Serve the metrics over HTTP.
61+
//
62+
// `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
63+
// it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
64+
serve(listener, app.into_make_service())
65+
.await
66+
.expect("Could not serve metrics");
67+
});
3068
}
3169

3270
// Combine layers into a single subscriber

0 commit comments

Comments
 (0)