Skip to content

Commit 5f52723

Browse files
Merge pull request #6204 from oasisprotocol/martin/feature/parallelize-checkpoint-creation-v2
go/storage/mkvs: Parallelize checkpoint creation
2 parents cfd46b7 + d6e01f6 commit 5f52723

25 files changed

Lines changed: 1131 additions & 143 deletions

File tree

.changelog/6204.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/storage/mkvs: Parallelize checkpoint creation

go/consensus/cometbft/abci/mux.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type ApplicationConfig struct { // nolint: maligned
6363

6464
DisableCheckpointer bool
6565
CheckpointerCheckInterval time.Duration
66+
ChunkerThreads uint16
6667

6768
// Identity is the local node identity.
6869
Identity *identity.Identity

go/consensus/cometbft/abci/snapshots.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,11 @@ func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) type
212212
"err", err,
213213
)
214214

215-
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
215+
// Given that snapshot was invalid, the sender must have been malicious.
216+
return types.ResponseApplySnapshotChunk{
217+
Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT,
218+
RejectSenders: []string{req.Sender},
219+
}
216220
default:
217221
// Unspecified error during restoration.
218222
mux.logger.Error("error during chunk restoration, aborting state sync",

go/consensus/cometbft/abci/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App
775775
NumKept: params.StateCheckpointNumKept,
776776
ChunkSize: params.StateCheckpointChunkSize,
777777
InitialVersion: cfg.InitialHeight,
778+
ChunkerThreads: cfg.ChunkerThreads,
778779
}, nil
779780
},
780781
}

go/consensus/cometbft/config/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ type CheckpointerConfig struct {
115115
Disabled bool `yaml:"disabled"`
116116
// ABCI state checkpointer check interval.
117117
CheckInterval time.Duration `yaml:"check_interval"`
118+
// ParallelChunker specifies if the new parallel chunking algorithm is used.
119+
ParallelChunker bool `yaml:"parallel_chunker"`
118120
}
119121

