Skip to content

Commit 69ce21f

Browse files
committed
[coordinator] restrict batch size
This commit restrict the batch size sent to verifier and vcservice when it receives resource exhausted status code on grpc. Signed-off-by: Senthil Nathan N <cendhu@gmail.com> Signed-off-by: senthil <cendhu@gmail.com>
1 parent cc092fc commit 69ce21f

7 files changed

Lines changed: 207 additions & 21 deletions

File tree

service/coordinator/signature_verifier_manager.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func (sv *signatureVerifier) sendTransactionsToSVService(
202202
inputTxBatch channel.Reader[dependencygraph.TxNodeBatch],
203203
) error {
204204
var policyVersion uint64
205+
firstBatch := true
205206
for {
206207
txBatch, ctxAlive := inputTxBatch.Read()
207208
if !ctxAlive {
@@ -231,13 +232,62 @@ func (sv *signatureVerifier) sendTransactionsToSVService(
231232
}
232233
}
233234

235+
if firstBatch {
236+
if err := splitAndSendToVerifier(stream, request); err != nil {
237+
return errors.Wrap(err, "send to stream ended with error")
238+
}
239+
firstBatch = false
240+
continue
241+
}
242+
234243
if err := stream.Send(request); err != nil {
235244
return errors.Wrap(err, "send to stream ended with error")
236245
}
237246
logger.Debugf("Batch contains %d TXs, and was stored in the accumulator and sent to a sv", batchSize)
238247
}
239248
}
240249

