Skip to content

Commit 5abb48a

Browse files
committed
[rewards] Crawl specified epoch instead of assuming it is current
1 parent 138d204 commit 5abb48a

File tree

4 files changed

+41
-25
lines changed

4 files changed

+41
-25
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ fake = { version = "2.10.0", features = ["derive"] }
7878
rand = "0.8.5"
7979
bigdecimal = "0.4.5"
8080
strum = "0.26.3"
81+
strum_macros = "0.26.3"
8182
sha256 = "1.5.0"
8283
rlimit = "0.10.2"
8384
axum-prometheus = "0.7.0"

rewards/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ pub struct AppConfig {
2323
#[clap(long, env, default_value_t = 60)]
2424
pub sleep_for: u64,
2525

26+
#[clap(
27+
long,
28+
help = "Crawl from given epoch and do not update crawler_state"
29+
)]
30+
pub backfill_from: Option<u32>,
31+
2632
#[clap(long, env)]
2733
pub database_url: String,
2834

rewards/src/main.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,28 @@ async fn main() -> Result<(), MainError> {
5656

5757
tracing::debug!("Querying epoch...");
5858

59-
let mut epoch;
60-
loop {
61-
epoch = namada_service::get_current_epoch(&client)
62-
.await
63-
.into_rpc_error()?;
64-
65-
if epoch < 2 {
66-
tracing::info!("Waiting for first epoch to happen...");
67-
sleep(Duration::from_secs(config.sleep_for)).await;
68-
} else {
69-
break;
59+
let mut epoch = config.backfill_from;
60+
61+
if epoch.is_none() {
62+
loop {
63+
epoch = Some(
64+
namada_service::get_current_epoch(&client)
65+
.await
66+
.into_rpc_error()?,
67+
);
68+
69+
if epoch.unwrap_or(0) < 2 {
70+
tracing::info!("Waiting for first epoch to happen...");
71+
sleep(Duration::from_secs(config.sleep_for)).await;
72+
} else {
73+
break;
74+
}
7075
}
7176
}
7277

7378
crawler::crawl(
7479
move |epoch| crawling_fn(conn.clone(), client.clone(), epoch),
75-
epoch,
80+
epoch.unwrap_or(0),
7681
None,
7782
)
7883
.await
@@ -97,6 +102,8 @@ async fn crawling_fn(
97102
return Err(MainError::NoAction);
98103
}
99104

105+
tracing::info!("Starting to update proposals...");
106+
100107
// TODO: change this by querying all the pairs in the database
101108
let delegations_pairs = namada_service::query_delegation_pairs(&client)
102109
.await
@@ -108,9 +115,14 @@ async fn crawling_fn(
108115
"Querying rewards..."
109116
);
110117

111-
let rewards = namada_service::query_rewards(&client, &delegations_pairs)
112-
.await
113-
.into_rpc_error()?;
118+
let rewards = namada_service::query_rewards(
119+
&client,
120+
&delegations_pairs,
121+
epoch_to_process,
122+
)
123+
.await
124+
.into_rpc_error()?;
125+
114126
let non_zero_rewards = rewards
115127
.iter()
116128
.filter(|reward| !reward.amount.is_zero())
@@ -128,17 +140,13 @@ async fn crawling_fn(
128140
"Queried rewards successfully",
129141
);
130142

131-
let current_epoch = namada_service::get_current_epoch(&client)
132-
.await
133-
.into_rpc_error()?;
134-
135143
conn.interact(move |conn| {
136144
conn.build_transaction().read_write().run(
137145
|transaction_conn: &mut diesel::pg::PgConnection| {
138146
repository::pos_rewards::upsert_rewards(
139147
transaction_conn,
140148
non_zero_rewards,
141-
current_epoch as i32,
149+
epoch_to_process as i32,
142150
)?;
143151

144152
repository::crawler_state::upsert_crawler_state(

rewards/src/services/namada.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub async fn query_delegation_pairs(
4141
pub async fn query_rewards(
4242
client: &HttpClient,
4343
delegation_pairs: &HashSet<DelegationPair>,
44+
epoch: Epoch,
4445
) -> anyhow::Result<Vec<Reward>> {
4546
let mut all_rewards: Vec<Reward> = Vec::new();
4647

@@ -60,7 +61,7 @@ pub async fn query_rewards(
6061
);
6162

6263
let results = futures::stream::iter(batches)
63-
.map(|batch| process_batch_with_retries(client, batch))
64+
.map(|batch| process_batch_with_retries(client, batch, epoch))
6465
.buffer_unordered(3)
6566
.collect::<Vec<_>>()
6667
.await;
@@ -88,12 +89,13 @@ pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result<Epoch> {
8889
async fn process_batch_with_retries(
8990
client: &HttpClient,
9091
batch: (usize, Vec<DelegationPair>),
92+
epoch: Epoch,
9193
) -> anyhow::Result<Vec<Reward>> {
9294
let mut retries = 0;
9395

9496
tracing::info!("Processing batch {}", batch.0);
9597
loop {
96-
let result = process_batch(client, batch.1.clone()).await;
98+
let result = process_batch(client, batch.1.clone(), epoch).await;
9799

98100
match result {
99101
Ok(rewards) => {
@@ -124,9 +126,8 @@ async fn process_batch_with_retries(
124126
async fn process_batch(
125127
client: &HttpClient,
126128
batch: Vec<DelegationPair>,
129+
epoch: Epoch,
127130
) -> anyhow::Result<Vec<Reward>> {
128-
let epoch = get_current_epoch(client).await?;
129-
130131
Ok(futures::stream::iter(batch)
131132
.filter_map(|delegation| async move {
132133
tracing::debug!(
@@ -142,7 +143,7 @@ async fn process_batch(
142143
client,
143144
&delegation.validator_address.clone().into(),
144145
&Some(delegation.delegator_address.clone().into()),
145-
&None,
146+
&Some((epoch as u64).into()),
146147
)
147148
.await
148149
.ok()?;

0 commit comments

Comments
 (0)