Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use shared::block_result::BlockResult;
use shared::checksums::Checksums;
use shared::crawler::crawl;
use shared::crawler_state::ChainCrawlerState;
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use shared::error::{
AsDbError, AsRpcError, AsTaskJoinError, ContextDbInteractError, MainError,
};
use shared::futures::AwaitContainer;
use shared::id::Id;
use shared::token::Token;
Expand Down Expand Up @@ -216,6 +218,22 @@ async fn crawling_fn(
let (block, tm_block_response, epoch) =
get_block(block_height, &client, checksums).await?;

let rate_limits = first_block_in_epoch.eq(&block_height).then(|| {
let client = Arc::clone(&client);

// start this series of queries in parallel, which take
// quite a while
tokio::spawn(async move {
let tokens = query_tokens(&client)
.await?
.into_iter()
.map(|token| token.to_string());

namada_service::get_rate_limits_for_tokens(&client, tokens, epoch)
.await
})
});

tracing::debug!(
block = block_height,
txs = block.transactions.len(),
Expand Down Expand Up @@ -411,6 +429,17 @@ async fn crawling_fn(
timestamp: timestamp_in_sec,
};

let rate_limits =
rate_limits
.future()
.await
.map_or(Ok(vec![]), |maybe_rate_limits| {
maybe_rate_limits
.context("Failed to await on rate limits query")
.into_task_join_error()?
.into_rpc_error()
})?;

tracing::info!(
txs = block.transactions.len(),
ibc_tokens = ibc_tokens.len(),
Expand Down Expand Up @@ -444,6 +473,11 @@ async fn crawling_fn(
token_supplies.into_iter().flatten(),
)?;

repository::balance::insert_ibc_rate_limits(
transaction_conn,
rate_limits,
)?;

repository::block::upsert_block(
transaction_conn,
block,
Expand Down
31 changes: 29 additions & 2 deletions chain/src/repository/balance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use anyhow::Context;
use diesel::{PgConnection, RunQueryDsl};
use orm::balances::BalanceChangesInsertDb;
use orm::ibc::IbcRateLimitsInsertDb;
use orm::schema::{
balance_changes, ibc_token, token, token_supplies_per_epoch,
balance_changes, ibc_rate_limits, ibc_token, token,
token_supplies_per_epoch,
};
use orm::token::{IbcTokenInsertDb, TokenInsertDb};
use orm::token_supplies_per_epoch::TokenSuppliesInsertDb;
use shared::balance::{Balances, TokenSupply};
use shared::token::Token;
use shared::token::{IbcRateLimit, Token};
use shared::tuple_len::TupleLen;

use super::utils::MAX_PARAM_SIZE;
Expand Down Expand Up @@ -100,6 +102,31 @@ where
anyhow::Ok(())
}

pub fn insert_ibc_rate_limits<S>(
transaction_conn: &mut PgConnection,
supplies: S,
) -> anyhow::Result<()>
where
S: IntoIterator<Item = IbcRateLimit>,
{
let limits: Vec<_> = supplies
.into_iter()
.map(IbcRateLimitsInsertDb::from)
.collect();

if limits.is_empty() {
return anyhow::Ok(());
}

diesel::insert_into(ibc_rate_limits::table)
.values(limits)
.on_conflict_do_nothing()
.execute(transaction_conn)
.context("Failed to update rate limits in db")?;

anyhow::Ok(())
}

#[cfg(test)]
mod tests {

Expand Down
45 changes: 44 additions & 1 deletion chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use shared::block::{BlockHeight, Epoch};
use shared::bond::{Bond, BondAddresses, Bonds};
use shared::id::Id;
use shared::proposal::{GovernanceProposal, TallyType};
use shared::token::{IbcToken, Token};
use shared::token::{IbcRateLimit, IbcToken, Token};
use shared::unbond::{Unbond, UnbondAddresses, Unbonds};
use shared::utils::BalanceChange;
use shared::validator::{Validator, ValidatorSet, ValidatorState};
Expand Down Expand Up @@ -864,3 +864,46 @@ pub async fn get_token_supply(
effective: None,
})
}

pub async fn get_throughput_rate_limit(
client: &HttpClient,
token: String,
epoch: u32,
) -> anyhow::Result<IbcRateLimit> {
let address: NamadaSdkAddress =
token.parse().context("Failed to parse token address")?;

let rate_limit = rpc::query_ibc_rate_limits(client, &address)
.await
.with_context(|| {
format!("Failed to query throughput rate limit of token {token}")
})?;

Ok(IbcRateLimit {
epoch,
address: token,
throughput_limit: Amount::from(rate_limit.throughput_per_epoch_limit)
.into(),
})
}

pub async fn get_rate_limits_for_tokens<I>(
client: &HttpClient,
tokens: I,
epoch: u32,
) -> anyhow::Result<Vec<IbcRateLimit>>
where
I: IntoIterator<Item = String>,
{
let mut buffer = vec![];

let mut stream = futures::stream::iter(tokens)
.map(|address| get_throughput_rate_limit(client, address, epoch))
.buffer_unordered(32);

while let Some(maybe_address) = stream.next().await {
buffer.push(maybe_address?);
}

Ok(buffer)
}
6 changes: 6 additions & 0 deletions orm/migrations/2025-02-13-094639_ibc_rate_limits/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file should undo anything in `up.sql`

ALTER TABLE ibc_rate_limits
DROP CONSTRAINT fk_ibc_rate_limits_address;

DROP TABLE ibc_rate_limits;
13 changes: 13 additions & 0 deletions orm/migrations/2025-02-13-094639_ibc_rate_limits/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Your SQL goes here

CREATE TABLE ibc_rate_limits (
id SERIAL PRIMARY KEY,
address VARCHAR(45) NOT NULL,
epoch INT NOT NULL,
throughput_limit NUMERIC(78, 0) NOT NULL,
-- reference the `address` column in the `token` table
CONSTRAINT fk_ibc_rate_limits_address
FOREIGN KEY(address) REFERENCES token(address) ON DELETE CASCADE
);

ALTER TABLE ibc_rate_limits ADD UNIQUE (address, epoch);
33 changes: 32 additions & 1 deletion orm/src/ibc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use bigdecimal::BigDecimal;
use diesel::prelude::Queryable;
use diesel::{AsChangeset, Insertable, Selectable};
use serde::{Deserialize, Serialize};
use shared::token::IbcRateLimit;
use shared::transaction::{IbcAckStatus, IbcSequence};

use crate::schema::ibc_ack;
use crate::schema::{ibc_ack, ibc_rate_limits};

#[derive(Debug, Clone, Serialize, Deserialize, diesel_derive_enum::DbEnum)]
#[ExistingTypePath = "crate::schema::sql_types::IbcStatus"]
Expand Down Expand Up @@ -54,3 +56,32 @@ impl From<IbcSequence> for IbcAckInsertDb {
pub struct IbcSequencekStatusUpdateDb {
pub status: IbcAckStatusDb,
}

#[derive(Debug, Clone, Queryable, Selectable)]
#[diesel(table_name = ibc_rate_limits)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct IbcRateLimitsDb {
pub id: i32,
pub address: String,
pub epoch: i32,
pub throughput_limit: BigDecimal,
}

#[derive(Debug, Clone, Queryable, Selectable, Insertable)]
#[diesel(table_name = ibc_rate_limits)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct IbcRateLimitsInsertDb {
pub address: String,
pub epoch: i32,
pub throughput_limit: BigDecimal,
}

impl From<IbcRateLimit> for IbcRateLimitsInsertDb {
fn from(rate_limit: IbcRateLimit) -> Self {
Self {
address: rate_limit.address,
epoch: rate_limit.epoch as _,
throughput_limit: rate_limit.throughput_limit,
}
}
}
12 changes: 12 additions & 0 deletions orm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ diesel::table! {
}
}

