Skip to content

Commit 2bff070

Browse files
committed
Merge branch '202511-undo-reorgs' into new-index
2 parents e60ca89 + ae45516 commit 2bff070

File tree

13 files changed

+1021
-263
lines changed

13 files changed

+1021
-263
lines changed

doc/schema.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,40 @@ Each block results in the following new rows:
2525

2626
* `"M{blockhash}" → "{metadata}"` (block weight, size and number of txs)
2727

28-
* `"D{blockhash}" → ""` (signifies the block is done processing)
28+
* `"D{blockhash}" → ""` (signifies the block was added)
2929

30-
Each transaction results in the following new rows:
30+
Each transaction results in the following new row:
3131

3232
* `"T{txid}" → "{serialized-transaction}"`
3333

34-
* `"C{txid}{confirmed-blockhash}" → ""` (a list of blockhashes where `txid` was seen to be confirmed)
35-
36-
Each output results in the following new row:
34+
Each output results in the following new rows:
3735

3836
* `"O{txid}{vout}" → "{scriptpubkey}{value}"`
37+
* `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled)
3938

4039
When the indexer is synced up to the tip of the chain, the hash of the tip is saved as following:
4140

4241
* `"t" → "{blockhash}"`
4342

4443
### `history`
4544

46-
Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new rows (`H` is for history, `F` is for funding):
45+
Each transaction results in the following new row:
46+
47+
* `"C{txid}" → "{confirmed-height}"`
48+
49+
Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new row (`H` is for history, `F` is for funding):
4750

4851
* `"H{funding-scripthash}{funding-height}F{funding-txid:vout}{value}" → ""`
49-
* `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled)
5052

5153
Each spending input (except the coinbase) results in the following new rows (`S` is for spending):
5254

5355
* `"H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout}{value}" → ""`
5456

55-
* `"S{funding-txid:vout}{spending-txid:vin}" → ""`
57+
* `"S{funding-txid:vout}" → "{spending-txid:vin}{spending-height}"`
58+
59+
Each block results in the following new row:
60+
61+
* `"D{blockhash}" → ""` (signifies the block was indexed)
5662

5763
#### Elements only
5864

