Skip to content

Commit b40a723

Browse files
committed
[coordinator] restrict batch size
This commit restrict the batch size sent to verifier and vcservice when it receives resource exhausted status code on grpc. Signed-off-by: Senthil Nathan N <cendhu@gmail.com>
1 parent a4e7b77 commit b40a723

6 files changed

Lines changed: 246 additions & 19 deletions

File tree

service/coordinator/signature_verifier_manager.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func (sv *signatureVerifier) sendTransactionsToSVService(
202202
inputTxBatch channel.Reader[dependencygraph.TxNodeBatch],
203203
) error {
204204
var policyVersion uint64
205+
firstBatch := true
205206
for {
206207
txBatch, ctxAlive := inputTxBatch.Read()
207208
if !ctxAlive {
@@ -231,13 +232,72 @@ func (sv *signatureVerifier) sendTransactionsToSVService(
231232
}
232233
}
233234

235+
// If a batch exceeds the gRPC message size limit, stream.Send() will fail with a
236+
// RESOURCE_EXHAUSTED error. This terminates the stream, and our connection lifecycle
237+
// management will trigger a full retry of the operation. On the subsequent attempt (the retry),
238+
// we assume the initial failure was due to an oversized first batch. To mitigate this,
239+
// we split only the first batch of that retry into smaller chunks. All subsequent batches
240+
// in the same attempt are sent without splitting.
241+
// Hitting this limit is not expected during normal operation. It should only occur
242+
// in rare scenarios, like the failure of many services or when numerous transactions
243+
// depend on a single transaction. In those cases, we rely on the connection retry mechanism.
244+
245+
if firstBatch {
246+
if err := splitAndSendToVerifier(stream, request); err != nil {
247+
return errors.Wrap(err, "send to stream ended with error")
248+
}
249+
firstBatch = false
250+
continue
251+
}
252+
234253
if err := stream.Send(request); err != nil {
235254
return errors.Wrap(err, "send to stream ended with error")
236255
}
237256
logger.Debugf("Batch contains %d TXs, and was stored in the accumulator and sent to a sv", batchSize)
238257
}
239258
}
240259