diesel::table! {
ibc_rate_limits (id) {
id -> Int4,
#[max_length = 45]
address -> Varchar,
epoch -> Int4,
throughput_limit -> Numeric,
}
}

diesel::table! {
ibc_token (address) {
#[max_length = 45]
Expand Down Expand Up @@ -404,6 +414,7 @@ diesel::joinable!(balance_changes -> token (token));
diesel::joinable!(bonds -> validators (validator_id));
diesel::joinable!(gas_estimations -> wrapper_transactions (wrapper_id));
diesel::joinable!(governance_votes -> governance_proposals (proposal_id));
diesel::joinable!(ibc_rate_limits -> token (address));
diesel::joinable!(ibc_token -> token (address));
diesel::joinable!(inner_transactions -> wrapper_transactions (wrapper_id));
diesel::joinable!(pos_rewards -> validators (validator_id));
Expand All @@ -425,6 +436,7 @@ diesel::allow_tables_to_appear_in_same_query!(
governance_proposals,
governance_votes,
ibc_ack,
ibc_rate_limits,
ibc_token,
inner_transactions,
pos_rewards,
Expand Down
16 changes: 16 additions & 0 deletions shared/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub enum MainError {
RpcError,
#[error("Can't commit block to database")]
Database,
#[error("Failed to join async task")]
TaskJoinError,
}

pub trait AsRpcError<T> {
Expand Down Expand Up @@ -38,6 +40,20 @@ impl<T> AsDbError<T> for anyhow::Result<T> {
}
}

pub trait AsTaskJoinError<T> {
fn into_task_join_error(self) -> Result<T, MainError>;
}

impl<T> AsTaskJoinError<T> for anyhow::Result<T> {
#[inline]
fn into_task_join_error(self) -> Result<T, MainError> {
self.map_err(|reason| {
tracing::error!(?reason, "{}", MainError::TaskJoinError);
MainError::TaskJoinError
})
}
}

pub trait ContextDbInteractError<T> {
fn context_db_interact_error(self) -> anyhow::Result<T>;
}
Expand Down
12 changes: 12 additions & 0 deletions shared/src/token.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt::Display;

use bigdecimal::BigDecimal;

use crate::id::Id;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -22,3 +24,13 @@ impl Display for Token {
}
}
}

#[derive(Debug)]
pub struct IbcRateLimit {
/// Address of the token in Namada
pub address: String,
/// Epoch of the indexed rate limit
pub epoch: u32,
/// Throughput limit of token `address` at epoch `epoch`
pub throughput_limit: BigDecimal,
}
33 changes: 33 additions & 0 deletions swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ paths:
type: string
version:
type: string
/api/v1/ibc/rate-limits:
get:
summary: Get the rate limits of IBC tokens
parameters:
- in: query
name: tokenAddress
schema:
type: integer
minimum: 0
description: Optional address of the token to query
- in: query
name: throughputLimit
schema:
type: integer
minimum: 0
description: Optional throughput limit to filter with
responses:
"200":
description: List of IBC tokens and their rate limits
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/IbcRateLimit"
/api/v1/pos/validator:
get:
summary: Get all validators, paginated
Expand Down Expand Up @@ -1405,3 +1430,11 @@ components:
enum: [received, sent]
block_height:
type: string
IbcRateLimit:
type: object
required: [tokenAddress, throughputLimit]
properties:
tokenAddress:
type: string
throughputLimit:
type: string
4 changes: 4 additions & 0 deletions webserver/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl ApplicationServer {
get(chain_handlers::get_last_processed_epoch),
)
.route("/ibc/:tx_id/status", get(ibc_handler::get_ibc_status))
.route(
"/ibc/rate-limits",
get(ibc_handler::get_ibc_rate_limits),
)
.route(
"/pgf/payments",
get(pgf_service::get_pgf_continuous_payments),
Expand Down
8 changes: 8 additions & 0 deletions webserver/src/dto/ibc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IbcRateLimit {
pub token_address: Option<String>,
pub throughput_limit: Option<u64>,
}
1 change: 1 addition & 0 deletions webserver/src/dto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod chain;
pub mod crawler_state;
pub mod gas;
pub mod governance;
pub mod ibc;
pub mod pgf;
pub mod pos;
pub mod transaction;
Loading
Loading