Skip to content

feat: add total reward for epoch for each user #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a3040ff
feat: add total reward for epoch
neocybereth Dec 13, 2024
523a368
fix: add migration to add epoch field to rewwrds table
neocybereth Dec 13, 2024
5030f97
fix: restore sample_env
neocybereth Dec 13, 2024
3540112
fix: add rewards migrations
neocybereth Dec 16, 2024
90fecd6
fix: add pos_rewards table
neocybereth Dec 16, 2024
2146524
fix: update migrations
neocybereth Dec 17, 2024
59726d0
fix: update migration
neocybereth Dec 17, 2024
149ed55
fix: update accoridng to review
neocybereth Dec 17, 2024
92e0aa7
fix: formatting on schema
neocybereth Dec 30, 2024
c61a2e6
fix: update rewards epoch func
neocybereth Dec 30, 2024
0957a4a
fix: orm pos_rewards fix for bigdecimal
neocybereth Dec 30, 2024
1244bab
fix: update sql statements
neocybereth Dec 30, 2024
fae5e1f
fix: add webserver endpoint
neocybereth Dec 30, 2024
f2aade5
fix: update rewards deletion
neocybereth Jan 11, 2025
abd16ec
fix: add update to tables
neocybereth Jan 11, 2025
b129655
fix: add epoch to rewawrd finder
neocybereth Jan 11, 2025
64a9fa6
fix: add get reward by latest epoch
neocybereth Jan 11, 2025
de515f0
fix: remove unnecessary parens
neocybereth Jan 11, 2025
d36ec9e
fix: update according to joels commit 02ada1263c8e07f7e14fd323189307a…
neocybereth Jan 14, 2025
bae0188
fix: remove unnecessary type
neocybereth Jan 14, 2025
7320744
fix: add in 68d63cf0aae052e7ee0a93bc7d4b65a4b2681bf1
neocybereth Feb 4, 2025
c393c22
fix: merge
neocybereth Feb 21, 2025
cc051da
fix: bring in new formatter
neocybereth Mar 11, 2025
a77e62b
fix: unused import
neocybereth Mar 11, 2025
f4f37fd
fix: update
neocybereth Mar 11, 2025
b11aa17
fix: fmt
neocybereth Mar 11, 2025
0548085
trade tracing error for debug
neocybereth Mar 11, 2025
9a9cb88
clone validator_address
neocybereth Mar 11, 2025
a479b2e
fix: propagate DB errors
neocybereth Mar 11, 2025
ee6bc67
fix: find validator by addy
neocybereth Mar 11, 2025
f41e94f
fix: fmt
neocybereth Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions chain/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ pub fn delete_claimed_rewards(
return Ok(());
}

let mut query = diesel::delete(pos_rewards::table).into_boxed();

