Skip to content

Commit 056abdf

Browse files
committed
Bump DB version, add DB migration script
1 parent e435eff commit 056abdf

File tree

7 files changed

+327
-35
lines changed

7 files changed

+327
-35
lines changed

src/bin/db-migrate-v1-to-v2.rs

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
use std::collections::BTreeSet;
2+
use std::convert::TryInto;
3+
use std::str;
4+
5+
use itertools::Itertools;
6+
use log::{debug, info, trace};
7+
use rocksdb::WriteBatch;
8+
9+
use bitcoin::hashes::Hash;
10+
11+
use electrs::chain::{BlockHash, Txid};
12+
use electrs::new_index::db::DBFlush;
13+
use electrs::new_index::schema::{
14+
lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxEdgeRow as V2TxEdgeRow,
15+
TxHistoryKey,
16+
};
17+
use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little};
18+
use electrs::{config::Config, metrics::Metrics};
19+
20+
const FROM_DB_VERSION: u32 = 1;
21+
const TO_DB_VERSION: u32 = 2;
22+
23+
const BATCH_SIZE: usize = 15000;
24+
const PROGRESS_EVERY: usize = BATCH_SIZE * 50;
25+
26+
// For Elements-based chains the 'I' asset history index is migrated too
27+
#[cfg(not(feature = "liquid"))]
28+
const HISTORY_PREFIXES: [u8; 1] = [b'H'];
29+
#[cfg(feature = "liquid")]
30+
const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I'];
31+
32+
fn main() {
33+
let config = Config::from_args();
34+
let metrics = Metrics::new(config.monitoring_addr);
35+
let store = Store::open(&config, &metrics, false);
36+
37+
let txstore_db = store.txstore_db();
38+
let history_db = store.history_db();
39+
let cache_db = store.cache_db();
40+
let headers = store.headers();
41+
42+
// Check the DB version under `V` matches the expected version
43+
for db in [txstore_db, history_db, cache_db] {
44+
let ver_bytes = db.get(b"V").expect("missing DB version");
45+
let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap();
46+
assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver);
47+
}
48+
49+
// Utility to log progress once every PROGRESS_EVERY ticks
50+
let mut tick = 0usize;
51+
macro_rules! progress {
52+
($($arg:tt)+) => {{
53+
tick += 1;
54+
if tick % PROGRESS_EVERY == 0 {
55+
debug!($($arg)+);
56+
}
57+
}};
58+
}
59+
60+
// 1. Migrate the address prefix search index
61+
// Moved as-is from the history db to the txstore db
62+
info!("[1/4] migrating address prefix search index...");
63+
let address_iter = history_db.iter_scan(b"a");
64+
for chunk in &address_iter.chunks(BATCH_SIZE) {
65+
let mut batch = WriteBatch::default();
66+
for row in chunk {
67+
progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap());
68+
batch.put(row.key, row.value);
69+
}
70+
// Write batches without flushing (sync and WAL disabled)
71+
trace!("[1/4] writing batch of {} ops", batch.len());
72+
txstore_db.write_batch(batch, DBFlush::Disable);
73+
}
74+
// Flush the txstore db, only then delete the original rows from the history db
75+
info!("[1/4] flushing V2 address index to txstore db");
76+
txstore_db.flush();
77+
info!("[1/4] deleting V1 address index from history db");
78+
history_db.delete_range(b"a", b"b", DBFlush::Enable);
79+
80+
// 2. Migrate the TxConf transaction confirmation index
81+
// - Moved from the txstore db to the history db
82+
// - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain)
83+
// - Changed from the block hash to the block height
84+
// - Entries originating from stale blocks are removed
85+
// Steps 3/4 depend on this index getting migrated first
86+
info!("[2/4] migrating TxConf index...");
87+
let txconf_iter = txstore_db.iter_scan(b"C");
88+
for chunk in &txconf_iter.chunks(BATCH_SIZE) {
89+
let mut batch = WriteBatch::default();
90+
for v1_row in chunk {
91+
let v1_txconf: V1TxConfKey =
92+
deserialize_little(&v1_row.key).expect("invalid TxConfKey");
93+
let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash);
94+
if let Some(header) = headers.header_by_blockhash(&blockhash) {
95+
// The blockhash is still part of the best chain, use its height to construct the V2 row
96+
let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row();
97+
batch.put(v2_row.key, v2_row.value);
98+
} else {
99+
// The transaction was reorged, don't write the V2 entry
100+
// trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid));
101+
}
102+
progress!(
103+
"[2/4] migrating TxConf index ~{:.2}%",
104+
est_hash_progress(&v1_txconf.txid)
105+
);
106+
}
107+
// Write batches without flushing (sync and WAL disabled)
108+
trace!("[2/4] writing batch of {} ops", batch.len());
109+
history_db.write_batch(batch, DBFlush::Disable);
110+
}
111+
// Flush the history db, only then delete the original rows from the txstore db
112+
info!("[2/4] flushing V2 TxConf to history db");
113+
history_db.flush();
114+
info!("[2/4] deleting V1 TxConf from txstore db");
115+
txstore_db.delete_range(b"C", b"D", DBFlush::Enable);
116+
117+
// 3. Migrate the TxEdge spending index
118+
// - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain)
119+
// - Keep the height of the spending tx
120+
// - Entries originating from stale blocks are removed
121+
info!("[3/4] migrating TxEdge index...");
122+
let txedge_iter = history_db.iter_scan(b"S");
123+
for chunk in &txedge_iter.chunks(BATCH_SIZE) {
124+
let mut v1_edges = Vec::with_capacity(BATCH_SIZE);
125+
let mut spending_txids = BTreeSet::new();
126+
for v1_row in chunk {
127+
if let Ok(v1_edge) = deserialize_little::<V1TxEdgeKey>(&v1_row.key) {
128+
spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid));
129+
v1_edges.push((v1_edge, v1_row.key));
130+
}
131+
// Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped
132+
// This is necessary to properly recover if the migration stops halfway through.
133+
}
134+
135+
// Lookup the confirmation status for the entire chunk using a MultiGet operation
136+
let confirmations = lookup_confirmations(history_db, spending_txids);
137+
138+
let mut batch = WriteBatch::default();
139+
for (v1_edge, v1_db_key) in v1_edges {
140+
let spending_txid = Txid::from_byte_array(v1_edge.spending_txid);
141+
142+
// Remove the old V1 entry. V2 entries use a different key.
143+
batch.delete(v1_db_key);
144+
145+
if let Some(spending_height) = confirmations.get(&spending_txid) {
146+
// Re-add the V2 entry if it is still part of the best chain
147+
let v2_row = V2TxEdgeRow::new(
148+
v1_edge.funding_txid,
149+
v1_edge.funding_vout,
150+
v1_edge.spending_txid,
151+
v1_edge.spending_vin,
152+
*spending_height, // now with the height included
153+
)
154+
.into_row();
155+
batch.put(v2_row.key, v2_row.value);
156+
} else {
157+
// The spending transaction was reorged, don't write the V2 entry
158+
//trace!("[3/4] skipping reorged TxEdge for {}", spending_txid);
159+
}
160+
161+
progress!(
162+
"[3/4] migrating TxEdge index ~{:.2}%",
163+
est_hash_progress(&v1_edge.funding_txid)
164+
);
165+
}
166+
// Write batches without flushing (sync and WAL disabled)
167+
trace!("[3/4] writing batch of {} ops", batch.len());
168+
history_db.write_batch(batch, DBFlush::Disable);
169+
}
170+
info!("[3/4] flushing V2 TxEdge index to history db");
171+
history_db.flush();
172+
173+
// 4. Migrate the TxHistory index
174+
// Entries originating from stale blocks are removed, with no other changes
175+
info!("[4/4] migrating TxHistory index...");
176+
for prefix in HISTORY_PREFIXES {
177+
let txhistory_iter = history_db.iter_scan(&[prefix]);
178+
info!("[4/4] migrating TxHistory index {}", prefix as char);
179+
for chunk in &txhistory_iter.chunks(BATCH_SIZE) {
180+
let mut history_entries = Vec::with_capacity(BATCH_SIZE);
181+
let mut history_txids = BTreeSet::new();
182+
for row in chunk {
183+
let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey");
184+
history_txids.insert(hist.txinfo.get_txid());
185+
history_entries.push((hist, row.key));
186+
}
187+
188+
// Lookup the confirmation status for the entire chunk using a MultiGet operation
189+
let confirmations = lookup_confirmations(history_db, history_txids);
190+
191+
let mut batch = WriteBatch::default();
192+
for (hist, db_key) in history_entries {
193+
let hist_txid = hist.txinfo.get_txid();
194+
if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) {
195+
// The history entry originated from a stale block, remove it
196+
batch.delete(db_key);
197+
// trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid());
198+
}
199+
progress!(
200+
"[4/4] migrating TxHistory index {} ~{:.2}%",
201+
prefix as char,
202+
est_hash_progress(&hist.hash)
203+
);
204+
}
205+
// Write batches without flushing (sync and WAL disabled)
206+
trace!("[4/4] writing batch of {} deletions", batch.len());
207+
if !batch.is_empty() {
208+
history_db.write_batch(batch, DBFlush::Disable);
209+
}
210+
}
211+
}
212+
info!("[4/4] flushing TxHistory deletions to history db");
213+
history_db.flush();
214+
215+
// Update the DB version under `V`
216+
let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap();
217+
for db in [txstore_db, history_db, cache_db] {
218+
db.put_sync(b"V", &ver_bytes);
219+
}
220+
221+
// Compact everything once at the end
222+
txstore_db.full_compaction();
223+
history_db.full_compaction();
224+
}
225+
226+
// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution
227+
fn est_hash_progress(hash: &FullHash) -> f32 {
228+
u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32
229+
}
230+
231+
#[derive(Debug, serde::Deserialize)]
232+
struct V1TxConfKey {
233+
#[allow(dead_code)]
234+
code: u8,
235+
txid: FullHash,
236+
blockhash: FullHash,
237+
}
238+
239+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
240+
struct V1TxEdgeKey {
241+
code: u8,
242+
funding_txid: FullHash,
243+
funding_vout: u16,
244+
spending_txid: FullHash,
245+
spending_vin: u16,
246+
}
247+
248+
/*
249+
use bitcoin::hex::DisplayHex;
250+
251+
fn dump_db(db: &DB, label: &str, prefix: &[u8]) {
252+
debug!("dumping {}", label);
253+
for item in db.iter_scan(prefix) {
254+
trace!(
255+
"[{}] {} => {}",
256+
label,
257+
fmt_key(&item.key),
258+
&item.value.to_lower_hex_string()
259+
);
260+
}
261+
}
262+
263+
fn debug_batch(batch: &WriteBatch, label: &'static str) {
264+
debug!("batch {} with {} ops", label, batch.len());
265+
batch.iterate(&mut WriteBatchLogIterator(label));
266+
}
267+
268+
struct WriteBatchLogIterator(&'static str);
269+
impl rocksdb::WriteBatchIterator for WriteBatchLogIterator {
270+
fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) {
271+
trace!(
272+
"[batch {}] PUT {} => {}",
273+
self.0,
274+
fmt_key(&key),
275+
value.to_lower_hex_string()
276+
);
277+
}
278+
fn delete(&mut self, key: Box<[u8]>) {
279+
trace!("[batch {}] DELETE {}", self.0, fmt_key(&key));
280+
}
281+
}
282+
283+
fn fmt_key(key: &[u8]) -> String {
284+
format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string())
285+
}
286+
*/