src/bin/db-migrate-v1-to-v2.rs

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
use std::collections::BTreeSet;
2+
use std::convert::TryInto;
3+
use std::str;
4+
5+
use itertools::Itertools;
6+
use log::{debug, info, trace};
7+
use rocksdb::WriteBatch;
8+
9+
use bitcoin::hashes::Hash;
10+
11+
use electrs::chain::{BlockHash, Txid};
12+
use electrs::new_index::db::DBFlush;
13+
use electrs::new_index::schema::{
14+
lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxEdgeRow as V2TxEdgeRow,
15+
TxHistoryKey,
16+
};
17+
use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little};
18+
use electrs::{config::Config, metrics::Metrics};
19+
20+
const FROM_DB_VERSION: u32 = 1;
21+
const TO_DB_VERSION: u32 = 2;
22+
23+
const BATCH_SIZE: usize = 15000;
24+
const PROGRESS_EVERY: usize = BATCH_SIZE * 50;
25+
26+
// For Elements-based chains the 'I' asset history index is migrated too
27+
#[cfg(not(feature = "liquid"))]
28+
const HISTORY_PREFIXES: [u8; 1] = [b'H'];
29+
#[cfg(feature = "liquid")]
30+
const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I'];
31+
32+
fn main() {
33+
let config = Config::from_args();
34+
let metrics = Metrics::new(config.monitoring_addr);
35+
let store = Store::open(&config, &metrics, false);
36+
37+
let txstore_db = store.txstore_db();
38+
let history_db = store.history_db();
39+
let cache_db = store.cache_db();
40+
let headers = store.headers();
41+
let tip_height = headers.best_height() as u32;
42+
43+
// Check the DB version under `V` matches the expected version
44+
for db in [txstore_db, history_db, cache_db] {
45+
let ver_bytes = db.get(b"V").expect("missing DB version");
46+
let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap();
47+
assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver);
48+
}
49+
50+
// Utility to log progress once every PROGRESS_EVERY ticks
51+
let mut tick = 0usize;
52+
macro_rules! progress {
53+
($($arg:tt)+) => {{
54+
tick = tick.wrapping_add(1);
55+
if tick % PROGRESS_EVERY == 0 {
56+
debug!($($arg)+);
57+
}
58+
}};
59+
}
60+
61+
// 1. Migrate the address prefix search index
62+
// Moved as-is from the history db to the txstore db
63+
info!("[1/4] migrating address prefix search index...");
64+
let address_iter = history_db.iter_scan(b"a");
65+
for chunk in &address_iter.chunks(BATCH_SIZE) {
66+
let mut batch = WriteBatch::default();
67+
for row in chunk {
68+
progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap());
69+
batch.put(row.key, row.value);
70+
}
71+
// Write batches without flushing (sync and WAL disabled)
72+
trace!("[1/4] writing batch of {} ops", batch.len());
73+
txstore_db.write_batch(batch, DBFlush::Disable);
74+
}
75+
// Flush the txstore db, only then delete the original rows from the history db
76+
info!("[1/4] flushing V2 address index to txstore db");
77+
txstore_db.flush();
78+
info!("[1/4] deleting V1 address index from history db");
79+
history_db.delete_range(b"a", b"b", DBFlush::Enable);
80+
81+
// 2. Migrate the TxConf transaction confirmation index
82+
// - Moved from the txstore db to the history db
83+
// - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain)
84+
// - Changed from the block hash to the block height
85+
// - Entries originating from stale blocks are removed
86+
// Steps 3/4 depend on this index getting migrated first
87+
info!("[2/4] migrating TxConf index...");
88+
let txconf_iter = txstore_db.iter_scan(b"C");
89+
for chunk in &txconf_iter.chunks(BATCH_SIZE) {
90+
let mut batch = WriteBatch::default();
91+
for v1_row in chunk {
92+
let v1_txconf: V1TxConfKey =
93+
deserialize_little(&v1_row.key).expect("invalid TxConfKey");
94+
let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash);
95+
if let Some(header) = headers.header_by_blockhash(&blockhash) {
96+
// The blockhash is still part of the best chain, use its height to construct the V2 row
97+
let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row();
98+
batch.put(v2_row.key, v2_row.value);
99+
} else {
100+
// The transaction was reorged, don't write the V2 entry
101+
// trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid));
102+
}
103+
progress!(
104+
"[2/4] migrating TxConf index ~{:.2}%",
105+
est_hash_progress(&v1_txconf.txid)
106+
);
107+
}
108+
// Write batches without flushing (sync and WAL disabled)
109+
trace!("[2/4] writing batch of {} ops", batch.len());
110+
history_db.write_batch(batch, DBFlush::Disable);
111+
}
112+
// Flush the history db, only then delete the original rows from the txstore db
113+
info!("[2/4] flushing V2 TxConf to history db");
114+
history_db.flush();
115+
info!("[2/4] deleting V1 TxConf from txstore db");
116+
txstore_db.delete_range(b"C", b"D", DBFlush::Enable);
117+
118+
// 3. Migrate the TxEdge spending index
119+
// - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain)
120+
// - Keep the height of the spending tx
121+
// - Entries originating from stale blocks are removed
122+
info!("[3/4] migrating TxEdge index...");
123+
let txedge_iter = history_db.iter_scan(b"S");
124+
for chunk in &txedge_iter.chunks(BATCH_SIZE) {
125+
let mut v1_edges = Vec::with_capacity(BATCH_SIZE);
126+
let mut spending_txids = BTreeSet::new();
127+
for v1_row in chunk {
128+
if let Ok(v1_edge) = deserialize_little::<V1TxEdgeKey>(&v1_row.key) {
129+
spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid));
130+
v1_edges.push((v1_edge, v1_row.key));
131+
}
132+
// Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped
133+
// This is necessary to properly recover if the migration stops halfway through.
134+
}
135+
136+
// Lookup the confirmation status for the entire chunk using a MultiGet operation
137+
let confirmations = lookup_confirmations(history_db, tip_height, spending_txids);
138+
139+
let mut batch = WriteBatch::default();
140+
for (v1_edge, v1_db_key) in v1_edges {
141+
let spending_txid = Txid::from_byte_array(v1_edge.spending_txid);
142+
143+
// Remove the old V1 entry. V2 entries use a different key.
144+
batch.delete(v1_db_key);
145+
146+
if let Some(spending_height) = confirmations.get(&spending_txid) {
147+
// Re-add the V2 entry if it is still part of the best chain
148+
let v2_row = V2TxEdgeRow::new(
149+
v1_edge.funding_txid,
150+
v1_edge.funding_vout,
151+
v1_edge.spending_txid,
152+
v1_edge.spending_vin,
153+
*spending_height, // now with the height included
154+
)
155+
.into_row();
156+
batch.put(v2_row.key, v2_row.value);
157+
} else {
158+
// The spending transaction was reorged, don't write the V2 entry
159+
//trace!("[3/4] skipping reorged TxEdge for {}", spending_txid);
160+
}
161+
162+
progress!(
163+
"[3/4] migrating TxEdge index ~{:.2}%",
164+
est_hash_progress(&v1_edge.funding_txid)
165+
);
166+
}
167+
// Write batches without flushing (sync and WAL disabled)
168+
trace!("[3/4] writing batch of {} ops", batch.len());
169+
history_db.write_batch(batch, DBFlush::Disable);
170+
}
171+
info!("[3/4] flushing V2 TxEdge index to history db");
172+
history_db.flush();
173+
174+
// 4. Migrate the TxHistory index
175+
// Entries originating from stale blocks are removed, with no other changes
176+
info!("[4/4] migrating TxHistory index...");
177+
for prefix in HISTORY_PREFIXES {
178+
let txhistory_iter = history_db.iter_scan(&[prefix]);
179+
info!("[4/4] migrating TxHistory index {}", prefix as char);
180+
for chunk in &txhistory_iter.chunks(BATCH_SIZE) {
181+
let mut history_entries = Vec::with_capacity(BATCH_SIZE);
182+
let mut history_txids = BTreeSet::new();
183+
for row in chunk {
184+
let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey");
185+
history_txids.insert(hist.txinfo.get_txid());
186+
history_entries.push((hist, row.key));
187+
}
188+
189+
// Lookup the confirmation status for the entire chunk using a MultiGet operation
190+
let confirmations = lookup_confirmations(history_db, tip_height, history_txids);
191+
192+
let mut batch = WriteBatch::default();
193+
for (hist, db_key) in history_entries {
194+
let hist_txid = hist.txinfo.get_txid();
195+
if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) {
196+
// The history entry originated from a stale block, remove it
197+
batch.delete(db_key);
198+
// trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid());
199+
}
200+
progress!(
201+
"[4/4] migrating TxHistory index {} ~{:.2}%",
202+
prefix as char,
203+
est_hash_progress(&hist.hash)
204+
);
205+
}
206+
// Write batches without flushing (sync and WAL disabled)
207+
trace!("[4/4] writing batch of {} deletions", batch.len());
208+
if !batch.is_empty() {
209+
history_db.write_batch(batch, DBFlush::Disable);
210+
}
211+
}
212+
}
213+
info!("[4/4] flushing TxHistory deletions to history db");
214+
history_db.flush();
215+
216+
// Update the DB version under `V`
217+
let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap();
218+
for db in [txstore_db, history_db, cache_db] {
219+
db.put_sync(b"V", &ver_bytes);
220+
}
221+
222+
// Compact everything once at the end
223+
txstore_db.full_compaction();
224+
history_db.full_compaction();
225+
}
226+
227+
// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution
228+
fn est_hash_progress(hash: &FullHash) -> f32 {
229+
u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32
230+
}
231+
232+
#[derive(Debug, serde::Deserialize)]
233+
struct V1TxConfKey {
234+
#[allow(dead_code)]
235+
code: u8,
236+
txid: FullHash,
237+
blockhash: FullHash,
238+
}
239+
240+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
241+
struct V1TxEdgeKey {
242+
code: u8,
243+
funding_txid: FullHash,
244+
funding_vout: u16,
245+
spending_txid: FullHash,
246+
spending_vin: u16,
247+
}
248+
249+
/*
250+
use bitcoin::hex::DisplayHex;
251+
252+
fn dump_db(db: &DB, label: &str, prefix: &[u8]) {
253+
debug!("dumping {}", label);
254+
for item in db.iter_scan(prefix) {
255+
trace!(
256+
"[{}] {} => {}",
257+
label,
258+
fmt_key(&item.key),
259+
&item.value.to_lower_hex_string()
260+
);
261+
}
262+
}
263+
264+
fn debug_batch(batch: &WriteBatch, label: &'static str) {
265+
debug!("batch {} with {} ops", label, batch.len());
266+
batch.iterate(&mut WriteBatchLogIterator(label));
267+
}
268+
269+
struct WriteBatchLogIterator(&'static str);
270+
impl rocksdb::WriteBatchIterator for WriteBatchLogIterator {
271+
fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) {
272+
trace!(
273+
"[batch {}] PUT {} => {}",
274+
self.0,
275+
fmt_key(&key),
276+
value.to_lower_hex_string()
277+
);
278+
}
279+
fn delete(&mut self, key: Box<[u8]>) {
280+
trace!("[batch {}] DELETE {}", self.0, fmt_key(&key));
281+
}
282+
}
283+
284+
fn fmt_key(key: &[u8]) -> String {
285+
format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string())
286+
}
287+
*/