120122
// StateSyncConfig is the consensus state sync configuration structure.
@@ -247,8 +249,9 @@ func DefaultConfig() Config {
247249
NumLightBlocksKept: 10000,
248250
},
249251
Checkpointer: CheckpointerConfig{
250-
Disabled: false,
251-
CheckInterval: 1 * time.Minute,
252+
Disabled: false,
253+
CheckInterval: 1 * time.Minute,
254+
ParallelChunker: false,
252255
},
253256
StateSync: StateSyncConfig{
254257
Enabled: false,

go/consensus/cometbft/full/full.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ const (
7070
tmSubscriberID = "oasis-core"
7171

7272
exportsSubDir = "exports"
73+
74+
// chunkerThreads is target number of subtrees during parallel checkpoint creation.
75+
// It is intentionally non-configurable since we want operators to produce
76+
// same checkpoint hashes. The current value was chosen based on the benchmarks
77+
// done on the modern developer machine.
78+
chunkerThreads = 12
7379
)
7480

7581
var (
@@ -544,6 +550,11 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo
544550
pruneCfg.NumKept = config.GlobalConfig.Consensus.Prune.NumKept
545551
pruneCfg.PruneInterval = max(config.GlobalConfig.Consensus.Prune.Interval, time.Second)
546552

553+
var threads uint16
554+
if config.GlobalConfig.Storage.Checkpointer.ParallelChunker {
555+
threads = chunkerThreads
556+
}
557+
547558
appConfig := &abci.ApplicationConfig{
548559
DataDir: filepath.Join(t.dataDir, tmcommon.StateDir),
549560
StorageBackend: config.GlobalConfig.Storage.Backend,
@@ -554,6 +565,7 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo
554565
Identity: t.identity,
555566
DisableCheckpointer: config.GlobalConfig.Consensus.Checkpointer.Disabled,
556567
CheckpointerCheckInterval: config.GlobalConfig.Consensus.Checkpointer.CheckInterval,
568+
ChunkerThreads: threads,
557569
InitialHeight: uint64(t.genesisHeight),
558570
ChainContext: t.chainContext,
559571
}

go/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ require (
5858
golang.org/x/crypto v0.39.0
5959
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
6060
golang.org/x/net v0.41.0
61+
golang.org/x/sync v0.15.0
6162
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
6263
google.golang.org/grpc v1.68.0
6364
google.golang.org/grpc/security/advancedtls v0.0.0-20221004221323-12db695f1648
@@ -210,7 +211,6 @@ require (
210211
go.uber.org/mock v0.5.0 // indirect
211212
go.uber.org/multierr v1.11.0 // indirect
212213
golang.org/x/mod v0.25.0 // indirect
213-
golang.org/x/sync v0.15.0 // indirect
214214
golang.org/x/sys v0.33.0 // indirect
215215
golang.org/x/text v0.26.0 // indirect
216216
golang.org/x/tools v0.33.0 // indirect

go/oasis-test-runner/oasis/compute.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ type Compute struct { // nolint: maligned
3737

3838
sentryIndices []int
3939

40-
storageBackend string
41-
disablePublicRPC bool
42-
checkpointSyncDisabled bool
43-
checkpointCheckInterval time.Duration
40+
storageBackend string
41+
disablePublicRPC bool
42+
checkpointSyncDisabled bool
43+
checkpointCheckInterval time.Duration
44+
checkpointParallelChunker bool
4445

4546
sentryPubKey signature.PublicKey
4647
consensusPort uint16
@@ -62,10 +63,11 @@ type ComputeCfg struct {
6263

6364
SentryIndices []int
6465

65-
StorageBackend string
66-
DisablePublicRPC bool
67-
CheckpointSyncDisabled bool
68-
CheckpointCheckInterval time.Duration
66+
StorageBackend string
67+
DisablePublicRPC bool
68+
CheckpointSyncDisabled bool
69+
CheckpointCheckInterval time.Duration
70+
CheckpointParallelChunker bool
6971
}
7072

7173
// UpdateRuntimes updates the worker node runtimes.
@@ -172,6 +174,7 @@ func (worker *Compute) ModifyConfig() error {
172174
worker.Config.Storage.CheckpointSyncDisabled = worker.checkpointSyncDisabled
173175
worker.Config.Storage.Checkpointer.Enabled = true
174176
worker.Config.Storage.Checkpointer.CheckInterval = worker.checkpointCheckInterval
177+
worker.Config.Storage.Checkpointer.ParallelChunker = worker.checkpointParallelChunker
175178

176179
// Sentry configuration.
177180
sentries, err := resolveSentries(worker.net, worker.sentryIndices)
@@ -230,18 +233,19 @@ func (net *Network) NewCompute(cfg *ComputeCfg) (*Compute, error) {
230233
}
231234

232235
worker := &Compute{
233-
Node: host,
234-
storageBackend: cfg.StorageBackend,
235-
sentryIndices: cfg.SentryIndices,
236-
disablePublicRPC: cfg.DisablePublicRPC,
237-
checkpointSyncDisabled: cfg.CheckpointSyncDisabled,
238-
checkpointCheckInterval: cfg.CheckpointCheckInterval,
239-
sentryPubKey: sentryPubKey,
240-
runtimeProvisioner: cfg.RuntimeProvisioner,
241-
consensusPort: host.getProvisionedPort(nodePortConsensus),
242-
p2pPort: host.getProvisionedPort(nodePortP2P),
243-
runtimes: cfg.Runtimes,
244-
runtimeConfig: cfg.RuntimeConfig,
236+
Node: host,
237+
storageBackend: cfg.StorageBackend,
238+
sentryIndices: cfg.SentryIndices,
239+
disablePublicRPC: cfg.DisablePublicRPC,
240+
checkpointSyncDisabled: cfg.CheckpointSyncDisabled,
241+
checkpointCheckInterval: cfg.CheckpointCheckInterval,
242+
checkpointParallelChunker: cfg.CheckpointParallelChunker,
243+
sentryPubKey: sentryPubKey,
244+
runtimeProvisioner: cfg.RuntimeProvisioner,
245+
consensusPort: host.getProvisionedPort(nodePortConsensus),
246+
p2pPort: host.getProvisionedPort(nodePortP2P),
247+
runtimes: cfg.Runtimes,
248+
runtimeConfig: cfg.RuntimeConfig,
245249
}
246250

247251
// Remove any exploded bundles on cleanup.

go/oasis-test-runner/oasis/fixture.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,9 @@ type ComputeWorkerFixture struct {
401401

402402
Sentries []int `json:"sentries,omitempty"`
403403

404-
CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"`
405-
CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"`
404+
CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"`
405+
CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"`
406+
CheckpointParallelChunker bool `json:"checkpoint_parallel_chunker,omitempty"`
406407

407408
// Runtimes contains the indexes of the runtimes to enable.
408409
Runtimes []int `json:"runtimes,omitempty"`
@@ -434,10 +435,11 @@ func (f *ComputeWorkerFixture) Create(net *Network) (*Compute, error) {
434435
Entity: entity,
435436
ExtraArgs: f.ExtraArgs,
436437
},
437-
RuntimeProvisioner: f.RuntimeProvisioner,
438-
StorageBackend: f.StorageBackend,
439-
SentryIndices: f.Sentries,
440-
CheckpointCheckInterval: f.CheckpointCheckInterval,
438+
RuntimeProvisioner: f.RuntimeProvisioner,
439+
StorageBackend: f.StorageBackend,
440+
SentryIndices: f.Sentries,
441+
CheckpointCheckInterval: f.CheckpointCheckInterval,
442+
CheckpointParallelChunker: f.CheckpointParallelChunker,
441443
// The checkpoint syncing flag is intentionally flipped here.
442444
// Syncing should normally be enabled, but normally disabled in tests.
443445
CheckpointSyncDisabled: !f.CheckpointSyncEnabled,

go/oasis-test-runner/scenario/e2e/runtime/scenario.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,13 @@ func (sc *Scenario) Fixture() (*oasis.NetworkFixture, error) {
264264
},
265265
},
266266
},
267-
{RuntimeProvisioner: runtimeProvisioner, Entity: 1, Runtimes: []int{1}},
267+
{
268+
RuntimeProvisioner: runtimeProvisioner,
269+
Entity: 1,
270+
Runtimes: []int{1},
271+
// Use a parallel chunking algorithm for the second compute node to show interoperability.
272+
CheckpointParallelChunker: true,
273+
},
268274
},
269275
Sentries: []oasis.SentryFixture{},
270276
Seeds: []oasis.SeedFixture{{}},

0 commit comments

Comments
 (0)