@@ -10,6 +10,7 @@ import (
10
10
"bytes"
11
11
"context"
12
12
"encoding/base64"
13
+ "fmt"
13
14
"strconv"
14
15
"strings"
15
16
"sync"
@@ -27,6 +28,8 @@ import (
27
28
"go.uber.org/zap/zapcore"
28
29
)
29
30
31
+ type NewChaincodeDiscoverFunc = func (chaincode * Chaincode ) driver.ChaincodeDiscover
32
+
30
33
type Invoke struct {
31
34
Chaincode * Chaincode
32
35
TxID driver.TxID
@@ -46,16 +49,20 @@ type Invoke struct {
46
49
NumRetries int
47
50
RetrySleep time.Duration
48
51
Context context.Context
52
+ QueryPolicy driver.QueryPolicy
53
+ NewChaincodeDiscover NewChaincodeDiscoverFunc
49
54
}
50
55
51
- func NewInvoke (chaincode * Chaincode , function string , args ... interface {}) * Invoke {
56
+ func NewInvoke (chaincode * Chaincode , newChaincodeDiscover NewChaincodeDiscoverFunc , function string , args ... interface {}) * Invoke {
52
57
return & Invoke {
53
- Chaincode : chaincode ,
54
- ChaincodeName : chaincode .name ,
55
- Function : function ,
56
- Args : args ,
57
- NumRetries : int (chaincode .NumRetries ),
58
- RetrySleep : chaincode .RetrySleep ,
58
+ Chaincode : chaincode ,
59
+
60
+ ChaincodeName : chaincode .name ,
61
+ Function : function ,
62
+ Args : args ,
63
+ NumRetries : int (chaincode .NumRetries ),
64
+ RetrySleep : chaincode .RetrySleep ,
65
+ NewChaincodeDiscover : newChaincodeDiscover ,
59
66
}
60
67
}
61
68
@@ -257,6 +264,11 @@ func (i *Invoke) WithRetrySleep(duration time.Duration) driver.ChaincodeInvocati
257
264
return i
258
265
}
259
266
267
+ func (i * Invoke ) WithQueryPolicy (policy driver.QueryPolicy ) driver.ChaincodeInvocation {
268
+ i .QueryPolicy = policy
269
+ return i
270
+ }
271
+
260
272
func (i * Invoke ) prepare (query bool ) (string , * pb.Proposal , []* pb.ProposalResponse , driver.SigningIdentity , error ) {
261
273
var peerClients []services.PeerClient
262
274
defer func () {
@@ -297,7 +309,7 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
297
309
298
310
// discover
299
311
var err error
300
- discovery := NewDiscovery (
312
+ discovery := i . NewChaincodeDiscover (
301
313
i .Chaincode ,
302
314
)
303
315
discovery .WithFilterByMSPIDs (
@@ -327,7 +339,9 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
327
339
}
328
340
}
329
341
330
- // get a peer client for all discovered peers
342
+ n := len (discoveredPeers )
343
+ // get a peer client for all discovered peers and collect the errors
344
+ var errs []error
331
345
for _ , peer := range discoveredPeers {
332
346
peerClient , err := i .Chaincode .Services .NewPeerClient (grpc.ConnectionConfig {
333
347
Address : peer .Endpoint ,
@@ -336,19 +350,28 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
336
350
TLSRootCertBytes : peer .TLSRootCerts ,
337
351
})
338
352
if err != nil {
339
- return "" , nil , nil , nil , errors .WithMessagef (err , "error getting endorser client for %s" , peer .Endpoint )
353
+ errs = append (errs , errors .WithMessagef (err , "error getting endorser client for %s" , peer .Endpoint ))
354
+ continue
340
355
}
341
356
peerClients = append (peerClients , peerClient )
342
357
}
358
+ if err := i .checkQueryPolicy (errs , len (peerClients ), n ); err != nil {
359
+ return "" , nil , nil , nil , errors .WithMessagef (err , "cannot match query policy with the given discovered peers" )
360
+ }
343
361
344
362
// get endorser clients
363
+ errs = nil
345
364
for _ , client := range peerClients {
346
365
endorserClient , err := client .EndorserClient ()
347
366
if err != nil {
348
- return "" , nil , nil , nil , errors .WithMessagef (err , "error getting endorser client for %s" , client .Address ())
367
+ errs = append (errs , errors .WithMessagef (err , "error getting endorser client for %s" , client .Address ()))
368
+ continue
349
369
}
350
370
endorserClients = append (endorserClients , endorserClient )
351
371
}
372
+ if err := i .checkQueryPolicy (errs , len (endorserClients ), n ); err != nil {
373
+ return "" , nil , nil , nil , errors .WithMessagef (err , "cannot match query policy with the given peer clients" )
374
+ }
352
375
if len (endorserClients ) == 0 {
353
376
return "" , nil , nil , nil , errors .New ("no endorser clients retrieved with the current filters" )
354
377
}
@@ -366,15 +389,17 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
366
389
}
367
390
368
391
// collect responses
369
- responses , err := i .collectResponses (endorserClients , signedProp )
392
+ responses , errs := i .collectResponses (endorserClients , signedProp )
370
393
if err != nil {
371
394
return "" , nil , nil , nil , errors .Wrapf (err , "failed collecting proposal responses" )
372
395
}
373
-
374
396
if len (responses ) == 0 {
375
397
// this should only happen if some new code has introduced a bug
376
398
return "" , nil , nil , nil , errors .New ("no proposal responses received - this might indicate a bug" )
377
399
}
400
+ if err := i .checkQueryPolicy (errs , len (responses ), n ); err != nil {
401
+ return "" , nil , nil , nil , errors .WithMessagef (err , "cannot match query policy with the given peer clients" )
402
+ }
378
403
379
404
return txID , prop , responses , signer , nil
380
405
}
@@ -440,12 +465,12 @@ func (i *Invoke) createChaincodeProposalWithTxIDAndTransient(typ common.HeaderTy
440
465
}
441
466
442
467
// collectResponses sends a signed proposal to a set of peers, and gathers all the responses.
443
- func (i * Invoke ) collectResponses (endorserClients []pb.EndorserClient , signedProposal * pb.SignedProposal ) ([]* pb.ProposalResponse , error ) {
468
+ func (i * Invoke ) collectResponses (endorserClients []pb.EndorserClient , signedProposal * pb.SignedProposal ) ([]* pb.ProposalResponse , [] error ) {
444
469
responsesCh := make (chan * pb.ProposalResponse , len (endorserClients ))
445
470
errorCh := make (chan error , len (endorserClients ))
446
471
wg := sync.WaitGroup {}
472
+ wg .Add (len (endorserClients ))
447
473
for _ , endorser := range endorserClients {
448
- wg .Add (1 )
449
474
go func (endorser pb.EndorserClient ) {
450
475
defer wg .Done ()
451
476
proposalResp , err := endorser .ProcessProposal (context .Background (), signedProposal )
@@ -459,14 +484,15 @@ func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedPro
459
484
wg .Wait ()
460
485
close (responsesCh )
461
486
close (errorCh )
487
+ var errs []error
462
488
for err := range errorCh {
463
- return nil , err
489
+ errs = append ( errs , err )
464
490
}
465
491
var responses []* pb.ProposalResponse
466
492
for response := range responsesCh {
467
493
responses = append (responses , response )
468
494
}
469
- return responses , nil
495
+ return responses , errs
470
496
}
471
497
472
498
// getChaincodeSpec get chaincode spec
@@ -529,3 +555,23 @@ func (i *Invoke) broadcast(txID string, env *common.Envelope) error {
529
555
}
530
556
return i .Chaincode .Finality .IsFinal (context .Background (), txID )
531
557
}
558
+
559
+ func (i * Invoke ) checkQueryPolicy (errs []error , successes int , n int ) error {
560
+ switch i .QueryPolicy {
561
+ case driver .QueryAll :
562
+ if len (errs ) != 0 {
563
+ return errors .Errorf ("query all policy, no errors expected [%v]" , errs )
564
+ }
565
+ case driver .QueryOne :
566
+ if successes == 0 {
567
+ return errors .Errorf ("query one policy, errors occurred [%v]" , errs )
568
+ }
569
+ case driver .QueryMajority :
570
+ if successes <= n / 2 {
571
+ return errors .Errorf ("query majority policy, no majority reached [%v]" , errs )
572
+ }
573
+ default :
574
+ panic (fmt .Sprintf ("programming error, policy [%d] is not valid" , i .QueryPolicy ))
575
+ }
576
+ return nil
577
+ }
0 commit comments