Skip to content

feat: backfill epoch rewards #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fake = { version = "2.10.0", features = ["derive"] }
rand = "0.8.5"
bigdecimal = "0.4.5"
strum = "0.26.3"
strum_macros = "0.26.3"
sha256 = "1.5.0"
rlimit = "0.10.2"
axum-prometheus = "0.7.0"
Expand Down
6 changes: 6 additions & 0 deletions rewards/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct AppConfig {
#[clap(long, env, default_value_t = 60)]
pub sleep_for: u64,

#[clap(
long,
help = "Crawl from given epoch and do not update crawler_state"
)]
pub backfill_from: Option<u32>,

#[clap(long, env)]
pub database_url: String,

Expand Down
48 changes: 28 additions & 20 deletions rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,28 @@ async fn main() -> Result<(), MainError> {

tracing::debug!("Querying epoch...");

let mut epoch;
loop {
epoch = namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?;

if epoch < 2 {
tracing::info!("Waiting for first epoch to happen...");
sleep(Duration::from_secs(config.sleep_for)).await;
} else {
break;
let mut epoch = config.backfill_from;

if epoch.is_none() {
loop {
epoch = Some(
namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?,
);

if epoch.unwrap_or(0) < 2 {
tracing::info!("Waiting for first epoch to happen...");
sleep(Duration::from_secs(config.sleep_for)).await;
} else {
break;
}
}
}

crawler::crawl(
move |epoch| crawling_fn(conn.clone(), client.clone(), epoch),
epoch,
epoch.unwrap_or(0),
None,
)
.await
Expand All @@ -97,6 +102,8 @@ async fn crawling_fn(
return Err(MainError::NoAction);
}

tracing::info!("Starting to update proposals...");

// TODO: change this by querying all the pairs in the database
let delegations_pairs = namada_service::query_delegation_pairs(&client)
.await
Expand All @@ -108,9 +115,14 @@ async fn crawling_fn(
"Querying rewards..."
);

let rewards = namada_service::query_rewards(&client, &delegations_pairs)
.await
.into_rpc_error()?;
let rewards = namada_service::query_rewards(
&client,
&delegations_pairs,
epoch_to_process,
)
.await
.into_rpc_error()?;

let non_zero_rewards = rewards
.iter()
.filter(|reward| !reward.amount.is_zero())
Expand All @@ -128,17 +140,13 @@ async fn crawling_fn(
"Queried rewards successfully",
);

let current_epoch = namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?;

conn.interact(move |conn| {
conn.build_transaction().read_write().run(
|transaction_conn: &mut diesel::pg::PgConnection| {
repository::pos_rewards::upsert_rewards(
transaction_conn,
non_zero_rewards,
current_epoch as i32,
epoch_to_process as i32,
)?;

repository::crawler_state::upsert_crawler_state(
Expand Down
11 changes: 6 additions & 5 deletions rewards/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn query_delegation_pairs(
pub async fn query_rewards(
client: &HttpClient,
delegation_pairs: &HashSet<DelegationPair>,
epoch: Epoch,
) -> anyhow::Result<Vec<Reward>> {
let mut all_rewards: Vec<Reward> = Vec::new();

Expand All @@ -60,7 +61,7 @@ pub async fn query_rewards(
);

let results = futures::stream::iter(batches)
.map(|batch| process_batch_with_retries(client, batch))
.map(|batch| process_batch_with_retries(client, batch, epoch))
.buffer_unordered(3)
.collect::<Vec<_>>()
.await;
Expand Down Expand Up @@ -88,12 +89,13 @@ pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result<Epoch> {
async fn process_batch_with_retries(
client: &HttpClient,
batch: (usize, Vec<DelegationPair>),
epoch: Epoch,
) -> anyhow::Result<Vec<Reward>> {
let mut retries = 0;

tracing::info!("Processing batch {}", batch.0);
loop {
let result = process_batch(client, batch.1.clone()).await;
let result = process_batch(client, batch.1.clone(), epoch).await;

match result {
Ok(rewards) => {
Expand Down Expand Up @@ -124,9 +126,8 @@ async fn process_batch_with_retries(
async fn process_batch(
client: &HttpClient,
batch: Vec<DelegationPair>,
epoch: Epoch,
) -> anyhow::Result<Vec<Reward>> {
let epoch = get_current_epoch(client).await?;

Ok(futures::stream::iter(batch)
.filter_map(|delegation| async move {
tracing::debug!(
Expand All @@ -142,7 +143,7 @@ async fn process_batch(
client,
&delegation.validator_address.clone().into(),
&Some(delegation.delegator_address.clone().into()),
&None,
&Some((epoch as u64).into()),
)
.await
.ok()?;
Expand Down
Loading