diff --git a/cmd/pruner.go b/cmd/pruner.go index c04d579..09e0e3d 100644 --- a/cmd/pruner.go +++ b/cmd/pruner.go @@ -5,7 +5,7 @@ import ( "fmt" "path/filepath" - "github.com/cometbft/cometbft/libs/log" + "github.com/cometbft/cometbft/state" "github.com/cosmos/cosmos-sdk/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" authzkeeper "github.com/cosmos/cosmos-sdk/x/authz/keeper" @@ -18,7 +18,6 @@ import ( ibchost "github.com/cosmos/ibc-go/v7/modules/core/exported" db "github.com/cometbft/cometbft-db" - "github.com/cometbft/cometbft/state" tmstore "github.com/cometbft/cometbft/store" storetypes "github.com/cosmos/cosmos-sdk/store/types" consensusparamtypes "github.com/cosmos/cosmos-sdk/x/consensus/types" @@ -49,9 +48,13 @@ func pruneCmd() *cobra.Command { Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { + logger.Info("Starting pruning...") + ctx := cmd.Context() errs, _ := errgroup.WithContext(ctx) var err error + + // Tendermint pruning (blockstore.db, state.db) if tendermint { errs.Go(func() error { if err = pruneTMData(args[0]); err != nil { @@ -94,7 +97,7 @@ func pruneAppState(home string) error { } //TODO: need to get all versions in the store, setting randomly is too slow - fmt.Println("pruning application state") + logger.Info("pruning application state") // only mount keys from core sdk // todo allow for other keys to be mounted @@ -135,7 +138,17 @@ func pruneAppState(home string) error { } // TODO: cleanup app state - appStore := rootmulti.NewStore(appDB, log.NewNopLogger()) + appStore := rootmulti.NewStore(appDB, logger) + + // Configure IAVL fast node + // Default (false): fast node enabled for queries + // With flag (true): fast node disabled for faster pruning + appStore.SetIAVLDisableFastNode(disableFastNode) + if disableFastNode { + logger.Info("IAVL fast node disabled (faster pruning mode)") + } else { + logger.Info("IAVL fast node enabled (default mode)") + } for _, value := range keys { appStore.MountStoreWithDB(value, storetypes.StoreTypeIAVL, nil) @@ -152,30 +165,34 @@ func pruneAppState(home string) error { return fmt.Errorf("the database has no valid heights to prune, the latest height: %v", latestHeight) } - var pruningHeights []int64 - for height := int64(1); height < latestHeight; height++ { - if height < latestHeight-int64(versions) { - pruningHeights = append(pruningHeights, height) - } - } - - //pruningHeight := []int64{latestHeight - int64(versions)} - - if len(pruningHeights) == 0 { - fmt.Println("no heights to prune") + // var pruningHeights []int64 + // for height := int64(1); height < latestHeight; height++ { + // if height < latestHeight-int64(versions) { + // pruningHeights = append(pruningHeights, height) + // } + // } + + // Prune the last X versions + // This is the most efficient way to prune the application state + // as it only needs to delete the last X versions + pruneHeight := latestHeight - int64(versions) + if pruneHeight <= 0 { + logger.Error("no heights to prune") return nil } + pruningHeights := []int64{pruneHeight} + //pruningHeight := []int64{latestHeight - int64(versions)} if err = appStore.PruneStores(false, pruningHeights); err != nil { return err } - fmt.Println("pruning application state complete") + logger.Info("pruning application state complete") - fmt.Println("compacting application state") + logger.Info("compacting application state") if err := appDB.Compact(nil, nil); err != nil { return err } - fmt.Println("compacting application state complete") + logger.Info("compacting application state complete") //create a new app store return nil @@ -197,52 +214,56 @@ func pruneTMData(home string) error { } blockStore := tmstore.NewBlockStore(blockStoreDB) - // Get StateStore - stateDB, err := db.NewGoLevelDBWithOpts("state", dbDir, &o) - if err != nil { - return err - } - - stateStore := state.NewStore(stateDB, state.StoreOptions{ - DiscardABCIResponses: true, - }) - base := blockStore.Base() pruneHeight := blockStore.Height() - int64(blocks) + // Check if there's anything to prune + if pruneHeight <= base { + logger.Error("no blocks to prune", "base", base, "target", pruneHeight) + return nil + } // Get StateStore + errs, _ := errgroup.WithContext(context.Background()) errs.Go(func() error { - fmt.Println("pruning block store") + logger.Info("pruning block store") // prune block store blocks, err = blockStore.PruneBlocks(pruneHeight) if err != nil { return err } - fmt.Println("pruning block store complete") + logger.Info("pruning block store complete") - fmt.Println("compacting block store") + logger.Info("compacting block store") if err := blockStoreDB.Compact(nil, nil); err != nil { return err } - fmt.Println("compacting block store complete") + logger.Info("compacting block store complete") return nil }) - fmt.Println("pruning state store") + logger.Info("pruning state store") + stateDB, err := db.NewGoLevelDBWithOpts("state", dbDir, &o) + if err != nil { + return err + } + + stateStore := state.NewStore(stateDB, state.StoreOptions{ + DiscardABCIResponses: true, + }) // prune state store err = stateStore.PruneStates(base, pruneHeight) if err != nil { return err } - fmt.Println("pruning state store complete") + logger.Info("pruning state store complete") - fmt.Println("compacting state store") + logger.Info("compacting state store") if err := stateDB.Compact(nil, nil); err != nil { return err } - fmt.Println("compacting state store complete") + logger.Info("compacting state store complete") return nil } diff --git a/cmd/root.go b/cmd/root.go index a322282..9cde46c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,19 +3,24 @@ package cmd import ( "os" + "github.com/cometbft/cometbft/libs/log" "github.com/spf13/cobra" "github.com/spf13/viper" ) var ( - dataDir string - backend string - app string - cosmosSdk bool - tendermint bool - blocks uint64 - versions uint64 - appName = "cosmprund" + dataDir string + backend string + app string + cosmosSdk bool + tendermint bool + blocks uint64 + versions uint64 + debug bool + disableFastNode bool + + appName = "cosmprund" + logger log.Logger ) // NewRootCmd returns the root command for relayer. @@ -31,7 +36,15 @@ func NewRootCmd() *cobra.Command { // if err := initConfig(rootCmd); err != nil { // return err // } - + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) + // Set log level based on debug flag + if debug { + // Show all logs including Debug level + logger = log.NewFilter(logger, log.AllowDebug()) + } else { + // Only show Info level and above (hides Debug logs like loadVersion commitID) + logger = log.NewFilter(logger, log.AllowInfo()) + } return nil } @@ -71,6 +84,18 @@ func NewRootCmd() *cobra.Command { panic(err) } + // --debug flag + rootCmd.PersistentFlags().BoolVar(&debug, "debug", false, "enable debug logging (shows debug logs)") + if err := viper.BindPFlag("debug", rootCmd.PersistentFlags().Lookup("debug")); err != nil { + panic(err) + } + + // --disable-fast-node flag + rootCmd.PersistentFlags().BoolVar(&disableFastNode, "disable-fast-node", true, "disable IAVL fast node for faster pruning (default: false, fast node enabled)") + if err := viper.BindPFlag("disable-fast-node", rootCmd.PersistentFlags().Lookup("disable-fast-node")); err != nil { + panic(err) + } + rootCmd.AddCommand( pruneCmd(), ) diff --git a/internal/rootmulti/store.go b/internal/rootmulti/store.go index cef9919..d954690 100644 --- a/internal/rootmulti/store.go +++ b/internal/rootmulti/store.go @@ -209,7 +209,7 @@ func (rs *Store) LoadVersion(ver int64) error { func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { infos := make(map[string]types.StoreInfo) - fmt.Println("loadVersion", "ver", ver) + rs.logger.Info("loadVersion", "ver", ver) cInfo := &types.CommitInfo{} // load old data if we are not version 0 @@ -243,52 +243,105 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { }) } - for _, key := range storesKeys { - storeParams := rs.storesParams[key] - commitID := rs.getCommitID(infos, key.Name()) - fmt.Println("loadVersion commitID", "key", key, "ver", ver, "hash", fmt.Sprintf("%x", commitID.Hash)) - - // If it has been added, set the initial version - if upgrades.IsAdded(key.Name()) || upgrades.RenamedFrom(key.Name()) != "" { - storeParams.initialVersion = uint64(ver) + 1 - } else if commitID.Version != ver && storeParams.typ == types.StoreTypeIAVL { - return fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d; new stores should be added using StoreUpgrades", key.Name(), ver, commitID.Version) + // Parallel loading only when no upgrades (upgrades need deterministic order) + if upgrades == nil || (len(upgrades.Deleted) == 0 && len(upgrades.Added) == 0 && len(upgrades.Renamed) == 0) { + // Parallel loading for better performance + type loadResult struct { + key types.StoreKey + store types.CommitKVStore + err error } - store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams) - if err != nil { - return errors.Wrap(err, "failed to load store") + resultChan := make(chan loadResult, len(storesKeys)) + var wg sync.WaitGroup + + for _, key := range storesKeys { + wg.Add(1) + go func(k types.StoreKey) { + defer wg.Done() + + storeParams := rs.storesParams[k] + commitID := rs.getCommitID(infos, k.Name()) + rs.logger.Info("loadVersion commitID", "key", k, "ver", ver, "hash", fmt.Sprintf("%x", commitID.Hash)) + + // Check version mismatch + if commitID.Version != ver && storeParams.typ == types.StoreTypeIAVL { + resultChan <- loadResult{ + key: k, + err: fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d; new stores should be added using StoreUpgrades", k.Name(), ver, commitID.Version), + } + return + } + + store, err := rs.loadCommitStoreFromParams(k, commitID, storeParams) + + resultChan <- loadResult{ + key: k, + store: store, + err: err, + } + }(key) } - newStores[key] = store + wg.Wait() + close(resultChan) - // If it was deleted, remove all data - if upgrades.IsDeleted(key.Name()) { - if err := deleteKVStore(types.KVStore(store)); err != nil { - return errors.Wrapf(err, "failed to delete store %s", key.Name()) + // Collect results + for result := range resultChan { + if result.err != nil { + return errors.Wrap(result.err, "failed to load store") } - rs.removalMap[key] = true - } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { - // handle renames specially - // make an unregistered key to satisfy loadCommitStore params - oldKey := types.NewKVStoreKey(oldName) - oldParams := newStoreParams(oldKey, storeParams.db, storeParams.typ, 0) - - // load from the old name - oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams) - if err != nil { - return errors.Wrapf(err, "failed to load old store %s", oldName) + newStores[result.key] = result.store + } + } else { + // Sequential loading when upgrades are present (order matters) + for _, key := range storesKeys { + storeParams := rs.storesParams[key] + commitID := rs.getCommitID(infos, key.Name()) + rs.logger.Info("loadVersion commitID", "key", key, "ver", ver, "hash", fmt.Sprintf("%x", commitID.Hash)) + + // If it has been added, set the initial version + if upgrades.IsAdded(key.Name()) || upgrades.RenamedFrom(key.Name()) != "" { + storeParams.initialVersion = uint64(ver) + 1 + } else if commitID.Version != ver && storeParams.typ == types.StoreTypeIAVL { + return fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d; new stores should be added using StoreUpgrades", key.Name(), ver, commitID.Version) } - // move all data - if err := moveKVStoreData(types.KVStore(oldStore), types.KVStore(store)); err != nil { - return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams) + if err != nil { + return errors.Wrap(err, "failed to load store") } - // add the old key so its deletion is committed - newStores[oldKey] = oldStore - // this will ensure it's not perpetually stored in commitInfo - rs.removalMap[oldKey] = true + newStores[key] = store + + // If it was deleted, remove all data + if upgrades.IsDeleted(key.Name()) { + if err := deleteKVStore(types.KVStore(store)); err != nil { + return errors.Wrapf(err, "failed to delete store %s", key.Name()) + } + rs.removalMap[key] = true + } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { + // handle renames specially + // make an unregistered key to satisfy loadCommitStore params + oldKey := types.NewKVStoreKey(oldName) + oldParams := newStoreParams(oldKey, storeParams.db, storeParams.typ, 0) + + // load from the old name + oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams) + if err != nil { + return errors.Wrapf(err, "failed to load old store %s", oldName) + } + + // move all data + if err := moveKVStoreData(types.KVStore(oldStore), types.KVStore(store)); err != nil { + return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + } + + // add the old key so its deletion is committed + newStores[oldKey] = oldStore + // this will ensure it's not perpetually stored in commitInfo + rs.removalMap[oldKey] = true + } } } @@ -448,7 +501,7 @@ func (rs *Store) Commit() types.CommitID { } if rs.commitHeader.Height != version { - fmt.Println("commit header and version mismatch", "header_height", rs.commitHeader.Height, "version", version) + rs.logger.Error("commit header and version mismatch", "header_height", rs.commitHeader.Height, "version", version) } rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap) @@ -606,8 +659,8 @@ func (rs *Store) handlePruning(version int64) error { if !rs.pruningManager.ShouldPruneAtHeight(version) { return nil } - fmt.Println("prune start", "height", version) - defer fmt.Println("prune end", "height", version) + rs.logger.Info("prune start", "height", version) + defer rs.logger.Info("prune end", "height", version) return rs.PruneStores(true, nil) } @@ -622,40 +675,73 @@ func (rs *Store) PruneStores(clearPruningManager bool, pruningHeights []int64) ( } if len(heights) == 0 { - fmt.Println("no heights to be pruned from pruning manager") + rs.logger.Error("no heights to be pruned from pruning manager") } pruningHeights = append(pruningHeights, heights...) } if len(pruningHeights) == 0 { - fmt.Println("no heights need to be pruned") + rs.logger.Error("no heights need to be pruned") return nil } pruneHeight := pruningHeights[len(pruningHeights)-1] - fmt.Println("deleting versions to", "pruneHeight", pruneHeight) + rs.logger.Info("deleting versions to", "pruneHeight", pruneHeight) - for key, store := range rs.stores { - fmt.Println("pruning store", "key", key) // Also log store.name (a private variable)? + // Collect pruning tasks for parallel processing + type pruneTask struct { + key types.StoreKey + store types.CommitKVStore + } + // Collect only IAVL stores that need pruning + tasks := make([]pruneTask, 0, len(rs.stores)) + for key, store := range rs.stores { + rs.logger.Info("pruning store", "key", key) // Also log store.name (a private variable)? // If the store is wrapped with an inter-block cache, we must first unwrap // it to get the underlying IAVL store. if store.GetStoreType() != types.StoreTypeIAVL { continue } + tasks = append(tasks, pruneTask{ + key: key, + store: rs.GetCommitKVStore(key), + }) + } - store = rs.GetCommitKVStore(key) + // Prune each store in parallel using goroutines + errChan := make(chan error, len(tasks)) + var wg sync.WaitGroup - err := store.(*iavl.Store).DeleteVersionsTo(pruneHeight) - if err == nil { - continue - } + for _, task := range tasks { + wg.Add(1) + go func(t pruneTask) { + defer wg.Done() + rs.logger.Info("pruning store", "key", t.key.Name()) - if errCause := errors.Cause(err); errCause != nil && errCause != iavltree.ErrVersionDoesNotExist { + err := t.store.(*iavl.Store).DeleteVersionsTo(pruneHeight) + if err != nil { + if errCause := errors.Cause(err); errCause != nil && errCause != iavltree.ErrVersionDoesNotExist { + errChan <- fmt.Errorf("failed to prune store %s: %w", t.key.Name(), err) + return + } + } + rs.logger.Info("pruning store complete", "key", t.key.Name()) + }(task) + } + + // Wait for all goroutines to complete + wg.Wait() + close(errChan) + + // Check for errors + for err := range errChan { + if err != nil { return err } } + return nil } @@ -804,7 +890,7 @@ func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error { // and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes // are demarcated by new SnapshotStore items. for _, store := range stores { - fmt.Println("starting snapshot", "store", store.name, "height", height) + rs.logger.Info("starting snapshot", "store", store.name, "height", height) exporter, err := store.Export(int64(height)) if err != nil { rs.logger.Error("snapshot failed; exporter error", "store", store.name, "err", err) @@ -826,7 +912,7 @@ func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error { for { node, err := exporter.Next() if err == iavltree.ErrorExportDone { - fmt.Println("snapshot Done", "store", store.name, "nodeCount", nodeCount) + rs.logger.Info("snapshot Done", "store", store.name, "nodeCount", nodeCount) nodeCount = 0 break } else if err != nil { @@ -893,7 +979,7 @@ loop: } defer importer.Close() // Importer height must reflect the node height (which usually matches the block height, but not always) - fmt.Println("restoring snapshot", "store", item.Store.Name) + rs.logger.Info("restoring snapshot", "store", item.Store.Name) case *snapshottypes.SnapshotItem_IAVL: if importer == nil { @@ -960,7 +1046,7 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID // If the whitelist is not empty, enable fast nodes for only the modules in the whitelist. disabledFastNodes = true if _, ok := rs.iavlFastNodeModuleWhitelist[key.Name()]; ok { - fmt.Println("fast node enabled for module", "module", key.Name()) + rs.logger.Info("fast node enabled for module", "module", key.Name()) disabledFastNodes = false } } @@ -1086,14 +1172,14 @@ func (rs *Store) GetCommitInfo(ver int64) (*types.CommitInfo, error) { } func (rs *Store) flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo) { - fmt.Println("flushing metadata", "height", version) + rs.logger.Info("flushing metadata", "height", version) batch := db.NewBatch() defer batch.Close() if cInfo != nil { flushCommitInfo(batch, version, cInfo) } else { - fmt.Println("commitInfo is nil, not flushed", "height", version) + rs.logger.Error("commitInfo is nil, not flushed", "height", version) } flushLatestVersion(batch, version) @@ -1101,7 +1187,7 @@ func (rs *Store) flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo if err := batch.WriteSync(); err != nil { panic(fmt.Errorf("error on batch write %w", err)) } - fmt.Println("flushing metadata finished", "height", version) + rs.logger.Info("flushing metadata finished", "height", version) } type storeParams struct {