Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,10 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {

// when bootstrapping, the bootstrap folder must have a checkpoint file
// we need to cover this file to the trie folder to restore the trie to restore the execution state.
//
// Note: in payloadless mode the V6 root checkpoint placed here is later
// converted to root.checkpoint.v7 by ledgerfactory.NewPayloadlessLedger
// before the bundle reads it. Bootstrap itself stays mode-agnostic.
err = copyBootstrapState(node.BootstrapDir, exeNode.exeConf.triedir)
if err != nil {
return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err)
Expand Down
60 changes: 50 additions & 10 deletions cmd/ledger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/onflow/flow-go/ledger"
ledgerfactory "github.com/onflow/flow-go/ledger/factory"
ledgerpb "github.com/onflow/flow-go/ledger/protobuf"
"github.com/onflow/flow-go/ledger/remote"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
)
Expand All @@ -35,6 +37,7 @@ var (
checkpointDist = flag.Uint("checkpoint-distance", 100, "Checkpoint distance")
checkpointsToKeep = flag.Uint("checkpoints-to-keep", 3, "Number of checkpoints to keep")
logLevel = flag.String("loglevel", "info", "Log level (panic, fatal, error, warn, info, debug)")
payloadless = flag.Bool("payloadless", false, "Run the ledger service in payloadless mode (stores leaf hashes instead of full payloads; requires a V7 checkpoint in --triedir).")
maxRequestSize = flag.Uint("max-request-size", 1<<30, "Maximum request message size in bytes (default: 1 GiB)")
maxResponseSize = flag.Uint("max-response-size", 1<<30, "Maximum response message size in bytes (default: 1 GiB)")
)
Expand Down Expand Up @@ -72,14 +75,18 @@ func main() {
Str("admin_addr", *adminAddr).
Uint("metrics_port", *metricsPort).
Int("mtrie_cache_size", *mtrieCacheSize).
Bool("payloadless", *payloadless).
Msg("starting ledger service")

// Create trigger for manual checkpointing (used by admin command)
triggerCheckpointOnNextSegmentFinish := atomic.NewBool(false)

// Create ledger using factory
// Create ledger using factory. The same config drives both modes; the
// payloadless flag selects which factory constructor (and gRPC service) is
// wired up. A ledger gRPC server registers either the full [remote.Service]
// or the [remote.PayloadlessService], never both.
metricsCollector := metrics.NewLedgerCollector("ledger", "wal")
ledgerStorage, err := ledgerfactory.NewLedger(ledgerfactory.Config{
factoryConfig := ledgerfactory.Config{
Triedir: *triedir,
MTrieCacheSize: uint32(*mtrieCacheSize),
CheckpointDistance: *checkpointDist,
Expand All @@ -88,24 +95,58 @@ func main() {
WALMetrics: metricsCollector,
LedgerMetrics: metricsCollector,
Logger: logger,
}, triggerCheckpointOnNextSegmentFinish)
if err != nil {
logger.Fatal().Err(err).Msg("failed to create ledger")
}

// ledgerStorage is the lifecycle handle used for readiness, health check,
// and shutdown regardless of mode. registerService binds the mode-specific
// gRPC service onto the server once it is created.
var ledgerStorage module.ReadyDoneAware
var registerService func(grpcServer *grpc.Server)

if *payloadless {
payloadlessLedger, err := ledgerfactory.NewPayloadlessLedger(factoryConfig, triggerCheckpointOnNextSegmentFinish)
if err != nil {
logger.Fatal().Err(err).Msg("failed to create payloadless ledger")
}
ledgerStorage = payloadlessLedger
registerService = func(grpcServer *grpc.Server) {
ledgerpb.RegisterPayloadlessLedgerServiceServer(grpcServer, remote.NewPayloadlessService(payloadlessLedger, logger))
}
} else {
fullLedger, err := ledgerfactory.NewLedger(factoryConfig, triggerCheckpointOnNextSegmentFinish)
if err != nil {
logger.Fatal().Err(err).Msg("failed to create ledger")
}
ledgerStorage = fullLedger
registerService = func(grpcServer *grpc.Server) {
ledgerpb.RegisterLedgerServiceServer(grpcServer, remote.NewService(fullLedger, logger))
}
}

// Wait for ledger to be ready (WAL replay)
logger.Info().Msg("waiting for ledger initialization...")
<-ledgerStorage.Ready()
logger.Info().Msg("ledger ready")

// Both the full and payloadless ledgers expose state inspection for the
// post-startup health check, though only the full ledger declares it on its
// public interface; assert it here so the check works in either mode.
inspector, ok := ledgerStorage.(interface {
StateCount() int
StateByIndex(index int) (ledger.State, error)
})
if !ok {
logger.Fatal().Msg("ledger does not support state inspection")
}

// Check if any trie is loaded after startup
stateCount := ledgerStorage.StateCount()
stateCount := inspector.StateCount()
if stateCount == 0 {
logger.Fatal().Msg("no trie loaded after startup - no states available")
}

// Get the last trie state for logging
lastState, err := ledgerStorage.StateByIndex(-1)
lastState, err := inspector.StateByIndex(-1)
if err != nil {
logger.Fatal().Err(err).Msg("failed to get last state for logging")
}
Expand All @@ -123,9 +164,8 @@ func main() {
grpc.MaxSendMsgSize(int(*maxResponseSize)),
)

// Create and register ledger service
ledgerService := remote.NewService(ledgerStorage, logger)
ledgerpb.RegisterLedgerServiceServer(grpcServer, ledgerService)
// Register the mode-specific ledger service
registerService(grpcServer)

// Create listeners based on provided flags
type listenerInfo struct {
Expand Down
4 changes: 3 additions & 1 deletion integration/localnet/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ EXECUTION = 2
VALID_EXECUTION := $(shell test $(EXECUTION) -ge 2; echo $$?)
LEDGER_EXECUTION = 0
VALID_LEDGER_EXECUTION := $(shell test $(LEDGER_EXECUTION) -le $(EXECUTION); echo $$?)
PAYLOADLESS = false
TEST_EXECUTION = 0
VERIFICATION = 1
ACCESS = 1
Expand Down Expand Up @@ -79,7 +80,8 @@ else
-extensive-tracing=$(EXTENSIVE_TRACING) \
-consensus-delay=$(CONSENSUS_DELAY) \
-collection-delay=$(COLLECTION_DELAY) \
-ledger-execution=$(LEDGER_EXECUTION)
-ledger-execution=$(LEDGER_EXECUTION) \
-payloadless=$(PAYLOADLESS)
endif

# Creates a light version of the localnet with just 1 instance for each node type
Expand Down
96 changes: 76 additions & 20 deletions integration/localnet/builder/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/go-yaml/yaml"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/cmd/build"
"github.com/onflow/flow-go/ledger/complete/wal"
Expand Down Expand Up @@ -81,6 +82,7 @@ var (
consensusDelay time.Duration
collectionDelay time.Duration
logLevel string
payloadless bool

ports *PortAllocator
)
Expand Down Expand Up @@ -109,6 +111,7 @@ func init() {
flag.DurationVar(&collectionDelay, "collection-delay", DefaultCollectionDelay, "delay on collection node block proposals")
flag.StringVar(&logLevel, "loglevel", DefaultLogLevel, "log level for all nodes")
flag.IntVar(&ledgerExecutionCount, "ledger-execution", 0, "number of execution nodes that use remote ledger service (0 = all use local ledger, max = execution count)")
flag.BoolVar(&payloadless, "payloadless", false, "enable payloadless trie mode (stores payload hashes instead of full payloads)")
}

func generateBootstrapData(flowNetworkConf testnet.NetworkConfig) []testnet.ContainerConfig {
Expand Down Expand Up @@ -480,6 +483,15 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se
service.Volumes = append(service.Volumes,
fmt.Sprintf("%s:/trie:z", trieDir),
)
if payloadless {
service.Command = append(service.Command, "--payloadless")
}
}

// Payloadless mode requires storehouse to store the actual payloads
// (the trie only stores payload hashes)
if payloadless {
service.Command = append(service.Command, "--enable-storehouse")
}

service.AddExposedPorts(testnet.GRPCPort)
Expand Down Expand Up @@ -834,20 +846,58 @@ func prepareLedgerService(dockerServices Services, flowNodeContainerConfigs []te
// 2. Ledger service has /trie mounted and can follow symlinks to /bootstrap (via execution node's mount)
// 3. We create symlinks using relative paths that work in both host and container contexts
bootstrapExecutionStateDir := filepath.Join(BootstrapDir, bootstrapFilenames.DirnameExecutionState)
checkpointSource := filepath.Join(bootstrapExecutionStateDir, bootstrapFilenames.FilenameWALRootCheckpoint)
if _, err := os.Stat(checkpointSource); err == nil {
// Checkpoint exists, create symlinks on host
// The symlinks will use relative paths that resolve correctly inside containers
// because both /bootstrap and /trie are mounted in the containers

// Create symlinks for V6 checkpoint
checkpointSourceV6 := filepath.Join(bootstrapExecutionStateDir, bootstrapFilenames.FilenameWALRootCheckpoint)
if _, err := os.Stat(checkpointSourceV6); err == nil {
// V6 checkpoint exists, create symlinks on host
_, err = wal.SoftlinkCheckpointFile(bootstrapFilenames.FilenameWALRootCheckpoint, bootstrapExecutionStateDir, trieDir)
if err != nil {
panic(fmt.Errorf("failed to create checkpoint symlinks: %w", err))
panic(fmt.Errorf("failed to create V6 checkpoint symlinks: %w", err))
}
fmt.Printf("created checkpoint symlinks in trie directory: %s\n", trieDir)
fmt.Printf("created V6 checkpoint symlinks in trie directory: %s\n", trieDir)
} else {
// Checkpoint doesn't exist, this is expected for fresh bootstrap
// The execution node will create it when it initializes
fmt.Printf("root checkpoint not found in %s, ledger service will start with empty state\n", checkpointSource)
fmt.Printf("V6 root checkpoint not found in %s\n", checkpointSourceV6)
}

// Create symlinks for V7 checkpoint (payloadless)
v7Filename := bootstrapFilenames.FilenameWALRootCheckpoint + wal.V7FileSuffix
checkpointSourceV7 := filepath.Join(bootstrapExecutionStateDir, v7Filename)

// In payloadless mode a spork only produces a V6 root.checkpoint, and the
// ledger service has no bootstrapper of its own to convert it. Convert the V6
// root checkpoint into a V7 root checkpoint here (once, at bootstrap time);
// the symlink block below then seeds the ledger service's trie directory from
// it. On restart the ledger factory finds an existing V7 checkpoint (this root
// or a newer numbered one written by the compactor), so no conversion is
// needed at runtime. The os.Stat guard makes a re-run of `make bootstrap`
// idempotent and avoids ConvertCheckpointV6ToV7's "output exists" rejection.
if payloadless {
if _, err := os.Stat(checkpointSourceV7); errors.Is(err, fs.ErrNotExist) {
logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
if convertErr := wal.ConvertCheckpointV6ToV7(
bootstrapExecutionStateDir,
bootstrapFilenames.FilenameWALRootCheckpoint,
bootstrapExecutionStateDir,
v7Filename,
logger,
16,
); convertErr != nil {
panic(fmt.Errorf("failed to convert V6 root checkpoint to V7 for payloadless ledger service: %w", convertErr))
}
fmt.Printf("converted V6 root checkpoint to V7 in %s\n", bootstrapExecutionStateDir)
}
}

if _, err := os.Stat(checkpointSourceV7); err == nil {
// V7 checkpoint exists, create symlinks on host
_, err = wal.SoftlinkCheckpointFile(v7Filename, bootstrapExecutionStateDir, trieDir)
if err != nil {
panic(fmt.Errorf("failed to create V7 checkpoint symlinks: %w", err))
}
fmt.Printf("created V7 checkpoint symlinks in trie directory: %s\n", trieDir)
} else {
fmt.Printf("V7 root checkpoint not found in %s\n", checkpointSourceV7)
}

// Allocate ports for ledger service
Expand All @@ -865,17 +915,22 @@ func prepareLedgerService(dockerServices Services, flowNodeContainerConfigs []te

// Create ledger service
// Use Unix domain socket; ledger and execution nodes share absSocketDir mounted at /sockets
ledgerCommand := []string{
"--triedir=/trie",
"--ledger-service-socket=/sockets/ledger.sock",
"--mtrie-cache-size=100",
"--checkpoint-distance=100",
"--checkpoints-to-keep=3",
fmt.Sprintf("--loglevel=%s", logLevel),
}
if payloadless {
ledgerCommand = append(ledgerCommand, "--payloadless")
}

service := Service{
name: ledgerServiceName,
Image: "localnet-ledger",
Command: []string{
"--triedir=/trie",
"--ledger-service-socket=/sockets/ledger.sock",
"--mtrie-cache-size=100",
"--checkpoint-distance=100",
"--checkpoints-to-keep=3",
fmt.Sprintf("--loglevel=%s", logLevel),
},
name: ledgerServiceName,
Image: "localnet-ledger",
Command: ledgerCommand,
Volumes: []string{
fmt.Sprintf("%s:/trie:z", trieDir),
fmt.Sprintf("%s:/bootstrap:z", BootstrapDir),
Expand Down Expand Up @@ -963,3 +1018,4 @@ func prepareTestExecutionService(dockerServices Services, flowNodeContainerConfi

return dockerServices
}

59 changes: 0 additions & 59 deletions ledger/complete/factory.go

This file was deleted.

Loading