src/bin/electrs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
6868
signal.clone(),
6969
&metrics,
7070
)?);
71-
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
71+
let store = Arc::new(Store::open(&config, &metrics, true));
7272
let mut indexer = Indexer::open(
7373
Arc::clone(&store),
7474
fetch_from(&config, &store),

src/bin/popular-scripts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use electrs::{
88
fn main() {
99
let config = Config::from_args();
1010
let metrics = Metrics::new(config.monitoring_addr);
11-
let store = Store::open(&config.db_path.join("newindex"), &config, &metrics);
11+
let store = Store::open(&config, &metrics, true);
1212

1313
let mut iter = store.history_db().raw_iterator();
1414
iter.seek(b"H");

src/bin/tx-fingerprint-stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn main() {
2424
let signal = Waiter::start(crossbeam_channel::never());
2525
let config = Config::from_args();
2626
let metrics = Metrics::new(config.monitoring_addr);
27-
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
27+
let store = Arc::new(Store::open(&config, &metrics, true));
2828

2929
let metrics = Metrics::new(config.monitoring_addr);
3030
metrics.start();

src/elements/asset.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::elements::registry::{AssetMeta, AssetRegistry};
1313
use crate::errors::*;
1414
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
1515
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
16-
use crate::util::{bincode, full_hash, Bytes, FullHash, TransactionStatus, TxInput};
16+
use crate::util::{bincode, full_hash, BlockId, Bytes, FullHash, TransactionStatus, TxInput};
1717

1818
lazy_static! {
1919
pub static ref NATIVE_ASSET_ID: AssetId =
@@ -509,7 +509,7 @@ where
509509

510510
// save updated stats to cache
511511
if let Some(lastblock) = lastblock {
512-
chain.store().cache_db().write(
512+
chain.store().cache_db().write_rows(
513513
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
514514
DBFlush::Enable,
515515
);
@@ -526,13 +526,14 @@ fn chain_asset_stats_delta<T>(
526526
start_height: usize,
527527
apply_fn: AssetStatApplyFn<T>,
528528
) -> (T, Option<BlockHash>) {
529+
let headers = chain.store().headers();
529530
let history_iter = chain
530531
.history_iter_scan(b'I', &asset_id.into_inner()[..], start_height)
531532
.map(TxHistoryRow::from_row)
532533
.filter_map(|history| {
533-
chain
534-
.tx_confirming_block(&history.get_txid())
535-
.map(|blockid| (history, blockid))
534+
// skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed)
535+
let header = headers.header_by_height(history.key.confirmed_height as usize)?;
536+
Some((history, BlockId::from(header)))
536537
});
537538

538539
let mut stats = init_stats;

0 commit comments

Comments
 (0)