Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rindexer_cli"
version = "0.28.0"
version = "0.28.1"
edition = "2021"
description = "A no-code or framework to build blazing fast EVM indexers - built in rust."
license = "MIT"
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rindexer"
version = "0.28.0"
version = "0.28.1"
edition = "2021"
description = "A no-code or framework to build blazing fast EVM indexers - built in rust."
license = "MIT"
Expand All @@ -27,7 +27,7 @@ bb8-redis = "0.24.0"
bytes = "1.10.1"
chrono = { version = "0.4", features = ["serde"] }
colored = "3.0.0"
clickhouse = { version = "0.13.3" }
clickhouse = { version = "0.13.3", features = ["rustls-tls"] }
csv = "1.3.1"
deadpool = { version = "0.12", features = ["rt_tokio_1"] }
deadpool-lapin = "0.13"
Expand Down
16 changes: 11 additions & 5 deletions core/src/database/clickhouse/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::env;
use tracing::info;

pub struct ClickhouseConnection {
url: String,
user: String,
password: String,
db: String,
pub url: String,
pub user: String,
pub password: String,
pub db: String,
}

pub fn clickhouse_connection() -> Result<ClickhouseConnection, env::VarError> {
Expand Down Expand Up @@ -42,11 +42,13 @@ pub enum ClickhouseError {

pub struct ClickhouseClient {
pub(crate) conn: Client,
pub(crate) database_name: String,
}

impl ClickhouseClient {
pub async fn new() -> Result<Self, ClickhouseConnectionError> {
let connection = clickhouse_connection()?;
let database_name = connection.db.clone();

let client = Client::default()
.with_url(connection.url)
Expand All @@ -57,7 +59,11 @@ impl ClickhouseClient {
client.query("select 1").execute().await?;
info!("Clickhouse client connected successfully!");

Ok(ClickhouseClient { conn: client })
Ok(ClickhouseClient { conn: client, database_name })
}

pub fn get_database_name(&self) -> &str {
&self.database_name
}

pub async fn query_one<T>(&self, sql: &str) -> Result<T, ClickhouseError>
Expand Down
56 changes: 35 additions & 21 deletions core/src/database/clickhouse/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use crate::manifest::contract::FactoryDetailsYaml;
pub fn generate_tables_for_indexer_clickhouse(
project_path: &Path,
indexer: &Indexer,
database_name: &str,
disable_event_tables: bool,
) -> Result<Code, GenerateTablesForIndexerSqlError> {
let mut sql = "CREATE DATABASE IF NOT EXISTS rindexer_internal;".to_string();
let mut sql = String::new();

for contract in &indexer.contracts {
let contract_name = contract.before_modify_name_if_filter_readonly();
Expand All @@ -33,29 +34,29 @@ pub fn generate_tables_for_indexer_clickhouse(
let factories = contract.details.iter().flat_map(|d| d.factory.clone()).collect::<Vec<_>>();

if !disable_event_tables {
sql.push_str(format!("CREATE DATABASE IF NOT EXISTS {};", schema_name).as_str());
info!("Creating database if not exists: {}", schema_name);
info!("Creating event tables in database: {}", database_name);

sql.push_str(&generate_event_table_clickhouse(&event_names, &schema_name));
sql.push_str(&generate_event_table_clickhouse(&event_names, database_name, &schema_name));
}

sql.push_str(&generate_internal_event_table_clickhouse(
&event_names,
database_name,
&schema_name,
networks,
));

sql.push_str(&generate_internal_factory_event_table_sql(&indexer.name, &factories));
sql.push_str(&generate_internal_factory_event_table_sql(database_name, &indexer.name, &factories));
}

Ok(Code::new(sql))
}

fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], schema_name: &str) -> String {
fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], database_name: &str, schema_name: &str) -> String {
abi_inputs
.iter()
.map(|event_info| {
let table_name = format!("{}.{}", schema_name, camel_to_snake(&event_info.name));
let table_name = format!("{}.{}_{}", database_name, schema_name, camel_to_snake(&event_info.name));
info!("Creating table if not exists: {}", table_name);
let event_columns = if event_info.inputs.is_empty() {
"".to_string()
Expand Down Expand Up @@ -93,12 +94,14 @@ fn generate_event_table_clickhouse(abi_inputs: &[EventInfo], schema_name: &str)

fn generate_internal_event_table_clickhouse(
abi_inputs: &[EventInfo],
database_name: &str,
schema_name: &str,
networks: Vec<&str>,
) -> String {
abi_inputs.iter().map(|event_info| {
let table_name = format!(
"rindexer_internal.{}_{}",
"{}.rindexer_internal_{}_{}",
database_name,
schema_name,
camel_to_snake(&event_info.name)
);
Expand All @@ -122,18 +125,21 @@ fn generate_internal_event_table_clickhouse(
)
}).collect::<Vec<_>>().join("\n");

let create_latest_block_query = r#"
CREATE TABLE IF NOT EXISTS rindexer_internal.latest_block (
let latest_block_table_name = format!("{}.rindexer_internal_latest_block", database_name);
let create_latest_block_query = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
"network" String,
"block" UInt64
)
ENGINE = ReplacingMergeTree(block)
ORDER BY network;
"#.to_string();
"#, latest_block_table_name);

let latest_block_insert_queries = networks.iter().map(|network| {
format!(
r#"INSERT INTO rindexer_internal.latest_block ("network", "block") VALUES ('{network}', 0);"#
r#"INSERT INTO {} ("network", "block") VALUES ('{network}', 0);"#,
latest_block_table_name
)
}).collect::<Vec<_>>().join("\n");

Expand All @@ -143,6 +149,7 @@ fn generate_internal_event_table_clickhouse(
}

fn generate_internal_factory_event_table_sql(
database_name: &str,
indexer_name: &str,
factories: &[FactoryDetailsYaml],
) -> String {
Expand All @@ -159,14 +166,15 @@ fn generate_internal_factory_event_table_sql(

let create_table_query = format!(
r#"
CREATE TABLE IF NOT EXISTS rindexer_internal.{table_name} (
CREATE TABLE IF NOT EXISTS {}.rindexer_internal_{} (
"factory_address" FixedString(42),
"factory_deployed_address" FixedString(42),
"network" String
)
ENGINE = ReplacingMergeTree()
ORDER BY ("network", "factory_address", "factory_deployed_address");
"#
"#,
database_name, table_name
);

create_table_query
Expand All @@ -186,24 +194,30 @@ pub fn generate_columns_with_data_types(inputs: &[ABIInput]) -> Vec<String> {
generate_columns(inputs, &GenerateAbiPropertiesType::ClickhouseWithDataTypes)
}

pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer) -> Code {
pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer, database_name: &str) -> Code {
let mut sql = String::new();

sql.push_str("DROP TABLE IF EXISTS rindexer_internal.latest_block;");
sql.push_str(&format!("DROP TABLE IF EXISTS {}.rindexer_internal_latest_block;", database_name));

for contract in &indexer.contracts {
let contract_name = contract.before_modify_name_if_filter_readonly();
let schema_name = generate_indexer_contract_schema_name(&indexer.name, &contract_name);
sql.push_str(format!("DROP DATABASE IF EXISTS {schema_name};").as_str());

// drop last synced blocks for contracts
// drop event tables
let abi_items = ABIItem::read_abi_items(project_path, contract);
if let Ok(abi_items) = abi_items {
for abi_item in abi_items.iter() {
let table_name =
// Drop event table
let event_table_name = format!("{}_{}", schema_name, camel_to_snake(&abi_item.name));
sql.push_str(
&format!("DROP TABLE IF EXISTS {}.{};", database_name, event_table_name),
);

// Drop internal tracking table
let internal_table_name =
generate_internal_event_table_name_no_shorten(&schema_name, &abi_item.name);
sql.push_str(
format!("DROP TABLE IF EXISTS rindexer_internal.{table_name};").as_str(),
&format!("DROP TABLE IF EXISTS {}.rindexer_internal_{};", database_name, internal_table_name),
);
}
} else {
Expand All @@ -222,7 +236,7 @@ pub fn drop_tables_for_indexer_clickhouse(project_path: &Path, indexer: &Indexer
input_names: factory.input_names(),
};
let table_name = generate_internal_factory_event_table_name(&params);
sql.push_str(format!("DROP TABLE IF EXISTS rindexer_internal.{table_name};").as_str())
sql.push_str(&format!("DROP TABLE IF EXISTS {}.rindexer_internal_{};", database_name, table_name))
}
}

Expand Down
12 changes: 7 additions & 5 deletions core/src/database/clickhouse/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,39 @@ pub async fn setup_clickhouse(

let client =
ClickhouseClient::new().await.map_err(SetupClickhouseError::ClickhouseConnectionError)?;
let database_name = client.get_database_name();
let disable_event_tables = manifest.storage.clickhouse_disable_create_tables();

if manifest.storage.clickhouse_drop_each_run() {
info!(
"`drop_each_run` enabled so dropping all data for {} before starting",
&manifest.name
);
let sql = drop_tables_for_indexer_clickhouse(project_path, &manifest.to_indexer());
let sql = drop_tables_for_indexer_clickhouse(project_path, &manifest.to_indexer(), database_name);
client.execute_batch(sql.as_str()).await?;
info!("Dropped all data for {}", manifest.name);
}

if disable_event_tables {
info!("Creating internal rindexer tables for {}", manifest.name);
info!("Creating internal rindexer tables for {} in database: {}", manifest.name, database_name);
} else {
info!("Creating tables for {}", manifest.name);
info!("Creating tables for {} in database: {}", manifest.name, database_name);
}

let sql = generate_tables_for_indexer_clickhouse(
project_path,
&manifest.to_indexer(),
database_name,
disable_event_tables,
)
.map_err(SetupClickhouseError::ClickhouseTableGenerationError)?;

client.execute_batch(sql.as_str()).await?;

if disable_event_tables {
info!("Created tables for {}", manifest.name);
} else {
info!("Created internal rindexer tables for {}", manifest.name);
} else {
info!("Created tables for {}", manifest.name);
}

Ok(client)
Expand Down
10 changes: 10 additions & 0 deletions core/src/database/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ pub fn generate_event_table_full_name(
format!("{}.{}", schema_name, camel_to_snake(event_name))
}

pub fn generate_event_table_full_name_clickhouse(
database_name: &str,
indexer_name: &str,
contract_name: &str,
event_name: &str,
) -> String {
let schema_name = generate_indexer_contract_schema_name(indexer_name, contract_name);
format!("{}.{}_{}", database_name, schema_name, camel_to_snake(event_name))
}

pub fn generate_event_table_columns_names_sql(column_names: &[String]) -> String {
column_names.iter().map(|name| format!("\"{name}\"")).collect::<Vec<String>>().join(", ")
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/indexer/last_synced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,12 @@ pub async fn get_last_synced_block_number(config: SyncConfig<'_>) -> Option<U64>
last_synced_block: u64,
}

let database_name = clickhouse.get_database_name();
let schema =
generate_indexer_contract_schema_name(config.indexer_name, config.contract_name);
let table_name = generate_internal_event_table_name_no_shorten(&schema, config.event_name);
let query = format!(
"SELECT last_synced_block FROM rindexer_internal.{table_name} FINAL WHERE network = '{}'",
"SELECT last_synced_block FROM {database_name}.rindexer_internal_{table_name} FINAL WHERE network = '{}'",
config.network
);

Expand Down Expand Up @@ -306,15 +307,16 @@ pub async fn update_progress_and_last_synced_task(
error!("Error updating db last synced block: {:?}", e);
}
} else if let Some(clickhouse) = &config.clickhouse() {
let database_name = clickhouse.get_database_name();
let schema =
generate_indexer_contract_schema_name(&config.indexer_name(), &config.contract_name());
let table_name =
generate_internal_event_table_name_no_shorten(&schema, &config.event_name());
let network = &config.network_contract().network;
let query = format!(
r#"
INSERT INTO rindexer_internal.{table_name} (network, last_synced_block) VALUES ('{network}', {to_block});
INSERT INTO rindexer_internal.latest_block (network, block) VALUES ('{network}', {latest});
INSERT INTO {database_name}.rindexer_internal_{table_name} (network, last_synced_block) VALUES ('{network}', {to_block});
INSERT INTO {database_name}.rindexer_internal_latest_block (network, block) VALUES ('{network}', {latest});
"#
);

Expand Down
Loading
Loading