diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index ab6d2c93c9f..e248003aebb 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -941,16 +941,16 @@ func (exeNode *ExecutionNode) LoadExecutionStateLedger( module.ReadyDoneAware, error, ) { + // Ledger selection is two independent choices passed to the factory: + // - --payloadless picks the payloadless vs. full ledger (this branch). + // - --ledger-service-addr (Config.LedgerServiceAddr), when set, means this + // node connects to a remote ledger service rather than running a local + // ledger; the factory then returns a gRPC client instead of a local one. + // Combined: payloadless + remote address -> remote payloadless client; + // payloadless + no address -> local payloadless ledger; likewise for full mode. if exeNode.exeConf.payloadless { // Payloadless mode. ValidateFlags enforces --enable-storehouse, // so the storehouse is the value source for reads. - // - // The factory call mirrors the full-mode call below: same Config, - // same triggerCheckpoint. Today the factory body is a placeholder - // (no WAL, no checkpoint load) — see TODOs at - // ledgerfactory.NewPayloadlessLedger. When the WAL/checkpoint - // pieces land, only the factory body changes; this call site stays - // the same. pl, err := ledgerfactory.NewPayloadlessLedger(ledgerfactory.Config{ LedgerServiceAddr: exeNode.exeConf.ledgerServiceAddr, LedgerMaxRequestSize: exeNode.exeConf.ledgerMaxRequestSize, @@ -1485,6 +1485,10 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error { // when bootstrapping, the bootstrap folder must have a checkpoint file // we need to cover this file to the trie folder to restore the trie to restore the execution state. + // + // Note: in payloadless mode the V6 root checkpoint placed here is later + // converted to root.checkpoint.v7 by ledgerfactory.NewPayloadlessLedger + // before the bundle reads it. Bootstrap itself stays mode-agnostic. err = copyBootstrapState(node.BootstrapDir, exeNode.exeConf.triedir) if err != nil { return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err) @@ -1497,11 +1501,16 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error { // HasRootCheckpointV7 guard keeps a re-entry after an interrupted // bootstrap from hitting ConvertCheckpointV6ToV7's "output exists" check. // + // Only nodes running a local payloadless ledger need this: a node using a + // remote ledger service (ledgerServiceAddr set) never reads its local trie + // dir, and the remote ledger service performs its own V7 bootstrap. Skipping + // the conversion avoids a needless full-forest load on remote-ledger nodes. + // // TODO: ConvertCheckpointV6ToV7 reads the entire V6 forest into memory // before emitting V7, a memory/time spike at first boot for mainnet-scale // root checkpoints. A future optimization is to convert subtrie-by-subtrie // without loading the whole forest. - if exeNode.exeConf.payloadless { + if exeNode.exeConf.payloadless && exeNode.exeConf.ledgerServiceAddr == "" { triedir := exeNode.exeConf.triedir hasV7Root, err := wal.HasRootCheckpointV7(triedir) if err != nil { diff --git a/cmd/ledger/main.go b/cmd/ledger/main.go index 0dd48ca6b98..0306f0d76d5 100644 --- a/cmd/ledger/main.go +++ b/cmd/ledger/main.go @@ -18,9 +18,11 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" + "github.com/onflow/flow-go/ledger" ledgerfactory "github.com/onflow/flow-go/ledger/factory" ledgerpb "github.com/onflow/flow-go/ledger/protobuf" "github.com/onflow/flow-go/ledger/remote" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" ) @@ -35,6 +37,7 @@ var ( checkpointDist = flag.Uint("checkpoint-distance", 100, "Checkpoint distance") checkpointsToKeep = flag.Uint("checkpoints-to-keep", 3, "Number of checkpoints to keep") logLevel = flag.String("loglevel", "info", "Log level (panic, fatal, error, warn, info, debug)") + payloadless = flag.Bool("payloadless", false, "Run the ledger service in payloadless mode (stores leaf hashes instead of full payloads; requires a V7 checkpoint in --triedir).") maxRequestSize = flag.Uint("max-request-size", 1<<30, "Maximum request message size in bytes (default: 1 GiB)") maxResponseSize = flag.Uint("max-response-size", 1<<30, "Maximum response message size in bytes (default: 1 GiB)") ) @@ -72,14 +75,18 @@ func main() { Str("admin_addr", *adminAddr). Uint("metrics_port", *metricsPort). Int("mtrie_cache_size", *mtrieCacheSize). + Bool("payloadless", *payloadless). Msg("starting ledger service") // Create trigger for manual checkpointing (used by admin command) triggerCheckpointOnNextSegmentFinish := atomic.NewBool(false) - // Create ledger using factory + // Create ledger using factory. The same config drives both modes; the + // payloadless flag selects which factory constructor (and gRPC service) is + // wired up. A ledger gRPC server registers either the full [remote.Service] + // or the [remote.PayloadlessService], never both. metricsCollector := metrics.NewLedgerCollector("ledger", "wal") - ledgerStorage, err := ledgerfactory.NewLedger(ledgerfactory.Config{ + factoryConfig := ledgerfactory.Config{ Triedir: *triedir, MTrieCacheSize: uint32(*mtrieCacheSize), CheckpointDistance: *checkpointDist, @@ -88,9 +95,32 @@ func main() { WALMetrics: metricsCollector, LedgerMetrics: metricsCollector, Logger: logger, - }, triggerCheckpointOnNextSegmentFinish) - if err != nil { - logger.Fatal().Err(err).Msg("failed to create ledger") + } + + // ledgerStorage is the lifecycle handle used for readiness, health check, + // and shutdown regardless of mode. registerService binds the mode-specific + // gRPC service onto the server once it is created. + var ledgerStorage module.ReadyDoneAware + var registerService func(grpcServer *grpc.Server) + + if *payloadless { + payloadlessLedger, err := ledgerfactory.NewPayloadlessLedger(factoryConfig, triggerCheckpointOnNextSegmentFinish) + if err != nil { + logger.Fatal().Err(err).Msg("failed to create payloadless ledger") + } + ledgerStorage = payloadlessLedger + registerService = func(grpcServer *grpc.Server) { + ledgerpb.RegisterPayloadlessLedgerServiceServer(grpcServer, remote.NewPayloadlessService(payloadlessLedger, logger)) + } + } else { + fullLedger, err := ledgerfactory.NewLedger(factoryConfig, triggerCheckpointOnNextSegmentFinish) + if err != nil { + logger.Fatal().Err(err).Msg("failed to create ledger") + } + ledgerStorage = fullLedger + registerService = func(grpcServer *grpc.Server) { + ledgerpb.RegisterLedgerServiceServer(grpcServer, remote.NewService(fullLedger, logger)) + } } // Wait for ledger to be ready (WAL replay) @@ -98,14 +128,25 @@ func main() { <-ledgerStorage.Ready() logger.Info().Msg("ledger ready") + // Both the full and payloadless ledgers expose state inspection for the + // post-startup health check, though only the full ledger declares it on its + // public interface; assert it here so the check works in either mode. + inspector, ok := ledgerStorage.(interface { + StateCount() int + StateByIndex(index int) (ledger.State, error) + }) + if !ok { + logger.Fatal().Msg("ledger does not support state inspection") + } + // Check if any trie is loaded after startup - stateCount := ledgerStorage.StateCount() + stateCount := inspector.StateCount() if stateCount == 0 { logger.Fatal().Msg("no trie loaded after startup - no states available") } // Get the last trie state for logging - lastState, err := ledgerStorage.StateByIndex(-1) + lastState, err := inspector.StateByIndex(-1) if err != nil { logger.Fatal().Err(err).Msg("failed to get last state for logging") } @@ -123,9 +164,8 @@ func main() { grpc.MaxSendMsgSize(int(*maxResponseSize)), ) - // Create and register ledger service - ledgerService := remote.NewService(ledgerStorage, logger) - ledgerpb.RegisterLedgerServiceServer(grpcServer, ledgerService) + // Register the mode-specific ledger service + registerService(grpcServer) // Create listeners based on provided flags type listenerInfo struct { diff --git a/integration/localnet/Makefile b/integration/localnet/Makefile index 4a2bb2a4413..075f3528f95 100644 --- a/integration/localnet/Makefile +++ b/integration/localnet/Makefile @@ -5,6 +5,7 @@ EXECUTION = 2 VALID_EXECUTION := $(shell test $(EXECUTION) -ge 2; echo $$?) LEDGER_EXECUTION = 0 VALID_LEDGER_EXECUTION := $(shell test $(LEDGER_EXECUTION) -le $(EXECUTION); echo $$?) +PAYLOADLESS = false TEST_EXECUTION = 0 VERIFICATION = 1 ACCESS = 1 @@ -79,7 +80,8 @@ else -extensive-tracing=$(EXTENSIVE_TRACING) \ -consensus-delay=$(CONSENSUS_DELAY) \ -collection-delay=$(COLLECTION_DELAY) \ - -ledger-execution=$(LEDGER_EXECUTION) + -ledger-execution=$(LEDGER_EXECUTION) \ + -payloadless=$(PAYLOADLESS) endif # Creates a light version of the localnet with just 1 instance for each node type diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 70be30ede2a..a9a5ee5d635 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -15,6 +15,7 @@ import ( "time" "github.com/go-yaml/yaml" + "github.com/rs/zerolog" "github.com/onflow/flow-go/cmd/build" "github.com/onflow/flow-go/ledger/complete/wal" @@ -81,6 +82,7 @@ var ( consensusDelay time.Duration collectionDelay time.Duration logLevel string + payloadless bool ports *PortAllocator ) @@ -109,6 +111,7 @@ func init() { flag.DurationVar(&collectionDelay, "collection-delay", DefaultCollectionDelay, "delay on collection node block proposals") flag.StringVar(&logLevel, "loglevel", DefaultLogLevel, "log level for all nodes") flag.IntVar(&ledgerExecutionCount, "ledger-execution", 0, "number of execution nodes that use remote ledger service (0 = all use local ledger, max = execution count)") + flag.BoolVar(&payloadless, "payloadless", false, "enable payloadless trie mode (stores payload hashes instead of full payloads)") } func generateBootstrapData(flowNetworkConf testnet.NetworkConfig) []testnet.ContainerConfig { @@ -482,6 +485,21 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se ) } + // In payloadless mode, both remote-ledger and local-ledger execution nodes + // must run payloadless. The flag selects the payloadless committer, state + // checker, and ledger client. A remote-ledger node missing this flag would + // build a full ledger.LedgerService client and fail against a payloadless + // ledger service with "unknown service ledger.LedgerService". + if payloadless { + service.Command = append(service.Command, "--payloadless") + } + + // Payloadless mode requires storehouse to store the actual payloads + // (the trie only stores payload hashes) + if payloadless { + service.Command = append(service.Command, "--enable-storehouse") + } + service.AddExposedPorts(testnet.GRPCPort) return service @@ -834,20 +852,58 @@ func prepareLedgerService(dockerServices Services, flowNodeContainerConfigs []te // 2. Ledger service has /trie mounted and can follow symlinks to /bootstrap (via execution node's mount) // 3. We create symlinks using relative paths that work in both host and container contexts bootstrapExecutionStateDir := filepath.Join(BootstrapDir, bootstrapFilenames.DirnameExecutionState) - checkpointSource := filepath.Join(bootstrapExecutionStateDir, bootstrapFilenames.FilenameWALRootCheckpoint) - if _, err := os.Stat(checkpointSource); err == nil { - // Checkpoint exists, create symlinks on host - // The symlinks will use relative paths that resolve correctly inside containers - // because both /bootstrap and /trie are mounted in the containers + + // Create symlinks for V6 checkpoint + checkpointSourceV6 := filepath.Join(bootstrapExecutionStateDir, bootstrapFilenames.FilenameWALRootCheckpoint) + if _, err := os.Stat(checkpointSourceV6); err == nil { + // V6 checkpoint exists, create symlinks on host _, err = wal.SoftlinkCheckpointFile(bootstrapFilenames.FilenameWALRootCheckpoint, bootstrapExecutionStateDir, trieDir) if err != nil { - panic(fmt.Errorf("failed to create checkpoint symlinks: %w", err)) + panic(fmt.Errorf("failed to create V6 checkpoint symlinks: %w", err)) + } + fmt.Printf("created V6 checkpoint symlinks in trie directory: %s\n", trieDir) + } else { + fmt.Printf("V6 root checkpoint not found in %s\n", checkpointSourceV6) + } + + // Create symlinks for V7 checkpoint (payloadless) + v7Filename := bootstrapFilenames.FilenameWALRootCheckpoint + wal.V7FileSuffix + checkpointSourceV7 := filepath.Join(bootstrapExecutionStateDir, v7Filename) + + // In payloadless mode a spork only produces a V6 root.checkpoint, and the + // ledger service has no bootstrapper of its own to convert it. Convert the V6 + // root checkpoint into a V7 root checkpoint here (once, at bootstrap time); + // the symlink block below then seeds the ledger service's trie directory from + // it. On restart the ledger factory finds an existing V7 checkpoint (this root + // or a newer numbered one written by the compactor), so no conversion is + // needed at runtime. The os.Stat guard makes a re-run of `make bootstrap` + // idempotent and avoids ConvertCheckpointV6ToV7's "output exists" rejection. + if payloadless { + if _, err := os.Stat(checkpointSourceV7); errors.Is(err, fs.ErrNotExist) { + logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + if convertErr := wal.ConvertCheckpointV6ToV7( + bootstrapExecutionStateDir, + bootstrapFilenames.FilenameWALRootCheckpoint, + bootstrapExecutionStateDir, + v7Filename, + logger, + 16, + ); convertErr != nil { + panic(fmt.Errorf("failed to convert V6 root checkpoint to V7 for payloadless ledger service: %w", convertErr)) + } + fmt.Printf("converted V6 root checkpoint to V7 in %s\n", bootstrapExecutionStateDir) + } + } + + if _, err := os.Stat(checkpointSourceV7); err == nil { + // V7 checkpoint exists, create symlinks on host + _, err = wal.SoftlinkCheckpointFile(v7Filename, bootstrapExecutionStateDir, trieDir) + if err != nil { + panic(fmt.Errorf("failed to create V7 checkpoint symlinks: %w", err)) } - fmt.Printf("created checkpoint symlinks in trie directory: %s\n", trieDir) + fmt.Printf("created V7 checkpoint symlinks in trie directory: %s\n", trieDir) } else { - // Checkpoint doesn't exist, this is expected for fresh bootstrap - // The execution node will create it when it initializes - fmt.Printf("root checkpoint not found in %s, ledger service will start with empty state\n", checkpointSource) + fmt.Printf("V7 root checkpoint not found in %s\n", checkpointSourceV7) } // Allocate ports for ledger service @@ -865,17 +921,22 @@ func prepareLedgerService(dockerServices Services, flowNodeContainerConfigs []te // Create ledger service // Use Unix domain socket; ledger and execution nodes share absSocketDir mounted at /sockets + ledgerCommand := []string{ + "--triedir=/trie", + "--ledger-service-socket=/sockets/ledger.sock", + "--mtrie-cache-size=100", + "--checkpoint-distance=100", + "--checkpoints-to-keep=3", + fmt.Sprintf("--loglevel=%s", logLevel), + } + if payloadless { + ledgerCommand = append(ledgerCommand, "--payloadless") + } + service := Service{ - name: ledgerServiceName, - Image: "localnet-ledger", - Command: []string{ - "--triedir=/trie", - "--ledger-service-socket=/sockets/ledger.sock", - "--mtrie-cache-size=100", - "--checkpoint-distance=100", - "--checkpoints-to-keep=3", - fmt.Sprintf("--loglevel=%s", logLevel), - }, + name: ledgerServiceName, + Image: "localnet-ledger", + Command: ledgerCommand, Volumes: []string{ fmt.Sprintf("%s:/trie:z", trieDir), fmt.Sprintf("%s:/bootstrap:z", BootstrapDir), @@ -963,3 +1024,4 @@ func prepareTestExecutionService(dockerServices Services, flowNodeContainerConfi return dockerServices } + diff --git a/ledger/complete/factory.go b/ledger/complete/factory.go deleted file mode 100644 index 2152a1143f2..00000000000 --- a/ledger/complete/factory.go +++ /dev/null @@ -1,59 +0,0 @@ -package complete - -import ( - "github.com/rs/zerolog" - "go.uber.org/atomic" - - "github.com/onflow/flow-go/ledger" - "github.com/onflow/flow-go/ledger/complete/wal" - "github.com/onflow/flow-go/module" -) - -// LocalLedgerFactory creates in-process ledger instances with compactor. -type LocalLedgerFactory struct { - wal wal.LedgerWAL - capacity int - compactorConfig *ledger.CompactorConfig - triggerCheckpoint *atomic.Bool - metrics module.LedgerMetrics - logger zerolog.Logger - pathFinderVersion uint8 -} - -// NewLocalLedgerFactory creates a new factory for local ledger instances. -// triggerCheckpoint is a runtime control signal to trigger checkpoint on next segment finish. -func NewLocalLedgerFactory( - ledgerWAL wal.LedgerWAL, - capacity int, - compactorConfig *ledger.CompactorConfig, - triggerCheckpoint *atomic.Bool, - metrics module.LedgerMetrics, - logger zerolog.Logger, - pathFinderVersion uint8, -) ledger.Factory { - return &LocalLedgerFactory{ - wal: ledgerWAL, - capacity: capacity, - compactorConfig: compactorConfig, - triggerCheckpoint: triggerCheckpoint, - metrics: metrics, - logger: logger, - pathFinderVersion: pathFinderVersion, - } -} - -func (f *LocalLedgerFactory) NewLedger() (ledger.Ledger, error) { - ledgerWithCompactor, err := NewLedgerWithCompactor( - f.wal, - f.capacity, - f.compactorConfig, - f.triggerCheckpoint, - f.metrics, - f.logger, - f.pathFinderVersion, - ) - if err != nil { - return nil, err - } - return ledgerWithCompactor, nil -} diff --git a/ledger/complete/payloadless_ledger_with_compactor_test.go b/ledger/complete/payloadless_ledger_with_compactor_test.go index 9858831f9e2..8e069e96039 100644 --- a/ledger/complete/payloadless_ledger_with_compactor_test.go +++ b/ledger/complete/payloadless_ledger_with_compactor_test.go @@ -12,10 +12,29 @@ import ( "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/pathfinder" "github.com/onflow/flow-go/ledger/complete" + "github.com/onflow/flow-go/ledger/complete/payloadless" realWAL "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/module/metrics" ) +// seedV7Root writes a minimal V7 root checkpoint (a single empty payloadless +// trie) into dir. Tests that construct NewPayloadlessLedgerWithCompactor +// directly against a fresh temp dir need this because the bundle now refuses +// to start without seedable V7 state on disk; in production the equivalent +// seeding is performed by ledger/factory.NewPayloadlessLedger when it +// converts a V6 root to V7. +func seedV7Root(t *testing.T, dir string) { + t.Helper() + err := realWAL.StoreCheckpointV7( + []*payloadless.MTrie{payloadless.NewEmptyMTrie()}, + dir, + realWAL.RootCheckpointFilenameV7(), + zerolog.Nop(), + 1, + ) + require.NoError(t, err) +} + // buildDiskWAL returns a fresh DiskWAL bound to the given directory. The // caller is responsible for Ready/Done lifecycle (typically handled by the // bundle). @@ -39,9 +58,11 @@ func buildDiskWAL(t *testing.T, dir string) *realWAL.DiskWAL { } // TestPayloadlessLedgerWithCompactor_NewEmpty constructs the bundle against a -// fresh directory and verifies the lifecycle and basic API surface. +// fresh directory seeded with an empty V7 root checkpoint and verifies the +// lifecycle and basic API surface. func TestPayloadlessLedgerWithCompactor_NewEmpty(t *testing.T) { dir := t.TempDir() + seedV7Root(t, dir) diskWAL := buildDiskWAL(t, dir) bundle, err := complete.NewPayloadlessLedgerWithCompactor( @@ -74,6 +95,7 @@ func TestPayloadlessLedgerWithCompactor_NewEmpty(t *testing.T) { // the replayed forest contains the same state. func TestPayloadlessLedgerWithCompactor_SetPersists(t *testing.T) { dir := t.TempDir() + seedV7Root(t, dir) // First run: apply updates and capture the final state. var finalState ledger.State @@ -135,6 +157,33 @@ func TestPayloadlessLedgerWithCompactor_SetPersists(t *testing.T) { "replayed forest should contain final state %s", finalState) } +// TestPayloadlessLedgerWithCompactor_RequiresV7Checkpoint verifies the +// constructor refuses to start when the directory contains no V7 checkpoint +// (neither numbered nor root). The error message should mention what's +// missing so an operator can act on it. +func TestPayloadlessLedgerWithCompactor_RequiresV7Checkpoint(t *testing.T) { + dir := t.TempDir() + // No seedV7Root: dir is entirely empty. + diskWAL := buildDiskWAL(t, dir) + + _, err := complete.NewPayloadlessLedgerWithCompactor( + diskWAL, + 100, + &ledger.CompactorConfig{ + CheckpointCapacity: 100, + CheckpointDistance: 100, + CheckpointsToKeep: 10, + Metrics: &metrics.NoopCollector{}, + }, + atomic.NewBool(false), + &metrics.NoopCollector{}, + zerolog.Nop(), + complete.DefaultPathFinderVersion, + ) + require.Error(t, err) + require.Contains(t, err.Error(), "no V7 checkpoint found") +} + // TestPayloadlessLedgerWithCompactor_RequiresWAL verifies the constructor // rejects a nil WAL — that path is intended for direct in-memory construction // via NewPayloadlessLedger(nil, ...). @@ -160,6 +209,7 @@ func TestPayloadlessLedgerWithCompactor_RequiresWAL(t *testing.T) { // flag and verifies a V7 checkpoint file is produced. func TestPayloadlessLedgerWithCompactor_TriggerCheckpoint(t *testing.T) { dir := t.TempDir() + seedV7Root(t, dir) diskWAL := buildDiskWAL(t, dir) trigger := atomic.NewBool(false) diff --git a/ledger/complete/wal/checkpointer.go b/ledger/complete/wal/checkpointer.go index d676f1958c5..ea841cad571 100644 --- a/ledger/complete/wal/checkpointer.go +++ b/ledger/complete/wal/checkpointer.go @@ -967,7 +967,7 @@ func HasRootCheckpoint(dir string) (bool, error) { // HasRootCheckpointV7 checks if a V7 (payloadless) root checkpoint exists. func HasRootCheckpointV7(dir string) (bool, error) { - if _, err := os.Stat(path.Join(dir, bootstrap.FilenameWALRootCheckpoint+V7FileSuffix)); err == nil { + if _, err := os.Stat(path.Join(dir, RootCheckpointFilenameV7())); err == nil { return true, nil } else if os.IsNotExist(err) { return false, nil @@ -976,6 +976,14 @@ func HasRootCheckpointV7(dir string) (bool, error) { } } +// RootCheckpointFilenameV7 returns the on-disk filename of the V7 +// (payloadless) root checkpoint. The V7 file lives alongside the V6 root +// (bootstrap.FilenameWALRootCheckpoint), with the V7 suffix appended, so +// the two can coexist while a node is being migrated between modes. +func RootCheckpointFilenameV7() string { + return bootstrap.FilenameWALRootCheckpoint + V7FileSuffix +} + func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error { // Try to remove both V6 and V7 versions if they exist v6Name := NumberToFilename(checkpoint) diff --git a/ledger/complete/wal/wal.go b/ledger/complete/wal/wal.go index 621e695a241..7ea8fc7c348 100644 --- a/ledger/complete/wal/wal.go +++ b/ledger/complete/wal/wal.go @@ -143,10 +143,18 @@ func (w *DiskWAL) ReplayOnForest(forest *mtrie.Forest) error { // // When no numbered V7 checkpoint is available it falls back to a V7 root // checkpoint (converted from the V6 root.checkpoint during bootstrap), mirroring -// the V6 root-checkpoint fallback in [DiskWAL.replay]. With no V7 checkpoint of -// either kind it replays all segments onto the (presumably empty) `forest`. +// the V6 root-checkpoint fallback in [DiskWAL.replay]. // -// No error returns are expected during normal operation. +// A V7 checkpoint of one kind or the other is required: a payloadless forest +// retains only leaf-hash commitments, which cannot be reconstructed by WAL +// replay alone (the WAL records full payload updates, but replaying every update +// from genesis to rebuild the commitment is not feasible at runtime). When +// neither a numbered V7 checkpoint nor a V7 root checkpoint is present, this +// refuses to seed rather than silently booting an empty, uncommitted forest. +// +// Expected error returns during normal operation: +// - error containing "no V7 checkpoint found": when the WAL directory contains +// no V7 checkpoint of either kind, so the forest cannot be seeded. func (w *DiskWAL) ReplayOnPayloadlessForest(forest *payloadless.Forest) error { checkpointer, err := w.NewCheckpointer() if err != nil { @@ -158,6 +166,16 @@ func (w *DiskWAL) ReplayOnPayloadlessForest(forest *payloadless.Forest) error { return fmt.Errorf("cannot load latest V7 checkpoint: %w", err) } + // LoadLatestCheckpointV7 returns no tries and loadedCheckpoint == -1 only when + // neither a numbered V7 checkpoint nor a V7 root checkpoint was found. In that + // case there is no seed for the leaf-hash commitment, so refuse to start. + if loadedCheckpoint < 0 && len(tries) == 0 { + return fmt.Errorf( + "no V7 checkpoint found in %s; a V7 checkpoint is required to start a payloadless ledger", + w.wal.Dir(), + ) + } + if err := forest.AddTries(tries); err != nil { return fmt.Errorf("failed to seed payloadless forest from V7 checkpoint: %w", err) } diff --git a/ledger/factory.go b/ledger/factory.go deleted file mode 100644 index 29d656a6d4e..00000000000 --- a/ledger/factory.go +++ /dev/null @@ -1,9 +0,0 @@ -package ledger - -// Factory creates ledger instances with internal compaction management. -// The compactor lifecycle is managed internally by the ledger. -type Factory interface { - // NewLedger creates a new ledger instance with internal compactor. - // The ledger's Ready() method will signal when initialization (WAL replay) is complete. - NewLedger() (Ledger, error) -} diff --git a/ledger/factory/factory.go b/ledger/factory/factory.go index 07eb961e931..f1cb5eca223 100644 --- a/ledger/factory/factory.go +++ b/ledger/factory/factory.go @@ -46,23 +46,25 @@ func NewLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.Ledger, er // newRemoteLedger creates a remote ledger client that connects to a ledger service. func newRemoteLedger(config Config) (ledger.Ledger, error) { - config.Logger.Info(). + logger := config.Logger.With().Str("subcomponent", "ledger").Logger() + logger.Info(). Str("ledger_service_addr", config.LedgerServiceAddr). Msg("using remote ledger service") - factory := remote.NewRemoteLedgerFactory( - config.LedgerServiceAddr, - config.Logger.With().Str("subcomponent", "ledger").Logger(), - config.LedgerMaxRequestSize, - config.LedgerMaxResponseSize, - ) + var opts []remote.ClientOption + if config.LedgerMaxRequestSize > 0 { + opts = append(opts, remote.WithMaxRequestSize(config.LedgerMaxRequestSize)) + } + if config.LedgerMaxResponseSize > 0 { + opts = append(opts, remote.WithMaxResponseSize(config.LedgerMaxResponseSize)) + } - ledgerStorage, err := factory.NewLedger() + client, err := remote.NewClient(config.LedgerServiceAddr, logger, opts...) if err != nil { return nil, fmt.Errorf("failed to create remote ledger: %w", err) } - return ledgerStorage, nil + return client, nil } // newLocalLedger creates a local ledger with WAL and compactor. @@ -97,8 +99,8 @@ func newLocalLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.Ledge Metrics: config.WALMetrics, } - // Use factory to create ledger with internal compactor - factory := complete.NewLocalLedgerFactory( + // Create ledger with internal compactor + ledgerStorage, err := complete.NewLedgerWithCompactor( diskWal, int(config.MTrieCacheSize), compactorConfig, @@ -107,8 +109,6 @@ func newLocalLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.Ledge config.Logger.With().Str("subcomponent", "ledger").Logger(), complete.DefaultPathFinderVersion, ) - - ledgerStorage, err := factory.NewLedger() if err != nil { return nil, fmt.Errorf("failed to create local ledger: %w", err) } @@ -116,13 +116,53 @@ func newLocalLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.Ledge return ledgerStorage, nil } -// NewPayloadlessLedger creates a payloadless ledger instance. +// NewPayloadlessLedger creates a payloadless ledger instance based on the +// configuration. If LedgerServiceAddr is set, it creates a remote payloadless +// ledger client. Otherwise, it creates a local payloadless ledger with WAL +// and compactor. +// +// This is the payloadless-mode counterpart of [NewLedger]. The signature and +// dispatch shape mirror that function so call sites in +// cmd/execution_builder.go can switch between the two without changing how +// config is plumbed. // -// This is the payloadless-mode counterpart of [NewLedger]. It mirrors that -// function's signature and contract so call sites in cmd/execution_builder.go -// can switch between the two without changing how config is plumbed — -// including the requirement that config.Triedir be non-empty (the same -// requirement [newLocalLedger] places on the V6 ledger). +// triggerCheckpoint is a runtime control signal to trigger checkpoint on +// next segment finish (ignored by the remote client; can be nil). +func NewPayloadlessLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.PayloadlessLedger, error) { + if config.LedgerServiceAddr != "" { + return newRemotePayloadlessLedger(config) + } + return newLocalPayloadlessLedger(config, triggerCheckpoint) +} + +// newRemotePayloadlessLedger creates a remote payloadless ledger client that +// connects to a payloadless ledger service over gRPC. The client's [Ready] +// method verifies the server is running in payloadless mode and crashes if +// it is not — i.e. a wrong-mode server is treated as a deployment error, not +// a retryable failure. +func newRemotePayloadlessLedger(config Config) (ledger.PayloadlessLedger, error) { + logger := config.Logger.With().Str("subcomponent", "ledger").Logger() + logger.Info(). + Str("ledger_service_addr", config.LedgerServiceAddr). + Msg("using remote payloadless ledger service") + + var opts []remote.ClientOption + if config.LedgerMaxRequestSize > 0 { + opts = append(opts, remote.WithMaxRequestSize(config.LedgerMaxRequestSize)) + } + if config.LedgerMaxResponseSize > 0 { + opts = append(opts, remote.WithMaxResponseSize(config.LedgerMaxResponseSize)) + } + + client, err := remote.NewPayloadlessClient(config.LedgerServiceAddr, logger, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create remote payloadless ledger client: %w", err) + } + return client, nil +} + +// newLocalPayloadlessLedger creates a local payloadless ledger with WAL and +// compactor, mirroring [newLocalLedger] for the full ledger. // // The factory opens a [wal.DiskWAL] over config.Triedir and returns a // [complete.PayloadlessLedgerWithCompactor], which: @@ -136,15 +176,13 @@ func newLocalLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.Ledge // Either a numbered V7 checkpoint or a V7 root checkpoint must be present in // config.Triedir. If only V6 checkpoints exist (no V7 of either kind), the // factory logs a hint pointing to the checkpoint-convert-v7 utility and refuses -// to start. -// -// TODO: remote payloadless ledger client. When config.LedgerServiceAddr is -// set, this factory should construct a remote.PayloadlessClient (Spec 004). -// For now config.LedgerServiceAddr is ignored. +// to start — the leaf-hash commitment cannot be reconstructed by WAL replay +// alone. // // Expected error returns during normal operation: // - error if config.Triedir is empty -func NewPayloadlessLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.PayloadlessLedger, error) { +// - error if no V7 (payloadless) checkpoint exists in config.Triedir +func newLocalPayloadlessLedger(config Config, triggerCheckpoint *atomic.Bool) (ledger.PayloadlessLedger, error) { logger := config.Logger.With().Str("subcomponent", "ledger").Logger() if config.Triedir == "" { diff --git a/ledger/factory/factory_test.go b/ledger/factory/factory_test.go index ef8714a0297..259e926da42 100644 --- a/ledger/factory/factory_test.go +++ b/ledger/factory/factory_test.go @@ -388,8 +388,8 @@ func startLedgerServer(t *testing.T, walDir string) (string, func()) { // Create compactor config compactorConfig := ledger.DefaultCompactorConfig(metricsCollector) - // Create ledger factory - factory := complete.NewLocalLedgerFactory( + // Create ledger instance with internal compactor + ledgerStorage, err := complete.NewLedgerWithCompactor( diskWal, 100, compactorConfig, @@ -398,9 +398,6 @@ func startLedgerServer(t *testing.T, walDir string) (string, func()) { logger, complete.DefaultPathFinderVersion, ) - - // Create ledger instance - ledgerStorage, err := factory.NewLedger() require.NoError(t, err) // Wait for ledger to be ready (WAL replay) diff --git a/ledger/mock/factory.go b/ledger/mock/factory.go deleted file mode 100644 index 4c26a640169..00000000000 --- a/ledger/mock/factory.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by mockery; DO NOT EDIT. -// github.com/vektra/mockery -// template: testify - -package mock - -import ( - "github.com/onflow/flow-go/ledger" - mock "github.com/stretchr/testify/mock" -) - -// NewFactory creates a new instance of Factory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewFactory(t interface { - mock.TestingT - Cleanup(func()) -}) *Factory { - mock := &Factory{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// Factory is an autogenerated mock type for the Factory type -type Factory struct { - mock.Mock -} - -type Factory_Expecter struct { - mock *mock.Mock -} - -func (_m *Factory) EXPECT() *Factory_Expecter { - return &Factory_Expecter{mock: &_m.Mock} -} - -// NewLedger provides a mock function for the type Factory -func (_mock *Factory) NewLedger() (ledger.Ledger, error) { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for NewLedger") - } - - var r0 ledger.Ledger - var r1 error - if returnFunc, ok := ret.Get(0).(func() (ledger.Ledger, error)); ok { - return returnFunc() - } - if returnFunc, ok := ret.Get(0).(func() ledger.Ledger); ok { - r0 = returnFunc() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(ledger.Ledger) - } - } - if returnFunc, ok := ret.Get(1).(func() error); ok { - r1 = returnFunc() - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// Factory_NewLedger_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewLedger' -type Factory_NewLedger_Call struct { - *mock.Call -} - -// NewLedger is a helper method to define mock.On call -func (_e *Factory_Expecter) NewLedger() *Factory_NewLedger_Call { - return &Factory_NewLedger_Call{Call: _e.mock.On("NewLedger")} -} - -func (_c *Factory_NewLedger_Call) Run(run func()) *Factory_NewLedger_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Factory_NewLedger_Call) Return(ledger1 ledger.Ledger, err error) *Factory_NewLedger_Call { - _c.Call.Return(ledger1, err) - return _c -} - -func (_c *Factory_NewLedger_Call) RunAndReturn(run func() (ledger.Ledger, error)) *Factory_NewLedger_Call { - _c.Call.Return(run) - return _c -} diff --git a/ledger/remote/factory.go b/ledger/remote/factory.go deleted file mode 100644 index d7e5ad89b98..00000000000 --- a/ledger/remote/factory.go +++ /dev/null @@ -1,46 +0,0 @@ -package remote - -import ( - "github.com/rs/zerolog" - - "github.com/onflow/flow-go/ledger" -) - -// RemoteLedgerFactory creates remote ledger instances via gRPC. -type RemoteLedgerFactory struct { - grpcAddr string - logger zerolog.Logger - maxRequestSize uint - maxResponseSize uint -} - -// NewRemoteLedgerFactory creates a new factory for remote ledger instances. -// maxRequestSize and maxResponseSize specify the maximum message sizes in bytes. -// If both are 0, defaults to 1 GiB for both requests and responses. -func NewRemoteLedgerFactory( - grpcAddr string, - logger zerolog.Logger, - maxRequestSize, maxResponseSize uint, -) ledger.Factory { - return &RemoteLedgerFactory{ - grpcAddr: grpcAddr, - logger: logger, - maxRequestSize: maxRequestSize, - maxResponseSize: maxResponseSize, - } -} - -func (f *RemoteLedgerFactory) NewLedger() (ledger.Ledger, error) { - var opts []ClientOption - if f.maxRequestSize > 0 { - opts = append(opts, WithMaxRequestSize(f.maxRequestSize)) - } - if f.maxResponseSize > 0 { - opts = append(opts, WithMaxResponseSize(f.maxResponseSize)) - } - client, err := NewClient(f.grpcAddr, f.logger, opts...) - if err != nil { - return nil, err - } - return client, nil -}