-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathblock_scheduler.go
More file actions
181 lines (143 loc) · 5.28 KB
/
block_scheduler.go
File metadata and controls
181 lines (143 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package simplex
import (
"errors"
"fmt"
"maps"
"slices"
"sync"
"sync/atomic"
"go.uber.org/zap"
)
var ErrTooManyPendingVerifications = errors.New("too many blocks being verified to ingest another one")
type Scheduler interface {
Schedule(task Task)
Size() int
Close()
}
// BlockDependencyManager manages block verification tasks with dependencies on previous blocks and empty rounds.
// It schedules tasks when their dependencies are resolved.
type BlockDependencyManager struct {
lock sync.Mutex
logger Logger
scheduler Scheduler
dependencies []*TaskWithDependents
maxDeps uint64
closed atomic.Bool
}
type TaskWithDependents struct {
Task Task
blockSeq uint64 // the seq of the block being verified
prevBlock *Digest
emptyRounds map[uint64]struct{}
}
func (t *TaskWithDependents) isReady() bool {
return t.prevBlock == nil && len(t.emptyRounds) == 0
}
func (t *TaskWithDependents) String() string {
return fmt.Sprintf("BlockVerificationTask{blockSeq: %d, prevBlock: %v, emptyRounds: %v}", t.blockSeq, t.prevBlock, slices.Collect(maps.Keys(t.emptyRounds)))
}
func NewBlockVerificationScheduler(logger Logger, maxDeps uint64, scheduler Scheduler) *BlockDependencyManager {
b := &BlockDependencyManager{
logger: logger,
maxDeps: maxDeps,
scheduler: scheduler,
}
b.logger.Debug("Created BlockVerificationScheduler", zap.Uint64("maxDeps", maxDeps))
return b
}
// ExecuteBlockDependents removes the given digest from dependent tasks and schedules any whose dependencies are now resolved.
func (bs *BlockDependencyManager) ExecuteBlockDependents(prev Digest) {
bs.lock.Lock()
defer bs.lock.Unlock()
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
for _, taskWithDeps := range bs.dependencies {
if taskWithDeps.prevBlock != nil && *taskWithDeps.prevBlock == prev {
taskWithDeps.prevBlock = nil
}
if taskWithDeps.isReady() {
bs.logger.Debug("Scheduling block verification task as all dependencies are met", zap.Stringer("taskID", prev))
bs.scheduler.Schedule(taskWithDeps.Task)
continue
}
bs.logger.Debug("Block verification task has unsatisfied dependencies",
zap.Any("prevBlock", prev),
zap.Stringer("task", taskWithDeps),
)
remainingDeps = append(remainingDeps, taskWithDeps)
}
bs.dependencies = remainingDeps
}
// ExecuteEmptyRoundDependents removes the given empty round from dependent tasks and schedules any whose dependencies are now resolved.
func (bs *BlockDependencyManager) ExecuteEmptyRoundDependents(emptyRound uint64) {
bs.lock.Lock()
defer bs.lock.Unlock()
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
for _, taskWithDeps := range bs.dependencies {
delete(taskWithDeps.emptyRounds, emptyRound)
if taskWithDeps.isReady() {
bs.logger.Debug("Scheduling block verification task as all dependencies are met", zap.Stringer("task", taskWithDeps))
bs.scheduler.Schedule(taskWithDeps.Task)
continue
}
bs.logger.Debug("Block verification task has unsatisfied dependencies",
zap.Any("emptyRound", emptyRound),
zap.Stringer("task", taskWithDeps),
)
remainingDeps = append(remainingDeps, taskWithDeps)
}
bs.dependencies = remainingDeps
}
func (bs *BlockDependencyManager) ScheduleTaskWithDependencies(task Task, blockSeq uint64, prev *Digest, emptyRounds []uint64) error {
bs.lock.Lock()
defer bs.lock.Unlock()
if bs.closed.Load() {
return nil
}
wrappedTask := func() Digest {
id := task()
bs.ExecuteBlockDependents(id)
return id
}
totalSize := uint64(len(bs.dependencies) + bs.scheduler.Size())
if totalSize >= bs.maxDeps {
bs.logger.Warn("Too many blocks being verified to ingest another one", zap.Uint64("pendingBlocks", totalSize))
return fmt.Errorf("%w: %d pending verifications (max %d)", ErrTooManyPendingVerifications, totalSize, bs.maxDeps)
}
if prev == nil && len(emptyRounds) == 0 {
bs.logger.Debug("Scheduling block verification task with no dependencies", zap.Uint64("blockSeq", blockSeq))
bs.scheduler.Schedule(wrappedTask)
return nil
}
bs.logger.Debug("Adding block verification task with dependencies", zap.Any("prevBlock", prev), zap.Uint64s("emptyRounds", emptyRounds))
emptyRoundsSet := make(map[uint64]struct{})
for _, round := range emptyRounds {
emptyRoundsSet[round] = struct{}{}
}
bs.dependencies = append(bs.dependencies, &TaskWithDependents{
Task: wrappedTask,
prevBlock: prev,
emptyRounds: emptyRoundsSet,
blockSeq: blockSeq,
})
return nil
}
// We can remove all tasks that have an empty notarization dependency for a round that has been finalized.
func (bs *BlockDependencyManager) RemoveOldTasks(seq uint64) {
bs.lock.Lock()
defer bs.lock.Unlock()
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
for _, taskWithDeps := range bs.dependencies {
if taskWithDeps.blockSeq <= seq {
bs.logger.Debug("Removing block verification task as its block seq is less than or equal to finalized seq", zap.Uint64("blockSeq", taskWithDeps.blockSeq), zap.Uint64("finalizedSeq", seq))
continue
}
remainingDeps = append(remainingDeps, taskWithDeps)
}
bs.dependencies = remainingDeps
}
func (bs *BlockDependencyManager) Close() {
bs.closed.Store(true)
bs.scheduler.Close()
}