diff --git a/chain/src/main.rs b/chain/src/main.rs index e412f1fd4..d8525fa75 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -28,7 +28,9 @@ use shared::block_result::BlockResult; use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; -use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::error::{ + AsDbError, AsRpcError, AsTaskJoinError, ContextDbInteractError, MainError, +}; use shared::futures::AwaitContainer; use shared::id::Id; use shared::token::Token; @@ -216,6 +218,22 @@ async fn crawling_fn( let (block, tm_block_response, epoch) = get_block(block_height, &client, checksums).await?; + let rate_limits = first_block_in_epoch.eq(&block_height).then(|| { + let client = Arc::clone(&client); + + // start this series of queries in parallel, which take + // quite a while + tokio::spawn(async move { + let tokens = query_tokens(&client) + .await? + .into_iter() + .map(|token| token.to_string()); + + namada_service::get_rate_limits_for_tokens(&client, tokens, epoch) + .await + }) + }); + tracing::debug!( block = block_height, txs = block.transactions.len(), @@ -411,6 +429,17 @@ async fn crawling_fn( timestamp: timestamp_in_sec, }; + let rate_limits = + rate_limits + .future() + .await + .map_or(Ok(vec![]), |maybe_rate_limits| { + maybe_rate_limits + .context("Failed to await on rate limits query") + .into_task_join_error()? + .into_rpc_error() + })?; + tracing::info!( txs = block.transactions.len(), ibc_tokens = ibc_tokens.len(), @@ -444,6 +473,11 @@ async fn crawling_fn( token_supplies.into_iter().flatten(), )?; + repository::balance::insert_ibc_rate_limits( + transaction_conn, + rate_limits, + )?; + repository::block::upsert_block( transaction_conn, block, diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index cf66d3008..d64c8be7d 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -1,13 +1,15 @@ use anyhow::Context; use diesel::{PgConnection, RunQueryDsl}; use orm::balances::BalanceChangesInsertDb; +use orm::ibc::IbcRateLimitsInsertDb; use orm::schema::{ - balance_changes, ibc_token, token, token_supplies_per_epoch, + balance_changes, ibc_rate_limits, ibc_token, token, + token_supplies_per_epoch, }; use orm::token::{IbcTokenInsertDb, TokenInsertDb}; use orm::token_supplies_per_epoch::TokenSuppliesInsertDb; use shared::balance::{Balances, TokenSupply}; -use shared::token::Token; +use shared::token::{IbcRateLimit, Token}; use shared::tuple_len::TupleLen; use super::utils::MAX_PARAM_SIZE; @@ -100,6 +102,31 @@ where anyhow::Ok(()) } +pub fn insert_ibc_rate_limits( + transaction_conn: &mut PgConnection, + supplies: S, +) -> anyhow::Result<()> +where + S: IntoIterator, +{ + let limits: Vec<_> = supplies + .into_iter() + .map(IbcRateLimitsInsertDb::from) + .collect(); + + if limits.is_empty() { + return anyhow::Ok(()); + } + + diesel::insert_into(ibc_rate_limits::table) + .values(limits) + .on_conflict_do_nothing() + .execute(transaction_conn) + .context("Failed to update rate limits in db")?; + + anyhow::Ok(()) +} + #[cfg(test)] mod tests { diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 13dd35723..d149199ce 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -23,7 +23,7 @@ use shared::block::{BlockHeight, Epoch}; use shared::bond::{Bond, BondAddresses, Bonds}; use shared::id::Id; use shared::proposal::{GovernanceProposal, TallyType}; -use shared::token::{IbcToken, Token}; +use shared::token::{IbcRateLimit, IbcToken, Token}; use shared::unbond::{Unbond, UnbondAddresses, Unbonds}; use shared::utils::BalanceChange; use shared::validator::{Validator, ValidatorSet, ValidatorState}; @@ -864,3 +864,46 @@ pub async fn get_token_supply( effective: None, }) } + +pub async fn get_throughput_rate_limit( + client: &HttpClient, + token: String, + epoch: u32, +) -> anyhow::Result { + let address: NamadaSdkAddress = + token.parse().context("Failed to parse token address")?; + + let rate_limit = rpc::query_ibc_rate_limits(client, &address) + .await + .with_context(|| { + format!("Failed to query throughput rate limit of token {token}") + })?; + + Ok(IbcRateLimit { + epoch, + address: token, + throughput_limit: Amount::from(rate_limit.throughput_per_epoch_limit) + .into(), + }) +} + +pub async fn get_rate_limits_for_tokens( + client: &HttpClient, + tokens: I, + epoch: u32, +) -> anyhow::Result> +where + I: IntoIterator, +{ + let mut buffer = vec![]; + + let mut stream = futures::stream::iter(tokens) + .map(|address| get_throughput_rate_limit(client, address, epoch)) + .buffer_unordered(32); + + while let Some(maybe_address) = stream.next().await { + buffer.push(maybe_address?); + } + + Ok(buffer) +} diff --git a/orm/migrations/2025-02-13-094639_ibc_rate_limits/down.sql b/orm/migrations/2025-02-13-094639_ibc_rate_limits/down.sql new file mode 100644 index 000000000..b577a18ce --- /dev/null +++ b/orm/migrations/2025-02-13-094639_ibc_rate_limits/down.sql @@ -0,0 +1,6 @@ +-- This file should undo anything in `up.sql` + +ALTER TABLE ibc_rate_limits + DROP CONSTRAINT fk_ibc_rate_limits_address; + +DROP TABLE ibc_rate_limits; diff --git a/orm/migrations/2025-02-13-094639_ibc_rate_limits/up.sql b/orm/migrations/2025-02-13-094639_ibc_rate_limits/up.sql new file mode 100644 index 000000000..7f565002b --- /dev/null +++ b/orm/migrations/2025-02-13-094639_ibc_rate_limits/up.sql @@ -0,0 +1,13 @@ +-- Your SQL goes here + +CREATE TABLE ibc_rate_limits ( + id SERIAL PRIMARY KEY, + address VARCHAR(45) NOT NULL, + epoch INT NOT NULL, + throughput_limit NUMERIC(78, 0) NOT NULL, + -- reference the `address` column in the `token` table + CONSTRAINT fk_ibc_rate_limits_address + FOREIGN KEY(address) REFERENCES token(address) ON DELETE CASCADE +); + +ALTER TABLE ibc_rate_limits ADD UNIQUE (address, epoch); diff --git a/orm/src/ibc.rs b/orm/src/ibc.rs index 0583f47e5..0a52d91ae 100644 --- a/orm/src/ibc.rs +++ b/orm/src/ibc.rs @@ -1,9 +1,11 @@ +use bigdecimal::BigDecimal; use diesel::prelude::Queryable; use diesel::{AsChangeset, Insertable, Selectable}; use serde::{Deserialize, Serialize}; +use shared::token::IbcRateLimit; use shared::transaction::{IbcAckStatus, IbcSequence}; -use crate::schema::ibc_ack; +use crate::schema::{ibc_ack, ibc_rate_limits}; #[derive(Debug, Clone, Serialize, Deserialize, diesel_derive_enum::DbEnum)] #[ExistingTypePath = "crate::schema::sql_types::IbcStatus"] @@ -54,3 +56,32 @@ impl From for IbcAckInsertDb { pub struct IbcSequencekStatusUpdateDb { pub status: IbcAckStatusDb, } + +#[derive(Debug, Clone, Queryable, Selectable)] +#[diesel(table_name = ibc_rate_limits)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct IbcRateLimitsDb { + pub id: i32, + pub address: String, + pub epoch: i32, + pub throughput_limit: BigDecimal, +} + +#[derive(Debug, Clone, Queryable, Selectable, Insertable)] +#[diesel(table_name = ibc_rate_limits)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct IbcRateLimitsInsertDb { + pub address: String, + pub epoch: i32, + pub throughput_limit: BigDecimal, +} + +impl From for IbcRateLimitsInsertDb { + fn from(rate_limit: IbcRateLimit) -> Self { + Self { + address: rate_limit.address, + epoch: rate_limit.epoch as _, + throughput_limit: rate_limit.throughput_limit, + } + } +} diff --git a/orm/src/schema.rs b/orm/src/schema.rs index 98920148b..f2ab514da 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -259,6 +259,16 @@ diesel::table! { } } +diesel::table! { + ibc_rate_limits (id) { + id -> Int4, + #[max_length = 45] + address -> Varchar, + epoch -> Int4, + throughput_limit -> Numeric, + } +} + diesel::table! { ibc_token (address) { #[max_length = 45] @@ -404,6 +414,7 @@ diesel::joinable!(balance_changes -> token (token)); diesel::joinable!(bonds -> validators (validator_id)); diesel::joinable!(gas_estimations -> wrapper_transactions (wrapper_id)); diesel::joinable!(governance_votes -> governance_proposals (proposal_id)); +diesel::joinable!(ibc_rate_limits -> token (address)); diesel::joinable!(ibc_token -> token (address)); diesel::joinable!(inner_transactions -> wrapper_transactions (wrapper_id)); diesel::joinable!(pos_rewards -> validators (validator_id)); @@ -425,6 +436,7 @@ diesel::allow_tables_to_appear_in_same_query!( governance_proposals, governance_votes, ibc_ack, + ibc_rate_limits, ibc_token, inner_transactions, pos_rewards, diff --git a/shared/src/error.rs b/shared/src/error.rs index 72183c25d..4337a793e 100644 --- a/shared/src/error.rs +++ b/shared/src/error.rs @@ -8,6 +8,8 @@ pub enum MainError { RpcError, #[error("Can't commit block to database")] Database, + #[error("Failed to join async task")] + TaskJoinError, } pub trait AsRpcError { @@ -38,6 +40,20 @@ impl AsDbError for anyhow::Result { } } +pub trait AsTaskJoinError { + fn into_task_join_error(self) -> Result; +} + +impl AsTaskJoinError for anyhow::Result { + #[inline] + fn into_task_join_error(self) -> Result { + self.map_err(|reason| { + tracing::error!(?reason, "{}", MainError::TaskJoinError); + MainError::TaskJoinError + }) + } +} + pub trait ContextDbInteractError { fn context_db_interact_error(self) -> anyhow::Result; } diff --git a/shared/src/token.rs b/shared/src/token.rs index 1c7fc2b13..c7a8cf928 100644 --- a/shared/src/token.rs +++ b/shared/src/token.rs @@ -1,5 +1,7 @@ use std::fmt::Display; +use bigdecimal::BigDecimal; + use crate::id::Id; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -22,3 +24,13 @@ impl Display for Token { } } } + +#[derive(Debug)] +pub struct IbcRateLimit { + /// Address of the token in Namada + pub address: String, + /// Epoch of the indexed rate limit + pub epoch: u32, + /// Throughput limit of token `address` at epoch `epoch` + pub throughput_limit: BigDecimal, +} diff --git a/swagger.yml b/swagger.yml index 353835089..5452b174b 100644 --- a/swagger.yml +++ b/swagger.yml @@ -25,6 +25,31 @@ paths: type: string version: type: string + /api/v1/ibc/rate-limits: + get: + summary: Get the rate limits of IBC tokens + parameters: + - in: query + name: tokenAddress + schema: + type: integer + minimum: 0 + description: Optional address of the token to query + - in: query + name: throughputLimit + schema: + type: integer + minimum: 0 + description: Optional throughput limit to filter with + responses: + "200": + description: List of IBC tokens and their rate limits + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/IbcRateLimit" /api/v1/pos/validator: get: summary: Get all validators, paginated @@ -1405,3 +1430,11 @@ components: enum: [received, sent] block_height: type: string + IbcRateLimit: + type: object + required: [tokenAddress, throughputLimit] + properties: + tokenAddress: + type: string + throughputLimit: + type: string diff --git a/webserver/src/app.rs b/webserver/src/app.rs index e11ffcf44..e4bbed003 100644 --- a/webserver/src/app.rs +++ b/webserver/src/app.rs @@ -144,6 +144,10 @@ impl ApplicationServer { get(chain_handlers::get_last_processed_epoch), ) .route("/ibc/:tx_id/status", get(ibc_handler::get_ibc_status)) + .route( + "/ibc/rate-limits", + get(ibc_handler::get_ibc_rate_limits), + ) .route( "/pgf/payments", get(pgf_service::get_pgf_continuous_payments), diff --git a/webserver/src/dto/ibc.rs b/webserver/src/dto/ibc.rs new file mode 100644 index 000000000..992318341 --- /dev/null +++ b/webserver/src/dto/ibc.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct IbcRateLimit { + pub token_address: Option, + pub throughput_limit: Option, +} diff --git a/webserver/src/dto/mod.rs b/webserver/src/dto/mod.rs index d967031db..6d320af75 100644 --- a/webserver/src/dto/mod.rs +++ b/webserver/src/dto/mod.rs @@ -2,6 +2,7 @@ pub mod chain; pub mod crawler_state; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pos; pub mod transaction; diff --git a/webserver/src/handler/ibc.rs b/webserver/src/handler/ibc.rs index 1d40cb4e0..c7b1c4460 100644 --- a/webserver/src/handler/ibc.rs +++ b/webserver/src/handler/ibc.rs @@ -1,10 +1,13 @@ use axum::Json; use axum::extract::{Path, State}; use axum::http::HeaderMap; +use axum_extra::extract::Query; use axum_macros::debug_handler; +use bigdecimal::BigDecimal; +use crate::dto::ibc::IbcRateLimit as IbcRateLimitDto; use crate::error::api::ApiError; -use crate::response::ibc::IbcAck; +use crate::response::ibc::{IbcAck, IbcRateLimit}; use crate::state::common::CommonState; #[debug_handler] @@ -17,3 +20,19 @@ pub async fn get_ibc_status( Ok(Json(ibc_ack_status)) } + +#[debug_handler] +pub async fn get_ibc_rate_limits( + Query(query): Query, + State(state): State, +) -> Result>, ApiError> { + let rate_limits = state + .ibc_service + .get_throughput_limits( + query.token_address, + query.throughput_limit.map(BigDecimal::from), + ) + .await?; + + Ok(Json(rate_limits)) +} diff --git a/webserver/src/repository/ibc.rs b/webserver/src/repository/ibc.rs index e55ef4a60..d11a443cb 100644 --- a/webserver/src/repository/ibc.rs +++ b/webserver/src/repository/ibc.rs @@ -1,7 +1,11 @@ use axum::async_trait; -use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use bigdecimal::BigDecimal; +use diesel::{ + ExpressionMethods, NullableExpressionMethods, QueryDsl, RunQueryDsl, + SelectableHelper, +}; use orm::ibc::IbcAckDb; -use orm::schema::ibc_ack; +use orm::schema::{ibc_ack, ibc_rate_limits}; use crate::appstate::AppState; @@ -18,6 +22,12 @@ pub trait IbcRepositoryTrait { &self, id: String, ) -> Result, String>; + + async fn get_throughput_limits( + &self, + token_address: Option, + matching_rate_limit: Option, + ) -> Result, String>; } #[async_trait] @@ -42,4 +52,61 @@ impl IbcRepositoryTrait for IbcRepository { .await .map_err(|e| e.to_string()) } + + async fn get_throughput_limits( + &self, + matching_token_address: Option, + matching_rate_limit: Option, + ) -> Result, String> { + let conn = self.app_state.get_db_connection().await; + + conn.interact(move |conn| { + use diesel::Column; + + diesel::alias!(ibc_rate_limits as ibc_rate_limits_alias: IbcRateLimitsAlias); + + // NB: We're using a raw select because `CAST` is not available in the diesel dsl. :( + let select_statement = diesel::dsl::sql::<(diesel::sql_types::Text, diesel::sql_types::Text)>( + &format!( + "{}, CAST({} AS TEXT)", + ibc_rate_limits::dsl::address::NAME, + ibc_rate_limits::dsl::throughput_limit::NAME, + ), + ); + + let max_epoch_where_clause = + ibc_rate_limits::dsl::epoch.nullable().eq(ibc_rate_limits_alias + .select(diesel::dsl::max(ibc_rate_limits_alias.field(ibc_rate_limits::dsl::epoch))) + .single_value()); + + match (matching_token_address, matching_rate_limit) { + (None, None) => ibc_rate_limits::table + .filter(max_epoch_where_clause) + .select(select_statement) + .load(conn), + (Some(token), None) => ibc_rate_limits::table + .filter(ibc_rate_limits::dsl::address.eq(&token)) + .filter(max_epoch_where_clause) + .select(select_statement) + .load(conn), + (None, Some(limit)) => ibc_rate_limits::table + .filter(ibc_rate_limits::dsl::throughput_limit.eq(&limit)) + .filter(max_epoch_where_clause) + .select(select_statement) + .load(conn), + (Some(token), Some(limit)) => ibc_rate_limits::table + .filter( + ibc_rate_limits::dsl::throughput_limit + .eq(&limit) + ) + .filter(ibc_rate_limits::dsl::address.eq(&token)) + .filter(max_epoch_where_clause) + .select(select_statement) + .load(conn), + } + .map_err(|e| e.to_string()) + }) + .await + .map_err(|e| e.to_string())? + } } diff --git a/webserver/src/response/ibc.rs b/webserver/src/response/ibc.rs index c215b789e..9daaa5f04 100644 --- a/webserver/src/response/ibc.rs +++ b/webserver/src/response/ibc.rs @@ -15,3 +15,10 @@ pub struct IbcAck { pub status: IbcAckStatus, pub timeout: Option, } + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct IbcRateLimit { + pub token_address: String, + pub throughput_limit: String, +} diff --git a/webserver/src/service/ibc.rs b/webserver/src/service/ibc.rs index 4bb3021d3..3c457617b 100644 --- a/webserver/src/service/ibc.rs +++ b/webserver/src/service/ibc.rs @@ -1,9 +1,10 @@ +use bigdecimal::BigDecimal; use orm::ibc::IbcAckStatusDb; use crate::appstate::AppState; use crate::error::ibc::IbcError; use crate::repository::ibc::{IbcRepository, IbcRepositoryTrait}; -use crate::response::ibc::{IbcAck, IbcAckStatus}; +use crate::response::ibc::{IbcAck, IbcAckStatus, IbcRateLimit}; #[derive(Clone)] pub struct IbcService { @@ -41,4 +42,40 @@ impl IbcService { }, }) } + + pub async fn get_throughput_limits( + &self, + matching_token_address: Option, + matching_rate_limit: Option, + ) -> Result, IbcError> { + const _: () = { + // NB: Statically assert that the cast we will + // perform below is safe. + if std::mem::size_of::<(String, String)>() + != std::mem::size_of::() + { + panic!("IbcRateLimit size is invalid"); + } + + if std::mem::align_of::<(String, String)>() + != std::mem::align_of::() + { + panic!("IbcRateLimit alignment is invalid"); + } + }; + + self.ibc_repo + .get_throughput_limits(matching_token_address, matching_rate_limit) + .await + .map_err(IbcError::Database) + .map(|limits| unsafe { + // NB: Transmute this value. It's faster than destructing + // the vec and creating a new one, just to convert between + // types. + + // SAFETY: We have asserted above that `IbcRateLimit` is + // compatible with the type `(String, BigDecimal)`. + std::mem::transmute(limits) + }) + } }