-
Notifications
You must be signed in to change notification settings - Fork 82
Adding hosted clickhouse support + changing naming convention for cli… #331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||
| [package] | ||||||
| name = "rindexer" | ||||||
| version = "0.28.0" | ||||||
| version = "0.28.1" | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't bump here
Suggested change
|
||||||
| edition = "2021" | ||||||
| description = "A no-code or framework to build blazing fast EVM indexers - built in rust." | ||||||
| license = "MIT" | ||||||
|
|
@@ -27,7 +27,7 @@ bb8-redis = "0.24.0" | |||||
| bytes = "1.10.1" | ||||||
| chrono = { version = "0.4", features = ["serde"] } | ||||||
| colored = "3.0.0" | ||||||
| clickhouse = { version = "0.13.3" } | ||||||
| clickhouse = { version = "0.13.3", features = ["rustls-tls"] } | ||||||
| csv = "1.3.1" | ||||||
| deadpool = { version = "0.12", features = ["rt_tokio_1"] } | ||||||
| deadpool-lapin = "0.13" | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,9 +20,10 @@ use crate::manifest::contract::FactoryDetailsYaml; | |
| pub fn generate_tables_for_indexer_clickhouse( | ||
| project_path: &Path, | ||
| indexer: &Indexer, | ||
| database_name: &str, | ||
| disable_event_tables: bool, | ||
| ) -> Result<Code, GenerateTablesForIndexerSqlError> { | ||
| let mut sql = "CREATE DATABASE IF NOT EXISTS rindexer_internal;".to_string(); | ||
| let mut sql = String::new(); | ||
|
|
||
| for contract in &indexer.contracts { | ||
| let contract_name = contract.before_modify_name_if_filter_readonly(); | ||
|
|
@@ -33,29 +34,29 @@ pub fn generate_tables_for_indexer_clickhouse( | |
| let factories = contract.details.iter().flat_map(|d| d.factory.clone()).collect::<Vec<_>>(); | ||
|
|
||
| if !disable_event_tables { | ||
| sql.push_str(format!("CREATE DATABASE IF NOT EXISTS {};", schema_name).as_str()); | ||
| info!("Creating database if not exists: {}", schema_name); | ||
| info!("Creating event tables in database: {}", database_name); | ||
|
|
||
| sql.push_str(&generate_event_table_clickhouse(&event_names, &schema_name)); | ||
| sql.push_str(&generate_event_table_clickhouse(&event_names, database_name, &schema_name)); | ||
| } | ||
|
|
||
| sql.push_str(&generate_internal_event_table_clickhouse( | ||
| &event_names, | ||
| database_name, | ||
| &schema_name, | ||
| networks, | ||
| )); | ||
|
|
||
| sql.push_str(&generate_internal_factory_event_table_sql(&indexer.name, &factories)); | ||
| sql.push_str(&generate_internal_factory_event_table_sql(database_name, &indexer.name, &factories)); | ||
| } | ||
|
|
||
| Ok(Code::new(sql)) | ||
| } | ||
|
|
||
| fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], schema_name: &str) -> String { | ||
| fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], database_name: &str, schema_name: &str) -> String { | ||
| abi_inputs | ||
| .iter() | ||
| .map(|event_info| { | ||
| let table_name = format!("{}.{}", schema_name, camel_to_snake(&event_info.name)); | ||
| let table_name = format!("{}.{}_{}", database_name, schema_name, camel_to_snake(&event_info.name)); | ||
| info!("Creating table if not exists: {}", table_name); | ||
| let event_columns = if event_info.inputs.is_empty() { | ||
| "".to_string() | ||
|
|
@@ -93,12 +94,14 @@ fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], schema_name: &str) | |
|
|
||
| fn generate_internal_event_table_clickhouse( | ||
| abi_inputs: &[EventInfo], | ||
| database_name: &str, | ||
| schema_name: &str, | ||
| networks: Vec<&str>, | ||
| ) -> String { | ||
| abi_inputs.iter().map(|event_info| { | ||
| let table_name = format!( | ||
| "rindexer_internal.{}_{}", | ||
| "{}.rindexer_internal_{}_{}", | ||
| database_name, | ||
| schema_name, | ||
| camel_to_snake(&event_info.name) | ||
| ); | ||
|
|
@@ -122,18 +125,21 @@ fn generate_internal_event_table_clickhouse( | |
| ) | ||
| }).collect::<Vec<_>>().join("\n"); | ||
|
|
||
| let create_latest_block_query = r#" | ||
| CREATE TABLE IF NOT EXISTS rindexer_internal.latest_block ( | ||
| let latest_block_table_name = format!("{}.rindexer_internal_latest_block", database_name); | ||
| let create_latest_block_query = format!( | ||
| r#" | ||
| CREATE TABLE IF NOT EXISTS {} ( | ||
| "network" String, | ||
| "block" UInt64 | ||
| ) | ||
| ENGINE = ReplacingMergeTree(block) | ||
| ORDER BY network; | ||
| "#.to_string(); | ||
| "#, latest_block_table_name); | ||
|
|
||
| let latest_block_insert_queries = networks.iter().map(|network| { | ||
| format!( | ||
| r#"INSERT INTO rindexer_internal.latest_block ("network", "block") VALUES ('{network}', 0);"# | ||
| r#"INSERT INTO {} ("network", "block") VALUES ('{network}', 0);"#, | ||
| latest_block_table_name | ||
| ) | ||
| }).collect::<Vec<_>>().join("\n"); | ||
|
|
||
|
|
@@ -143,6 +149,7 @@ fn generate_internal_event_table_clickhouse( | |
| } | ||
|
|
||
| fn generate_internal_factory_event_table_sql( | ||
| database_name: &str, | ||
| indexer_name: &str, | ||
| factories: &[FactoryDetailsYaml], | ||
| ) -> String { | ||
|
|
@@ -159,14 +166,15 @@ fn generate_internal_factory_event_table_sql( | |
|
|
||
| let create_table_query = format!( | ||
| r#" | ||
| CREATE TABLE IF NOT EXISTS rindexer_internal.{table_name} ( | ||
| CREATE TABLE IF NOT EXISTS {}.rindexer_internal_{} ( | ||
| "factory_address" FixedString(42), | ||
| "factory_deployed_address" FixedString(42), | ||
| "network" String | ||
| ) | ||
| ENGINE = ReplacingMergeTree() | ||
| ORDER BY ("network", "factory_address", "factory_deployed_address"); | ||
| "# | ||
| "#, | ||
| database_name, table_name | ||
| ); | ||
|
|
||
| create_table_query | ||
|
|
@@ -186,24 +194,30 @@ pub fn generate_columns_with_data_types(inputs: &[ABIInput]) -> Vec<String> { | |
| generate_columns(inputs, &GenerateAbiPropertiesType::ClickhouseWithDataTypes) | ||
| } | ||
|
|
||
| pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer) -> Code { | ||
| pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer, database_name: &str) -> Code { | ||
| let mut sql = String::new(); | ||
|
|
||
| sql.push_str("DROP TABLE IF EXISTS rindexer_internal.latest_block;"); | ||
| sql.push_str(&format!("DROP TABLE IF EXISTS {}.rindexer_internal_latest_block;", database_name)); | ||
|
||
|
|
||
| for contract in &indexer.contracts { | ||
| let contract_name = contract.before_modify_name_if_filter_readonly(); | ||
| let schema_name = generate_indexer_contract_schema_name(&indexer.name, &contract_name); | ||
| sql.push_str(format!("DROP DATABASE IF EXISTS {schema_name};").as_str()); | ||
|
|
||
| // drop last synced blocks for contracts | ||
| // drop event tables | ||
| let abi_items = ABIItem::read_abi_items(project_path, contract); | ||
| if let Ok(abi_items) = abi_items { | ||
| for abi_item in abi_items.iter() { | ||
| let table_name = | ||
| // Drop event table | ||
| let event_table_name = format!("{}_{}", schema_name, camel_to_snake(&abi_item.name)); | ||
| sql.push_str( | ||
| &format!("DROP TABLE IF EXISTS {}.{};", database_name, event_table_name), | ||
| ); | ||
|
|
||
| // Drop internal tracking table | ||
| let internal_table_name = | ||
| generate_internal_event_table_name_no_shorten(&schema_name, &abi_item.name); | ||
| sql.push_str( | ||
| format!("DROP TABLE IF EXISTS rindexer_internal.{table_name};").as_str(), | ||
| &format!("DROP TABLE IF EXISTS {}.rindexer_internal_{};", database_name, internal_table_name), | ||
| ); | ||
| } | ||
| } else { | ||
|
|
@@ -222,7 +236,7 @@ pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer | |
| input_names: factory.input_names(), | ||
| }; | ||
| let table_name = generate_internal_factory_event_table_name(¶ms); | ||
| sql.push_str(format!("DROP TABLE IF EXISTS rindexer_internal.{table_name};").as_str()) | ||
| sql.push_str(&format!("DROP TABLE IF EXISTS {}.rindexer_internal_{};", database_name, table_name)) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,11 +183,12 @@ pub async fn get_last_synced_block_number(config: SyncConfig<'_>) -> Option<U64> | |
| last_synced_block: u64, | ||
| } | ||
|
|
||
| let database_name = clickhouse.get_database_name(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just call |
||
| let schema = | ||
| generate_indexer_contract_schema_name(config.indexer_name, config.contract_name); | ||
| let table_name = generate_internal_event_table_name_no_shorten(&schema, config.event_name); | ||
| let query = format!( | ||
| "SELECT last_synced_block FROM rindexer_internal.{table_name} FINAL WHERE network = '{}'", | ||
| "SELECT last_synced_block FROM {database_name}.rindexer_internal_{table_name} FINAL WHERE network = '{}'", | ||
| config.network | ||
| ); | ||
|
|
||
|
|
@@ -306,15 +307,16 @@ pub async fn update_progress_and_last_synced_task( | |
| error!("Error updating db last synced block: {:?}", e); | ||
| } | ||
| } else if let Some(clickhouse) = &config.clickhouse() { | ||
| let database_name = clickhouse.get_database_name(); | ||
| let schema = | ||
| generate_indexer_contract_schema_name(&config.indexer_name(), &config.contract_name()); | ||
| let table_name = | ||
| generate_internal_event_table_name_no_shorten(&schema, &config.event_name()); | ||
| let network = &config.network_contract().network; | ||
| let query = format!( | ||
| r#" | ||
| INSERT INTO rindexer_internal.{table_name} (network, last_synced_block) VALUES ('{network}', {to_block}); | ||
| INSERT INTO rindexer_internal.latest_block (network, block) VALUES ('{network}', {latest}); | ||
| INSERT INTO {database_name}.rindexer_internal_{table_name} (network, last_synced_block) VALUES ('{network}', {to_block}); | ||
| INSERT INTO {database_name}.rindexer_internal_latest_block (network, block) VALUES ('{network}', {latest}); | ||
| "# | ||
| ); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we should bump this.