260+
func splitAndSendToVerifier(
261+
stream protosigverifierservice.Verifier_StartStreamClient,
262+
r *protosigverifierservice.RequestBatch,
263+
) error {
264+
// We group transactions by block to ensure our batch sizes do not exceed the gRPC message limit.
265+
// This strategy prevents RESOURCE_EXHAUSTED errors because the orderer's maximum block size
266+
// will be configured to be safely smaller than the gRPC send/receive limit.
267+
// For added safety, we can split each block's transactions into more batches, but this is deferred for now
268+
// until the orderer implements all sanity checks on the configuration provided in the config block.
269+
// For example, if the orderer can enforce that the maximum block size should be at most half of the
270+
// maximum message size in gRPC, one batch would be adequate.
271+
blkToBatch := make(map[uint64]*protosigverifierservice.RequestBatch)
272+
for _, req := range r.Requests {
273+
rBatch, ok := blkToBatch[req.BlockNum]
274+
if !ok {
275+
rBatch = &protosigverifierservice.RequestBatch{
276+
Requests: make([]*protosigverifierservice.Request, 0, len(r.Requests)),
277+
}
278+
blkToBatch[req.BlockNum] = rBatch
279+
}
280+
281+
rBatch.Requests = append(rBatch.Requests, req)
282+
}
283+
284+
updateSent := false
285+
for _, rBatch := range blkToBatch {
286+
if !updateSent {
287+
rBatch.Update = r.Update
288+
updateSent = false
289+
}
290+
291+
if err := stream.Send(rBatch); err != nil {
292+
// ResourceExhausted should not occur here, as we have split a block's transactions
293+
// into two batches, assuming the block size is less than the maximum gRPC message size.
294+
return err
295+
}
296+
}
297+
298+
return nil
299+
}
300+
241301
func (sv *signatureVerifier) receiveStatusAndForwardToOutput(
242302
stream protosigverifierservice.Verifier_StartStreamClient,
243303
outputValidatedTxs channel.Writer[dependencygraph.TxNodeBatch],

service/coordinator/signature_verifier_manager_test.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package coordinator
88

99
import (
1010
"context"
11+
"crypto/rand"
1112
"sync/atomic"
1213
"testing"
1314
"time"
@@ -87,7 +88,7 @@ func newSvMgrTestEnv(t *testing.T, numSvService int, expectedEndErrorMsg ...byte
8788
func (e *svMgrTestEnv) submitTxBatch(t *testing.T, numTxs int) dependencygraph.TxNodeBatch {
8889
t.Helper()
8990
blkNum := e.curBlockNum.Add(1) - 1
90-
txBatch, expectedValidatedTxs := createTxNodeBatchForTest(t, blkNum, numTxs)
91+
txBatch, expectedValidatedTxs := createInputTxsNodeForSVM(t, blkNum, numTxs, 0)
9192
channel.NewWriter(t.Context(), e.inputTxBatch).Write(txBatch)
9293
return expectedValidatedTxs
9394
}
@@ -146,6 +147,45 @@ func TestSignatureVerifierManagerWithSingleVerifier(t *testing.T) {
146147
)
147148
}
148149

150+
func TestSignatureVerifierManagerWithLargeSize(t *testing.T) {
151+
t.Parallel()
152+
env := newSvMgrTestEnv(t, 1)
153+
154+
expectedValidatedTxs := env.submitTxBatch(t, 1)
155+
env.requireTxBatch(t, expectedValidatedTxs)
156+
157+
totalBlocks := 3
158+
txPerBlock := 50
159+
txBatches := make([]dependencygraph.TxNodeBatch, totalBlocks)
160+
expectedValidatedTxs = make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
161+
for i := range totalBlocks {
162+
//nolint:gosec // int -> uint64
163+
txBatch, expectedValidatedTxsBatch := createInputTxsNodeForSVM(t, uint64(i+1), txPerBlock, 1024*1024)
164+
txBatches[i] = txBatch
165+
expectedValidatedTxs = append(expectedValidatedTxs, expectedValidatedTxsBatch...)
166+
}
167+
168+
txsBatch := make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
169+
for _, b := range txBatches {
170+
txsBatch = append(txsBatch, b...)
171+
}
172+
173+
channel.NewWriter(t.Context(), env.inputTxBatch).Write(txsBatch)
174+
175+
actualValidatedTxs := make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
176+
for range totalBlocks {
177+
txs := <-env.outputValidatedTxs
178+
actualValidatedTxs = append(actualValidatedTxs, txs...)
179+
}
180+
require.Len(t, actualValidatedTxs, totalBlocks*txPerBlock)
181+
require.ElementsMatch(t, expectedValidatedTxs, actualValidatedTxs)
182+
183+
test.EventuallyIntMetric(
184+
t, totalBlocks*txPerBlock+1+1, env.signVerifierManager.config.metrics.sigverifierTransactionProcessedTotal,
185+
30*time.Second, 10*time.Millisecond,
186+
)
187+
}
188+
149189
func TestSignatureVerifierManagerWithMultipleVerifiers(t *testing.T) {
150190
t.Parallel()
151191
env := newSvMgrTestEnv(t, 2)
@@ -206,19 +246,37 @@ func TestSignatureVerifierWithAllInvalidTxs(t *testing.T) {
206246
}
207247

208248
env := newSvMgrTestEnv(t, 1)
249+
env.requireTxBatch(t, env.submitTxBatch(t, 1))
250+
209251
channel.NewWriter(t.Context(), env.inputTxBatch).Write(txBatch)
210252
env.requireTxBatch(t, expectedValidatedTxs)
211253
}
212254

213-
func createTxNodeBatchForTest(
214-
_ *testing.T,
215-
blkNum uint64, numTxs int,
255+
func createInputTxsNodeForSVM(
256+
t *testing.T,
257+
blkNum uint64, numTxs, valueSize int,
216258
) (inputTxBatch, expectedValidatedTxs dependencygraph.TxNodeBatch) {
259+
t.Helper()
260+
261+
b := make([]byte, valueSize)
262+
_, err := rand.Read(b)
263+
require.NoError(t, err)
264+
265+
ns := []*protoblocktx.TxNamespace{
266+
{
267+
BlindWrites: []*protoblocktx.Write{
268+
{
269+
Value: b,
270+
},
271+
},
272+
},
273+
}
217274
for i := range numTxs {
218275
txNode := &dependencygraph.TransactionNode{
219276
Tx: &protovcservice.Transaction{
220277
BlockNumber: blkNum,
221278
TxNum: uint32(i), //nolint:gosec
279+
Namespaces: ns,
222280
},
223281
}
224282

@@ -234,6 +292,7 @@ func createTxNodeBatchForTest(
234292
Tx: &protovcservice.Transaction{
235293
BlockNumber: txNode.Tx.BlockNumber,
236294
TxNum: txNode.Tx.TxNum,
295+
Namespaces: txNode.Tx.Namespaces,
237296
PrelimInvalidTxStatus: sigInvalidTxStatus,
238297
},
239298
}

service/coordinator/validator_committer_manager.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func (vc *validatorCommitter) sendTransactionsToVCService(
247247
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
248248
inputTxsNode channel.Reader[dependencygraph.TxNodeBatch],
249249
) error {
250+
firstBatch := true
250251
for {
251252
txsNode, ok := inputTxsNode.Read()
252253
if !ok {
@@ -260,17 +261,54 @@ func (vc *validatorCommitter) sendTransactionsToVCService(
260261
txBatch[i] = txNode.Tx
261262
}
262263

263-
err := stream.Send(&protovcservice.TransactionBatch{
264+
// NOTE: This following logic is duplicated from the signature-verifier-manager.
265+
// This could be resolved by introducing interfaces or passing functions as arguments,
266+
// but that approach seems like overkill for now. We can refactor this into a more
267+
// general solution if the need arises in other services.
268+
269+
if firstBatch {
270+
if err := splitAndSendToVC(stream, txBatch); err != nil {
271+
return errors.Wrap(err, "send to stream ended with error")
272+
}
273+
firstBatch = false
274+
continue
275+
}
276+
277+
if err := stream.Send(&protovcservice.TransactionBatch{
264278
Transactions: txBatch,
265-
})
266-
if err != nil {
267-
// The stream ended or the VCM was closed.
268-
return errors.Wrap(err, "receive from stream ended with error")
279+
}); err != nil {
280+
return errors.Wrap(err, "send to stream ended with error")
269281
}
270282
logger.Debugf("TX node contains %d TXs, and was sent to a vcservice", len(txBatch))
271283
}
272284
}
273285

286+
func splitAndSendToVC(
287+
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
288+
txBatch []*protovcservice.Transaction,
289+
) error {
290+
blkToBatch := make(map[uint64]*protovcservice.TransactionBatch)
291+
for _, tx := range txBatch {
292+
rBatch, ok := blkToBatch[tx.BlockNumber]
293+
if !ok {
294+
rBatch = &protovcservice.TransactionBatch{
295+
Transactions: make([]*protovcservice.Transaction, 0, len(txBatch)),
296+
}
297+
blkToBatch[tx.BlockNumber] = rBatch
298+
}
299+
300+
rBatch.Transactions = append(rBatch.Transactions, tx)
301+
}
302+
303+
for _, rBatch := range blkToBatch {
304+
if err := stream.Send(rBatch); err != nil {
305+
return err
306+
}
307+
}
308+
309+
return nil
310+
}
311+
274312
func (vc *validatorCommitter) receiveStatusAndForwardToOutput(
275313
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
276314
outputTxsNode channel.Writer[dependencygraph.TxNodeBatch],

service/coordinator/validator_committer_manager_test.go

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ package coordinator
88

99
import (
1010
"context"
11+
"crypto/rand"
1112
"maps"
12-
"strconv"
1313
"testing"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819
"google.golang.org/protobuf/proto"
@@ -111,7 +112,7 @@ func TestValidatorCommitterManager(t *testing.T) {
111112
t.Run("Send tx batch to use any vcservice", func(t *testing.T) {
112113
t.Parallel()
113114
env := newVcMgrTestEnv(t, 2)
114-
txBatch, expectedTxsStatus := createInputTxsNodeForTest(5, 1, 1)
115+
txBatch, expectedTxsStatus := createInputTxsNodeForVCM(t, 1, 5, 0)
115116
env.inputTxs <- txBatch
116117

117118
outTxs := <-env.outputTxs
@@ -125,14 +126,16 @@ func TestValidatorCommitterManager(t *testing.T) {
125126
t, 5, env.validatorCommitterManager.config.metrics.vcserviceTransactionProcessedTotal,
126127
2*time.Second, 100*time.Millisecond,
127128
)
129+
128130
ensureZeroWaitingTxs(env)
129131
})
130132

131133
t.Run("send batches to ensure all vcservices are used", func(t *testing.T) {
132134
t.Parallel()
133135
env := newVcMgrTestEnv(t, 2)
134-
txBatch1, expectedTxsStatus1 := createInputTxsNodeForTest(5, 1, 2)
135-
txBatch2, expectedTxsStatus2 := createInputTxsNodeForTest(5, 6, 3)
136+
137+
txBatch1, expectedTxsStatus1 := createInputTxsNodeForVCM(t, 2, 5, 0)
138+
txBatch2, expectedTxsStatus2 := createInputTxsNodeForVCM(t, 3, 5, 0)
136139

137140
require.Eventually(t, func() bool {
138141
env.inputTxs <- txBatch1
@@ -281,7 +284,7 @@ func TestValidatorCommitterManagerRecovery(t *testing.T) {
281284
env.requireRetriedTxsTotal(t, 0)
282285

283286
numTxs := 10
284-
txBatch, expectedTxsStatus := createInputTxsNodeForTest(numTxs, 0, 0)
287+
txBatch, expectedTxsStatus := createInputTxsNodeForVCM(t, 0, numTxs, 0)
285288
env.inputTxs <- txBatch
286289

287290
require.Eventually(t, func() bool {
@@ -323,21 +326,84 @@ func TestValidatorCommitterManagerRecovery(t *testing.T) {
323326
}, 2*time.Second, 1*time.Second)
324327
}
325328

326-
func createInputTxsNodeForTest(numTxs, startIndex int, blkNum uint64) (
327-
[]*dependencygraph.TransactionNode, *protoblocktx.TransactionsStatus,
329+
func TestValidatorCommitterManagerWithLargeSize(t *testing.T) {
330+
t.Parallel()
331+
332+
env := newVcMgrTestEnv(t, 2)
333+
txBatch, expectedTxsStatus := createInputTxsNodeForVCM(t, 1, 5, 0)
334+
env.inputTxs <- txBatch
335+
336+
outTxs := <-env.outputTxs
337+
require.ElementsMatch(t, txBatch, outTxs)
338+
339+
outTxsStatus := <-env.outputTxsStatus
340+
require.Equal(t, expectedTxsStatus.Status, outTxsStatus.Status)
341+
342+
totalBlocks := 3
343+
txPerBlock := 50
344+
txBatches := make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
345+
expectedTxsStatus = &protoblocktx.TransactionsStatus{Status: make(map[string]*protoblocktx.StatusWithHeight)}
346+
347+
for i := range totalBlocks {
348+
//nolint:gosec // int -> int64
349+
txBatch, txStatus := createInputTxsNodeForVCM(t, uint64(i+2), txPerBlock, 1024*1024)
350+
txBatches = append(txBatches, txBatch...)
351+
maps.Copy(expectedTxsStatus.Status, txStatus.Status)
352+
}
353+
354+
env.inputTxs <- txBatches
355+
356+
// txBatch would be split into three parts, one per block.
357+
outTxs = nil
358+
for range 3 {
359+
outTxs = append(outTxs, <-env.outputTxs...)
360+
}
361+
require.ElementsMatch(t, txBatches, outTxs)
362+
363+
outTxsStatus.Status = make(map[string]*protoblocktx.StatusWithHeight)
364+
for range totalBlocks {
365+
s := <-env.outputTxsStatus
366+
maps.Copy(outTxsStatus.Status, s.Status)
367+
}
368+
require.Equal(t, expectedTxsStatus.Status, outTxsStatus.Status)
369+
370+
test.EventuallyIntMetric(
371+
t, 5+totalBlocks*txPerBlock,
372+
env.validatorCommitterManager.config.metrics.vcserviceTransactionProcessedTotal,
373+
2*time.Second, 100*time.Millisecond,
374+
)
375+
}
376+
377+
func createInputTxsNodeForVCM(t *testing.T, blkNum uint64, numTxs, valueSize int) (
378+
dependencygraph.TxNodeBatch, *protoblocktx.TransactionsStatus,
328379
) {
380+
t.Helper()
381+
329382
txsNode := make([]*dependencygraph.TransactionNode, numTxs)
330383
expectedTxsStatus := &protoblocktx.TransactionsStatus{
331384
Status: make(map[string]*protoblocktx.StatusWithHeight),
332385
}
333386

387+
b := make([]byte, valueSize)
388+
_, err := rand.Read(b)
389+
require.NoError(t, err)
390+
334391
for i := range numTxs {
335-
id := "tx" + strconv.Itoa(startIndex+i)
392+
id := uuid.NewString()
336393
txsNode[i] = &dependencygraph.TransactionNode{
337394
Tx: &protovcservice.Transaction{
338395
ID: id,
339396
BlockNumber: blkNum,
340397
TxNum: uint32(i), //nolint:gosec
398+
Namespaces: []*protoblocktx.TxNamespace{
399+
{
400+
BlindWrites: []*protoblocktx.Write{
401+
{
402+
Value: b,
403+
},
404+
},
405+
},
406+
},
341407
},
342408
}
343409
expectedTxsStatus.Status[id] = types.CreateStatusWithHeight(protoblocktx.Status_COMMITTED, blkNum, i)

0 commit comments

Comments
 (0)