Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 56 additions & 190 deletions keepers/stakenet-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se
use clap::Parser;
use dotenvy::dotenv;
use log::*;
use rand::Rng;
use rusqlite::Connection;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_metrics::set_host_id;
Expand All @@ -15,12 +14,12 @@ use stakenet_keeper::{
operations::{
self,
block_metadata::db::create_sqlite_tables,
keeper_operations::{set_flag, KeeperCreates, KeeperOperations},
keeper_operations::{set_flag, KeeperOperations},
},
state::{
keeper_config::{Args, KeeperConfig},
keeper_state::{KeeperFlag, KeeperState},
update_state::{create_missing_accounts, post_create_update, pre_create_update},
operation::{OperationQueue, OperationState},
},
};
use std::{process::Command, sync::Arc, time::Duration};
Expand All @@ -30,6 +29,10 @@ use tokio::time::sleep;
fn set_run_flags(args: &Args) -> u32 {
let mut run_flags = 0;

run_flags = set_flag(run_flags, KeeperOperations::PreCreateUpdate);
run_flags = set_flag(run_flags, KeeperOperations::CreateMissingAccounts);
run_flags = set_flag(run_flags, KeeperOperations::PostCreateUpdate);

if args.run_cluster_history {
run_flags = set_flag(run_flags, KeeperOperations::ClusterHistory);
}
Expand Down Expand Up @@ -60,9 +63,6 @@ fn set_run_flags(args: &Args) -> u32 {
if args.run_priority_fee_commission {
run_flags = set_flag(run_flags, KeeperOperations::PriorityFeeCommission);
}
if args.run_directed_staking {
run_flags = set_flag(run_flags, KeeperOperations::DirectedStaking);
}

run_flags
}
Expand All @@ -72,18 +72,6 @@ fn should_clear_startup_flag(tick: u64, intervals: &[u64]) -> bool {
tick % (max_interval + 1) == 0
}

fn should_emit(tick: u64, intervals: &[u64]) -> bool {
intervals.iter().any(|interval| tick % (interval + 1) == 0)
}

fn should_update(tick: u64, intervals: &[u64]) -> bool {
intervals.iter().any(|interval| tick % interval == 0)
}

fn should_fire(tick: u64, interval: u64) -> bool {
tick % interval == 0
}

fn advance_tick(tick: &mut u64) {
*tick += 1;
}
Expand All @@ -93,15 +81,6 @@ async fn sleep_and_tick(tick: &mut u64) {
advance_tick(tick);
}

/// To reduce transaction collisions, we sleep a random amount after any emit
async fn random_cooldown(range: u8) {
let mut rng = rand::thread_rng();
let sleep_duration = rng.gen_range(0..=60 * (range as u64 + 1));

info!("\n\n⏰ Cooldown for {} seconds\n", sleep_duration);
sleep(Duration::from_secs(sleep_duration)).await;
}

async fn run_keeper(keeper_config: KeeperConfig) {
// Intervals
let metrics_interval = keeper_config.metrics_interval;
Expand All @@ -120,6 +99,24 @@ async fn run_keeper(keeper_config: KeeperConfig) {
let mut keeper_state = KeeperState::default();
keeper_state.set_cluster_name(&keeper_config.cluster_name);

let mut operation_queue = OperationQueue::new(
keeper_config.validator_history_interval,
keeper_config.steward_interval,
keeper_config.block_metadata_interval,
keeper_config.metrics_interval,
keeper_config.run_flags,
);

info!(
"Operations: {}",
operation_queue
.tasks
.iter()
.map(|o| o.operation.to_string())
.collect::<Vec<String>>()
.join(",")
);

let smallest_interval = intervals.iter().min().unwrap();
let mut tick: u64 = *smallest_interval; // 1 second ticks - start at metrics interval

Expand All @@ -128,187 +125,56 @@ async fn run_keeper(keeper_config: KeeperConfig) {
}

loop {
// ---------------------- FETCH -----------------------------------
// The fetch ( update ) functions fetch everything we need for the operations from the blockchain
// Additionally, this function will update the keeper state. If update fails - it will skip the fire functions.
if should_update(tick, &intervals) {
info!("Pre-fetching data for update...({})", tick);
match pre_create_update(&keeper_config, &mut keeper_state).await {
Ok(_) => {
keeper_state.increment_update_run_for_epoch(KeeperOperations::PreCreateUpdate);
}
Err(e) => {
error!("Failed to pre create update: {:?}", e);

keeper_state
.increment_update_error_for_epoch(KeeperOperations::PreCreateUpdate);

advance_tick(&mut tick);
continue;
}
}

if keeper_config.pay_for_new_accounts {
info!("Creating missing accounts...({})", tick);
match create_missing_accounts(&keeper_config, &keeper_state).await {
Ok(new_accounts_created) => {
keeper_state.increment_update_run_for_epoch(
KeeperOperations::CreateMissingAccounts,
);

let total_txs: usize =
new_accounts_created.iter().map(|(_, txs)| txs).sum();
keeper_state.increment_update_txs_for_epoch(
KeeperOperations::CreateMissingAccounts,
total_txs as u64,
);

new_accounts_created
.iter()
.for_each(|(operation, created_accounts)| {
keeper_state.increment_creations_for_epoch((
operation.clone(),
*created_accounts as u64,
));
});
}
Err(e) => {
error!("Failed to create missing accounts: {:?}", e);

keeper_state.increment_update_error_for_epoch(
KeeperOperations::CreateMissingAccounts,
);

advance_tick(&mut tick);
continue;
}
}
}

info!("Post-fetching data for update...({})", tick);
match post_create_update(&keeper_config, &mut keeper_state).await {
Ok(_) => {
keeper_state.increment_update_run_for_epoch(KeeperOperations::PostCreateUpdate);
}
Err(e) => {
error!("Failed to post create update: {:?}", e);

keeper_state
.increment_update_error_for_epoch(KeeperOperations::PostCreateUpdate);

advance_tick(&mut tick);
continue;
}
}
}

// ---------------------- FIRE ------------------------------------

// VALIDATOR HISTORY
if should_fire(tick, validator_history_interval) {
info!("Firing operations...");

info!("Updating cluster history...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::cluster_history::fire(&keeper_config, &keeper_state).await,
);
if keeper_state
.check_last_update_epoch(keeper_config.client.clone())
.await
{
info!("Epoch transition Steward cranking");

info!("Updating copy vote accounts...");
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::vote_account::fire(&keeper_config, &keeper_state).await,
operations::steward::fire(&keeper_config, &keeper_state).await,
);

info!("Updating mev commission...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::mev_commission::fire(&keeper_config, &keeper_state).await,
);
info!("Epoch transition Steward crank completed");
}

info!("Updating mev earned...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::mev_earned::fire(&keeper_config, &keeper_state).await,
);
operation_queue.mark_should_fire(tick);

if keeper_config.oracle_authority_keypair.is_some() {
info!("Updating stake accounts...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::stake_upload::fire(&keeper_config, &keeper_state).await,
);
}
while let Some(task) = operation_queue.get_next_pending() {
let operation = task.operation;

if keeper_config.oracle_authority_keypair.is_some()
&& keeper_config.gossip_entrypoints.is_some()
if let Err(e) = operation
.execute(&keeper_config, &mut keeper_state, &mut operation_queue)
.await
{
info!("Updating gossip accounts...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::gossip_upload::fire(&keeper_config, &keeper_state).await,
);
error!("Operation {operation:?} failed, stopping execution: {e:?}",);
break;
}

info!("Updating priority fee commission...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::priority_fee_commission::fire(&keeper_config, &keeper_state).await,
);

if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) {
random_cooldown(keeper_config.cool_down_range).await;
}
}
// After sending many tx, check `last_update_epoch` in stake pool
if operation.is_heavy_operation()
&& keeper_state
.check_last_update_epoch(keeper_config.client.clone())
.await
{
info!("Epoch transition Steward cranking");

// STEWARD
if should_fire(tick, steward_interval) {
info!("Cranking Steward...");
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::steward::fire(&keeper_config, &keeper_state).await,
);
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::steward::fire(&keeper_config, &keeper_state).await,
);

if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) {
random_cooldown(keeper_config.cool_down_range).await;
info!("Epoch transition Steward crank completed");
}
}

// PRIORITY FEE BLOCK METADATA
if should_fire(tick, block_metadata_interval)
&& keeper_config
.priority_fee_oracle_authority_keypair
.is_some()
{
info!("Updating priority fee block metadata...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::block_metadata::operations::fire(&keeper_config, &keeper_state).await,
);
}

// ---------------------- EMIT ---------------------------------

if should_fire(tick, metrics_interval) {
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::metrics_emit::fire(
&keeper_config,
&keeper_state,
keeper_config.cluster_name.as_str(),
)
.await,
);
for task in operation_queue.tasks.iter() {
if matches!(task.state, OperationState::Failed) {
error!("Operation failed: {}", task.operation);
}
}

if should_emit(tick, &intervals) {
info!("Emitting metrics...");
keeper_state.emit();

KeeperOperations::emit(
&keeper_state.runs_for_epoch,
&keeper_state.errors_for_epoch,
&keeper_state.txs_for_epoch,
keeper_config.cluster_name.as_str(),
);

KeeperCreates::emit(
&keeper_state.created_accounts_for_epoch,
&keeper_state.cluster_name,
);
}
operation_queue.reset_for_next_cycle();

// ---------- CLEAR STARTUP ----------
if should_clear_startup_flag(tick, &intervals) {
keeper_state.keeper_flags.unset_flag(KeeperFlag::Startup);
}
Expand Down
Loading
Loading