Skip to content

Commit 92d2a23

Browse files
committed
1. remove block_hash
2. rewrite indexer to save data into specific data_range by block_height
1 parent 00e9bae commit 92d2a23

File tree

7 files changed

+40
-40
lines changed

7 files changed

+40
-40
lines changed

database/src/base/state_indexer.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,31 +59,27 @@ pub trait StateIndexerDbManager {
5959
shard_id: near_primitives::types::ShardId,
6060
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
6161
block_height: u64,
62-
block_hash: near_primitives::hash::CryptoHash,
6362
) -> anyhow::Result<()>;
6463

6564
async fn save_state_changes_access_key(
6665
&self,
6766
shard_id: near_primitives::types::ShardId,
6867
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
6968
block_height: u64,
70-
block_hash: near_primitives::hash::CryptoHash,
7169
) -> anyhow::Result<()>;
7270

7371
async fn save_state_changes_contract(
7472
&self,
7573
shard_id: near_primitives::types::ShardId,
7674
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
7775
block_height: u64,
78-
block_hash: near_primitives::hash::CryptoHash,
7976
) -> anyhow::Result<()>;
8077

8178
async fn save_state_changes_account(
8279
&self,
8380
shard_id: near_primitives::types::ShardId,
8481
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
8582
block_height: u64,
86-
block_hash: near_primitives::hash::CryptoHash,
8783
) -> anyhow::Result<()>;
8884

8985
async fn create_new_range_tables(&self, range_id: u64) -> anyhow::Result<()>;

