|
| 1 | +// Copyright 2025 The go-ethereum Authors |
| 2 | +// This file is part of the go-ethereum library. |
| 3 | +// |
| 4 | +// The go-ethereum library is free software: you can redistribute it and/or modify |
| 5 | +// it under the terms of the GNU Lesser General Public License as published by |
| 6 | +// the Free Software Foundation, either version 3 of the License, or |
| 7 | +// (at your option) any later version. |
| 8 | +// |
| 9 | +// The go-ethereum library is distributed in the hope that it will be useful, |
| 10 | +// but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +// GNU Lesser General Public License for more details. |
| 13 | +// |
| 14 | +// You should have received a copy of the GNU Lesser General Public License |
| 15 | +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. |
| 16 | + |
| 17 | +package pathdb |
| 18 | + |
| 19 | +import ( |
| 20 | + "encoding/binary" |
| 21 | + "sync" |
| 22 | + "sync/atomic" |
| 23 | + "time" |
| 24 | + |
| 25 | + "github.com/ethereum/go-ethereum/common" |
| 26 | + "github.com/ethereum/go-ethereum/core/rawdb" |
| 27 | + "github.com/ethereum/go-ethereum/ethdb" |
| 28 | + "github.com/ethereum/go-ethereum/log" |
| 29 | +) |
| 30 | + |
| 31 | +const ( |
| 32 | + // indexPruningThreshold defines the number of pruned histories that must |
| 33 | + // accumulate before triggering index pruning. This helps avoid scheduling |
| 34 | + // index pruning too frequently. |
| 35 | + indexPruningThreshold = 90000 |
| 36 | + |
| 37 | + // iteratorReopenInterval is how long the iterator is kept open before |
| 38 | + // being released and re-opened. Long-lived iterators hold a read snapshot |
| 39 | + // that blocks LSM compaction; periodically re-opening avoids stalling the |
| 40 | + // compactor during a large scan. |
| 41 | + iteratorReopenInterval = 30 * time.Second |
| 42 | +) |
| 43 | + |
| 44 | +// indexPruner is responsible for pruning stale index data from the tail side |
| 45 | +// when old history objects are removed. It runs as a background goroutine and |
| 46 | +// processes pruning signals whenever the history tail advances. |
| 47 | +// |
| 48 | +// The pruning operates at the block level: for each state element's index |
| 49 | +// metadata, leading index blocks whose maximum history ID falls below the |
| 50 | +// new tail are removed entirely. This avoids the need to decode individual |
| 51 | +// block contents and is efficient because index blocks store monotonically |
| 52 | +// increasing history IDs. |
| 53 | +type indexPruner struct { |
| 54 | + disk ethdb.KeyValueStore |
| 55 | + typ historyType |
| 56 | + tail atomic.Uint64 // Tail below which index entries can be pruned |
| 57 | + lastRun uint64 // The tail in the last pruning run |
| 58 | + trigger chan struct{} // Non-blocking signal that tail has advanced |
| 59 | + closed chan struct{} |
| 60 | + wg sync.WaitGroup |
| 61 | + log log.Logger |
| 62 | + |
| 63 | + pauseReq chan chan struct{} // Pause request; caller sends ack channel, pruner closes it when paused |
| 64 | + resumeCh chan struct{} // Resume signal sent by caller after indexSingle/unindexSingle completes |
| 65 | +} |
| 66 | + |
| 67 | +// newIndexPruner creates and starts a new index pruner for the given history type. |
| 68 | +func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner { |
| 69 | + p := &indexPruner{ |
| 70 | + disk: disk, |
| 71 | + typ: typ, |
| 72 | + trigger: make(chan struct{}, 1), |
| 73 | + closed: make(chan struct{}), |
| 74 | + log: log.New("type", typ.String()), |
| 75 | + pauseReq: make(chan chan struct{}), |
| 76 | + resumeCh: make(chan struct{}), |
| 77 | + } |
| 78 | + p.wg.Add(1) |
| 79 | + go p.run() |
| 80 | + return p |
| 81 | +} |
| 82 | + |
| 83 | +// prune signals the pruner that the history tail has advanced to the given ID. |
| 84 | +// All index entries referencing history IDs below newTail can be removed. |
| 85 | +func (p *indexPruner) prune(newTail uint64) { |
| 86 | + // Only update if the tail is actually advancing |
| 87 | + for { |
| 88 | + old := p.tail.Load() |
| 89 | + if newTail <= old { |
| 90 | + return |
| 91 | + } |
| 92 | + if p.tail.CompareAndSwap(old, newTail) { |
| 93 | + break |
| 94 | + } |
| 95 | + } |
| 96 | + // Non-blocking signal |
| 97 | + select { |
| 98 | + case p.trigger <- struct{}{}: |
| 99 | + default: |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +// pause requests the pruner to flush all pending writes and pause. It blocks |
| 104 | +// until the pruner has acknowledged the pause. This must be paired with a |
| 105 | +// subsequent call to resume. |
| 106 | +func (p *indexPruner) pause() { |
| 107 | + ack := make(chan struct{}) |
| 108 | + select { |
| 109 | + case p.pauseReq <- ack: |
| 110 | + <-ack // wait for the pruner to flush and acknowledge |
| 111 | + case <-p.closed: |
| 112 | + } |
| 113 | +} |
| 114 | + |
| 115 | +// resume unblocks a previously paused pruner, allowing it to continue |
| 116 | +// processing. |
| 117 | +func (p *indexPruner) resume() { |
| 118 | + select { |
| 119 | + case p.resumeCh <- struct{}{}: |
| 120 | + case <-p.closed: |
| 121 | + } |
| 122 | +} |
| 123 | + |
| 124 | +// close shuts down the pruner and waits for it to finish. |
| 125 | +func (p *indexPruner) close() { |
| 126 | + select { |
| 127 | + case <-p.closed: |
| 128 | + return |
| 129 | + default: |
| 130 | + close(p.closed) |
| 131 | + p.wg.Wait() |
| 132 | + } |
| 133 | +} |
| 134 | + |
| 135 | +// run is the main loop of the pruner. It waits for trigger signals and |
| 136 | +// processes a small batch of entries on each trigger, advancing the cursor. |
| 137 | +func (p *indexPruner) run() { |
| 138 | + defer p.wg.Done() |
| 139 | + |
| 140 | + for { |
| 141 | + select { |
| 142 | + case <-p.trigger: |
| 143 | + tail := p.tail.Load() |
| 144 | + if tail < p.lastRun || tail-p.lastRun < indexPruningThreshold { |
| 145 | + continue |
| 146 | + } |
| 147 | + if err := p.process(tail); err != nil { |
| 148 | + p.log.Error("Failed to prune index", "tail", tail, "err", err) |
| 149 | + } else { |
| 150 | + p.lastRun = tail |
| 151 | + } |
| 152 | + |
| 153 | + case ack := <-p.pauseReq: |
| 154 | + // Pruner is idle, acknowledge immediately and wait for resume. |
| 155 | + close(ack) |
| 156 | + select { |
| 157 | + case <-p.resumeCh: |
| 158 | + case <-p.closed: |
| 159 | + return |
| 160 | + } |
| 161 | + |
| 162 | + case <-p.closed: |
| 163 | + return |
| 164 | + } |
| 165 | + } |
| 166 | +} |
| 167 | + |
| 168 | +// process iterates all index metadata entries for the history type and prunes |
| 169 | +// leading blocks whose max history ID is below the given tail. |
| 170 | +func (p *indexPruner) process(tail uint64) error { |
| 171 | + var ( |
| 172 | + err error |
| 173 | + pruned int |
| 174 | + start = time.Now() |
| 175 | + ) |
| 176 | + switch p.typ { |
| 177 | + case typeStateHistory: |
| 178 | + n, err := p.prunePrefix(rawdb.StateHistoryAccountMetadataPrefix, typeAccount, tail) |
| 179 | + if err != nil { |
| 180 | + return err |
| 181 | + } |
| 182 | + pruned += n |
| 183 | + |
| 184 | + n, err = p.prunePrefix(rawdb.StateHistoryStorageMetadataPrefix, typeStorage, tail) |
| 185 | + if err != nil { |
| 186 | + return err |
| 187 | + } |
| 188 | + pruned += n |
| 189 | + statePruneHistoryIndexTimer.UpdateSince(start) |
| 190 | + |
| 191 | + case typeTrienodeHistory: |
| 192 | + pruned, err = p.prunePrefix(rawdb.TrienodeHistoryMetadataPrefix, typeTrienode, tail) |
| 193 | + if err != nil { |
| 194 | + return err |
| 195 | + } |
| 196 | + trienodePruneHistoryIndexTimer.UpdateSince(start) |
| 197 | + |
| 198 | + default: |
| 199 | + panic("unknown history type") |
| 200 | + } |
| 201 | + if pruned > 0 { |
| 202 | + p.log.Info("Pruned stale index blocks", "pruned", pruned, "tail", tail, "elapsed", common.PrettyDuration(time.Since(start))) |
| 203 | + } |
| 204 | + return nil |
| 205 | +} |
| 206 | + |
| 207 | +// prunePrefix scans all metadata entries under the given prefix and prunes |
| 208 | +// leading index blocks below the tail. The iterator is periodically released |
| 209 | +// and re-opened to avoid holding a read snapshot that blocks LSM compaction. |
| 210 | +func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) { |
| 211 | + var ( |
| 212 | + pruned int |
| 213 | + opened = time.Now() |
| 214 | + it = p.disk.NewIterator(prefix, nil) |
| 215 | + batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize) |
| 216 | + ) |
| 217 | + for { |
| 218 | + // Terminate if iterator is exhausted |
| 219 | + if !it.Next() { |
| 220 | + it.Release() |
| 221 | + break |
| 222 | + } |
| 223 | + // Check termination or pause request |
| 224 | + select { |
| 225 | + case <-p.closed: |
| 226 | + // Terminate the process if indexer is closed |
| 227 | + it.Release() |
| 228 | + if batch.ValueSize() > 0 { |
| 229 | + return pruned, batch.Write() |
| 230 | + } |
| 231 | + return pruned, nil |
| 232 | + |
| 233 | + case ack := <-p.pauseReq: |
| 234 | + // Save the current position so that after resume the |
| 235 | + // iterator can be re-opened from where it left off. |
| 236 | + start := common.CopyBytes(it.Key()[len(prefix):]) |
| 237 | + it.Release() |
| 238 | + |
| 239 | + // Flush all pending writes before acknowledging the pause. |
| 240 | + var flushErr error |
| 241 | + if batch.ValueSize() > 0 { |
| 242 | + if err := batch.Write(); err != nil { |
| 243 | + flushErr = err |
| 244 | + } |
| 245 | + batch.Reset() |
| 246 | + } |
| 247 | + close(ack) |
| 248 | + |
| 249 | + // Block until resumed or closed. Always wait here even if |
| 250 | + // the flush failed — returning early would cause resume() |
| 251 | + // to deadlock since nobody would receive on resumeCh. |
| 252 | + select { |
| 253 | + case <-p.resumeCh: |
| 254 | + if flushErr != nil { |
| 255 | + return 0, flushErr |
| 256 | + } |
| 257 | + // Re-open the iterator from the saved position so the |
| 258 | + // pruner sees the current database state (including any |
| 259 | + // writes made by indexer during the pause). |
| 260 | + it = p.disk.NewIterator(prefix, start) |
| 261 | + opened = time.Now() |
| 262 | + continue |
| 263 | + case <-p.closed: |
| 264 | + return pruned, flushErr |
| 265 | + } |
| 266 | + |
| 267 | + default: |
| 268 | + // Keep processing |
| 269 | + } |
| 270 | + |
| 271 | + // Prune the index data block |
| 272 | + key, value := it.Key(), it.Value() |
| 273 | + ident, bsize := p.identFromKey(key, prefix, elemType) |
| 274 | + n, err := p.pruneEntry(batch, ident, value, bsize, tail) |
| 275 | + if err != nil { |
| 276 | + p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) |
| 277 | + continue |
| 278 | + } |
| 279 | + pruned += n |
| 280 | + |
| 281 | + // Flush the batch if there are too many accumulated |
| 282 | + if batch.ValueSize() >= ethdb.IdealBatchSize { |
| 283 | + if err := batch.Write(); err != nil { |
| 284 | + it.Release() |
| 285 | + return 0, err |
| 286 | + } |
| 287 | + batch.Reset() |
| 288 | + } |
| 289 | + |
| 290 | + // Periodically release the iterator so the LSM compactor |
| 291 | + // is not blocked by the read snapshot we hold. |
| 292 | + if time.Since(opened) >= iteratorReopenInterval { |
| 293 | + opened = time.Now() |
| 294 | + |
| 295 | + start := common.CopyBytes(it.Key()[len(prefix):]) |
| 296 | + it.Release() |
| 297 | + it = p.disk.NewIterator(prefix, start) |
| 298 | + } |
| 299 | + } |
| 300 | + if batch.ValueSize() > 0 { |
| 301 | + if err := batch.Write(); err != nil { |
| 302 | + return 0, err |
| 303 | + } |
| 304 | + } |
| 305 | + return pruned, nil |
| 306 | +} |
| 307 | + |
| 308 | +// identFromKey reconstructs the stateIdent and bitmapSize from a metadata key. |
| 309 | +func (p *indexPruner) identFromKey(key []byte, prefix []byte, elemType elementType) (stateIdent, int) { |
| 310 | + rest := key[len(prefix):] |
| 311 | + |
| 312 | + switch elemType { |
| 313 | + case typeAccount: |
| 314 | + // key = prefix + addressHash(32) |
| 315 | + var addrHash common.Hash |
| 316 | + copy(addrHash[:], rest[:32]) |
| 317 | + return newAccountIdent(addrHash), 0 |
| 318 | + |
| 319 | + case typeStorage: |
| 320 | + // key = prefix + addressHash(32) + storageHash(32) |
| 321 | + var addrHash, storHash common.Hash |
| 322 | + copy(addrHash[:], rest[:32]) |
| 323 | + copy(storHash[:], rest[32:64]) |
| 324 | + return newStorageIdent(addrHash, storHash), 0 |
| 325 | + |
| 326 | + case typeTrienode: |
| 327 | + // key = prefix + addressHash(32) + path(variable) |
| 328 | + var addrHash common.Hash |
| 329 | + copy(addrHash[:], rest[:32]) |
| 330 | + path := string(rest[32:]) |
| 331 | + ident := newTrienodeIdent(addrHash, path) |
| 332 | + return ident, ident.bloomSize() |
| 333 | + |
| 334 | + default: |
| 335 | + panic("unknown element type") |
| 336 | + } |
| 337 | +} |
| 338 | + |
| 339 | +// pruneEntry checks a single metadata entry and removes leading index blocks |
| 340 | +// whose max < tail. Returns the number of blocks pruned. |
| 341 | +func (p *indexPruner) pruneEntry(batch ethdb.Batch, ident stateIdent, blob []byte, bsize int, tail uint64) (int, error) { |
| 342 | + // Fast path: the first 8 bytes of the metadata encode the max history ID |
| 343 | + // of the first index block (big-endian uint64). If it is >= tail, no |
| 344 | + // blocks can be pruned and we skip the full parse entirely. |
| 345 | + if len(blob) >= 8 && binary.BigEndian.Uint64(blob[:8]) >= tail { |
| 346 | + return 0, nil |
| 347 | + } |
| 348 | + descList, err := parseIndex(blob, bsize) |
| 349 | + if err != nil { |
| 350 | + return 0, err |
| 351 | + } |
| 352 | + // Find the number of leading blocks that can be entirely pruned. |
| 353 | + // A block can be pruned if its max history ID is strictly below |
| 354 | + // the tail. |
| 355 | + var count int |
| 356 | + for _, desc := range descList { |
| 357 | + if desc.max < tail { |
| 358 | + count++ |
| 359 | + } else { |
| 360 | + break // blocks are ordered, no more to prune |
| 361 | + } |
| 362 | + } |
| 363 | + if count == 0 { |
| 364 | + return 0, nil |
| 365 | + } |
| 366 | + // Delete the pruned index blocks |
| 367 | + for i := 0; i < count; i++ { |
| 368 | + deleteStateIndexBlock(ident, batch, descList[i].id) |
| 369 | + } |
| 370 | + // Update or delete the metadata |
| 371 | + remaining := descList[count:] |
| 372 | + if len(remaining) == 0 { |
| 373 | + // All blocks pruned, remove the metadata entry entirely |
| 374 | + deleteStateIndex(ident, batch) |
| 375 | + } else { |
| 376 | + // Rewrite the metadata with the remaining blocks |
| 377 | + size := indexBlockDescSize + bsize |
| 378 | + buf := make([]byte, 0, size*len(remaining)) |
| 379 | + for _, desc := range remaining { |
| 380 | + buf = append(buf, desc.encode()...) |
| 381 | + } |
| 382 | + writeStateIndex(ident, batch, buf) |
| 383 | + } |
| 384 | + return count, nil |
| 385 | +} |
0 commit comments