Skip to content

Commit e2a6857

Browse files
zanniszannis
andauthored
breaking: update contract_address column type and insert_bulk API (#301)
Co-authored-by: zannis <[email protected]>
1 parent d9e1bea commit e2a6857

File tree

29 files changed

+768
-4380
lines changed

29 files changed

+768
-4380
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/database/postgres/client.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,10 @@ impl PostgresClient {
288288
conn.copy_in(statement).await.map_err(PostgresError::PgError)
289289
}
290290

291-
pub async fn bulk_insert_via_copy(
291+
// Internal method used by insert_bulk for large datasets (>100 rows).
292+
// Uses PostgreSQL COPY command for optimal performance with large data.
293+
// Made pub(crate) to allow crate-internal access while keeping insert_bulk as the primary API.
294+
pub(crate) async fn bulk_insert_via_copy(
292295
&self,
293296
table_name: &str,
294297
column_names: &[String],
@@ -332,7 +335,10 @@ impl PostgresClient {
332335
Ok(())
333336
}
334337

335-
pub async fn bulk_insert(
338+
// Internal method used by insert_bulk for small datasets (≤100 rows).
339+
// Uses standard INSERT queries which are more efficient for smaller data volumes.
340+
// Made pub(crate) to allow crate-internal access while keeping insert_bulk as the primary API.
341+
pub(crate) async fn bulk_insert_via_query(
336342
&self,
337343
table_name: &str,
338344
column_names: &[String],
@@ -399,15 +405,20 @@ impl PostgresClient {
399405
return Ok(());
400406
}
401407

402-
if postgres_bulk_data.len() > 100 {
408+
let total_params = postgres_bulk_data.len() * columns.len();
409+
410+
// PostgreSQL has a maximum of 65535 parameters in a single query
411+
// (see https://www.postgresql.org/docs/current/limits.html#LIMITS-TABLE)
412+
// If we exceed this limit, force use of COPY method
413+
if postgres_bulk_data.len() > 100 || total_params > 65535 {
403414
let column_types: Vec<PgType> =
404415
postgres_bulk_data[0].iter().map(|param| param.to_type()).collect();
405416

406417
self.bulk_insert_via_copy(table_name, columns, &column_types, postgres_bulk_data)
407418
.await
408419
.map_err(|e| e.to_string())
409420
} else {
410-
self.bulk_insert(table_name, columns, postgres_bulk_data)
421+
self.bulk_insert_via_query(table_name, columns, postgres_bulk_data)
411422
.await
412423
.map(|_| ())
413424
.map_err(|e| e.to_string())

core/src/database/postgres/generate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ fn generate_event_table_sql_with_comments(
8383
let create_table_sql = format!(
8484
"CREATE TABLE IF NOT EXISTS {table_name} (\
8585
rindexer_id SERIAL PRIMARY KEY NOT NULL, \
86-
contract_address CHAR(66) NOT NULL, \
86+
contract_address CHAR(42) NOT NULL, \
8787
{event_columns} \
8888
tx_hash CHAR(66) NOT NULL, \
8989
block_number NUMERIC NOT NULL, \

core/src/generator/events_bindings.rs

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -958,40 +958,18 @@ pub fn generate_event_handlers(
958958
959959
let rows = [{columns_names}];
960960
961-
if postgres_bulk_data.len() > 100 {{
962-
let result = context
963-
.database
964-
.bulk_insert_via_copy(
965-
"{table_name}",
966-
&rows,
967-
&postgres_bulk_data
968-
.first()
969-
.ok_or("No first element in bulk data, impossible")?
970-
.iter()
971-
.map(|param| param.to_type())
972-
.collect::<Vec<PgType>>(),
973-
&postgres_bulk_data,
974-
)
975-
.await;
976-
977-
if let Err(e) = result {{
978-
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via COPY: {{:?}}", e);
979-
return Err(e.to_string());
980-
}}
981-
}} else {{
982-
let result = context
983-
.database
984-
.bulk_insert(
985-
"{table_name}",
986-
&rows,
987-
&postgres_bulk_data,
988-
)
989-
.await;
990-
991-
if let Err(e) = result {{
992-
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via INSERT: {{:?}}", e);
993-
return Err(e.to_string());
994-
}}
961+
let result = context
962+
.database
963+
.insert_bulk(
964+
"{table_name}",
965+
&rows,
966+
&postgres_bulk_data,
967+
)
968+
.await;
969+
970+
if let Err(e) = result {{
971+
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data: {{:?}}", e);
972+
return Err(e.to_string());
995973
}}
996974
"#,
997975
table_name =

core/src/generator/trace_bindings.rs

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -925,15 +925,19 @@ pub fn generate_trace_handlers(
925925
}
926926
}
927927

928-
data.push_str("EthereumSqlTypeWrapper::B256(result.tx_information.transaction_hash),");
929-
data.push_str("EthereumSqlTypeWrapper::U64(result.tx_information.block_number),");
928+
data.push_str(
929+
"\nEthereumSqlTypeWrapper::B256(result.tx_information.transaction_hash),",
930+
);
931+
data.push_str("\nEthereumSqlTypeWrapper::U64(result.tx_information.block_number),");
930932
data.push_str("\nEthereumSqlTypeWrapper::DateTimeNullable(result.tx_information.block_timestamp_to_datetime()),");
931-
data.push_str("EthereumSqlTypeWrapper::B256(result.tx_information.block_hash),");
933+
data.push_str("\nEthereumSqlTypeWrapper::B256(result.tx_information.block_hash),");
932934
data.push_str(
933935
"EthereumSqlTypeWrapper::String(result.tx_information.network.to_string()),",
934936
);
935-
data.push_str("EthereumSqlTypeWrapper::U64(result.tx_information.transaction_index),");
936-
data.push_str("EthereumSqlTypeWrapper::U256(result.tx_information.log_index)");
937+
data.push_str(
938+
"\nEthereumSqlTypeWrapper::U64(result.tx_information.transaction_index),",
939+
);
940+
data.push_str("\nEthereumSqlTypeWrapper::U256(result.tx_information.log_index)");
937941
data.push(']');
938942

939943
postgres_write = format!(
@@ -952,40 +956,18 @@ pub fn generate_trace_handlers(
952956
return Ok(());
953957
}}
954958
955-
if postgres_bulk_data.len() > 100 {{
956-
let result = context
957-
.database
958-
.bulk_insert_via_copy(
959-
"{table_name}",
960-
&[{columns_names}],
961-
&postgres_bulk_data
962-
.first()
963-
.ok_or("No first element in bulk data, impossible")?
964-
.iter()
965-
.map(|param| param.to_type())
966-
.collect::<Vec<PgType>>(),
967-
&postgres_bulk_data,
968-
)
969-
.await;
970-
971-
if let Err(e) = result {{
972-
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via COPY: {{:?}}", e);
973-
return Err(e.to_string());
974-
}}
975-
}} else {{
976-
let result = context
977-
.database
978-
.bulk_insert(
979-
"{table_name}",
980-
&[{columns_names}],
981-
&postgres_bulk_data,
982-
)
983-
.await;
984-
985-
if let Err(e) = result {{
986-
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data via INSERT: {{:?}}", e);
987-
return Err(e.to_string());
988-
}}
959+
let result = context
960+
.database
961+
.insert_bulk(
962+
"{table_name}",
963+
&[{columns_names}],
964+
&postgres_bulk_data,
965+
)
966+
.await;
967+
968+
if let Err(e) = result {{
969+
rindexer_error!("{event_type_name}::{handler_name} inserting bulk data: {{:?}}", e);
970+
return Err(e.to_string());
989971
}}
990972
"#,
991973
table_name =

core/src/indexer/no_code.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -441,25 +441,8 @@ fn no_code_callback(params: Arc<NoCodeCallbackParams>) -> EventCallbacks {
441441
if let Some(postgres) = &params.postgres {
442442
let bulk_data_length = postgres_bulk_data.len();
443443
if bulk_data_length > 0 {
444-
// anything over 100 events is considered bulk and goes the COPY route
445-
if bulk_data_length > 100 {
446-
if let Err(e) = postgres
447-
.bulk_insert_via_copy(
448-
&params.postgres_event_table_name,
449-
&params.postgres_column_names,
450-
&postgres_bulk_column_types,
451-
&postgres_bulk_data,
452-
)
453-
.await
454-
{
455-
error!(
456-
"{}::{} - Error performing bulk insert: {}",
457-
params.contract_name, params.event_info.name, e
458-
);
459-
return Err(e.to_string());
460-
}
461-
} else if let Err(e) = postgres
462-
.bulk_insert(
444+
if let Err(e) = postgres
445+
.insert_bulk(
463446
&params.postgres_event_table_name,
464447
&params.postgres_column_names,
465448
&postgres_bulk_data,

core/src/manifest/contract.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ impl Contract {
379379
}
380380
}
381381

382-
pub fn before_modify_name_if_filter_readonly(&self) -> Cow<'_, str> {
382+
pub fn before_modify_name_if_filter_readonly(&'_ self) -> Cow<'_, str> {
383383
if self.is_filter() {
384384
Cow::Owned(self.contract_name_to_filter_name())
385385
} else {

core/src/streams/redis.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct Redis {
2828
}
2929

3030
async fn get_pooled_connection(
31-
pool: &Arc<Pool<RedisConnectionManager>>,
31+
pool: &'_ Arc<Pool<RedisConnectionManager>>,
3232
) -> Result<PooledConnection<'_, RedisConnectionManager>, RedisError> {
3333
match pool.get().await {
3434
Ok(c) => Ok(c),

documentation/docs/pages/docs/changelog.mdx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@
88

99
### Bug fixes
1010
-------------------------------------------------
11+
- fix: `contract_address` Postgres column changed from `char(66)` to `char(42)`
12+
- fix: `PostgresClient` now only exposes `insert_bulk` which handles internally whether to insert rows via INSERT or COPY
13+
- fix: regenerated example projects to support latest changes
1114

1215
### Breaking changes
1316
-------------------------------------------------
17+
- `contract_address` Postgres column changed from `char(66)` to `char(42)`
18+
- `PostgresClient` now only exposes `insert_bulk` which handles internally whether to insert rows via INSERT or COPY
1419

1520
## Releases
1621
-------------------------------------------------

documentation/docs/pages/docs/start-building/rust-project-deep-dive/indexers.mdx

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -101,66 +101,31 @@ async fn transfer_handler(
101101
return Ok(());
102102
}
103103
104-
if bulk_data.len() > 100 {
105-
let result = context
106-
.database
107-
.bulk_insert_via_copy(
108-
"rust_rocket_pool_eth.transfer",
109-
&[
110-
"contract_address".to_string(),
111-
"from".to_string(),
112-
"to".to_string(),
113-
"value".to_string(),
114-
"tx_hash".to_string(),
115-
"block_number".to_string(),
116-
"block_hash".to_string(),
117-
"network".to_string(),
118-
],
119-
&bulk_data
120-
.first()
121-
.ok_or("No first element in bulk data, impossible")?
122-
.iter()
123-
.map(|param| param.to_type())
124-
.collect::<Vec<PgType>>(),
125-
&bulk_data,
126-
)
127-
.await;
128-
129-
if let Err(e) = result {
130-
rindexer_error!(
131-
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
132-
e
133-
);
134-
135-
return Err(e.to_string());
136-
}
137-
} else {
138-
let result = context
139-
.database
140-
.bulk_insert(
141-
"rust_rocket_pool_eth.transfer",
142-
&[
143-
"contract_address".to_string(),
144-
"from".to_string(),
145-
"to".to_string(),
146-
"value".to_string(),
147-
"tx_hash".to_string(),
148-
"block_number".to_string(),
149-
"block_hash".to_string(),
150-
"network".to_string(),
151-
],
152-
&bulk_data,
153-
)
154-
.await;
155-
156-
if let Err(e) = result {
157-
rindexer_error!(
158-
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
159-
e
160-
);
161-
162-
return Err(e.to_string());
163-
}
104+
let result = context
105+
.database
106+
.insert_bulk(
107+
"rust_rocket_pool_eth.transfer",
108+
&[
109+
"contract_address".to_string(),
110+
"from".to_string(),
111+
"to".to_string(),
112+
"value".to_string(),
113+
"tx_hash".to_string(),
114+
"block_number".to_string(),
115+
"block_hash".to_string(),
116+
"network".to_string(),
117+
],
118+
&bulk_data,
119+
)
120+
.await;
121+
122+
if let Err(e) = result {
123+
rindexer_error!(
124+
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
125+
e
126+
);
127+
128+
return Err(e.to_string());
164129
}
165130
166131
rindexer_info!(
@@ -822,7 +787,7 @@ You can query data from the database and write data to the database, here are th
822787
- `context.database.query` - This is for querying data from the database.
823788
- `context.database.query_one` - This is for querying a single row from the database.
824789
- `context.database.query_one_or_none` - This is for querying a single row from the database or returning None if no rows are found.
825-
- `context.database.batch_insert` - This is for inserting multiple rows into the database.
790+
- `context.database.insert_bulk` - This is for inserting multiple rows into the database efficiently.
826791
- `context.database.copy_in` - This is for inserting multiple rows into the database using the COPY command.
827792

828793
### EthereumSqlTypeWrapper

0 commit comments

Comments
 (0)