database/src/postgres/migrations/shard_db/20250430152143_procedure_create_new_range_tables.up.sql

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ BEGIN
88
CREATE TABLE IF NOT EXISTS state_changes_data_%s (
99
account_id text NOT NULL,
1010
block_height numeric(20,0) NOT NULL,
11-
block_hash text NOT NULL,
1211
data_key text NOT NULL,
1312
data_value bytea NULL,
1413
PRIMARY KEY (account_id, data_key, block_height)
@@ -27,7 +26,6 @@ BEGIN
2726
CREATE TABLE IF NOT EXISTS state_changes_access_key_%s (
2827
account_id text NOT NULL,
2928
block_height numeric(20,0) NOT NULL,
30-
block_hash text NOT NULL,
3129
data_key text NOT NULL,
3230
data_value bytea NULL,
3331
PRIMARY KEY (account_id, data_key, block_height)
@@ -45,7 +43,6 @@ BEGIN
4543
CREATE TABLE IF NOT EXISTS state_changes_contract_%s (
4644
account_id text NOT NULL,
4745
block_height numeric(20,0) NOT NULL,
48-
block_hash text NOT NULL,
4946
data_value bytea NULL,
5047
PRIMARY KEY (account_id, block_height)
5148
) PARTITION BY HASH (account_id);
@@ -62,7 +59,6 @@ BEGIN
6259
CREATE TABLE IF NOT EXISTS state_changes_account_%s (
6360
account_id text NOT NULL,
6461
block_height numeric(20,0) NOT NULL,
65-
block_hash text NOT NULL,
6662
data_value bytea NULL,
6763
PRIMARY KEY (account_id, block_height)
6864
) PARTITION BY HASH (account_id);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-- Add down migration script here
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- Add up migration script here
2+
DO $$
3+
DECLARE
4+
i INT;
5+
BEGIN
6+
-- Drop column from each partition
7+
FOR i IN 0..99 LOOP
8+
EXECUTE format('ALTER TABLE state_changes_data_%s DROP COLUMN IF EXISTS block_hash', i);
9+
EXECUTE format('ALTER TABLE state_changes_access_key_%s DROP COLUMN IF EXISTS block_hash', i);
10+
EXECUTE format('ALTER TABLE state_changes_contract_%s DROP COLUMN IF EXISTS block_hash', i);
11+
EXECUTE format('ALTER TABLE state_changes_account_%s DROP COLUMN IF EXISTS block_hash', i);
12+
END LOOP;
13+
14+
-- Drop column from parent table
15+
ALTER TABLE state_changes_data DROP COLUMN IF EXISTS block_hash;
16+
ALTER TABLE state_changes_access_key DROP COLUMN IF EXISTS block_hash;
17+
ALTER TABLE state_changes_contract DROP COLUMN IF EXISTS block_hash;
18+
ALTER TABLE state_changes_account DROP COLUMN IF EXISTS block_hash;
19+
END $$;

database/src/postgres/state_indexer.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
216216
shard_id: near_primitives::types::ShardId,
217217
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
218218
block_height: u64,
219-
block_hash: near_primitives::hash::CryptoHash,
220219
) -> anyhow::Result<()> {
221220
crate::metrics::SHARD_DATABASE_WRITE_QUERIES
222221
.with_label_values(&[
@@ -225,8 +224,9 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
225224
"state_changes_data",
226225
])
227226
.inc();
227+
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
228228
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> = sqlx::QueryBuilder::new(
229-
"INSERT INTO state_changes_data (account_id, block_height, block_hash, data_key, data_value) ",
229+
format!("INSERT INTO state_changes_data_{range_id} (account_id, block_height, data_key, data_value) "),
230230
);
231231
query_builder.push_values(state_changes.iter(), |mut values, state_change| {
232232
match &state_change.value {
@@ -240,7 +240,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
240240
values
241241
.push_bind(account_id.to_string())
242242
.push_bind(bigdecimal::BigDecimal::from(block_height))
243-
.push_bind(block_hash.to_string())
244243
.push_bind(hex::encode(data_key).to_string())
245244
.push_bind(data_value);
246245
}
@@ -250,7 +249,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
250249
values
251250
.push_bind(account_id.to_string())
252251
.push_bind(bigdecimal::BigDecimal::from(block_height))
253-
.push_bind(block_hash.to_string())
254252
.push_bind(hex::encode(data_key).to_string())
255253
.push_bind(data_value);
256254
}
@@ -273,7 +271,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
273271
shard_id: near_primitives::types::ShardId,
274272
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
275273
block_height: u64,
276-
block_hash: near_primitives::hash::CryptoHash,
277274
) -> anyhow::Result<()> {
278275
crate::metrics::SHARD_DATABASE_WRITE_QUERIES
279276
.with_label_values(&[
@@ -282,8 +279,9 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
282279
"state_changes_access_key",
283280
])
284281
.inc();
282+
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
285283
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> = sqlx::QueryBuilder::new(
286-
"INSERT INTO state_changes_access_key (account_id, block_height, block_hash, data_key, data_value) ",
284+
format!("INSERT INTO state_changes_access_key_{range_id} (account_id, block_height, data_key, data_value) "),
287285
);
288286
query_builder.push_values(state_changes.iter(), |mut values, state_change| {
289287
match &state_change.value {
@@ -299,7 +297,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
299297
values
300298
.push_bind(account_id.to_string())
301299
.push_bind(bigdecimal::BigDecimal::from(block_height))
302-
.push_bind(block_hash.to_string())
303300
.push_bind(hex::encode(data_key).to_string())
304301
.push_bind(data_value);
305302
}
@@ -313,7 +310,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
313310
values
314311
.push_bind(account_id.to_string())
315312
.push_bind(bigdecimal::BigDecimal::from(block_height))
316-
.push_bind(block_hash.to_string())
317313
.push_bind(hex::encode(data_key).to_string())
318314
.push_bind(data_value);
319315
}
@@ -336,7 +332,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
336332
shard_id: near_primitives::types::ShardId,
337333
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
338334
block_height: u64,
339-
block_hash: near_primitives::hash::CryptoHash,
340335
) -> anyhow::Result<()> {
341336
crate::metrics::SHARD_DATABASE_WRITE_QUERIES
342337
.with_label_values(&[
@@ -345,8 +340,9 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
345340
"state_changes_contract",
346341
])
347342
.inc();
343+
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
348344
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> = sqlx::QueryBuilder::new(
349-
"INSERT INTO state_changes_contract (account_id, block_height, block_hash, data_value) ",
345+
format!("INSERT INTO state_changes_contract_{range_id} (account_id, block_height, data_value) "),
350346
);
351347
query_builder.push_values(state_changes.iter(), |mut values, state_change| {
352348
match &state_change.value {
@@ -358,7 +354,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
358354
values
359355
.push_bind(account_id.to_string())
360356
.push_bind(bigdecimal::BigDecimal::from(block_height))
361-
.push_bind(block_hash.to_string())
362357
.push_bind(data_value);
363358
}
364359
near_primitives::views::StateChangeValueView::ContractCodeDeletion {
@@ -368,7 +363,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
368363
values
369364
.push_bind(account_id.to_string())
370365
.push_bind(bigdecimal::BigDecimal::from(block_height))
371-
.push_bind(block_hash.to_string())
372366
.push_bind(data_value);
373367
}
374368
_ => {}
@@ -390,7 +384,6 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
390384
shard_id: near_primitives::types::ShardId,
391385
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
392386
block_height: u64,
393-
block_hash: near_primitives::hash::CryptoHash,
394387
) -> anyhow::Result<()> {
395388
crate::metrics::SHARD_DATABASE_WRITE_QUERIES
396389
.with_label_values(&[
@@ -399,8 +392,9 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
399392
"state_changes_account",
400393
])
401394
.inc();
395+
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
402396
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> = sqlx::QueryBuilder::new(
403-
"INSERT INTO state_changes_account (account_id, block_height, block_hash, data_value) ",
397+
format!("INSERT INTO state_changes_account_{range_id} (account_id, block_height, data_value) "),
404398
);
405399
query_builder.push_values(state_changes.iter(), |mut values, state_change| {
406400
match &state_change.value {
@@ -414,15 +408,13 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
414408
values
415409
.push_bind(account_id.to_string())
416410
.push_bind(bigdecimal::BigDecimal::from(block_height))
417-
.push_bind(block_hash.to_string())
418411
.push_bind(data_value);
419412
}
420413
near_primitives::views::StateChangeValueView::AccountDeletion { account_id } => {
421414
let data_value: Option<&[u8]> = None;
422415
values
423416
.push_bind(account_id.to_string())
424417
.push_bind(bigdecimal::BigDecimal::from(block_height))
425-
.push_bind(block_hash.to_string())
426418
.push_bind(data_value);
427419
}
428420
_ => {}

logic-state-indexer/src/lib.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,13 @@ impl StateChangesToStore {
5050
&self,
5151
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
5252
block_height: u64,
53-
block_hash: CryptoHash,
5453
) -> anyhow::Result<()> {
5554
if !self.data.is_empty() {
5655
let futures = self.data.iter().map(|(shard_id, state_changes)| {
5756
db_manager.save_state_changes_data(
5857
*shard_id,
5958
state_changes.values().cloned().collect(),
6059
block_height,
61-
block_hash,
6260
)
6361
});
6462
futures::future::join_all(futures)
@@ -75,15 +73,13 @@ impl StateChangesToStore {
7573
&self,
7674
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
7775
block_height: u64,
78-
block_hash: CryptoHash,
7976
) -> anyhow::Result<()> {
8077
if !self.access_key.is_empty() {
8178
let futures = self.access_key.iter().map(|(shard_id, state_changes)| {
8279
db_manager.save_state_changes_access_key(
8380
*shard_id,
8481
state_changes.values().cloned().collect(),
8582
block_height,
86-
block_hash,
8783
)
8884
});
8985
futures::future::join_all(futures)
@@ -100,15 +96,13 @@ impl StateChangesToStore {
10096
&self,
10197
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
10298
block_height: u64,
103-
block_hash: CryptoHash,
10499
) -> anyhow::Result<()> {
105100
if !self.contract.is_empty() {
106101
let futures = self.contract.iter().map(|(shard_id, state_changes)| {
107102
db_manager.save_state_changes_contract(
108103
*shard_id,
109104
state_changes.values().cloned().collect(),
110105
block_height,
111-
block_hash,
112106
)
113107
});
114108
futures::future::join_all(futures)
@@ -125,15 +119,13 @@ impl StateChangesToStore {
125119
&self,
126120
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
127121
block_height: u64,
128-
block_hash: CryptoHash,
129122
) -> anyhow::Result<()> {
130123
if !self.account.is_empty() {
131124
let futures = self.account.iter().map(|(shard_id, state_changes)| {
132125
db_manager.save_state_changes_account(
133126
*shard_id,
134127
state_changes.values().cloned().collect(),
135128
block_height,
136-
block_hash,
137129
)
138130
});
139131
futures::future::join_all(futures)
@@ -148,12 +140,11 @@ impl StateChangesToStore {
148140
&self,
149141
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
150142
block_height: u64,
151-
block_hash: CryptoHash,
152143
) -> anyhow::Result<()> {
153-
let save_data_future = self.save_data(db_manager, block_height, block_hash);
154-
let save_access_key_future = self.save_access_key(db_manager, block_height, block_hash);
155-
let save_contract_future = self.save_contract(db_manager, block_height, block_hash);
156-
let save_account_future = self.save_account(db_manager, block_height, block_hash);
144+
let save_data_future = self.save_data(db_manager, block_height);
145+
let save_access_key_future = self.save_access_key(db_manager, block_height);
146+
let save_contract_future = self.save_contract(db_manager, block_height);
147+
let save_account_future = self.save_account(db_manager, block_height);
157148

158149
futures::future::join_all([
159150
save_data_future.boxed(),
@@ -187,6 +178,12 @@ pub async fn handle_streamer_message(
187178
let block_height = streamer_message.block.header.height;
188179
let block_hash = streamer_message.block.header.hash;
189180

181+
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
182+
if stats.read().await.current_range_id < range_id {
183+
db_manager.create_new_range_tables(range_id).await?;
184+
stats.write().await.current_range_id = range_id;
185+
}
186+
190187
let current_epoch_id = streamer_message.block.header.epoch_id;
191188
let next_epoch_id = streamer_message.block.header.next_epoch_id;
192189

@@ -244,7 +241,6 @@ pub async fn handle_streamer_message(
244241
&streamer_message,
245242
db_manager,
246243
block_height,
247-
block_hash,
248244
&indexer_config,
249245
shard_layout,
250246
)
@@ -339,7 +335,6 @@ async fn handle_state_changes(
339335
streamer_message: &near_indexer_primitives::StreamerMessage,
340336
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
341337
block_height: u64,
342-
block_hash: CryptoHash,
343338
indexer_config: &(impl configuration::RightsizingConfig + std::fmt::Debug),
344339
shard_layout: &near_primitives::shard_layout::ShardLayout,
345340
) -> anyhow::Result<()> {
@@ -432,6 +427,6 @@ async fn handle_state_changes(
432427
}
433428

434429
state_changes_to_store
435-
.save_state_changes(db_manager, block_height, block_hash)
430+
.save_state_changes(db_manager, block_height)
436431
.await
437432
}

logic-state-indexer/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub struct Stats {
6565
pub last_processed_block_height: u64,
6666
pub current_epoch_id: Option<near_indexer_primitives::CryptoHash>,
6767
pub current_epoch_height: u64,
68+
pub current_range_id: u64,
6869
}
6970

7071
pub async fn state_logger(

0 commit comments

Comments
 (0)