Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion gcp_uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ description = "A tool to continuously monitor and upload epoch-related files to

[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
clap = { workspace = true, features = ["derive", "env"] }
cloud-storage = "0.11"
env_logger = { workspace = true }
futures-util = "0.3.31"
hostname = "0.3"
log = { workspace = true }
regex = "1.10"
serde_json = { workspace = true }
solana-client = { workspace = true }
solana-metrics = { workspace = true }
tokio = { workspace = true, features = ["full"] }
2 changes: 2 additions & 0 deletions gcp_uploader/monitor-merkle-uploads.service
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ ExecStart=/home/core/jito-tip-router/target/release/gcp_uploader \
Restart=always
RestartSec=10
Environment="RUST_LOG=info"
Environment="SOLANA_METRICS_CONFIG="
Environment="RPC_URL="

[Install]
WantedBy=multi-user.target
71 changes: 70 additions & 1 deletion gcp_uploader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use env_logger::Env;
use hostname::get as get_hostname_raw;
use log::{error, info};
use regex::Regex;
use solana_client::rpc_client::RpcClient;
use solana_metrics::{datapoint_info, set_host_id};
use std::collections::HashSet;
use std::path::Path;
use std::process::Command;
Expand Down Expand Up @@ -34,6 +36,14 @@ struct Args {
/// Directory to scan for snapshot files
#[arg(short, long)]
snapshot_directory: String,

/// Solana JSON RPC URL to fetch current epoch for metrics
#[arg(
long,
env = "RPC_URL",
default_value = "https://api.mainnet-beta.solana.com"
)]
rpc_url: String,
}

#[tokio::main]
Expand Down Expand Up @@ -64,6 +74,8 @@ async fn main() -> Result<()> {
// Get hostname
let hostname = get_hostname()?;

set_host_id(hostname.clone());

// Determine bucket name
let bucket_name = args
.bucket
Expand Down Expand Up @@ -125,6 +137,18 @@ async fn main() -> Result<()> {
}
}

// Emit metric about whether snapshot for current epoch is present in GCS
if let Err(e) = emit_current_epoch_snapshot_metric(
&args.rpc_url,
&bucket_name,
&hostname,
&args.cluster,
)
.await
{
error!("Error emitting snapshot metric: {}", e);
}

// Wait for the next polling interval
sleep(Duration::from_secs(args.interval)).await;
}
Expand Down Expand Up @@ -188,7 +212,7 @@ async fn scan_and_upload_files(
}

/// Scans directory for snapshots & uploads after deriving the associated epoch
#[allow(clippy::arithmetic_side_effects)]
#[allow(clippy::arithmetic_side_effects, clippy::integer_division)]
async fn scan_and_upload_snapshot_files(
dir_path: &Path,
bucket_name: &str,
Expand Down Expand Up @@ -311,6 +335,51 @@ async fn upload_file(
Ok(())
}

async fn emit_current_epoch_snapshot_metric(
rpc_url: &str,
bucket_name: &str,
hostname: &str,
cluster: &str,
) -> Result<()> {
// Fetch current epoch via Solana RPC
let client = RpcClient::new(rpc_url.to_string());
let epoch_info = client
.get_epoch_info()
.with_context(|| format!("Failed to fetch epoch info from {}", rpc_url))?;
let epoch = epoch_info.epoch;

// Build GCS prefix path used by upload_file: {epoch}/{hostname}/snapshot-*.tar.zst
// First, list objects under epoch/hostname and look for any snapshot-*.tar.zst
let list_output = Command::new("/opt/gcloud/google-cloud-sdk/bin/gcloud")
.args([
"storage",
"ls",
&format!("gs://{}/{}/{}/", bucket_name, epoch, hostname),
])
.output()
.with_context(|| "Failed to execute gcloud ls for snapshot metric")?;

let uploaded = if list_output.status.success() {
let stdout = String::from_utf8_lossy(&list_output.stdout);
stdout
.lines()
.any(|line| line.contains("snapshot-") && line.ends_with(".tar.zst"))
} else {
false
};

datapoint_info!(
"tip_router_gcp_uploader.snapshot_present",
("epoch", epoch as i64, i64),
("present", uploaded, bool),
"cluster" => cluster,
"hostname" => hostname,
"bucket" => bucket_name,
);

Ok(())
}

fn get_hostname() -> Result<String> {
let hostname = get_hostname_raw()
.context("Failed to get hostname")?
Expand Down
Loading