Skip to content

Commit 4f29036

Browse files
committed
add script to snapshot state data
1 parent 9d66715 commit 4f29036

File tree

13 files changed

+455
-10
lines changed

13 files changed

+455
-10
lines changed

configuration/src/configs/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::configs::{deserialize_data_or_env, deserialize_optional_data_or_env};
55

66
// Database connection URL
77
// Example: "postgres://user:password@localhost:5432/dbname"
8-
type DatabaseConnectUrl = String;
8+
pub type DatabaseConnectUrl = String;
99

1010
#[derive(Validate, serde_derive::Deserialize, Debug, Clone, Default)]
1111
pub struct ShardDatabaseConfig {

configuration/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod configs;
99
mod default_env_configs;
1010
pub mod utils;
1111

12-
pub use crate::configs::database::DatabaseConfig;
12+
pub use crate::configs::database::{DatabaseConfig, DatabaseConnectUrl};
1313
pub use crate::configs::general::ChainId;
1414
pub use crate::configs::general::StorageProvider;
1515
pub use crate::configs::{

database/src/postgres/docker-compose.yml

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3.7'
2-
31
services:
42
metadata:
53
image: postgres:15.5
@@ -10,6 +8,11 @@ services:
108
POSTGRES_PASSWORD: password
119
ports:
1210
- "5422:5432"
11+
command: ["postgres", "-c", "max_connections=4000"]
12+
ulimits:
13+
nofile:
14+
soft: 1048576
15+
hard: 1048576
1316

1417
shard_0:
1518
image: postgres:15.5
@@ -20,6 +23,11 @@ services:
2023
POSTGRES_PASSWORD: password
2124
ports:
2225
- "5430:5432"
26+
command: ["postgres", "-c", "max_connections=4000"]
27+
ulimits:
28+
nofile:
29+
soft: 1048576
30+
hard: 1048576
2331

2432
shard_1:
2533
image: postgres:15.5
@@ -30,6 +38,11 @@ services:
3038
POSTGRES_PASSWORD: password
3139
ports:
3240
- "5431:5432"
41+
command: ["postgres", "-c", "max_connections=4000"]
42+
ulimits:
43+
nofile:
44+
soft: 1048576
45+
hard: 1048576
3346

3447
shard_2:
3548
image: postgres:15.5
@@ -40,6 +53,7 @@ services:
4053
POSTGRES_PASSWORD: password
4154
ports:
4255
- "5432:5432"
56+
command: ["postgres", "-c", "max_connections=4000"]
4357

4458
shard_3:
4559
image: postgres:15.5
@@ -50,6 +64,11 @@ services:
5064
POSTGRES_PASSWORD: password
5165
ports:
5266
- "5433:5432"
67+
command: ["postgres", "-c", "max_connections=4000"]
68+
ulimits:
69+
nofile:
70+
soft: 1048576
71+
hard: 1048576
5372

5473
shard_4:
5574
image: postgres:15.5
@@ -60,6 +79,11 @@ services:
6079
POSTGRES_PASSWORD: password
6180
ports:
6281
- "5434:5432"
82+
command: ["postgres", "-c", "max_connections=4000"]
83+
ulimits:
84+
nofile:
85+
soft: 1048576
86+
hard: 1048576
6387

6488
shard_5:
6589
image: postgres:15.5
@@ -70,3 +94,8 @@ services:
7094
POSTGRES_PASSWORD: password
7195
ports:
7296
- "5435:5432"
97+
command: ["postgres", "-c", "max_connections=4000"]
98+
ulimits:
99+
nofile:
100+
soft: 1048576
101+
hard: 1048576

database/src/postgres/migrations/shard_db/20250430152143_procedure_create_new_range_tables.up.sql

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,77 @@ BEGIN
7070
CREATE TABLE IF NOT EXISTS state_changes_account_%s_%s PARTITION OF state_changes_account_%s FOR VALUES WITH (MODULUS 100, REMAINDER %s);
7171
', range_id, i, range_id, i);
7272
END LOOP;
73+
74+
75+
EXECUTE format('
76+
CREATE TABLE IF NOT EXISTS state_changes_data_%s_compact (
77+
account_id text NOT NULL,
78+
block_height_from numeric(20,0) NOT NULL,
79+
block_height_to numeric(20,0) NULL,
80+
data_key text NOT NULL,
81+
data_value bytea NOT NULL,
82+
PRIMARY KEY (account_id, data_key, block_height_from)
83+
) PARTITION BY HASH (account_id);
84+
', range_id);
85+
86+
-- Create 100 partitions
87+
FOR i IN 0..99 LOOP
88+
EXECUTE format('
89+
CREATE TABLE IF NOT EXISTS state_changes_data_%s_compact_%s PARTITION OF state_changes_data_%s_compact FOR VALUES WITH (MODULUS 100, REMAINDER %s);
90+
', range_id, i, range_id, i);
91+
END LOOP;
92+
93+
94+
EXECUTE format('
95+
CREATE TABLE IF NOT EXISTS state_changes_access_key_%s_compact (
96+
account_id text NOT NULL,
97+
block_height_from numeric(20,0) NOT NULL,
98+
block_height_to numeric(20,0) NULL,
99+
data_key text NOT NULL,
100+
data_value bytea NOT NULL,
101+
PRIMARY KEY (account_id, data_key, block_height_from)
102+
) PARTITION BY HASH (account_id);
103+
', range_id);
104+
105+
-- Create 100 partitions
106+
FOR i IN 0..99 LOOP
107+
EXECUTE format('
108+
CREATE TABLE IF NOT EXISTS state_changes_access_key_%s_compact_%s PARTITION OF state_changes_access_key_%s_compact FOR VALUES WITH (MODULUS 100, REMAINDER %s);
109+
', range_id, i, range_id, i);
110+
END LOOP;
111+
112+
EXECUTE format('
113+
CREATE TABLE IF NOT EXISTS state_changes_contract_%s_compact (
114+
account_id text NOT NULL,
115+
block_height_from numeric(20,0) NOT NULL,
116+
block_height_to numeric(20,0) NULL,
117+
data_value bytea,
118+
PRIMARY KEY (account_id, block_height_from)
119+
) PARTITION BY HASH (account_id);
120+
', range_id);
121+
122+
-- Create 100 partitions
123+
FOR i IN 0..99 LOOP
124+
EXECUTE format('
125+
CREATE TABLE IF NOT EXISTS state_changes_contract_%s_compact_%s PARTITION OF state_changes_contract_%s_compact FOR VALUES WITH (MODULUS 100, REMAINDER %s);
126+
', range_id, i, range_id, i);
127+
END LOOP;
128+
129+
EXECUTE format('
130+
CREATE TABLE IF NOT EXISTS state_changes_account_%s_compact (
131+
account_id text NOT NULL,
132+
block_height_from numeric(20,0) NOT NULL,
133+
block_height_to numeric(20,0) NULL,
134+
data_value bytea,
135+
PRIMARY KEY (account_id, block_height_from)
136+
) PARTITION BY HASH (account_id);
137+
', range_id);
138+
139+
-- Create 100 partitions
140+
FOR i IN 0..99 LOOP
141+
EXECUTE format('
142+
CREATE TABLE IF NOT EXISTS state_changes_account_%s_compact_%s PARTITION OF state_changes_account_%s_compact FOR VALUES WITH (MODULUS 100, REMAINDER %s);
143+
', range_id, i, range_id, i);
144+
END LOOP;
73145
END;
74146
$$;

database/src/postgres/migrations/shard_db/20250521171609_drop_block_hash_from_state_changes.up.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,9 @@ ALTER TABLE state_changes_data DROP COLUMN IF EXISTS block_hash;
33
ALTER TABLE state_changes_access_key DROP COLUMN IF EXISTS block_hash;
44
ALTER TABLE state_changes_contract DROP COLUMN IF EXISTS block_hash;
55
ALTER TABLE state_changes_account DROP COLUMN IF EXISTS block_hash;
6+
7+
-- TODO: REMOVE ALL DEPRECATED TABLES.
8+
DROP TABLE IF EXISTS state_changes_data;
9+
DROP TABLE IF EXISTS state_changes_access_key;
10+
DROP TABLE IF EXISTS state_changes_contract;
11+
DROP TABLE IF EXISTS state_changes_account;

logic-state-indexer/src/configs.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub enum StartOptions {
2020
height: Option<u64>,
2121
},
2222
FromLatest,
23+
FromGenesis,
2324
}
2425

2526
pub async fn get_start_block_height(
@@ -28,7 +29,9 @@ pub async fn get_start_block_height(
2829
start_options: &StartOptions,
2930
indexer_id: &str,
3031
) -> anyhow::Result<u64> {
32+
let genesis_config = configuration::read_genesis_config_from_root()?;
3133
let start_block_height = match start_options {
34+
StartOptions::FromGenesis => return Ok(genesis_config.genesis_height),
3235
StartOptions::FromBlock { height } => *height,
3336
StartOptions::FromInterruption { height } => {
3437
if let Ok(block_height) = db_manager.get_last_processed_block_height(indexer_id).await {
@@ -41,7 +44,11 @@ pub async fn get_start_block_height(
4144
}
4245
StartOptions::FromLatest => final_block_height(near_client).await?,
4346
};
44-
Ok(start_block_height - 100) // Start just a bit earlier to overlap indexed blocks to ensure we don't miss anything in-between
47+
if start_block_height - 100 < genesis_config.genesis_height {
48+
Ok(genesis_config.genesis_height)
49+
} else {
50+
Ok(start_block_height - 100) // Start just a bit earlier to overlap indexed blocks to ensure we don't miss anything in-between
51+
}
4552
}
4653

4754
pub(crate) async fn final_block_height(

logic-state-indexer/src/lib.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,32 @@ impl StateChangesToStore {
160160
}
161161
}
162162

163+
async fn task_migrate_and_compact_db(
164+
shard_db_config: HashMap<ShardId, configuration::DatabaseConnectUrl>,
165+
previous_range_id: u64,
166+
current_range_id: u64,
167+
current_range_start_block_height: u64,
168+
) {
169+
for (_, shard_database_url) in shard_db_config {
170+
let output = std::process::Command::new("migrate_compact_with_state/shard_migration.sh")
171+
.arg(shard_database_url)
172+
.arg(previous_range_id.to_string())
173+
.arg(current_range_id.to_string())
174+
.arg(current_range_start_block_height.to_string())
175+
.output();
176+
177+
println!("Task_migrate_and_compact_db exited with status: {:?}", output);
178+
}
179+
}
180+
163181
#[cfg_attr(
164182
feature = "tracing-instrumentation",
165183
tracing::instrument(skip(streamer_message, db_manager))
166184
)]
167185
pub async fn handle_streamer_message(
168186
streamer_message: near_indexer_primitives::StreamerMessage,
169187
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
188+
shard_db_config: HashMap<ShardId, configuration::DatabaseConnectUrl>,
170189
near_client: &(impl NearClient + std::fmt::Debug + Sync),
171190
indexer_config: impl configuration::RightsizingConfig
172191
+ configuration::IndexerConfig
@@ -179,14 +198,33 @@ pub async fn handle_streamer_message(
179198
let block_hash = streamer_message.block.header.hash;
180199

181200
let range_id = configuration::utils::get_data_range_id(&block_height).await?;
182-
if stats.read().await.current_range_id < range_id {
183-
db_manager.create_new_range_tables(range_id).await?;
201+
let current_range_id = stats.read().await.current_range_id;
202+
if current_range_id < range_id {
203+
match db_manager.create_new_range_tables(range_id).await {
204+
Ok(_) => {
205+
tracing::info!(target: INDEXER, "Created new range tables for range {}", range_id);
206+
}
207+
Err(e) => {
208+
tracing::error!(target: INDEXER, "Failed to create new range tables for range {}: {}", range_id, e);
209+
return Err(e);
210+
}
211+
};
212+
if current_range_id > 0 {
213+
// We spawn a task to do this in the background
214+
tracing::info!(target: INDEXER, "Migrating and compacting DB for range {}", current_range_id);
215+
tokio::spawn(async move { task_migrate_and_compact_db(
216+
shard_db_config,
217+
current_range_id,
218+
range_id,
219+
block_height,
220+
).await });
221+
}
184222
stats.write().await.current_range_id = range_id;
185223
}
186224

187225
let current_epoch_id = streamer_message.block.header.epoch_id;
188226
let next_epoch_id = streamer_message.block.header.next_epoch_id;
189-
227+
println!("current_epoch_id {}, next_epoch_id {}", current_epoch_id, next_epoch_id);
190228
tracing::debug!(target: INDEXER, "Block height {}", block_height,);
191229

192230
stats
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/bin/bash
2+
3+
DATABASE_URL=$1
4+
PREVIOUS_RANGE_ID=$2
5+
CURRENT_RANGE_ID=$3
6+
CURRENT_RANGE_BLOCK_HEIGHT_START=$4
7+
MAX_PARALLEL=${MAX_PARALLEL_JOBS}
8+
9+
# Function to migrate a single partition
10+
migrate_partition() {
11+
local partition=$1
12+
13+
psql "$DATABASE_URL" -c "
14+
WITH ordered_data AS (
15+
SELECT
16+
account_id,
17+
data_key,
18+
data_value,
19+
block_height AS block_height_from,
20+
LAG(block_height) OVER (
21+
PARTITION BY account_id, data_key
22+
ORDER BY block_height DESC
23+
) AS block_height_to
24+
FROM state_changes_access_key_${PREVIOUS_RANGE_ID}_$partition
25+
),
26+
insert_compact AS (
27+
INSERT INTO state_changes_access_key_${PREVIOUS_RANGE_ID}_compact_$partition (
28+
account_id, data_key, data_value, block_height_from, block_height_to
29+
)
30+
SELECT
31+
account_id, data_key, data_value, block_height_from, block_height_to
32+
FROM ordered_data
33+
WHERE data_value IS NOT NULL
34+
ON CONFLICT (account_id, data_key, block_height_from) DO NOTHING
35+
)
36+
INSERT INTO state_changes_access_key_${CURRENT_RANGE_ID}_$partition (
37+
account_id, block_height, data_key, data_value
38+
)
39+
SELECT
40+
account_id,
41+
block_height_from AS block_height,
42+
data_key,
43+
data_value
44+
FROM ordered_data
45+
WHERE data_value IS NOT NULL
46+
AND block_height_from <= ${CURRENT_RANGE_BLOCK_HEIGHT_START}
47+
AND (block_height_to IS NULL OR block_height_to > ${CURRENT_RANGE_BLOCK_HEIGHT_START})
48+
ON CONFLICT (account_id, data_key, block_height) DO NOTHING;
49+
"
50+
}
51+
52+
# Run migrations in parallel for partitions 0 to 99
53+
running=0
54+
for i in $(seq 0 99); do
55+
migrate_partition "$i" &
56+
57+
((running+=1))
58+
if ((running >= MAX_PARALLEL)); then
59+
wait -n # Wait for at least one job to finish
60+
((running-=1))
61+
fi
62+
done
63+
64+
# Wait for all background jobs to finish
65+
wait

0 commit comments

Comments
 (0)