250+
func splitAndSendToVerifier(
251+
stream protosigverifierservice.Verifier_StartStreamClient,
252+
r *protosigverifierservice.RequestBatch,
253+
) error {
254+
// We group transactions by block to ensure our batch sizes do not exceed the gRPC message limit.
255+
// This strategy prevents RESOURCE_EXHAUSTED errors because the orderer's maximum block size
256+
// will be configured to be safely smaller than the gRPC send/receive limit.
257+
// For added safety, we can split each block's transactions into more batches, but this is deferred for now
258+
// until the orderer implements all sanity checks on the configuration provided in the config block.
259+
// For example, if the orderer can enforce that the maximum block size should be at most half of the
260+
// maximum message size in gRPC, one batch would be adequate.
261+
blkToBatch := make(map[uint64]*protosigverifierservice.RequestBatch)
262+
for _, req := range r.Requests {
263+
rBatch, ok := blkToBatch[req.BlockNum]
264+
if !ok {
265+
rBatch = &protosigverifierservice.RequestBatch{
266+
Requests: make([]*protosigverifierservice.Request, 0, len(r.Requests)),
267+
}
268+
blkToBatch[req.BlockNum] = rBatch
269+
}
270+
271+
rBatch.Requests = append(rBatch.Requests, req)
272+
}
273+
274+
updateSent := false
275+
for _, rBatch := range blkToBatch {
276+
if !updateSent {
277+
rBatch.Update = r.Update
278+
updateSent = false
279+
}
280+
281+
if err := stream.Send(rBatch); err != nil {
282+
// ResourceExhausted should not occur here, as we have split a block's transactions
283+
// into two batches, assuming the block size is less than the maximum gRPC message size.
284+
return err
285+
}
286+
}
287+
288+
return nil
289+
}
290+
241291
func (sv *signatureVerifier) receiveStatusAndForwardToOutput(
242292
stream protosigverifierservice.Verifier_StartStreamClient,
243293
outputValidatedTxs channel.Writer[dependencygraph.TxNodeBatch],

service/coordinator/signature_verifier_manager_test.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package coordinator
88

99
import (
1010
"context"
11+
"crypto/rand"
1112
"sync/atomic"
1213
"testing"
1314
"time"
@@ -87,7 +88,7 @@ func newSvMgrTestEnv(t *testing.T, numSvService int, expectedEndErrorMsg ...byte
8788
func (e *svMgrTestEnv) submitTxBatch(t *testing.T, numTxs int) dependencygraph.TxNodeBatch {
8889
t.Helper()
8990
blkNum := e.curBlockNum.Add(1) - 1
90-
txBatch, expectedValidatedTxs := createTxNodeBatchForTest(t, blkNum, numTxs)
91+
txBatch, expectedValidatedTxs := createTxNodeBatchForTest(t, blkNum, numTxs, 0)
9192
channel.NewWriter(t.Context(), e.inputTxBatch).Write(txBatch)
9293
return expectedValidatedTxs
9394
}
@@ -146,6 +147,36 @@ func TestSignatureVerifierManagerWithSingleVerifier(t *testing.T) {
146147
)
147148
}
148149

150+
func TestSignatureVerifierManagerWithLargeSize(t *testing.T) {
151+
t.Parallel()
152+
env := newSvMgrTestEnv(t, 1)
153+
154+
expectedValidatedTxs := env.submitTxBatch(t, 1)
155+
env.requireTxBatch(t, expectedValidatedTxs)
156+
157+
totalBlocks := 3
158+
txPerBlock := 50
159+
txBatches := make([]dependencygraph.TxNodeBatch, totalBlocks)
160+
expectedValidatedTxsBatches := make([]dependencygraph.TxNodeBatch, totalBlocks)
161+
for i := range 3 {
162+
//nolint:gosec // int -> uint64
163+
txBatches[i], expectedValidatedTxsBatches[i] = createTxNodeBatchForTest(t, uint64(i+1), txPerBlock, 1024*1024)
164+
}
165+
166+
txsBatch := make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
167+
for _, b := range txBatches {
168+
txsBatch = append(txsBatch, b...)
169+
}
170+
171+
channel.NewWriter(t.Context(), env.inputTxBatch).Write(txsBatch)
172+
// env.requireTxBatch(t, expectedValidatedTxs)
173+
174+
test.EventuallyIntMetric(
175+
t, totalBlocks*txPerBlock+1, env.signVerifierManager.config.metrics.sigverifierTransactionProcessedTotal,
176+
30*time.Second, 10*time.Millisecond,
177+
)
178+
}
179+
149180
func TestSignatureVerifierManagerWithMultipleVerifiers(t *testing.T) {
150181
t.Parallel()
151182
env := newSvMgrTestEnv(t, 2)
@@ -206,19 +237,37 @@ func TestSignatureVerifierWithAllInvalidTxs(t *testing.T) {
206237
}
207238

208239
env := newSvMgrTestEnv(t, 1)
240+
env.requireTxBatch(t, env.submitTxBatch(t, 1))
241+
209242
channel.NewWriter(t.Context(), env.inputTxBatch).Write(txBatch)
210243
env.requireTxBatch(t, expectedValidatedTxs)
211244
}
212245

213246
func createTxNodeBatchForTest(
214-
_ *testing.T,
215-
blkNum uint64, numTxs int,
247+
t *testing.T,
248+
blkNum uint64, numTxs, valueSize int,
216249
) (inputTxBatch, expectedValidatedTxs dependencygraph.TxNodeBatch) {
250+
t.Helper()
251+
252+
b := make([]byte, valueSize)
253+
_, err := rand.Read(b)
254+
require.NoError(t, err)
255+
256+
ns := []*protoblocktx.TxNamespace{
257+
{
258+
BlindWrites: []*protoblocktx.Write{
259+
{
260+
Value: b,
261+
},
262+
},
263+
},
264+
}
217265
for i := range numTxs {
218266
txNode := &dependencygraph.TransactionNode{
219267
Tx: &protovcservice.Transaction{
220268
BlockNumber: blkNum,
221269
TxNum: uint32(i), //nolint:gosec
270+
Namespaces: ns,
222271
},
223272
}
224273

@@ -234,6 +283,7 @@ func createTxNodeBatchForTest(
234283
Tx: &protovcservice.Transaction{
235284
BlockNumber: txNode.Tx.BlockNumber,
236285
TxNum: txNode.Tx.TxNum,
286+
Namespaces: ns,
237287
PrelimInvalidTxStatus: sigInvalidTxStatus,
238288
},
239289
}

service/coordinator/validator_committer_manager.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func (vc *validatorCommitter) sendTransactionsToVCService(
247247
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
248248
inputTxsNode channel.Reader[dependencygraph.TxNodeBatch],
249249
) error {
250+
firstBatch := true
250251
for {
251252
txsNode, ok := inputTxsNode.Read()
252253
if !ok {
@@ -260,17 +261,49 @@ func (vc *validatorCommitter) sendTransactionsToVCService(
260261
txBatch[i] = txNode.Tx
261262
}
262263

263-
err := stream.Send(&protovcservice.TransactionBatch{
264+
if firstBatch {
265+
if err := splitAndSendToVC(stream, txBatch); err != nil {
266+
return errors.Wrap(err, "send to stream ended with error")
267+
}
268+
firstBatch = false
269+
continue
270+
}
271+
272+
if err := stream.Send(&protovcservice.TransactionBatch{
264273
Transactions: txBatch,
265-
})
266-
if err != nil {
267-
// The stream ended or the VCM was closed.
268-
return errors.Wrap(err, "receive from stream ended with error")
274+
}); err != nil {
275+
return errors.Wrap(err, "send to stream ended with error")
269276
}
270277
logger.Debugf("TX node contains %d TXs, and was sent to a vcservice", len(txBatch))
271278
}
272279
}
273280

281+
func splitAndSendToVC(
282+
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
283+
txBatch []*protovcservice.Transaction,
284+
) error {
285+
blkToBatch := make(map[uint64]*protovcservice.TransactionBatch)
286+
for _, tx := range txBatch {
287+
rBatch, ok := blkToBatch[tx.BlockNumber]
288+
if !ok {
289+
rBatch = &protovcservice.TransactionBatch{
290+
Transactions: make([]*protovcservice.Transaction, 0, len(txBatch)),
291+
}
292+
blkToBatch[tx.BlockNumber] = rBatch
293+
}
294+
295+
rBatch.Transactions = append(rBatch.Transactions, tx)
296+
}
297+
298+
for _, rBatch := range blkToBatch {
299+
if err := stream.Send(rBatch); err != nil {
300+
return err
301+
}
302+
}
303+
304+
return nil
305+
}
306+
274307
func (vc *validatorCommitter) receiveStatusAndForwardToOutput(
275308
stream protovcservice.ValidationAndCommitService_StartValidateAndCommitStreamClient,
276309
outputTxsNode channel.Writer[dependencygraph.TxNodeBatch],

service/coordinator/validator_committer_manager_test.go

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ package coordinator
88

99
import (
1010
"context"
11+
"crypto/rand"
1112
"maps"
12-
"strconv"
1313
"testing"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819
"google.golang.org/protobuf/proto"
@@ -99,7 +100,7 @@ func (e *vcMgrTestEnv) requireRetriedTxsTotal(t *testing.T, expectedRetriedTxsTo
99100
)
100101
}
101102

102-
func TestValidatorCommitterManager(t *testing.T) {
103+
func TestValidatorCommitterManagerX(t *testing.T) {
103104
t.Parallel()
104105

105106
ensureZeroWaitingTxs := func(env *vcMgrTestEnv) {
@@ -108,10 +109,10 @@ func TestValidatorCommitterManager(t *testing.T) {
108109
}
109110
}
110111

111-
t.Run("Send tx batch to use any vcservice", func(t *testing.T) {
112+
t.Run("Send tx batch to use any vcservice and send a batch with larger size", func(t *testing.T) {
112113
t.Parallel()
113114
env := newVcMgrTestEnv(t, 2)
114-
txBatch, expectedTxsStatus := createInputTxsNodeForTest(5, 1, 1)
115+
txBatch, expectedTxsStatus := createInputTxsNodeForTest(t, 5, 0, 1)
115116
env.inputTxs <- txBatch
116117

117118
outTxs := <-env.outputTxs
@@ -125,14 +126,49 @@ func TestValidatorCommitterManager(t *testing.T) {
125126
t, 5, env.validatorCommitterManager.config.metrics.vcserviceTransactionProcessedTotal,
126127
2*time.Second, 100*time.Millisecond,
127128
)
129+
130+
totalBlocks := 3
131+
txPerBlock := 50
132+
txBatches := make(dependencygraph.TxNodeBatch, 0, totalBlocks*txPerBlock)
133+
expectedTxsStatus = &protoblocktx.TransactionsStatus{Status: make(map[string]*protoblocktx.StatusWithHeight)}
134+
135+
for i := range 3 {
136+
//nolint:gosec // int -> int64
137+
txBatch, txStatus := createInputTxsNodeForTest(t, txPerBlock, 1024*1024, uint64(i+2))
138+
txBatches = append(txBatches, txBatch...)
139+
maps.Copy(expectedTxsStatus.Status, txStatus.Status)
140+
}
141+
142+
env.inputTxs <- txBatches
143+
144+
// txBatch would be split into three parts, one per block.
145+
outTxs = <-env.outputTxs
146+
outTxs = append(outTxs, <-env.outputTxs...)
147+
outTxs = append(outTxs, <-env.outputTxs...)
148+
require.ElementsMatch(t, txBatches, outTxs)
149+
150+
outTxsStatus = <-env.outputTxsStatus
151+
status := <-env.outputTxsStatus
152+
maps.Copy(outTxsStatus.Status, status.Status)
153+
status = <-env.outputTxsStatus
154+
maps.Copy(outTxsStatus.Status, status.Status)
155+
require.Equal(t, expectedTxsStatus.Status, outTxsStatus.Status)
156+
157+
test.EventuallyIntMetric(
158+
t, 5+totalBlocks*txPerBlock,
159+
env.validatorCommitterManager.config.metrics.vcserviceTransactionProcessedTotal,
160+
2*time.Second, 100*time.Millisecond,
161+
)
162+
128163
ensureZeroWaitingTxs(env)
129164
})
130165

131166
t.Run("send batches to ensure all vcservices are used", func(t *testing.T) {
132167
t.Parallel()
133168
env := newVcMgrTestEnv(t, 2)
134-
txBatch1, expectedTxsStatus1 := createInputTxsNodeForTest(5, 1, 2)
135-
txBatch2, expectedTxsStatus2 := createInputTxsNodeForTest(5, 6, 3)
169+
170+
txBatch1, expectedTxsStatus1 := createInputTxsNodeForTest(t, 5, 0, 2)
171+
txBatch2, expectedTxsStatus2 := createInputTxsNodeForTest(t, 5, 0, 3)
136172

137173
require.Eventually(t, func() bool {
138174
env.inputTxs <- txBatch1
@@ -281,7 +317,7 @@ func TestValidatorCommitterManagerRecovery(t *testing.T) {
281317
env.requireRetriedTxsTotal(t, 0)
282318

283319
numTxs := 10
284-
txBatch, expectedTxsStatus := createInputTxsNodeForTest(numTxs, 0, 0)
320+
txBatch, expectedTxsStatus := createInputTxsNodeForTest(t, numTxs, 0, 0)
285321
env.inputTxs <- txBatch
286322

287323
require.Eventually(t, func() bool {
@@ -323,21 +359,36 @@ func TestValidatorCommitterManagerRecovery(t *testing.T) {
323359
}, 2*time.Second, 1*time.Second)
324360
}
325361

326-
func createInputTxsNodeForTest(numTxs, startIndex int, blkNum uint64) (
362+
func createInputTxsNodeForTest(t *testing.T, numTxs, valueSize int, blkNum uint64) (
327363
[]*dependencygraph.TransactionNode, *protoblocktx.TransactionsStatus,
328364
) {
365+
t.Helper()
366+
329367
txsNode := make([]*dependencygraph.TransactionNode, numTxs)
330368
expectedTxsStatus := &protoblocktx.TransactionsStatus{
331369
Status: make(map[string]*protoblocktx.StatusWithHeight),
332370
}
333371

372+
b := make([]byte, valueSize)
373+
_, err := rand.Read(b)
374+
require.NoError(t, err)
375+
334376
for i := range numTxs {
335-
id := "tx" + strconv.Itoa(startIndex+i)
377+
id := uuid.NewString()
336378
txsNode[i] = &dependencygraph.TransactionNode{
337379
Tx: &protovcservice.Transaction{
338380
ID: id,
339381
BlockNumber: blkNum,
340382
TxNum: uint32(i), //nolint:gosec
383+
Namespaces: []*protoblocktx.TxNamespace{
384+
{
385+
BlindWrites: []*protoblocktx.Write{
386+
{
387+
Value: b,
388+
},
389+
},
390+
},
391+
},
341392
},
342393
}
343394
expectedTxsStatus.Status[id] = types.CreateStatusWithHeight(protoblocktx.Status_COMMITTED, blkNum, i)

utils/connection/client_util.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ const (
3434

3535
// defaultGrpcMaxAttempts is set to a high number to allow the timeout to dictate the retry end condition.
3636
defaultGrpcMaxAttempts = 1024
37-
maxMsgSize = 100 * 1024 * 1024
38-
scResolverSchema = "sc.connection"
37+
// TODO: All services including the orderer must use the same default maximum message size.
38+
// Hence, we need to move this constant to fabrix-x-common.
39+
maxMsgSize = 100 * 1024 * 1024
40+
scResolverSchema = "sc.connection"
3941
)
4042

4143
type (

0 commit comments

Comments
 (0)