Skip to content

wip: cometbft service #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["chain", "shared", "rewards", "orm", "pos", "governance", "webserver", "seeder", "parameters", "transactions", "test_helpers"]
members = ["chain", "shared", "rewards", "orm", "pos", "governance", "webserver", "seeder", "parameters", "transactions", "test_helpers", "cometbft"]

[workspace.package]
authors = ["Heliax <[email protected]>"]
Expand Down
74 changes: 57 additions & 17 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use shared::balance::TokenSupply;
use shared::block::Block;
use shared::block_result::BlockResult;
use shared::checksums::Checksums;
use shared::cometbft::CometbftBlock;
use shared::crawler::crawl;
use shared::crawler_state::ChainCrawlerState;
use shared::error::{
Expand Down Expand Up @@ -217,6 +218,11 @@ async fn crawling_fn(

let start = Instant::now();

let cometbft_block =
get_cometbft_block_with_fallback(&conn, &client, block_height)
.await
.into_db_error()?;

tracing::debug!(block = block_height, "Query first block in epoch...");
let first_block_in_epoch =
namada_service::get_first_block_in_epoch(&client)
Expand All @@ -230,7 +236,7 @@ async fn crawling_fn(
native_token.clone().into();

let (block, tm_block_response, epoch) =
get_block(block_height, &client, checksums, &native_token_address)
get_block(cometbft_block, &client, checksums, &native_token_address)
.await?;

let rate_limits = first_block_in_epoch.eq(&block_height).then(|| {
Expand Down Expand Up @@ -419,13 +425,11 @@ async fn crawling_fn(

let reward_claimers = block.pos_rewards();

let timestamp_in_sec = DateTimeUtc::now().0.timestamp();

let crawler_state = ChainCrawlerState {
last_processed_block: block_height,
last_processed_epoch: epoch,
first_block_in_epoch,
timestamp: timestamp_in_sec,
timestamp: chrono::Utc::now().timestamp(),
};

let rate_limits =
Expand Down Expand Up @@ -605,8 +609,14 @@ async fn try_initial_query(
.await
.into_rpc_error()?
.into();

let cometbft_block =
get_cometbft_block_with_fallback(conn, client, block_height)
.await
.into_db_error()?;

let (block, tm_block_response, epoch) =
get_block(block_height, client, checksums.clone(), &native_token)
get_block(cometbft_block, client, checksums.clone(), &native_token)
.await?;

let tokens = query_tokens(client).await.into_rpc_error()?;
Expand Down Expand Up @@ -796,30 +806,23 @@ async fn update_crawler_timestamp(
}

async fn get_block(
block_height: u32,
block: CometbftBlock,
client: &HttpClient,
checksums: Checksums,
native_token: &namada_sdk::address::Address,
) -> Result<(Block, TendermintBlockResponse, u32), MainError> {
let block_height = block.block_height;

tracing::debug!(block = block_height, "Query block...");
let tm_block_response =
tendermint_service::query_raw_block_at_height(client, block_height)
.await
.into_rpc_error()?;
let tm_block_response = block.block;
tracing::debug!(
block = block_height,
"Raw block contains {} txs...",
tm_block_response.block.data.len()
);

tracing::debug!(block = block_height, "Query block results...");
let tm_block_results_response =
tendermint_service::query_raw_block_results_at_height(
client,
block_height,
)
.await
.into_rpc_error()?;
let tm_block_results_response = block.events;
let block_results = BlockResult::from(tm_block_results_response);

tracing::debug!(block = block_height, "Query epoch...");
Expand Down Expand Up @@ -900,3 +903,40 @@ async fn query_token_supplies(

Ok(supplies)
}

pub async fn get_cometbft_block_with_fallback(
conn: &Object,
client: &HttpClient,
block_height: u32,
) -> anyhow::Result<CometbftBlock> {
let block = repository::cometbft::get_block(conn, block_height)
.await
.context("Failed to get block")?;

let block = match block {
Some(block) => block,
None => {
let block = tendermint_service::query_raw_block_at_height(
client,
block_height,
)
.await
.context("Failed to query block")?;

let events = tendermint_service::query_raw_block_results_at_height(
client,
block_height,
)
.await
.context("Failed to query block results")?;

CometbftBlock {
block_height,
block,
events,
}
}
};

Ok(block)
}
21 changes: 21 additions & 0 deletions chain/src/repository/cometbft.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deadpool_diesel::postgres::Object;
use diesel::{QueryDsl, RunQueryDsl, SelectableHelper};
use orm::cometbft::CometbftBlock;
use orm::schema::cometbft_block;
use shared::cometbft::CometbftBlock as CometBlock;

pub async fn get_block(
conn: &Object,
block_height: u32,
) -> anyhow::Result<Option<CometBlock>> {
conn.interact(move |conn| {
cometbft_block::table
.find(block_height as i32)
.select(CometbftBlock::as_select())
.first(conn)
.ok()
})
.await
.map(|block| block.map(CometBlock::from))
.map_err(|e| anyhow::anyhow!(e.to_string()))
}
1 change: 1 addition & 0 deletions chain/src/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod balance;
pub mod block;
pub mod cometbft;
pub mod crawler_state;
pub mod gov;
pub mod pgf;
Expand Down
35 changes: 35 additions & 0 deletions cometbft/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "cometbft"
description = "Cometbft transactions fetcher."
resolver = "2"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme.workspace = true
version.workspace = true

[[bin]]
name = "transactions"
path = "src/main.rs"

[dependencies]
bigdecimal.workspace = true
tokio.workspace = true
tracing.workspace = true
chrono.workspace = true
clap.workspace = true
anyhow.workspace = true
namada_sdk.workspace = true
namada_core.workspace = true
tendermint-rpc.workspace = true
shared.workspace = true
futures.workspace = true
deadpool-diesel.workspace = true
diesel.workspace = true
diesel_migrations.workspace = true
orm.workspace = true
clap-verbosity-flag.workspace = true
serde_json.workspace = true

[build-dependencies]
vergen = { workspace = true, features = ["build", "git", "gitcl"] }
8 changes: 8 additions & 0 deletions cometbft/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use std::error::Error;

use vergen::EmitBuilder;

fn main() -> Result<(), Box<dyn Error>> {
EmitBuilder::builder().all_git().emit()?;
Ok(())
}
4 changes: 4 additions & 0 deletions cometbft/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
. ../.env
export TENDERMINT_URL
export DATABASE_URL
cargo run --release
35 changes: 35 additions & 0 deletions cometbft/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::env;

use anyhow::Context;
use deadpool_diesel::postgres::{Object, Pool as DbPool};

#[derive(Clone)]
pub struct AppState {
db: DbPool,
}

impl AppState {
pub fn new(db_url: String) -> anyhow::Result<Self> {
let max_pool_size = env::var("DATABASE_POOL_SIZE")
.unwrap_or_else(|_| 1.to_string())
.parse::<usize>()
.unwrap_or(1_usize);
let pool_manager = deadpool_diesel::Manager::new(
db_url,
deadpool_diesel::Runtime::Tokio1,
);
let pool = DbPool::builder(pool_manager)
.max_size(max_pool_size)
.build()
.context("Failed to build Postgres db pool")?;

Ok(Self { db: pool })
}

pub async fn get_db_connection(&self) -> anyhow::Result<Object> {
self.db
.get()
.await
.context("Failed to get db connection handle from deadpool")
}
}
28 changes: 28 additions & 0 deletions cometbft/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use core::fmt;
use std::fmt::Display;

use shared::log_config::LogConfig;

#[derive(clap::ValueEnum, Clone, Debug, Copy)]
pub enum CargoEnv {
Development,
Production,
}

impl Display for CargoEnv {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(clap::Parser)]
pub struct AppConfig {
#[clap(long, env)]
pub tendermint_url: String,

#[clap(long, env)]
pub database_url: String,

#[clap(flatten)]
pub log: LogConfig,
}
4 changes: 4 additions & 0 deletions cometbft/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod app_state;
pub mod config;
pub mod repository;
pub mod services;
Loading
Loading