Skip to content

Commit 9a8bbde

Browse files
authored
[verifier] Fix timeout bug (#596)
#### Type of change - Bug fix - Improvement (improvement to code, performance, etc) #### Description - Fix timeout behaviour to start from the last batch submitted (instead of the last received item) - Batch buffer capacity (`outputCh`) increased to 1 to prevent blocking - Flattened `Config` structure by removing nested `ExecutorConfig` - Updated all references across tests, configuration files, and templates #### Related issues - resolves #554 Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 34934f7 commit 9a8bbde

10 files changed

Lines changed: 62 additions & 68 deletions

File tree

cmd/config/app_config_test.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,10 @@ func TestReadConfigVerifier(t *testing.T) {
247247
configFilePath: emptyConfig(t),
248248
expectedServerConfig: newServeConfig(verifier.DefaultServerPort, verifier.DefaultMonitoringPort),
249249
expectedServiceConfig: &verifier.Config{
250-
ParallelExecutor: verifier.ExecutorConfig{
251-
Parallelism: verifier.DefaultParallelism,
252-
BatchSizeCutoff: verifier.DefaultBatchSizeCutoff,
253-
BatchTimeCutoff: verifier.DefaultBatchTimeCutoff,
254-
ChannelBufferSize: verifier.DefaultChannelBufferSize,
255-
},
250+
Parallelism: verifier.DefaultParallelism,
251+
BatchSizeCutoff: verifier.DefaultBatchSizeCutoff,
252+
BatchTimeCutoff: verifier.DefaultBatchTimeCutoff,
253+
ChannelBufferSize: verifier.DefaultChannelBufferSize,
256254
},
257255
}, {
258256
name: "sample",
@@ -261,12 +259,10 @@ func TestReadConfigVerifier(t *testing.T) {
261259
"verifier", verifier.DefaultServerPort, verifier.DefaultMonitoringPort,
262260
),
263261
expectedServiceConfig: &verifier.Config{
264-
ParallelExecutor: verifier.ExecutorConfig{
265-
BatchSizeCutoff: verifier.DefaultBatchSizeCutoff,
266-
BatchTimeCutoff: 10 * time.Millisecond,
267-
ChannelBufferSize: verifier.DefaultChannelBufferSize,
268-
Parallelism: 40,
269-
},
262+
BatchSizeCutoff: verifier.DefaultBatchSizeCutoff,
263+
BatchTimeCutoff: 10 * time.Millisecond,
264+
ChannelBufferSize: verifier.DefaultChannelBufferSize,
265+
Parallelism: 40,
270266
},
271267
}}
272268

cmd/config/samples/verifier.yaml

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,27 @@ monitoring:
3939
# - Must be less than or equal to requests-per-second
4040
burst: 0
4141

42-
parallel-executor:
43-
# Minimum number of verification responses to collect before emitting a batch result.
44-
# Larger batches improve verification efficiency through parallel processing
45-
# across multiple goroutines. Batches are processed when this size is reached
46-
# or batch-time-cutoff expires, whichever comes first.
47-
# Default: 50
48-
batch-size-cutoff: 50
49-
# Maximum time to wait for a batch to reach batch-size-cutoff before processing
50-
# it anyway. Ensures low latency even when signature arrival rate is low.
51-
# Balances latency vs. batching efficiency.
52-
# Default: 500ms
53-
batch-time-cutoff: 10ms
54-
# Buffer size for internal channels used in the signature verification pipeline.
55-
# Larger buffers reduce channel contention and improve throughput but increase
56-
# memory usage.
57-
# Default: 50
58-
channel-buffer-size: 50
59-
# Number of goroutines used to verify signatures in parallel within each batch.
60-
# Should be tuned based on CPU core count and signature verification workload.
61-
# Higher values improve throughput on multi-core systems but increase CPU usage.
62-
# Default: 4
63-
parallelism: 40
42+
# Minimum number of verification responses to collect before emitting a batch result.
43+
# Larger batches improve verification efficiency through parallel processing
44+
# across multiple goroutines. Batches are processed when this size is reached
45+
# or batch-time-cutoff expires, whichever comes first.
46+
# Default: 50
47+
batch-size-cutoff: 50
48+
# Maximum time to wait for a batch to reach batch-size-cutoff before processing
49+
# it anyway. Ensures low latency even when signature arrival rate is low.
50+
# Balances latency vs. batching efficiency.
51+
# Default: 500ms
52+
batch-time-cutoff: 10ms
53+
# Buffer size for internal channels used in the signature verification pipeline.
54+
# Larger buffers reduce channel contention and improve throughput but increase
55+
# memory usage.
56+
# Default: 50
57+
channel-buffer-size: 50
58+
# Number of goroutines used to verify signatures in parallel within each batch.
59+
# Should be tuned based on CPU core count and signature verification workload.
60+
# Higher values improve throughput on multi-core systems but increase CPU usage.
61+
# Default: 4
62+
parallelism: 40
6463

