Skip to content

Commit a8e87b1

Browse files
authored
Merge pull request #204 from neocybereth/feat/add-total-reward-for-epoch
feat: add total reward for epoch for each user
2 parents 2f5f8de + f41e94f commit a8e87b1

File tree

15 files changed

+198
-23
lines changed

15 files changed

+198
-23
lines changed

chain/src/repository/pos.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,8 @@ pub fn delete_claimed_rewards(
248248
return Ok(());
249249
}
250250

251-
let mut query = diesel::delete(pos_rewards::table).into_boxed();
252-
253251
for (owner, validator_id) in reward_claimers {
254-
query = query.or_filter(
252+
let target = pos_rewards::table.filter(
255253
pos_rewards::owner.eq(owner.to_string()).and(
256254
pos_rewards::validator_id.eq_any(
257255
validators::table.select(validators::columns::id).filter(
@@ -261,11 +259,12 @@ pub fn delete_claimed_rewards(
261259
),
262260
),
263261
);
264-
}
265262

266-
query
267-
.execute(transaction_conn)
268-
.context("Failed to remove pos rewards from db")?;
263+
diesel::update(target)
264+
.set(pos_rewards::claimed.eq(true))
265+
.execute(transaction_conn)
266+
.context("Failed to update pos rewards in db")?;
267+
}
269268

270269
anyhow::Ok(())
271270
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
ALTER table pos_rewards ADD CONSTRAINT pos_rewards_owner_validator_id_key UNIQUE (owner, validator_id);
2+
ALTER table pos_rewards DROP CONSTRAINT pos_rewards_owner_validator_id_epoch_key;
3+
ALTER TABLE pos_rewards DROP COLUMN epoch;
4+
ALTER TABLE pos_rewards DROP COLUMN claimed;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- Populate existing records with claimed = false
2+
ALTER TABLE pos_rewards ADD COLUMN claimed BOOLEAN DEFAULT FALSE;
3+
-- Populate existing records with epoch = 0
4+
ALTER TABLE pos_rewards ADD COLUMN epoch INTEGER NOT NULL DEFAULT 0;
5+
-- Now we can safely drop the default
6+
ALTER TABLE pos_rewards ALTER COLUMN epoch DROP DEFAULT;
7+
-- Also update the UNIQUE constraint to include the epoch column
8+
ALTER table pos_rewards ADD CONSTRAINT pos_rewards_owner_validator_id_epoch_key unique (owner, validator_id, epoch);
9+
ALTER table pos_rewards DROP CONSTRAINT pos_rewards_owner_validator_id_key;

orm/src/pos_rewards.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,27 @@ use shared::rewards::Reward;
66

77
use crate::schema::pos_rewards;
88

9-
#[derive(Insertable, Clone, Queryable, Selectable)]
9+
#[derive(Insertable, Queryable, Selectable, Clone)]
1010
#[diesel(table_name = pos_rewards)]
11-
#[diesel(check_for_backend(diesel::pg::Pg))]
1211
pub struct PosRewardInsertDb {
1312
pub owner: String,
1413
pub validator_id: i32,
1514
pub raw_amount: BigDecimal,
15+
pub epoch: i32,
16+
pub claimed: bool,
1617
}
1718

1819
pub type PoSRewardDb = PosRewardInsertDb;
1920

2021
impl PosRewardInsertDb {
21-
pub fn from_reward(reward: Reward, validator_id: i32) -> Self {
22-
Self {
22+
pub fn from_reward(reward: Reward, validator_id: i32, epoch: i32) -> Self {
23+
PosRewardInsertDb {
2324
owner: reward.delegation_pair.delegator_address.to_string(),
24-
raw_amount: BigDecimal::from_str(&reward.amount.to_string())
25-
.expect("Invalid amount"),
2625
validator_id,
26+
raw_amount: BigDecimal::from_str(&reward.amount.to_string())
27+
.unwrap(),
28+
epoch,
29+
claimed: false,
2730
}
2831
}
2932
}

orm/src/schema.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ diesel::table! {
359359
owner -> Varchar,
360360
validator_id -> Int4,
361361
raw_amount -> Numeric,
362+
epoch -> Int4,
363+
claimed -> Bool,
362364
}
363365
}
364366

rewards/src/main.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,25 @@ async fn crawling_fn(
117117
"Queried rewards successfully",
118118
);
119119

120+
let current_epoch = namada_service::get_current_epoch(&client)
121+
.await
122+
.into_rpc_error()?;
123+
120124
conn.interact(move |conn| {
121125
conn.build_transaction().read_write().run(
122-
|transaction_conn: &mut diesel::prelude::PgConnection| {
126+
|transaction_conn: &mut diesel::pg::PgConnection| {
123127
repository::pos_rewards::upsert_rewards(
124128
transaction_conn,
125129
non_zero_rewards,
130+
current_epoch as i32,
126131
)?;
127132

128133
repository::crawler_state::upsert_crawler_state(
129134
transaction_conn,
130135
(CrawlerName::Rewards, crawler_state).into(),
131136
)?;
132137

133-
anyhow::Ok(())
138+
Ok(())
134139
},
135140
)
136141
})

rewards/src/repository/pos_rewards.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use super::utils::MAX_PARAM_SIZE;
1111
pub fn upsert_rewards(
1212
transaction_conn: &mut PgConnection,
1313
rewards: Vec<Reward>,
14+
epoch: i32,
1415
) -> anyhow::Result<()> {
1516
let rewards_col_count = pos_rewards::all_columns.len() as i64;
1617

@@ -19,7 +20,7 @@ pub fn upsert_rewards(
1920
.collect::<Vec<_>>()
2021
.chunks((MAX_PARAM_SIZE as i64 / rewards_col_count) as usize)
2122
{
22-
upsert_rewards_chunk(transaction_conn, chunk.to_vec())?;
23+
upsert_rewards_chunk(transaction_conn, chunk.to_vec(), epoch)?;
2324
}
2425

2526
anyhow::Ok(())
@@ -28,6 +29,7 @@ pub fn upsert_rewards(
2829
fn upsert_rewards_chunk(
2930
transaction_conn: &mut PgConnection,
3031
rewards: Vec<Reward>,
32+
epoch: i32,
3133
) -> anyhow::Result<()> {
3234
diesel::insert_into(pos_rewards::table)
3335
.values::<Vec<PosRewardInsertDb>>(
@@ -45,13 +47,14 @@ fn upsert_rewards_chunk(
4547
.first(transaction_conn)
4648
.expect("Failed to get validator");
4749

48-
PosRewardInsertDb::from_reward(reward, validator_id)
50+
PosRewardInsertDb::from_reward(reward, validator_id, epoch)
4951
})
5052
.collect::<Vec<_>>(),
5153
)
5254
.on_conflict((
5355
pos_rewards::columns::owner,
5456
pos_rewards::columns::validator_id,
57+
pos_rewards::columns::epoch, // Add epoch to conflict target
5558
))
5659
.do_update()
5760
.set(

rewards/src/services/namada.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ async fn process_batch(
125125
client: &HttpClient,
126126
batch: Vec<DelegationPair>,
127127
) -> anyhow::Result<Vec<Reward>> {
128+
let epoch = get_current_epoch(client).await?;
129+
128130
Ok(futures::stream::iter(batch)
129131
.filter_map(|delegation| async move {
130132
tracing::debug!(
@@ -154,6 +156,7 @@ async fn process_batch(
154156
Some(Reward {
155157
delegation_pair: delegation.clone(),
156158
amount: Amount::from(reward),
159+
epoch: epoch as i32,
157160
})
158161
})
159162
.map(futures::future::ready)

seeder/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ async fn main() -> anyhow::Result<(), MainError> {
177177
.into_iter()
178178
.map(|reward| {
179179
let validator_id = reward.delegation_pair.validator_address.to_string().parse::<i32>().unwrap();
180-
PosRewardInsertDb::from_reward(reward, validator_id)
180+
let epoch = reward.epoch;
181+
PosRewardInsertDb::from_reward(reward, validator_id, epoch)
181182
})
182183
.collect::<Vec<_>>(),
183184
)

shared/src/rewards.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::utils::DelegationPair;
66
pub struct Reward {
77
pub delegation_pair: DelegationPair,
88
pub amount: Amount,
9+
pub epoch: i32,
910
}
1011

1112
impl Reward {
@@ -19,6 +20,7 @@ impl Reward {
1920
delegator_address: Id::Account(delegator_address.to_string()),
2021
},
2122
amount: Amount::fake(),
23+
epoch: 0,
2224
}
2325
}
2426
}

webserver/src/app.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ impl ApplicationServer {
7575
get(pos_handlers::get_withdraws),
7676
)
7777
.route("/pos/reward/:address", get(pos_handlers::get_rewards))
78+
.route(
79+
"/pos/reward/:delegator/:validator/:epoch",
80+
get(pos_handlers::get_rewards_by_delegator_and_validator_and_epoch),
81+
)
7882
.route(
7983
"/pos/voting-power",
8084
get(pos_handlers::get_total_voting_power),

webserver/src/dto/pos.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,9 @@ pub struct WithdrawsDto {
8888
#[validate(range(min = 1, max = 10000))]
8989
pub epoch: Option<u64>,
9090
}
91+
92+
#[derive(Clone, Serialize, Deserialize, Validate)]
93+
pub struct RewardsDto {
94+
#[validate(range(min = 1, max = 10000))]
95+
pub epoch: Option<u64>,
96+
}

webserver/src/handler/pos.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use axum_extra::extract::Query;
55
use axum_macros::debug_handler;
66

77
use crate::dto::pos::{
8-
AllValidatorsQueryParams, BondsDto, UnbondsDto, ValidatorQueryParams,
9-
ValidatorStateDto, WithdrawsDto,
8+
AllValidatorsQueryParams, BondsDto, RewardsDto, UnbondsDto,
9+
ValidatorQueryParams, ValidatorStateDto, WithdrawsDto,
1010
};
1111
use crate::error::api::ApiError;
1212
use crate::response::pos::{
@@ -149,10 +149,29 @@ pub async fn get_withdraws(
149149
#[debug_handler]
150150
pub async fn get_rewards(
151151
_headers: HeaderMap,
152+
query: Query<RewardsDto>,
152153
Path(address): Path<String>,
153154
State(state): State<CommonState>,
154155
) -> Result<Json<Vec<Reward>>, ApiError> {
155-
let rewards = state.pos_service.get_rewards_by_address(address).await?;
156+
let rewards = state
157+
.pos_service
158+
.get_rewards_by_address(address, query.epoch)
159+
.await?;
160+
Ok(Json(rewards))
161+
}
162+
163+
#[debug_handler]
164+
pub async fn get_rewards_by_delegator_and_validator_and_epoch(
165+
_headers: HeaderMap,
166+
Path((delegator, validator, epoch)): Path<(String, String, u64)>,
167+
State(state): State<CommonState>,
168+
) -> Result<Json<Vec<Reward>>, ApiError> {
169+
let rewards = state
170+
.pos_service
171+
.get_rewards_by_delegator_and_validator_and_epoch(
172+
delegator, validator, epoch,
173+
)
174+
.await?;
156175
Ok(Json(rewards))
157176
}
158177

webserver/src/repository/pos.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ pub trait PosRepositoryTrait {
4949
id: i32,
5050
) -> Result<Option<ValidatorDb>, String>;
5151

52+
async fn find_validator_by_address(
53+
&self,
54+
address: String,
55+
) -> Result<Option<ValidatorDb>, String>;
56+
5257
async fn find_merged_bonds_by_address(
5358
&self,
5459
address: String,
@@ -97,6 +102,14 @@ pub trait PosRepositoryTrait {
97102
async fn find_rewards_by_address(
98103
&self,
99104
address: String,
105+
epoch: Option<u64>,
106+
) -> Result<Vec<PoSRewardDb>, String>;
107+
108+
async fn find_rewards_by_delegator_and_validator_and_epoch(
109+
&self,
110+
delegator: String,
111+
validator_id: i32,
112+
epoch: u64,
100113
) -> Result<Vec<PoSRewardDb>, String>;
101114

102115
async fn get_total_voting_power(&self) -> Result<Option<i64>, String>;
@@ -188,6 +201,23 @@ impl PosRepositoryTrait for PosRepository {
188201
.map_err(|e| e.to_string())
189202
}
190203

204+
async fn find_validator_by_address(
205+
&self,
206+
address: String,
207+
) -> Result<Option<ValidatorDb>, String> {
208+
let conn = self.app_state.get_db_connection().await;
209+
210+
conn.interact(move |conn| {
211+
validators::table
212+
.filter(validators::dsl::namada_address.eq(address))
213+
.select(ValidatorDb::as_select())
214+
.first(conn)
215+
.ok()
216+
})
217+
.await
218+
.map_err(|e| e.to_string())
219+
}
220+
191221
async fn find_bonds_by_address(
192222
&self,
193223
address: String,
@@ -344,12 +374,50 @@ impl PosRepositoryTrait for PosRepository {
344374
async fn find_rewards_by_address(
345375
&self,
346376
address: String,
377+
epoch: Option<u64>,
378+
) -> Result<Vec<PoSRewardDb>, String> {
379+
let conn = self.app_state.get_db_connection().await;
380+
381+
conn.interact(
382+
move |conn| -> Result<Vec<PoSRewardDb>, diesel::result::Error> {
383+
let epoch = match epoch {
384+
Some(e) => e as i32,
385+
None => {
386+
// Properly propagate database errors with ?
387+
let max_epoch = pos_rewards::table
388+
.select(diesel::dsl::max(pos_rewards::epoch))
389+
.first::<Option<i32>>(conn)?;
390+
391+
max_epoch.unwrap_or(0) // This is fine - just unwrapping the Option, not a Result
392+
}
393+
};
394+
395+
// Propagate errors from this query too
396+
pos_rewards::table
397+
.filter(pos_rewards::epoch.eq(&epoch))
398+
.filter(pos_rewards::dsl::owner.eq(&address))
399+
.select(PoSRewardDb::as_select())
400+
.get_results(conn)
401+
},
402+
)
403+
.await
404+
.map_err(|e| e.to_string())?
405+
.map_err(|e| e.to_string())
406+
}
407+
408+
async fn find_rewards_by_delegator_and_validator_and_epoch(
409+
&self,
410+
delegator: String,
411+
validator_id: i32,
412+
epoch: u64,
347413
) -> Result<Vec<PoSRewardDb>, String> {
348414
let conn = self.app_state.get_db_connection().await;
349415

350416
conn.interact(move |conn| {
351417
pos_rewards::table
352-
.filter(pos_rewards::dsl::owner.eq(address))
418+
.filter(pos_rewards::dsl::owner.eq(delegator))
419+
.filter(pos_rewards::dsl::validator_id.eq(validator_id))
420+
.filter(pos_rewards::dsl::epoch.eq(epoch as i32))
353421
.select(PoSRewardDb::as_select())
354422
.get_results(conn)
355423
})

0 commit comments

Comments
 (0)