diff --git a/crates/db/src/client.rs b/crates/db/src/client.rs index 5c2dc0c..836fd99 100644 --- a/crates/db/src/client.rs +++ b/crates/db/src/client.rs @@ -23,7 +23,7 @@ pub enum BlockFilter { impl Client { /// Returns the slot number of blocks which we have indexed - pub fn indexed_blocks( + pub fn indexed_slots( self, conn: &mut PgConnection, block_table_choice: BlockTableChoice, @@ -47,6 +47,31 @@ impl Client { Ok(numbers) } + /// Returns the slot number of blocks which we have indexed + pub fn indexed_blocks( + self, + conn: &mut PgConnection, + block_table_choice: BlockTableChoice, + ) -> anyhow::Result> { + let numbers: Vec = match block_table_choice { + BlockTableChoice::Blocks => { + use super::schema::blocks::dsl::{self, blocks}; + blocks + .select(dsl::number) + .get_results(conn) + .with_context(|| "failed to select block numbers")? + } + BlockTableChoice::Blocks2 => { + use super::schema::blocks_2::dsl::{self, blocks_2}; + blocks_2 + .select(dsl::number) + .get_results(conn) + .with_context(|| "failed to select block numbers")? + } + }; + + Ok(numbers) + } /// Returns up to `limit` blocks which do not have the slot column set pub fn slot_is_null( self, diff --git a/crates/db/src/tests.rs b/crates/db/src/tests.rs index 2026c29..b4f405a 100644 --- a/crates/db/src/tests.rs +++ b/crates/db/src/tests.rs @@ -89,22 +89,22 @@ fn test_blocks() { assert_eq!(res.len(), 1); } // check that setting slot by default worked - let indexed_blocks: HashSet = client - .indexed_blocks(&mut db_conn, BlockTableChoice::Blocks) + let indexed_slots: HashSet = client + .indexed_slots(&mut db_conn, BlockTableChoice::Blocks) .unwrap() .into_iter() .filter_map(|block| Some(block?)) .collect(); let mut expected_set = (2..101).into_iter().collect::>(); - assert_eq!(expected_set, indexed_blocks); + assert_eq!(expected_set, indexed_slots); // check that setting slot by default worked - let indexed_blocks: HashSet = client - .indexed_blocks(&mut db_conn, BlockTableChoice::Blocks2) + let indexed_slots: HashSet = client + .indexed_slots(&mut db_conn, BlockTableChoice::Blocks2) .unwrap() .into_iter() .filter_map(|block| Some(block?)) .collect(); - assert_eq!(expected_set, indexed_blocks); + assert_eq!(expected_set, indexed_slots); // update slot number for blocks which are missing it for i in 200..300 { @@ -148,21 +148,21 @@ fn test_blocks() { assert_ne!(block2[0].slot, block[0].slot); assert_eq!(block2[0].data, block[0].data); } - let indexed_blocks: HashSet = client - .indexed_blocks(&mut db_conn, BlockTableChoice::Blocks) + let indexed_slots: HashSet = client + .indexed_slots(&mut db_conn, BlockTableChoice::Blocks) .unwrap() .into_iter() .filter_map(|block| Some(block?)) .collect(); expected_set.extend((201..301).into_iter().collect::>()); - assert_eq!(expected_set, indexed_blocks); - let indexed_blocks: HashSet = client - .indexed_blocks(&mut db_conn, BlockTableChoice::Blocks2) + assert_eq!(expected_set, indexed_slots); + let indexed_slots: HashSet = client + .indexed_slots(&mut db_conn, BlockTableChoice::Blocks2) .unwrap() .into_iter() .filter_map(|block| Some(block?)) .collect(); - assert_eq!(expected_set, indexed_blocks); + assert_eq!(expected_set, indexed_slots); // now test the edge case where block_number == slot number and slot number is not null for i in 1000..1500 { diff --git a/crates/sb_dl/src/commands/db.rs b/crates/sb_dl/src/commands/db.rs index 8463353..c0abd69 100644 --- a/crates/sb_dl/src/commands/db.rs +++ b/crates/sb_dl/src/commands/db.rs @@ -1,12 +1,328 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{collections::{HashMap, HashSet}, sync::{atomic::AtomicU64, Arc}}; use anyhow::{anyhow, Context}; use clap::ArgMatches; -use db::{client::{BlockFilter, Client}, migrations::run_migrations, models::BlockTableChoice}; +use db::{client::{BlockFilter, Client}, migrations::run_migrations, models::{BlockTableChoice, Blocks}}; +use futures::stream::{self, StreamExt}; use sb_dl::config::Config; use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig}; use solana_transaction_status::{EncodedTransaction, UiConfirmedBlock, UiTransactionEncoding}; -use tokio::task::JoinSet; +use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, task::JoinSet}; + +/// given a range, find blocks that are missing +pub async fn find_missing_blocks( + matches: &ArgMatches, + config_path: &str +) -> anyhow::Result<()> { + let start = *matches.get_one::("start").unwrap() as u64; + let end = *matches.get_one::("end").unwrap() as u64; + let output = matches.get_one::("output").unwrap(); + let cfg = Config::load(config_path).await?; + // load all currently indexed block number to avoid re-downloading already indexed block data + let mut indexed: HashSet = { + let mut conn = db::new_connection(&cfg.db_url)?; + + // perform db migrations + run_migrations(&mut conn); + + let client = db::client::Client {}; + //let mut blocks_1_indexed = client + // .indexed_slots(&mut conn, BlockTableChoice::Blocks) + // .unwrap_or_default() + // .into_iter() + // .filter_map(|block| Some(block? as u64)) + // .collect::>(); + //let mut blocks_2_indexed = client + // .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + // .unwrap_or_default() + // .into_iter() + // .filter_map(|block| Some(block? as u64)) + // .collect::>(); + let mut blocks_1_indexed = client.indexed_blocks(&mut conn, BlockTableChoice::Blocks)?; + let mut blocks_2_indexed = client.indexed_blocks(&mut conn, BlockTableChoice::Blocks2)?; + blocks_1_indexed.append(&mut blocks_2_indexed); + blocks_1_indexed.into_iter().map(|block| block as u64).collect() + }; + { + let mut conn = db::new_connection(&cfg.remotedb_url)?; + // merge indexed blocks from remotedb + + let client = db::client::Client {}; + //let mut blocks_1_indexed = client + // .indexed_slots(&mut conn, BlockTableChoice::Blocks) + // .unwrap_or_default() + // .into_iter() + // .filter_map(|block| Some(block? as u64)) + // .collect::>(); + //let mut blocks_2_indexed = client + // .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + // .unwrap_or_default() + // .into_iter() + // .filter_map(|block| Some(block? as u64)) + // .collect::>(); + //blocks_1_indexed.append(&mut blocks_2_indexed); + + let mut blocks_1_indexed = client.indexed_blocks(&mut conn, BlockTableChoice::Blocks)?; + let mut blocks_2_indexed = client.indexed_blocks(&mut conn, BlockTableChoice::Blocks2)?; + blocks_1_indexed.append(&mut blocks_2_indexed); + for block in blocks_1_indexed.into_iter() { + indexed.insert(block as u64); + } + } + let mut missing_blocks = vec![]; + + for block in start..=end { + if !indexed.contains(&block) { + missing_blocks.push(block); + } + } + + log::info!("found {} missing blocks in range(start={start}, end={end})", missing_blocks.len()); + + let mut fh = BufWriter::new(File::create(output).await?); + for missing_block in missing_blocks { + fh.write_all(format!("{missing_block}\n").as_bytes()).await?; + } + fh.flush().await.with_context(|| "failed to flush file") +} + +pub async fn get_missing_slots_in_range( + matches: &ArgMatches, + config_path: &str +) -> anyhow::Result<()> { + let start = matches.get_one::("start").unwrap(); + let end = matches.get_one::("end").unwrap(); + let cfg = Config::load(config_path).await?; + + let mut missing_slots = HashSet::new(); + + + // load all currently indexed block number to avoid re-downloading already indexed block data + let mut indexed_slots: HashSet = { + let mut conn = db::new_connection(&cfg.db_url)?; + + // perform db migrations + run_migrations(&mut conn); + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + blocks_1_indexed.append(&mut blocks_2_indexed); + blocks_1_indexed.into_iter().map(|block| block as u64).collect::>() + }; + { + let mut conn = db::new_connection(&cfg.remotedb_url)?; + // merge indexed blocks from remotedb + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + blocks_1_indexed.append(&mut blocks_2_indexed); + + for block in blocks_1_indexed.into_iter() { + indexed_slots.insert(block as u64); + } + } + for slot in *start..=*end { + if !indexed_slots.contains(&slot) { + missing_slots.insert(slot); + } + } + + let mut fh = File::create("slots_to_fetch.txt").await?; + for slot in missing_slots { + fh.write_all(format!("{slot}\n").as_bytes()).await?; + } + fh.flush().await.with_context(|| "failed to flush file") +} + +pub async fn guess_slot_numbers( + matches: &ArgMatches, + config_path: &str +) -> anyhow::Result<()> { + let input = matches.get_one::("input").unwrap(); + let limit = matches.get_one::("limit").unwrap(); + let threads = matches.get_one::("threads").unwrap(); + + let cfg = Config::load(config_path).await?; + let pool = db::new_connection_pool(&cfg.db_url, *threads as u32)?; + let remote_pool = db::new_connection_pool(&cfg.remotedb_url, *threads as u32)?; + + let client = db::client::Client {}; + + + let mut db_block_1_indexed: HashSet = HashSet::default(); + let mut db_block_2_indexed: HashSet = HashSet::default(); + + { + let mut conn = db::new_connection(&cfg.db_url)?; + + client.indexed_blocks(&mut conn, BlockTableChoice::Blocks)?.into_iter().for_each(|block| { + db_block_1_indexed.insert(block as u64); + }); + client.indexed_blocks(&mut conn, BlockTableChoice::Blocks2)?.into_iter().for_each(|block| { + db_block_2_indexed.insert(block as u64); + }); + } + let mut remote_db_block_1_indexed: HashSet = HashSet::default(); + let mut remote_db_block_2_indexed: HashSet = HashSet::default(); + { + let mut remote_conn = db::new_connection(&cfg.remotedb_url)?; + + client.indexed_blocks(&mut remote_conn, BlockTableChoice::Blocks)?.into_iter().for_each(|block| { + remote_db_block_1_indexed.insert(block as u64); + }); + client.indexed_blocks(&mut remote_conn, BlockTableChoice::Blocks2)?.into_iter().for_each(|block| { + remote_db_block_2_indexed.insert(block as u64); + }); + } + let blocks_to_fetch = { + let mut slots_to_fetch = vec![]; + { + let file = File::open(input).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + // Read the file line by line + while let Some(line) = lines.next_line().await? { + // Parse each line as a u64 + match line.trim().parse::() { + Ok(number) => { + if db_block_1_indexed.contains(&number) || db_block_2_indexed.contains(&number) || remote_db_block_1_indexed.contains(&number) || remote_db_block_2_indexed.contains(&number) { + continue; + } else { + slots_to_fetch.push(number); + } + } , + Err(_) => log::warn!("Warning: Skipping invalid line: {}", line), + } + } + } + slots_to_fetch + }; + let db_block_1_indexed = Arc::new(db_block_1_indexed); + let db_block_2_indexed = Arc::new(db_block_2_indexed); + let remote_db_block_1_indexed = Arc::new(remote_db_block_1_indexed); + let remote_db_block_2_indexed = Arc::new(remote_db_block_2_indexed); + + let total_blocks = blocks_to_fetch.len().min(*limit); + + let block_to_slot = stream::iter(blocks_to_fetch).enumerate().take(*limit).map(|(idx, block_to_fetch)| { + if idx as u64 % 500 == 0 { + log::info!("fetching block {idx}/{total_blocks}"); + } + let db_block_1_indexed = db_block_1_indexed.clone(); + let db_block_2_indexed = db_block_2_indexed.clone(); + let remote_db_block_1_indexed = remote_db_block_1_indexed.clone(); + let remote_db_block_2_indexed = remote_db_block_2_indexed.clone(); + let pool = pool.clone(); + let remote_pool = remote_pool.clone(); + async move { + let mut next_block = if db_block_1_indexed.contains(&(block_to_fetch + 1)) { + match pool.get() { + Ok(mut conn) => { + client.select_block( + &mut conn, + BlockFilter::Number((block_to_fetch + 1) as i64), + BlockTableChoice::Blocks + ).unwrap_or_default() + } + Err(err) => { + log::error!("failed to get connection"); + return (0, 0) + } + } + + } else if db_block_2_indexed.contains(&(block_to_fetch + 1)) { + match pool.get() { + Ok(mut conn) => { + client.select_block( + &mut conn, + BlockFilter::Number((block_to_fetch + 1) as i64), + BlockTableChoice::Blocks2 + ).unwrap_or_default() + } + Err(err) => { + log::error!("failed to get connection"); + return (0, 0) + } + } + + } else if remote_db_block_1_indexed.contains(&(block_to_fetch + 1)) { + match remote_pool.get() { + Ok(mut conn) => { + client.select_block( + &mut conn, + BlockFilter::Number((block_to_fetch + 1) as i64), + BlockTableChoice::Blocks + ).unwrap_or_default() + } + Err(err) => { + log::error!("failed to get connection"); + return (0, 0) + } + } + + } else if remote_db_block_2_indexed.contains(&(block_to_fetch + 1)) { + match remote_pool.get() { + Ok(mut conn) => { + client.select_block( + &mut conn, + BlockFilter::Number((block_to_fetch + 1) as i64), + BlockTableChoice::Blocks2 + ).unwrap_or_default() + } + Err(err) => { + log::error!("failed to get connection"); + return (0, 0) + } + } + + } else { + log::debug!("failed to find block {block_to_fetch}"); + return (0, 0) + }; + if next_block.is_empty() { + log::debug!("failed to find block {block_to_fetch}"); + return (0, 0) + } + let next_block = std::mem::take(&mut next_block[0]); + let block: UiConfirmedBlock = serde_json::from_value(next_block.data).unwrap(); + (block_to_fetch as u64, block.parent_slot as u64) + } + }) + .buffer_unordered(*threads - 1) + .collect::>() + .await; + let mut fh = BufWriter::new(File::create("slots_to_fetch.txt").await?); + for (_, slot) in block_to_slot { + if slot != 0 { + fh.write_all(format!("{slot}\n").as_bytes()).await?; + } + + } + fh.flush().await.with_context(|| "failed to flush file") +} pub async fn fill_missing_slots_no_tx( matches: &ArgMatches, diff --git a/crates/sb_dl/src/commands/mod.rs b/crates/sb_dl/src/commands/mod.rs index d1b659c..a74f470 100644 --- a/crates/sb_dl/src/commands/mod.rs +++ b/crates/sb_dl/src/commands/mod.rs @@ -1,5 +1,41 @@ +use tokio::signal::unix::Signal; + pub mod config; pub mod db; pub mod services; pub mod transfer_graph; pub mod utils; + + +pub async fn handle_exit( + mut sig_quit: Signal, + mut sig_int: Signal, + mut sig_term: Signal, + finished_rx: tokio::sync::oneshot::Receiver>, +) -> anyhow::Result<()> { + // handle exit routines + tokio::select! { + _ = sig_quit.recv() => { + log::warn!("goodbye.."); + return Ok(()); + } + _ = sig_int.recv() => { + log::warn!("goodbye.."); + return Ok(()); + } + _ = sig_term.recv() => { + log::warn!("goodbye.."); + return Ok(()); + } + msg = finished_rx => { + match msg { + // service encountered error + Ok(Some(msg)) => return Err(anyhow::anyhow!(msg)), + // service finished without error + Ok(None) => return Ok(()), + // underlying channel had an error + Err(err) => return Err(anyhow::anyhow!(err)) + } + } + } +} \ No newline at end of file diff --git a/crates/sb_dl/src/commands/services/bigtable.rs b/crates/sb_dl/src/commands/services/bigtable.rs new file mode 100644 index 0000000..2a5bf5a --- /dev/null +++ b/crates/sb_dl/src/commands/services/bigtable.rs @@ -0,0 +1,270 @@ +use { + super::{super::utils::get_failed_blocks, downloaders::block_persistence_loop}, + crate::commands::handle_exit, + clap::ArgMatches, + db::{migrations::run_migrations, models::BlockTableChoice}, + sb_dl::{config::Config, services::bigtable::Downloader, types::BlockInfo}, + std::{collections::HashSet, sync::Arc}, + tokio::{ + fs::File, + io::{AsyncBufReadExt, BufReader}, + signal::unix::{signal, SignalKind}, + }, +}; +/// Starts the big table historical block downloader +pub async fn bigtable_downloader(matches: &ArgMatches, config_path: &str) -> anyhow::Result<()> { + let blocks_table = + BlockTableChoice::try_from(*matches.get_one::("block-table-choice").unwrap()).unwrap(); + + let cfg = Config::load(config_path).await?; + let start = matches.get_one::("start").cloned(); + let limit = matches.get_one::("limit").cloned(); + let no_minimization = matches.get_flag("no-minimization"); + let failed_blocks_dir = matches.get_one::("failed-blocks").unwrap().clone(); + let threads = *matches.get_one::("threads").unwrap(); + + // create failed blocks directory, ignoring error (its already created) + let _ = tokio::fs::create_dir(&failed_blocks_dir).await; + + // read all failed blocks to append to the already_indexed hash set + // + // we do this so we can avoid re-downloading the blocks which are stored locally + let failed_blocks = get_failed_blocks(&failed_blocks_dir).await.unwrap(); + + // load all currently indexed block number to avoid re-downloading already indexed block data + let mut already_indexed: HashSet = { + let mut conn = db::new_connection(&cfg.db_url)?; + + // perform db migrations + run_migrations(&mut conn); + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + blocks_1_indexed.append(&mut blocks_2_indexed); + blocks_1_indexed.into_iter().collect() + }; + { + let mut conn = db::new_connection(&cfg.remotedb_url)?; + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + for block in blocks_1_indexed { + already_indexed.insert(block); + } + for block in blocks_2_indexed { + already_indexed.insert(block); + } + } + + // mark failed blocks as already indexed to avoid redownloading + already_indexed.extend(failed_blocks.iter()); + + let downloader = Arc::new(Downloader::new(cfg.bigtable).await?); + + // receives downloaded blocks, which allows us to persist downloaded data while we download and parse other data + let (blocks_tx, blocks_rx) = tokio::sync::mpsc::channel::(10_000 as usize); + + let sig_quit = signal(SignalKind::quit())?; + let sig_int = signal(SignalKind::interrupt())?; + let sig_term = signal(SignalKind::terminate())?; + + let pool = db::new_connection_pool(&cfg.db_url, threads as u32 * 2)?; + + // start the background persistence task + tokio::task::spawn(async move { + block_persistence_loop(pool, failed_blocks_dir, blocks_rx, threads, blocks_table).await + }); + + let (finished_tx, finished_rx) = tokio::sync::oneshot::channel(); + let (stop_downloader_tx, stop_downloader_rx) = tokio::sync::oneshot::channel(); + tokio::task::spawn(async move { + log::info!("starting block_indexing. disable_minimization={no_minimization}"); + + if let Err(err) = downloader + .start( + blocks_tx, + already_indexed, + start, + limit, + no_minimization, + threads, + stop_downloader_rx, + ) + .await + { + let _ = finished_tx.send(Some(format!("downloader failed {err:#?}"))); + } else { + log::info!("finished downloading blocks"); + let _ = finished_tx.send(None); + } + }); + + let err = handle_exit(sig_quit, sig_int, sig_term, finished_rx).await; + // stop the downloader task + let _ = stop_downloader_tx.send(()); + return err; +} + +pub async fn manual_bigtable_downloader( + matches: &ArgMatches, + config_path: &str, +) -> anyhow::Result<()> { + let blocks_table = + BlockTableChoice::try_from(*matches.get_one::("block-table-choice").unwrap()).unwrap(); + + let cfg = Config::load(config_path).await?; + let no_minimization = matches.get_flag("no-minimization"); + let failed_blocks_dir = matches.get_one::("failed-blocks").unwrap().clone(); + let threads = *matches.get_one::("threads").unwrap(); + let input = matches.get_one::("input").unwrap(); + let full_range = matches.get_flag("full-range"); + let use_remote = matches.get_flag("use-remote"); + // load all currently indexed block number to avoid re-downloading already indexed block data + let mut already_indexed: HashSet = { + let mut conn = db::new_connection(&cfg.db_url)?; + + // perform db migrations + run_migrations(&mut conn); + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + blocks_1_indexed.append(&mut blocks_2_indexed); + blocks_1_indexed.into_iter().collect() + }; + + { + let mut conn = db::new_connection(&cfg.remotedb_url)?; + + let client = db::client::Client {}; + let mut blocks_1_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + let mut blocks_2_indexed = client + .indexed_slots(&mut conn, BlockTableChoice::Blocks2) + .unwrap_or_default() + .into_iter() + .filter_map(|block| Some(block? as u64)) + .collect::>(); + for block in blocks_1_indexed { + already_indexed.insert(block); + } + for block in blocks_2_indexed { + already_indexed.insert(block); + } + } + + let slots_to_fetch = { + let mut slots_to_fetch = vec![]; + { + let file = File::open(input).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + // Read the file line by line + while let Some(line) = lines.next_line().await? { + // Parse each line as a u64 + match line.trim().parse::() { + Ok(number) => { + if !already_indexed.contains(&number) { + slots_to_fetch.push(number); + } + }, + Err(_) => log::warn!("Warning: Skipping invalid line: {}", line), + } + } + } + if full_range { + let start = slots_to_fetch[0]; + let end = slots_to_fetch[slots_to_fetch.len()-1]; + (start..=end).collect::>() + } else { + slots_to_fetch + } + + }; + let db_url = if use_remote { + log::info!("using remote db"); + &cfg.remotedb_url + } else { + &cfg.db_url + }; + log::info!("found {} slots to fetch", slots_to_fetch.len()); + // receives downloaded blocks, which allows us to persist downloaded data while we download and parse other data + let (blocks_tx, blocks_rx) = tokio::sync::mpsc::channel::(10_000 as usize); + + let sig_quit = signal(SignalKind::quit())?; + let sig_int = signal(SignalKind::interrupt())?; + let sig_term = signal(SignalKind::terminate())?; + + let downloader = Arc::new(Downloader::new(cfg.bigtable).await?); + let pool = db::new_connection_pool(db_url, threads as u32 * 2)?; + + // start the background persistence task + tokio::task::spawn(async move { + block_persistence_loop(pool, failed_blocks_dir, blocks_rx, threads, blocks_table).await + }); + let (stop_downloader_tx, stop_downloader_rx) = tokio::sync::oneshot::channel(); + let (finished_tx, finished_rx) = tokio::sync::oneshot::channel(); + + tokio::task::spawn(async move { + log::info!("fetching blocks"); + if let Err(err) = downloader + .fetch_blocks( + blocks_tx, + already_indexed, + no_minimization, + threads, + slots_to_fetch, + stop_downloader_rx, + ) + .await + { + log::error!("failed to fetch blocks {err:#?}"); + let _ = finished_tx.send(Some(format!("downloader failed {err:#?}"))); + } else { + log::info!("downloader finished"); + let _ = finished_tx.send(None); + } + }); + + let err = handle_exit(sig_quit, sig_int, sig_term, finished_rx).await; + // stop the downloader task + let _ = stop_downloader_tx.send(()); + return err; +} diff --git a/crates/sb_dl/src/commands/services/downloaders.rs b/crates/sb_dl/src/commands/services/downloaders.rs index b569813..e69088d 100644 --- a/crates/sb_dl/src/commands/services/downloaders.rs +++ b/crates/sb_dl/src/commands/services/downloaders.rs @@ -1,16 +1,11 @@ use { super::super::utils::{ get_failed_blocks, load_failed_blocks, sanitize_for_postgres, sanitize_value, - }, - anyhow::{anyhow, Context}, - clap::ArgMatches, - db::{migrations::run_migrations, models::{BlockTableChoice, NewBlock, NewBlock2}}, - diesel::{ + }, crate::commands::handle_exit, anyhow::{anyhow, Context}, clap::ArgMatches, db::{migrations::run_migrations, models::{BlockTableChoice, NewBlock, NewBlock2}}, diesel::{ prelude::*, r2d2::{ConnectionManager, Pool, PooledConnection}, PgConnection, - }, - sb_dl::{ + }, sb_dl::{ config::Config, services::{ backfill::Backfiller, @@ -18,107 +13,29 @@ use { geyser::{new_geyser_client, subscribe_blocks}, }, types::BlockInfo, - }, - solana_transaction_status::UiConfirmedBlock, - std::{collections::HashSet, sync::Arc}, - tokio::{ + }, solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig}, solana_transaction_status::{UiConfirmedBlock, UiTransactionEncoding}, std::{collections::HashSet, sync::Arc}, tokio::{ signal::unix::{signal, Signal, SignalKind}, sync::Semaphore, - }, + } }; - -/// Starts the big table historical block downloader -pub async fn bigtable_downloader(matches: &ArgMatches, config_path: &str) -> anyhow::Result<()> { - let blocks_table = BlockTableChoice::try_from(*matches.get_one::("block-table-choice").unwrap()).unwrap(); - +pub async fn get_transaction( + matches: &clap::ArgMatches, + config_path: &str +) -> anyhow::Result<()> { + let tx_hash = matches.get_one::("tx-hash").unwrap(); let cfg = Config::load(config_path).await?; - let start = matches.get_one::("start").cloned(); - let limit = matches.get_one::("limit").cloned(); - let no_minimization = matches.get_flag("no-minimization"); - let failed_blocks_dir = matches.get_one::("failed-blocks").unwrap().clone(); - let threads = *matches.get_one::("threads").unwrap(); - - // create failed blocks directory, ignoring error (its already created) - let _ = tokio::fs::create_dir(&failed_blocks_dir).await; - - // read all failed blocks to append to the already_indexed hash set - // - // we do this so we can avoid re-downloading the blocks which are stored locally - let failed_blocks = get_failed_blocks(&failed_blocks_dir).await.unwrap(); - - // load all currently indexed block number to avoid re-downloading already indexed block data - let mut already_indexed: HashSet = { - let mut conn = db::new_connection(&cfg.db_url)?; - - // perform db migrations - run_migrations(&mut conn); - - let client = db::client::Client {}; - let mut blocks_1_indexed = client - .indexed_blocks(&mut conn, BlockTableChoice::Blocks) - .unwrap_or_default() - .into_iter() - .filter_map(|block| Some(block? as u64)) - .collect::>(); - let mut blocks_2_indexed = client - .indexed_blocks(&mut conn, BlockTableChoice::Blocks2) - .unwrap_or_default() - .into_iter() - .filter_map(|block| Some(block? as u64)) - .collect::>(); - blocks_1_indexed.append(&mut blocks_2_indexed); - blocks_1_indexed.into_iter().collect() - }; - - // mark failed blocks as already indexed to avoid redownloading - already_indexed.extend(failed_blocks.iter()); - - let downloader = Arc::new(Downloader::new(cfg.bigtable).await?); - - // receives downloaded blocks, which allows us to persist downloaded data while we download and parse other data - let (blocks_tx, blocks_rx) = tokio::sync::mpsc::channel::(10_000 as usize); - - let sig_quit = signal(SignalKind::quit())?; - let sig_int = signal(SignalKind::interrupt())?; - let sig_term = signal(SignalKind::terminate())?; - - let pool = db::new_connection_pool(&cfg.db_url, threads as u32 *2)?; - - // start the background persistence task - tokio::task::spawn( - async move { block_persistence_loop(pool, failed_blocks_dir, blocks_rx, threads, blocks_table).await }, - ); - - let (finished_tx, finished_rx) = tokio::sync::oneshot::channel(); - let (stop_downloader_tx, stop_downloader_rx) = tokio::sync::oneshot::channel(); - tokio::task::spawn(async move { - log::info!("starting block_indexing. disable_minimization={no_minimization}"); - - if let Err(err) = downloader - .start( - blocks_tx, - already_indexed, - start, - limit, - no_minimization, - threads, - stop_downloader_rx - ) - .await - { - let _ = finished_tx.send(Some(format!("downloader failed {err:#?}"))); - } else { - log::info!("finished downloading blocks"); - let _ = finished_tx.send(None); + let rpc = RpcClient::new(cfg.rpc_url.clone()); + let tx = rpc.get_transaction_with_config( + &tx_hash.parse().unwrap(), + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::JsonParsed), + max_supported_transaction_version: Some(1), + ..Default::default() } - }); - - let err = handle_exit(sig_quit, sig_int, sig_term, finished_rx).await; - // stop the downloader task - let _ = stop_downloader_tx.send(()); - return err + ).await?; + log::info!("tx(hash={}, slot={})", tx_hash, tx.slot); + Ok(()) } - /// Starts the geyser stream block downloader pub async fn geyser_stream(matches: &ArgMatches, config_path: &str) -> anyhow::Result<()> { let blocks_table = BlockTableChoice::try_from(*matches.get_one::("block-table-choice").unwrap()).unwrap(); @@ -351,39 +268,6 @@ pub async fn block_persistence_loop( } } -async fn handle_exit( - mut sig_quit: Signal, - mut sig_int: Signal, - mut sig_term: Signal, - finished_rx: tokio::sync::oneshot::Receiver>, -) -> anyhow::Result<()> { - // handle exit routines - tokio::select! { - _ = sig_quit.recv() => { - log::warn!("goodbye.."); - return Ok(()); - } - _ = sig_int.recv() => { - log::warn!("goodbye.."); - return Ok(()); - } - _ = sig_term.recv() => { - log::warn!("goodbye.."); - return Ok(()); - } - msg = finished_rx => { - match msg { - // service encountered error - Ok(Some(msg)) => return Err(anyhow!(msg)), - // service finished without error - Ok(None) => return Ok(()), - // underlying channel had an error - Err(err) => return Err(anyhow!(err)) - } - } - } -} - async fn process_block(block_info: BlockInfo, conn: &mut PgConnection, failed_blocks_dir: String, client: db::client::Client, block_table_choice: BlockTableChoice) { // we cant rely on parentSlot + 1, as some slots may be skipped let slot = if let Some(slot) = block_info.slot { diff --git a/crates/sb_dl/src/commands/services/mod.rs b/crates/sb_dl/src/commands/services/mod.rs index c493451..211b8fe 100644 --- a/crates/sb_dl/src/commands/services/mod.rs +++ b/crates/sb_dl/src/commands/services/mod.rs @@ -2,4 +2,5 @@ pub mod downloaders; pub mod idl_indexer; pub mod program_indexer; pub mod transfer_api; -pub mod backfill; \ No newline at end of file +pub mod backfill; +pub mod bigtable; \ No newline at end of file diff --git a/crates/sb_dl/src/main.rs b/crates/sb_dl/src/main.rs index 098fab9..9bafefc 100644 --- a/crates/sb_dl/src/main.rs +++ b/crates/sb_dl/src/main.rs @@ -81,6 +81,45 @@ async fn main() -> Result<()> { .arg(failed_blocks_flag()) .arg(threads_flag()) .arg(block_table_choice_flag()), + Command::new("manual-bigtable-downloader") + .about("download specific block data using bigtable") + .arg( + Arg::new("start") + .long("start") + .value_parser(value_parser!(u64)) + .required(false), + ) + .arg( + Arg::new("limit") + .long("limit") + .help("max number of slots to index") + .value_parser(value_parser!(u64)) + .required(false), + ) + .arg(no_minimization_flag()) + .arg(failed_blocks_flag()) + .arg(threads_flag()) + .arg(block_table_choice_flag()) + .arg( + Arg::new("input") + .long("input") + ) + .arg( + Arg::new("full-range") + .long("full-range") + .help("use full range instead of exact numbers") + .action(clap::ArgAction::SetTrue) + .default_value("false") + .required(false) + ) + .arg( + Arg::new("use-remote") + .long("use-remote") + .help("if present use remotedb") + .action(clap::ArgAction::SetTrue) + .default_value("false") + .required(false) + ), ]), Command::new("import-failed-blocks").arg(failed_blocks_flag()) .arg(block_table_choice_flag()), @@ -148,7 +187,56 @@ async fn main() -> Result<()> { .help("starting number to assume a gap for") .value_parser(clap::value_parser!(i64)) ) - .arg(block_table_choice_flag()) + .arg(block_table_choice_flag()), + Command::new("find-missing-blocks") + .about("find missing blocks in a given range") + .arg( + Arg::new("start") + .long("start") + .value_parser(clap::value_parser!(i64)) + ) + .arg( + Arg::new("end") + .long("end") + .value_parser(clap::value_parser!(i64)) + ) + .arg( + Arg::new("output") + .long("output") + ), + Command::new("guess-slot-numbers") + .arg( + Arg::new("input") + .long("input") + ) + .arg( + Arg::new("limit") + .long("limit") + .value_parser(clap::value_parser!(usize)) + .default_value("0") + ) + .arg( + Arg::new("threads") + .long("threads") + .value_parser(clap::value_parser!(usize)) + .default_value("2") + ), + Command::new("get-tx") + .arg( + Arg::new("tx-hash") + .long("tx-hash") + ), + Command::new("get-missing-slots-in-range") + .arg( + Arg::new("start") + .long("start") + .value_parser(clap::value_parser!(u64)) + ) + .arg( + Arg::new("end") + .long("end") + .value_parser(clap::value_parser!(u64)) + ) ]) .get_matches(); @@ -192,7 +280,7 @@ async fn process_matches(matches: &ArgMatches, config_path: &str) -> anyhow::Res Some(("fill-missing-slots-no-tx", fmsnt)) => commands::db::fill_missing_slots_no_tx(fmsnt, config_path).await, Some(("services", s)) => match s.subcommand() { Some(("bigtable-downloader", bd)) => { - commands::services::downloaders::bigtable_downloader(bd, config_path).await + commands::services::bigtable::bigtable_downloader(bd, config_path).await } Some(("geyser-stream", gs)) => { commands::services::downloaders::geyser_stream(gs, config_path).await @@ -210,8 +298,13 @@ async fn process_matches(matches: &ArgMatches, config_path: &str) -> anyhow::Res commands::services::transfer_api::transfer_flow_api(tfa, config_path).await } Some(("repair-gaps", rg)) => commands::services::backfill::backfill(rg, config_path).await, + Some(("manual-bigtable-downloader", mbd)) => commands::services::bigtable::manual_bigtable_downloader(mbd, config_path).await, _ => Err(anyhow!("invalid subcommand")), }, + Some(("find-missing-blocks", fmb)) => commands::db::find_missing_blocks(fmb, config_path).await, + Some(("guess-slot-numbers", gsn)) => commands::db::guess_slot_numbers(gsn, config_path).await, + Some(("get-tx", gt)) => commands::services::downloaders::get_transaction(gt, config_path).await, + Some(("get-missing-slots-in-range", gmsir)) => commands::db::get_missing_slots_in_range(gmsir, config_path).await, _ => Err(anyhow!("invalid subcommand")), } } diff --git a/crates/sb_dl/src/services/bigtable.rs b/crates/sb_dl/src/services/bigtable.rs index 6c155a2..7d922bb 100644 --- a/crates/sb_dl/src/services/bigtable.rs +++ b/crates/sb_dl/src/services/bigtable.rs @@ -92,41 +92,13 @@ impl Downloader { let exit = exit.clone(); async move { if !exit.load(Ordering::SeqCst) { - match Self::get_confirmed_block(client, max_decoding_size, slot).await { - Ok(block) => { - if let Some(block) = block { - let block_height = if let Some(block_height) = block.block_height { - block_height - } else { - log::warn!("block({slot}) height is none"); - return; - }; - // post process the block to handle encoding and space minimization - match process_block(block, no_minimization) { - Ok(block) => { - if let Err(err) = blocks_tx - .send(BlockInfo { - block_height, - slot: Some(slot), - block, - }) - .await - { - log::error!("failed to send block({slot}) {err:#?}"); - } else { - log::debug!("processed block({slot})"); - } - } - Err(err) => { - log::error!("failed to minimize and encode block({slot}) {err:#?}"); - } - } - } - } - Err(err) => { - log::error!("failed to fetch block({slot}) {err:#?}"); - } - } + Self::get_and_process_block( + client, + max_decoding_size, + slot, + no_minimization, + blocks_tx + ).await; } } @@ -136,6 +108,96 @@ impl Downloader { .await; Ok(()) } + /// Similar to `start` but uses provided input for slots to fetch + pub async fn fetch_blocks( + self: &Arc, + blocks_tx: tokio::sync::mpsc::Sender, + already_indexed: HashSet, + no_minimization: bool, + threads: usize, + slots_to_fetch: Vec, + exit_ch: tokio::sync::oneshot::Receiver<()> + ) -> anyhow::Result<()> { + let exit = Arc::new(AtomicBool::new(false)); + { + let exit = exit.clone(); + tokio::task::spawn(async move { + let _ = exit_ch.await; + exit.store(true, Ordering::SeqCst); + }); + } + // instantiate the client which will be cloned between threads + let client = self.conn.client(); + stream::iter(slots_to_fetch).map(|slot| { + let blocks_tx = blocks_tx.clone(); + let client = client.clone(); + let max_decoding_size = self.max_decoding_size; + let exit = exit.clone(); + async move { + if !exit.load(Ordering::SeqCst) { + Self::get_and_process_block( + client, + max_decoding_size, + slot, + no_minimization, + blocks_tx + ).await; + } + + } + }) + .buffer_unordered(threads) + .collect::>() + .await; + Ok(()) + } + async fn get_and_process_block( + client: BigTable, + max_decoding_size: usize, + slot: Slot, + no_minimization: bool, + blocks_tx: tokio::sync::mpsc::Sender, + + ) { + log::info!("retrieving block {slot}"); + match Self::get_confirmed_block(client, max_decoding_size, slot).await { + Ok(block) => { + if let Some(block) = block { + let block_height = if let Some(block_height) = block.block_height { + block_height + } else { + log::debug!("block({slot}) height is none"); + return; + }; + // post process the block to handle encoding and space minimization + match process_block(block, no_minimization) { + Ok(block) => { + if let Err(err) = blocks_tx + .send(BlockInfo { + block_height, + slot: Some(slot), + block, + }) + .await + { + log::error!("failed to send block({slot}) {err:#?}"); + } else { + log::info!("processed block({slot})"); + } + } + Err(err) => { + log::error!("failed to minimize and encode block({slot}) {err:#?}"); + } + } + } else { + log::warn!("block is None"); + } + } + Err(err) => { + log::error!("failed to fetch block({slot}) {err:#?}"); + } + } + } /// Downloads multiple blocks at once, returning a vector of vec![(block_slot, block_data)] pub async fn get_confirmed_blocks( &self, @@ -275,6 +337,7 @@ impl Downloader { // ensure we got a single response, as we requested 1 slot if response.len() != 1 { + log::warn!("no matching block, response_count {}", response.len()); // block does not exist and cant be found return Ok(None); }