6564
# Logging configuration
6665
logging:

cmd/config/templates/verifier.yaml.tmpl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
#
55
{{ include "service" . | indent 0 }}
66

7-
parallel-executor:
8-
parallelism: 40
9-
batch-time-cutoff: {{ .VerifierBatchTimeCutoff | default "10ms" }}
10-
batch-size-cutoff: {{ .VerifierBatchSizeCutoff | default 50 }}
11-
channel-buffer-size: 50
7+
parallelism: 40
8+
batch-time-cutoff: {{ .VerifierBatchTimeCutoff | default "10ms" }}
9+
batch-size-cutoff: {{ .VerifierBatchSizeCutoff | default 50 }}
10+
channel-buffer-size: 50

cmd/config/viper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ func NewViperWithSidecarDefaults() *viper.Viper {
5656
// NewViperWithVerifierDefaults returns a viper instance with the verifier default values.
5757
func NewViperWithVerifierDefaults() *viper.Viper {
5858
v := newViperWithServiceDefault("verifier", verifier.DefaultServerPort, verifier.DefaultMonitoringPort)
59-
v.SetDefault("parallel-executor.parallelism", verifier.DefaultParallelism)
60-
v.SetDefault("parallel-executor.batch-time-cutoff", verifier.DefaultBatchTimeCutoff)
61-
v.SetDefault("parallel-executor.batch-size-cutoff", verifier.DefaultBatchSizeCutoff)
62-
v.SetDefault("parallel-executor.channel-buffer-size", verifier.DefaultChannelBufferSize)
59+
v.SetDefault("parallelism", verifier.DefaultParallelism)
60+
v.SetDefault("batch-time-cutoff", verifier.DefaultBatchTimeCutoff)
61+
v.SetDefault("batch-size-cutoff", verifier.DefaultBatchSizeCutoff)
62+
v.SetDefault("channel-buffer-size", verifier.DefaultChannelBufferSize)
6363
return v
6464
}
6565

loadgen/client_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,10 @@ func startVerifiers(t *testing.T, serverTLS, clientTLS connection.TLSConfig) *co
136136
endpoints := make([]*connection.Endpoint, 2)
137137
for i := range endpoints {
138138
service := verifier.New(&verifier.Config{
139-
ParallelExecutor: verifier.ExecutorConfig{
140-
BatchSizeCutoff: 50,
141-
BatchTimeCutoff: 10 * time.Millisecond,
142-
ChannelBufferSize: 50,
143-
Parallelism: 40,
144-
},
139+
BatchSizeCutoff: 50,
140+
BatchTimeCutoff: 10 * time.Millisecond,
141+
ChannelBufferSize: 50,
142+
Parallelism: 40,
145143
})
146144

147145
serverConfig := test.NewLocalHostServiceConfig(serverTLS)

service/verifier/config.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ import (
1313
type (
1414
// Config describes the signature verifier parameters.
1515
Config struct {
16-
ParallelExecutor ExecutorConfig `mapstructure:"parallel-executor"`
17-
}
18-
19-
// ExecutorConfig describes the execution parameters.
20-
ExecutorConfig struct {
2116
// Parallelism How many parallel go routines will be launched
2217
Parallelism int `mapstructure:"parallelism" validate:"required,gt=0"`
2318
// BatchSizeCutoff The minimum amount of responses we need to collect before emitting a response

service/verifier/parallel_executor.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type (
2323
outputSingleCh chan *verificationOutput
2424
outputCh chan []*committerpb.TxStatus
2525
verifier *verifier
26-
config *ExecutorConfig
26+
config *Config
2727
}
2828

2929
verificationOutput struct {
@@ -32,12 +32,12 @@ type (
3232
}
3333
)
3434

35-
func newParallelExecutor(config *ExecutorConfig) *parallelExecutor {
35+
func newParallelExecutor(config *Config) *parallelExecutor {
3636
channelCapacity := config.ChannelBufferSize * config.Parallelism
3737
return &parallelExecutor{
3838
config: config,
3939
inputCh: make(chan *servicepb.TxWithRef, channelCapacity),
40-
outputCh: make(chan []*committerpb.TxStatus),
40+
outputCh: make(chan []*committerpb.TxStatus, 1),
4141
outputSingleCh: make(chan *verificationOutput, channelCapacity),
4242
verifier: newVerifier(),
4343
}
@@ -61,21 +61,30 @@ func (e *parallelExecutor) handleChannelInput(ctx context.Context) {
6161
func (e *parallelExecutor) handleCutoff(ctx context.Context) {
6262
var outputBuffer []*committerpb.TxStatus
6363
chOut := channel.NewWriter(ctx, e.outputCh)
64+
65+
var cutTimeout <-chan time.Time
6466
cutBatch := func(size int) {
6567
for len(outputBuffer) >= size {
6668
batchSize := min(e.config.BatchSizeCutoff, len(outputBuffer))
6769
logger.Debugf("Cuts batch with %d/%d of the outputs.", batchSize, len(outputBuffer))
6870
chOut.Write(outputBuffer[:batchSize])
6971
outputBuffer = outputBuffer[batchSize:]
72+
// Reset the timer if we submitted a batch.
73+
cutTimeout = nil
7074
}
7175
}
7276
for {
77+
if cutTimeout == nil {
78+
cutTimeout = time.After(e.config.BatchTimeCutoff)
79+
}
7380
select {
7481
case <-ctx.Done():
7582
return
76-
case <-time.After(e.config.BatchTimeCutoff):
77-
logger.Debugf("Attempts to cut a batch (timout). (buffer size: %d)", len(outputBuffer))
83+
case <-cutTimeout:
84+
logger.Debugf("Attempts to cut a batch (timeout). (buffer size: %d)", len(outputBuffer))
7885
cutBatch(1)
86+
// Reset the timer since it ended.
87+
cutTimeout = nil
7988
case output := <-e.outputSingleCh:
8089
logger.Debugf("Attempts to emit a batch (response). (buffer size: %d)", len(outputBuffer)+1)
8190
outputBuffer = append(outputBuffer, output.status)

service/verifier/parallel_executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestRegularTransactionBatching(t *testing.T) {
6060

6161
func startExecutor(ctx context.Context, t *testing.T, batchSizeCutoff int) *parallelExecutor {
6262
t.Helper()
63-
config := &ExecutorConfig{
63+
config := &Config{
6464
BatchSizeCutoff: batchSizeCutoff,
6565
BatchTimeCutoff: 1 * time.Hour,
6666
Parallelism: 1,

service/verifier/verifier_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (s *Server) StartStream(stream servicepb.Verifier_StartStreamServer) error
7474
defer s.metrics.ActiveStreams.Dec()
7575

7676
// We create a new executor for each stream to avoid answering to the wrong stream.
77-
executor := newParallelExecutor(&s.config.ParallelExecutor)
77+
executor := newParallelExecutor(s.config)
7878
g, gCtx := errgroup.WithContext(stream.Context())
7979
g.Go(func() error {
8080
return s.handleInputs(gCtx, stream, executor)

service/verifier/verifier_server_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -564,19 +564,17 @@ func defaultCryptoParameters(t *testing.T) cryptoParameters {
564564

565565
func defaultConfigWithTLS(tlsConfig connection.TLSConfig) (*Config, *serve.Config) {
566566
config := &Config{
567-
ParallelExecutor: ExecutorConfig{
568-
BatchSizeCutoff: 3,
569-
BatchTimeCutoff: 1 * time.Hour,
570-
Parallelism: 3,
571-
ChannelBufferSize: 1,
572-
},
567+
BatchSizeCutoff: 3,
568+
BatchTimeCutoff: 1 * time.Hour,
569+
Parallelism: 3,
570+
ChannelBufferSize: 1,
573571
}
574572
return config, test.NewLocalHostServiceConfig(tlsConfig)
575573
}
576574

577575
func defaultConfigQuickCutoff() (*Config, *serve.Config) {
578576
config, serverConfig := defaultConfigWithTLS(test.InsecureTLSConfig)
579-
config.ParallelExecutor.BatchSizeCutoff = 1
577+
config.BatchSizeCutoff = 1
580578
return config, serverConfig
581579
}
582580

0 commit comments

Comments
 (0)