for (owner, validator_id) in reward_claimers {
query = query.or_filter(
let target = pos_rewards::table.filter(
pos_rewards::owner.eq(owner.to_string()).and(
pos_rewards::validator_id.eq_any(
validators::table.select(validators::columns::id).filter(
Expand All @@ -220,11 +218,12 @@ pub fn delete_claimed_rewards(
),
),
);
}

query
.execute(transaction_conn)
.context("Failed to remove pos rewards from db")?;
diesel::update(target)
.set(pos_rewards::claimed.eq(true))
.execute(transaction_conn)
.context("Failed to update pos rewards in db")?;
}

anyhow::Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER table pos_rewards ADD CONSTRAINT pos_rewards_owner_validator_id_key UNIQUE (owner, validator_id);
ALTER table pos_rewards DROP CONSTRAINT pos_rewards_owner_validator_id_epoch_key;
ALTER TABLE pos_rewards DROP COLUMN epoch;
ALTER TABLE pos_rewards DROP COLUMN claimed;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Populate existing records with claimed = false
ALTER TABLE pos_rewards ADD COLUMN claimed BOOLEAN DEFAULT FALSE;
-- Populate existing records with epoch = 0
ALTER TABLE pos_rewards ADD COLUMN epoch INTEGER NOT NULL DEFAULT 0;
-- Now we can safely drop the default
ALTER TABLE pos_rewards ALTER COLUMN epoch DROP DEFAULT;
-- Also update the UNIQUE constraint to include the epoch column
ALTER table pos_rewards ADD CONSTRAINT pos_rewards_owner_validator_id_epoch_key unique (owner, validator_id, epoch);
ALTER table pos_rewards DROP CONSTRAINT pos_rewards_owner_validator_id_key;
15 changes: 9 additions & 6 deletions orm/src/pos_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ use shared::rewards::Reward;

use crate::schema::pos_rewards;

#[derive(Insertable, Clone, Queryable, Selectable)]
#[derive(Insertable, Queryable, Selectable, Clone)]
#[diesel(table_name = pos_rewards)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct PosRewardInsertDb {
pub owner: String,
pub validator_id: i32,
pub raw_amount: BigDecimal,
pub epoch: i32,
pub claimed: bool,
}

pub type PoSRewardDb = PosRewardInsertDb;

impl PosRewardInsertDb {
pub fn from_reward(reward: Reward, validator_id: i32) -> Self {
Self {
pub fn from_reward(reward: Reward, validator_id: i32, epoch: i32) -> Self {
PosRewardInsertDb {
owner: reward.delegation_pair.delegator_address.to_string(),
raw_amount: BigDecimal::from_str(&reward.amount.to_string())
.expect("Invalid amount"),
validator_id,
raw_amount: BigDecimal::from_str(&reward.amount.to_string())
.unwrap(),
epoch,
claimed: false,
}
}
}
2 changes: 2 additions & 0 deletions orm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ diesel::table! {
owner -> Varchar,
validator_id -> Int4,
raw_amount -> Numeric,
epoch -> Int4,
claimed -> Bool,
}
}

Expand Down
9 changes: 7 additions & 2 deletions rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,25 @@ async fn crawling_fn(
"Queried rewards successfully",
);

let current_epoch = namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?;

conn.interact(move |conn| {
conn.build_transaction().read_write().run(
|transaction_conn: &mut diesel::prelude::PgConnection| {
|transaction_conn: &mut diesel::pg::PgConnection| {
repository::pos_rewards::upsert_rewards(
transaction_conn,
non_zero_rewards,
current_epoch as i32,
)?;

repository::crawler_state::upsert_crawler_state(
transaction_conn,
(CrawlerName::Rewards, crawler_state).into(),
)?;

anyhow::Ok(())
Ok(())
},
)
})
Expand Down
7 changes: 5 additions & 2 deletions rewards/src/repository/pos_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::utils::MAX_PARAM_SIZE;
pub fn upsert_rewards(
transaction_conn: &mut PgConnection,
rewards: Vec<Reward>,
epoch: i32,
) -> anyhow::Result<()> {
let rewards_col_count = pos_rewards::all_columns.len() as i64;

Expand All @@ -19,7 +20,7 @@ pub fn upsert_rewards(
.collect::<Vec<_>>()
.chunks((MAX_PARAM_SIZE as i64 / rewards_col_count) as usize)
{
upsert_rewards_chunk(transaction_conn, chunk.to_vec())?;
upsert_rewards_chunk(transaction_conn, chunk.to_vec(), epoch)?;
}

anyhow::Ok(())
Expand All @@ -28,6 +29,7 @@ pub fn upsert_rewards(
fn upsert_rewards_chunk(
transaction_conn: &mut PgConnection,
rewards: Vec<Reward>,
epoch: i32,
) -> anyhow::Result<()> {
diesel::insert_into(pos_rewards::table)
.values::<Vec<PosRewardInsertDb>>(
Expand All @@ -45,13 +47,14 @@ fn upsert_rewards_chunk(
.first(transaction_conn)
.expect("Failed to get validator");

PosRewardInsertDb::from_reward(reward, validator_id)
PosRewardInsertDb::from_reward(reward, validator_id, epoch)
})
.collect::<Vec<_>>(),
)
.on_conflict((
pos_rewards::columns::owner,
pos_rewards::columns::validator_id,
pos_rewards::columns::epoch, // Add epoch to conflict target
))
.do_update()
.set(
Expand Down
3 changes: 3 additions & 0 deletions rewards/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ async fn process_batch(
client: &HttpClient,
batch: Vec<DelegationPair>,
) -> anyhow::Result<Vec<Reward>> {
let epoch = get_current_epoch(client).await?;

Ok(futures::stream::iter(batch)
.filter_map(|delegation| async move {
tracing::debug!(
Expand Down Expand Up @@ -154,6 +156,7 @@ async fn process_batch(
Some(Reward {
delegation_pair: delegation.clone(),
amount: Amount::from(reward),
epoch: epoch as i32,
})
})
.map(futures::future::ready)
Expand Down
3 changes: 2 additions & 1 deletion seeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ async fn main() -> anyhow::Result<(), MainError> {
.into_iter()
.map(|reward| {
let validator_id = reward.delegation_pair.validator_address.to_string().parse::<i32>().unwrap();
PosRewardInsertDb::from_reward(reward, validator_id)
let epoch = reward.epoch;
PosRewardInsertDb::from_reward(reward, validator_id, epoch)
})
.collect::<Vec<_>>(),
)
Expand Down
2 changes: 2 additions & 0 deletions shared/src/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::utils::DelegationPair;
pub struct Reward {
pub delegation_pair: DelegationPair,
pub amount: Amount,
pub epoch: i32,
}

impl Reward {
Expand All @@ -19,6 +20,7 @@ impl Reward {
delegator_address: Id::Account(delegator_address.to_string()),
},
amount: Amount::fake(),
epoch: 0,
}
}
}
4 changes: 4 additions & 0 deletions webserver/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl ApplicationServer {
get(pos_handlers::get_withdraws),
)
.route("/pos/reward/:address", get(pos_handlers::get_rewards))
.route(
"/pos/reward/:delegator/:validator/:epoch",
get(pos_handlers::get_rewards_by_delegator_and_validator_and_epoch),
)
.route(
"/pos/voting-power",
get(pos_handlers::get_total_voting_power),
Expand Down
6 changes: 6 additions & 0 deletions webserver/src/dto/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ pub struct WithdrawsDto {
#[validate(range(min = 1, max = 10000))]
pub epoch: Option<u64>,
}

#[derive(Clone, Serialize, Deserialize, Validate)]
pub struct RewardsDto {
#[validate(range(min = 1, max = 10000))]
pub epoch: Option<u64>,
}
25 changes: 22 additions & 3 deletions webserver/src/handler/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use axum_extra::extract::Query;
use axum_macros::debug_handler;

use crate::dto::pos::{
AllValidatorsQueryParams, BondsDto, UnbondsDto, ValidatorQueryParams,
ValidatorStateDto, WithdrawsDto,
AllValidatorsQueryParams, BondsDto, RewardsDto, UnbondsDto,
ValidatorQueryParams, ValidatorStateDto, WithdrawsDto,
};
use crate::error::api::ApiError;
use crate::response::pos::{
Expand Down Expand Up @@ -149,10 +149,29 @@ pub async fn get_withdraws(
#[debug_handler]
pub async fn get_rewards(
_headers: HeaderMap,
query: Query<RewardsDto>,
Path(address): Path<String>,
State(state): State<CommonState>,
) -> Result<Json<Vec<Reward>>, ApiError> {
let rewards = state.pos_service.get_rewards_by_address(address).await?;
let rewards = state
.pos_service
.get_rewards_by_address(address, query.epoch)
.await?;
Ok(Json(rewards))
}

#[debug_handler]
pub async fn get_rewards_by_delegator_and_validator_and_epoch(
_headers: HeaderMap,
Path((delegator, validator, epoch)): Path<(String, String, u64)>,
State(state): State<CommonState>,
) -> Result<Json<Vec<Reward>>, ApiError> {
let rewards = state
.pos_service
.get_rewards_by_delegator_and_validator_and_epoch(
delegator, validator, epoch,
)
.await?;
Ok(Json(rewards))
}

Expand Down
70 changes: 69 additions & 1 deletion webserver/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub trait PosRepositoryTrait {
id: i32,
) -> Result<Option<ValidatorDb>, String>;

async fn find_validator_by_address(
&self,
address: String,
) -> Result<Option<ValidatorDb>, String>;

async fn find_merged_bonds_by_address(
&self,
address: String,
Expand Down Expand Up @@ -90,6 +95,14 @@ pub trait PosRepositoryTrait {
async fn find_rewards_by_address(
&self,
address: String,
epoch: Option<u64>,
) -> Result<Vec<PoSRewardDb>, String>;

async fn find_rewards_by_delegator_and_validator_and_epoch(
&self,
delegator: String,
validator_id: i32,
epoch: u64,
) -> Result<Vec<PoSRewardDb>, String>;

async fn get_total_voting_power(&self) -> Result<Option<i64>, String>;
Expand Down Expand Up @@ -181,6 +194,23 @@ impl PosRepositoryTrait for PosRepository {
.map_err(|e| e.to_string())
}

async fn find_validator_by_address(
&self,
address: String,
) -> Result<Option<ValidatorDb>, String> {
let conn = self.app_state.get_db_connection().await;

conn.interact(move |conn| {
validators::table
.filter(validators::dsl::namada_address.eq(address))
.select(ValidatorDb::as_select())
.first(conn)
.ok()
})
.await
.map_err(|e| e.to_string())
}

async fn find_bonds_by_address(
&self,
address: String,
Expand Down Expand Up @@ -325,12 +355,50 @@ impl PosRepositoryTrait for PosRepository {
async fn find_rewards_by_address(
&self,
address: String,
epoch: Option<u64>,
) -> Result<Vec<PoSRewardDb>, String> {
let conn = self.app_state.get_db_connection().await;

conn.interact(
move |conn| -> Result<Vec<PoSRewardDb>, diesel::result::Error> {
let epoch = match epoch {
Some(e) => e as i32,
None => {
// Properly propagate database errors with ?
let max_epoch = pos_rewards::table
.select(diesel::dsl::max(pos_rewards::epoch))
.first::<Option<i32>>(conn)?;

max_epoch.unwrap_or(0) // This is fine - just unwrapping the Option, not a Result
}
};

// Propagate errors from this query too
pos_rewards::table
.filter(pos_rewards::epoch.eq(&epoch))
.filter(pos_rewards::dsl::owner.eq(&address))
.select(PoSRewardDb::as_select())
.get_results(conn)
},
)
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string())
}

async fn find_rewards_by_delegator_and_validator_and_epoch(
&self,
delegator: String,
validator_id: i32,
epoch: u64,
) -> Result<Vec<PoSRewardDb>, String> {
let conn = self.app_state.get_db_connection().await;

conn.interact(move |conn| {
pos_rewards::table
.filter(pos_rewards::dsl::owner.eq(address))
.filter(pos_rewards::dsl::owner.eq(delegator))
.filter(pos_rewards::dsl::validator_id.eq(validator_id))
.filter(pos_rewards::dsl::epoch.eq(epoch as i32))
.select(PoSRewardDb::as_select())
.get_results(conn)
})
Expand Down
Loading