Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions core/src/database/postgres/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ impl PostgresClient {
conn.copy_in(statement).await.map_err(PostgresError::PgError)
}

pub async fn bulk_insert_via_copy(
// Internal method used by insert_bulk for large datasets (>100 rows).
// Uses PostgreSQL COPY command for optimal performance with large data.
// Made pub(crate) to allow crate-internal access while keeping insert_bulk as the primary API.
pub(crate) async fn bulk_insert_via_copy(
&self,
table_name: &str,
column_names: &[String],
Expand Down Expand Up @@ -332,7 +335,10 @@ impl PostgresClient {
Ok(())
}

pub async fn bulk_insert(
// Internal method used by insert_bulk for small datasets (≤100 rows).
// Uses standard INSERT queries which are more efficient for smaller data volumes.
// Made pub(crate) to allow crate-internal access while keeping insert_bulk as the primary API.
pub(crate) async fn bulk_insert_via_query(
&self,
table_name: &str,
column_names: &[String],
Expand Down Expand Up @@ -399,15 +405,20 @@ impl PostgresClient {
return Ok(());
}

if postgres_bulk_data.len() > 100 {
let total_params = postgres_bulk_data.len() * columns.len();

// PostgreSQL has a maximum of 65535 parameters in a single query
// (see https://www.postgresql.org/docs/current/limits.html#LIMITS-TABLE)
// If we exceed this limit, force use of COPY method
if postgres_bulk_data.len() > 100 || total_params > 65535 {
let column_types: Vec<PgType> =
postgres_bulk_data[0].iter().map(|param| param.to_type()).collect();

self.bulk_insert_via_copy(table_name, columns, &column_types, postgres_bulk_data)
.await
.map_err(|e| e.to_string())
} else {
self.bulk_insert(table_name, columns, postgres_bulk_data)
self.bulk_insert_via_query(table_name, columns, postgres_bulk_data)
.await
.map(|_| ())
.map_err(|e| e.to_string())
Expand Down
2 changes: 1 addition & 1 deletion core/src/database/postgres/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn generate_event_table_sql_with_comments(
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {table_name} (\
rindexer_id SERIAL PRIMARY KEY NOT NULL, \
contract_address CHAR(66) NOT NULL, \
contract_address CHAR(42) NOT NULL, \
{event_columns} \
tx_hash CHAR(66) NOT NULL, \
block_number NUMERIC NOT NULL, \
Expand Down
46 changes: 12 additions & 34 deletions core/src/generator/events_bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,40 +958,18 @@ pub fn generate_event_handlers(

let rows = [{columns_names}];

if postgres_bulk_data.len() > 100 {{
let result = context
.database
.bulk_insert_via_copy(
"{table_name}",
&rows,
&postgres_bulk_data
.first()
.ok_or("No first element in bulk data, impossible")?
.iter()
.map(|param| param.to_type())
.collect::<Vec<PgType>>(),
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via COPY: {{:?}}", e);
return Err(e.to_string());
}}
}} else {{
let result = context
.database
.bulk_insert(
"{table_name}",
&rows,
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via INSERT: {{:?}}", e);
return Err(e.to_string());
}}
let result = context
.database
.insert_bulk(
"{table_name}",
&rows,
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data: {{:?}}", e);
return Err(e.to_string());
}}
"#,
table_name =
Expand Down
60 changes: 21 additions & 39 deletions core/src/generator/trace_bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,15 +925,19 @@ pub fn generate_trace_handlers(
}
}

data.push_str("EthereumSqlTypeWrapper::B256(result.tx_information.transaction_hash),");
data.push_str("EthereumSqlTypeWrapper::U64(result.tx_information.block_number),");
data.push_str(
"\nEthereumSqlTypeWrapper::B256(result.tx_information.transaction_hash),",
);
data.push_str("\nEthereumSqlTypeWrapper::U64(result.tx_information.block_number),");
data.push_str("\nEthereumSqlTypeWrapper::DateTimeNullable(result.tx_information.block_timestamp_to_datetime()),");
data.push_str("EthereumSqlTypeWrapper::B256(result.tx_information.block_hash),");
data.push_str("\nEthereumSqlTypeWrapper::B256(result.tx_information.block_hash),");
data.push_str(
"EthereumSqlTypeWrapper::String(result.tx_information.network.to_string()),",
);
data.push_str("EthereumSqlTypeWrapper::U64(result.tx_information.transaction_index),");
data.push_str("EthereumSqlTypeWrapper::U256(result.tx_information.log_index)");
data.push_str(
"\nEthereumSqlTypeWrapper::U64(result.tx_information.transaction_index),",
);
data.push_str("\nEthereumSqlTypeWrapper::U256(result.tx_information.log_index)");
data.push(']');

postgres_write = format!(
Expand All @@ -952,40 +956,18 @@ pub fn generate_trace_handlers(
return Ok(());
}}

if postgres_bulk_data.len() > 100 {{
let result = context
.database
.bulk_insert_via_copy(
"{table_name}",
&[{columns_names}],
&postgres_bulk_data
.first()
.ok_or("No first element in bulk data, impossible")?
.iter()
.map(|param| param.to_type())
.collect::<Vec<PgType>>(),
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via COPY: {{:?}}", e);
return Err(e.to_string());
}}
}} else {{
let result = context
.database
.bulk_insert(
"{table_name}",
&[{columns_names}],
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via INSERT: {{:?}}", e);
return Err(e.to_string());
}}
let result = context
.database
.insert_bulk(
"{table_name}",
&[{columns_names}],
&postgres_bulk_data,
)
.await;

if let Err(e) = result {{
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data: {{:?}}", e);
return Err(e.to_string());
}}
"#,
table_name =
Expand Down
21 changes: 2 additions & 19 deletions core/src/indexer/no_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,25 +441,8 @@ fn no_code_callback(params: Arc<NoCodeCallbackParams>) -> EventCallbacks {
if let Some(postgres) = &params.postgres {
let bulk_data_length = postgres_bulk_data.len();
if bulk_data_length > 0 {
// anything over 100 events is considered bulk and goes the COPY route
if bulk_data_length > 100 {
if let Err(e) = postgres
.bulk_insert_via_copy(
&params.postgres_event_table_name,
&params.postgres_column_names,
&postgres_bulk_column_types,
&postgres_bulk_data,
)
.await
{
error!(
"{}::{} - Error performing bulk insert: {}",
params.contract_name, params.event_info.name, e
);
return Err(e.to_string());
}
} else if let Err(e) = postgres
.bulk_insert(
if let Err(e) = postgres
.insert_bulk(
&params.postgres_event_table_name,
&params.postgres_column_names,
&postgres_bulk_data,
Expand Down
2 changes: 1 addition & 1 deletion core/src/manifest/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl Contract {
}
}

pub fn before_modify_name_if_filter_readonly(&self) -> Cow<'_, str> {
pub fn before_modify_name_if_filter_readonly(&'_ self) -> Cow<'_, str> {
if self.is_filter() {
Cow::Owned(self.contract_name_to_filter_name())
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/streams/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Redis {
}

async fn get_pooled_connection(
pool: &Arc<Pool<RedisConnectionManager>>,
pool: &'_ Arc<Pool<RedisConnectionManager>>,
) -> Result<PooledConnection<'_, RedisConnectionManager>, RedisError> {
match pool.get().await {
Ok(c) => Ok(c),
Expand Down
5 changes: 5 additions & 0 deletions documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@

### Bug fixes
-------------------------------------------------
- fix: `contract_address` Postgres column changed from `char(66)` to `char(42)`
- fix: `PostgresClient` now only exposes `insert_bulk` which handles internally whether to insert rows via INSERT or COPY
- fix: regenerated example projects to support latest changes

### Breaking changes
-------------------------------------------------
- `contract_address` Postgres column changed from `char(66)` to `char(42)`
- `PostgresClient` now only exposes `insert_bulk` which handles internally whether to insert rows via INSERT or COPY

## Releases
-------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,66 +101,31 @@ async fn transfer_handler(
return Ok(());
}

if bulk_data.len() > 100 {
let result = context
.database
.bulk_insert_via_copy(
"rust_rocket_pool_eth.transfer",
&[
"contract_address".to_string(),
"from".to_string(),
"to".to_string(),
"value".to_string(),
"tx_hash".to_string(),
"block_number".to_string(),
"block_hash".to_string(),
"network".to_string(),
],
&bulk_data
.first()
.ok_or("No first element in bulk data, impossible")?
.iter()
.map(|param| param.to_type())
.collect::<Vec<PgType>>(),
&bulk_data,
)
.await;

if let Err(e) = result {
rindexer_error!(
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
e
);

return Err(e.to_string());
}
} else {
let result = context
.database
.bulk_insert(
"rust_rocket_pool_eth.transfer",
&[
"contract_address".to_string(),
"from".to_string(),
"to".to_string(),
"value".to_string(),
"tx_hash".to_string(),
"block_number".to_string(),
"block_hash".to_string(),
"network".to_string(),
],
&bulk_data,
)
.await;

if let Err(e) = result {
rindexer_error!(
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
e
);

return Err(e.to_string());
}
let result = context
.database
.insert_bulk(
"rust_rocket_pool_eth.transfer",
&[
"contract_address".to_string(),
"from".to_string(),
"to".to_string(),
"value".to_string(),
"tx_hash".to_string(),
"block_number".to_string(),
"block_hash".to_string(),
"network".to_string(),
],
&bulk_data,
)
.await;

if let Err(e) = result {
rindexer_error!(
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
e
);

return Err(e.to_string());
}

rindexer_info!(
Expand Down Expand Up @@ -822,7 +787,7 @@ You can query data from the database and write data to the database, here are th
- `context.database.query` - This is for querying data from the database.
- `context.database.query_one` - This is for querying a single row from the database.
- `context.database.query_one_or_none` - This is for querying a single row from the database or returning None if no rows are found.
- `context.database.batch_insert` - This is for inserting multiple rows into the database.
- `context.database.insert_bulk` - This is for inserting multiple rows into the database efficiently.
- `context.database.copy_in` - This is for inserting multiple rows into the database using the COPY command.

### EthereumSqlTypeWrapper
Expand Down
Loading
Loading