src/bin/electrs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
6868
signal.clone(),
6969
&metrics,
7070
)?);
71-
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
71+
let store = Arc::new(Store::open(&config, &metrics, true));
7272
let mut indexer = Indexer::open(
7373
Arc::clone(&store),
7474
fetch_from(&config, &store),

src/bin/popular-scripts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use electrs::{
88
fn main() {
99
let config = Config::from_args();
1010
let metrics = Metrics::new(config.monitoring_addr);
11-
let store = Store::open(&config.db_path.join("newindex"), &config, &metrics);
11+
let store = Store::open(&config, &metrics, true);
1212

1313
let mut iter = store.history_db().raw_iterator();
1414
iter.seek(b"H");

src/bin/tx-fingerprint-stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn main() {
2424
let signal = Waiter::start(crossbeam_channel::never());
2525
let config = Config::from_args();
2626
let metrics = Metrics::new(config.monitoring_addr);
27-
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
27+
let store = Arc::new(Store::open(&config, &metrics, true));
2828

2929
let metrics = Metrics::new(config.monitoring_addr);
3030
metrics.start();

src/new_index/db.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::config::Config;
1111
use crate::new_index::db_metrics::RocksDbMetrics;
1212
use crate::util::{bincode, spawn_thread, Bytes};
1313

14-
static DB_VERSION: u32 = 1;
14+
static DB_VERSION: u32 = 2;
1515

1616
#[derive(Debug, Eq, PartialEq)]
1717
pub struct DBRow {
@@ -87,7 +87,7 @@ pub enum DBFlush {
8787
}
8888

8989
impl DB {
90-
pub fn open(path: &Path, config: &Config) -> DB {
90+
pub fn open(path: &Path, config: &Config, verify_compat: bool) -> DB {
9191
debug!("opening DB at {:?}", path);
9292
let mut db_opts = rocksdb::Options::default();
9393
db_opts.create_if_missing(true);
@@ -119,7 +119,9 @@ impl DB {
119119
let db = DB {
120120
db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"))
121121
};
122-
db.verify_compatibility(config);
122+
if verify_compat {
123+
db.verify_compatibility(config);
124+
}
123125
db
124126
}
125127

@@ -195,7 +197,7 @@ impl DB {
195197
self.write_batch(batch, flush)
196198
}
197199

198-
fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) {
200+
pub fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) {
199201
let do_flush = match flush {
200202
DBFlush::Enable => true,
201203
DBFlush::Disable => false,
@@ -232,21 +234,20 @@ impl DB {
232234
self.db.multi_get(keys)
233235
}
234236

237+
/// Remove database entries in the range [from, to)
238+
pub fn delete_range<K: AsRef<[u8]>>(&self, from: K, to: K, flush: DBFlush) {
239+
let mut batch = rocksdb::WriteBatch::default();
240+
batch.delete_range(from, to);
241+
self.write_batch(batch, flush);
242+
}
243+
235244
fn verify_compatibility(&self, config: &Config) {
236-
let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap();
237-
238-
if config.light_mode {
239-
// append a byte to indicate light_mode is enabled.
240-
// we're not letting bincode serialize this so that the compatiblity bytes won't change
241-
// (and require a reindex) when light_mode is disabled. this should be chagned the next
242-
// time we bump DB_VERSION and require a re-index anyway.
243-
compatibility_bytes.push(1);
244-
}
245+
let compatibility_bytes = bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap();
245246

246247
match self.get(b"V") {
247248
None => self.put(b"V", &compatibility_bytes),
248-
Some(ref x) if x != &compatibility_bytes => {
249-
panic!("Incompatible database found. Please reindex.")
249+
Some(x) if x != compatibility_bytes => {
250+
panic!("Incompatible database found. Please reindex or migrate.")
250251
}
251252
Some(_) => (),
252253
}

0 commit comments

Comments
 (0)