Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 93 additions & 189 deletions keepers/stakenet-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se
use clap::Parser;
use dotenvy::dotenv;
use log::*;
use rand::Rng;
use rusqlite::Connection;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_metrics::set_host_id;
Expand All @@ -15,12 +14,12 @@ use stakenet_keeper::{
operations::{
self,
block_metadata::db::create_sqlite_tables,
keeper_operations::{set_flag, KeeperCreates, KeeperOperations},
keeper_operations::{set_flag, KeeperOperations},
},
state::{
keeper_config::{Args, KeeperConfig},
keeper_state::{KeeperFlag, KeeperState},
update_state::{create_missing_accounts, post_create_update, pre_create_update},
operation::{OperationQueue, OperationState},
},
};
use std::{process::Command, sync::Arc, time::Duration};
Expand All @@ -30,6 +29,10 @@ use tokio::time::sleep;
fn set_run_flags(args: &Args) -> u32 {
let mut run_flags = 0;

run_flags = set_flag(run_flags, KeeperOperations::PreCreateUpdate);
run_flags = set_flag(run_flags, KeeperOperations::CreateMissingAccounts);
run_flags = set_flag(run_flags, KeeperOperations::PostCreateUpdate);

if args.run_cluster_history {
run_flags = set_flag(run_flags, KeeperOperations::ClusterHistory);
}
Expand Down Expand Up @@ -60,9 +63,6 @@ fn set_run_flags(args: &Args) -> u32 {
if args.run_priority_fee_commission {
run_flags = set_flag(run_flags, KeeperOperations::PriorityFeeCommission);
}
if args.run_directed_staking {
run_flags = set_flag(run_flags, KeeperOperations::DirectedStaking);
}

run_flags
}
Expand All @@ -72,18 +72,6 @@ fn should_clear_startup_flag(tick: u64, intervals: &[u64]) -> bool {
tick % (max_interval + 1) == 0
}

fn should_emit(tick: u64, intervals: &[u64]) -> bool {
intervals.iter().any(|interval| tick % (interval + 1) == 0)
}

fn should_update(tick: u64, intervals: &[u64]) -> bool {
intervals.iter().any(|interval| tick % interval == 0)
}

fn should_fire(tick: u64, interval: u64) -> bool {
tick % interval == 0
}

fn advance_tick(tick: &mut u64) {
*tick += 1;
}
Expand All @@ -93,15 +81,6 @@ async fn sleep_and_tick(tick: &mut u64) {
advance_tick(tick);
}

/// To reduce transaction collisions, we sleep a random amount after any emit
async fn random_cooldown(range: u8) {
let mut rng = rand::thread_rng();
let sleep_duration = rng.gen_range(0..=60 * (range as u64 + 1));

info!("\n\n⏰ Cooldown for {} seconds\n", sleep_duration);
sleep(Duration::from_secs(sleep_duration)).await;
}

async fn run_keeper(keeper_config: KeeperConfig) {
// Intervals
let metrics_interval = keeper_config.metrics_interval;
Expand All @@ -120,6 +99,30 @@ async fn run_keeper(keeper_config: KeeperConfig) {
let mut keeper_state = KeeperState::default();
keeper_state.set_cluster_name(&keeper_config.cluster_name);

let mut operation_queue = OperationQueue::new(
keeper_config.validator_history_interval,
keeper_config.steward_interval,
keeper_config.block_metadata_interval,
keeper_config.metrics_interval,
keeper_config.run_flags,
);
let mut last_seen_epoch = keeper_config
.client
.get_epoch_info()
.await
.map(|epoch_info| Some(epoch_info.epoch))
.unwrap_or(None);

info!(
"Operations: {}",
operation_queue
.tasks
.iter()
.map(|o| o.operation.to_string())
.collect::<Vec<String>>()
.join(",")
);

let smallest_interval = intervals.iter().min().unwrap();
let mut tick: u64 = *smallest_interval; // 1 second ticks - start at metrics interval

Expand All @@ -128,187 +131,62 @@ async fn run_keeper(keeper_config: KeeperConfig) {
}

loop {
// ---------------------- FETCH -----------------------------------
// The fetch ( update ) functions fetch everything we need for the operations from the blockchain
// Additionally, this function will update the keeper state. If update fails - it will skip the fire functions.
if should_update(tick, &intervals) {
info!("Pre-fetching data for update...({})", tick);
match pre_create_update(&keeper_config, &mut keeper_state).await {
Ok(_) => {
keeper_state.increment_update_run_for_epoch(KeeperOperations::PreCreateUpdate);
}
Err(e) => {
error!("Failed to pre create update: {:?}", e);

keeper_state
.increment_update_error_for_epoch(KeeperOperations::PreCreateUpdate);

advance_tick(&mut tick);
continue;
}
}

if keeper_config.pay_for_new_accounts {
info!("Creating missing accounts...({})", tick);
match create_missing_accounts(&keeper_config, &keeper_state).await {
Ok(new_accounts_created) => {
keeper_state.increment_update_run_for_epoch(
KeeperOperations::CreateMissingAccounts,
);

let total_txs: usize =
new_accounts_created.iter().map(|(_, txs)| txs).sum();
keeper_state.increment_update_txs_for_epoch(
KeeperOperations::CreateMissingAccounts,
total_txs as u64,
if let Some(seen_epoch) = last_seen_epoch {
match keeper_config.client.get_epoch_info().await {
Ok(epoch_info) => {
let current_epoch = epoch_info.epoch;
keeper_state.epoch_info = epoch_info;

if current_epoch > seen_epoch {
info!(
"EPOCH TRANSITION! {seen_epoch} -> {current_epoch} - IMMEDIATE STEWARD!",
);
last_seen_epoch = Some(current_epoch);

// Fire Steward immediately
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::steward::fire(&keeper_config, &keeper_state).await,
);

new_accounts_created
.iter()
.for_each(|(operation, created_accounts)| {
keeper_state.increment_creations_for_epoch((
operation.clone(),
*created_accounts as u64,
));
});
info!("Epoch start Steward crank completed");
}
Err(e) => {
error!("Failed to create missing accounts: {:?}", e);

keeper_state.increment_update_error_for_epoch(
KeeperOperations::CreateMissingAccounts,
);

advance_tick(&mut tick);
continue;
}
}
}

info!("Post-fetching data for update...({})", tick);
match post_create_update(&keeper_config, &mut keeper_state).await {
Ok(_) => {
keeper_state.increment_update_run_for_epoch(KeeperOperations::PostCreateUpdate);
}
Err(e) => {
error!("Failed to post create update: {:?}", e);

keeper_state
.increment_update_error_for_epoch(KeeperOperations::PostCreateUpdate);

advance_tick(&mut tick);
continue;
}
Err(e) => error!("Failed to check epoch: {e:?}"),
}
}

// ---------------------- FIRE ------------------------------------
operation_queue.mark_should_fire(tick);

// VALIDATOR HISTORY
if should_fire(tick, validator_history_interval) {
info!("Firing operations...");
while let Some(task) = operation_queue.get_next_pending() {
let operation = task.operation;

info!("Updating cluster history...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::cluster_history::fire(&keeper_config, &keeper_state).await,
);

info!("Updating copy vote accounts...");
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::vote_account::fire(&keeper_config, &keeper_state).await,
);

info!("Updating mev commission...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::mev_commission::fire(&keeper_config, &keeper_state).await,
);

info!("Updating mev earned...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::mev_earned::fire(&keeper_config, &keeper_state).await,
);

if keeper_config.oracle_authority_keypair.is_some() {
info!("Updating stake accounts...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::stake_upload::fire(&keeper_config, &keeper_state).await,
);
}

if keeper_config.oracle_authority_keypair.is_some()
&& keeper_config.gossip_entrypoints.is_some()
if let Err(e) = operation
.execute(&keeper_config, &mut keeper_state, &mut operation_queue)
.await
{
info!("Updating gossip accounts...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::gossip_upload::fire(&keeper_config, &keeper_state).await,
);
error!("Operation {operation:?} failed, stopping execution: {e:?}",);
break;
}

info!("Updating priority fee commission...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::priority_fee_commission::fire(&keeper_config, &keeper_state).await,
);

if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) {
random_cooldown(keeper_config.cool_down_range).await;
// After sending many tx, check epoch info
if operation.is_heavy_operation() {
check_and_fire_steward_on_epoch_transition(
&keeper_config,
&mut keeper_state,
&mut last_seen_epoch,
)
.await;
}
}

// STEWARD
if should_fire(tick, steward_interval) {
info!("Cranking Steward...");
keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::steward::fire(&keeper_config, &keeper_state).await,
);

if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) {
random_cooldown(keeper_config.cool_down_range).await;
for task in operation_queue.tasks.iter() {
if matches!(task.state, OperationState::Failed) {
error!("Operation failed: {}", task.operation);
}
}

// PRIORITY FEE BLOCK METADATA
if should_fire(tick, block_metadata_interval)
&& keeper_config
.priority_fee_oracle_authority_keypair
.is_some()
{
info!("Updating priority fee block metadata...");
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::block_metadata::operations::fire(&keeper_config, &keeper_state).await,
);
}

// ---------------------- EMIT ---------------------------------

if should_fire(tick, metrics_interval) {
keeper_state.set_runs_errors_and_txs_for_epoch(
operations::metrics_emit::fire(
&keeper_config,
&keeper_state,
keeper_config.cluster_name.as_str(),
)
.await,
);
}

if should_emit(tick, &intervals) {
info!("Emitting metrics...");
keeper_state.emit();

KeeperOperations::emit(
&keeper_state.runs_for_epoch,
&keeper_state.errors_for_epoch,
&keeper_state.txs_for_epoch,
keeper_config.cluster_name.as_str(),
);

KeeperCreates::emit(
&keeper_state.created_accounts_for_epoch,
&keeper_state.cluster_name,
);
}
operation_queue.reset_for_next_cycle();

// ---------- CLEAR STARTUP ----------
if should_clear_startup_flag(tick, &intervals) {
keeper_state.keeper_flags.unset_flag(KeeperFlag::Startup);
}
Expand All @@ -318,6 +196,32 @@ async fn run_keeper(keeper_config: KeeperConfig) {
}
}

async fn check_and_fire_steward_on_epoch_transition(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditional here should actually be if the Stake Pool has updated, last_updated_epoch I think this is the field. Since increase/decrease commands are locked until that has completed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, changed the if condition

    /// Check `last_update_epoch` field in Stake Pool
    ///
    /// # Process
    ///
    /// 1. Check the field `last_update_epoch` in StakePool account
    /// 2. If the `last_update_epoch` has updated, trigger steward operation
    /// 3. Otherwise, continue next operation
    pub async fn check_last_update_epoch(
        &mut self,
        client: Arc<RpcClient>,
        last_seen_epoch: &mut Option<u64>,
    ) -> bool {
        if let Some(all_steward_accounts) = self.all_steward_accounts.as_ref() {
            if let Ok(stake_pool) =
                get_stake_pool_account(&client, &all_steward_accounts.stake_pool_address).await
            {
                if let Some(ref mut seen_epoch) = last_seen_epoch {
                    let has_updated = stake_pool.last_update_epoch > *seen_epoch;
                    *seen_epoch = stake_pool.last_update_epoch;
                    return has_updated;
                } else {
                    *last_seen_epoch = Some(stake_pool.last_update_epoch);
                    return false;
                }
            }
        }

        false
    }

keeper_config: &KeeperConfig,
keeper_state: &mut KeeperState,
last_seen_epoch: &mut Option<u64>,
) {
if let Ok(epoch_info) = keeper_config.client.get_epoch_info().await {
if let Some(ref seen_epoch) = last_seen_epoch {
if epoch_info.epoch > *seen_epoch {
info!(
"EPOCH TRANSITION DETECTED DURING OPERATION! {seen_epoch} -> {}",
epoch_info.epoch
);

*last_seen_epoch = Some(epoch_info.epoch);
keeper_state.epoch_info = epoch_info;

keeper_state.set_runs_errors_txs_and_flags_for_epoch(
operations::steward::fire(keeper_config, keeper_state).await,
);

info!("Epoch transition Steward crank completed, resuming operations");
}
}
}
}

fn main() {
info!("\n👋 Welcome to the Jito Stakenet Keeper!\n\n");

Expand Down
Loading
Loading