Skip to content

Commit ae7ed1f

Browse files
committed
fix create envelope seek info for bft blocksprovider
Signed-off-by: Fedor Partanskiy <fredprtnsk@gmail.com>
1 parent 5fb3292 commit ae7ed1f

File tree

2 files changed

+42
-47
lines changed

2 files changed

+42
-47
lines changed

internal/pkg/peer/bftblocksprovider/blocksprovider.go

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,7 @@ func (d *Deliverer) assignReceivers() (int, error) {
282282
}
283283

284284
if d.blockReceiver == nil {
285-
seekInfoEnv, err := d.createSeekInfo(d.nextBlockNumber, false)
286-
if err != nil {
287-
d.Logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
288-
d.Cancel()
289-
290-
return numEP, err
291-
}
285+
seekInfoEnvFunc := d.createSeekInfo(d.nextBlockNumber, false)
292286

293287
d.blockReceiverIndex = (d.blockReceiverIndex + 1) % numEP
294288
ep := d.Endpoints[d.blockReceiverIndex]
@@ -315,7 +309,7 @@ func (d *Deliverer) assignReceivers() (int, error) {
315309
false,
316310
d.workBlockReceiver(d.chBlockReceiver),
317311
d.endBlockReceiver(d.chBlockReceiver),
318-
seekInfoEnv,
312+
seekInfoEnvFunc,
319313
)
320314

321315
go d.blockReceiver.DeliverBlocks()
@@ -340,13 +334,7 @@ func (d *Deliverer) assignReceivers() (int, error) {
340334
hRcvToCreate = append(hRcvToCreate, ep)
341335
}
342336

343-
seekInfoEnv, err := d.createSeekInfo(d.nextBlockNumber, true)
344-
if err != nil {
345-
d.Logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
346-
d.Cancel()
347-
348-
return numEP, err
349-
}
337+
seekInfoEnvFunc := d.createSeekInfo(d.nextBlockNumber, true)
350338

351339
for _, ep := range hRcvToCreate {
352340
ch := make(chan *common.Block, 10)
@@ -366,7 +354,7 @@ func (d *Deliverer) assignReceivers() (int, error) {
366354
true,
367355
d.workHeadReceiver(ch),
368356
d.endBlockReceiver(ch),
369-
seekInfoEnv,
357+
seekInfoEnvFunc,
370358
)
371359

372360
d.headerReceivers[ep.Address] = &header{
@@ -381,39 +369,40 @@ func (d *Deliverer) assignReceivers() (int, error) {
381369
return numEP, nil
382370
}
383371

384-
func (d *Deliverer) createSeekInfo(ledgerHeight uint64, workHeader bool) (*common.Envelope, error) {
385-
ct := orderer.SeekInfo_BLOCK
386-
387-
if workHeader {
388-
ct = orderer.SeekInfo_HEADER_WITH_SIG
389-
}
372+
func (d *Deliverer) createSeekInfo(ledgerHeight uint64, workHeader bool) func() (*common.Envelope, error) {
373+
return func() (*common.Envelope, error) {
374+
ct := orderer.SeekInfo_BLOCK
375+
if workHeader {
376+
ct = orderer.SeekInfo_HEADER_WITH_SIG
377+
}
390378

391-
return protoutil.CreateSignedEnvelopeWithTLSBinding(
392-
common.HeaderType_DELIVER_SEEK_INFO,
393-
d.ChainID,
394-
d.Signer,
395-
&orderer.SeekInfo{
396-
Start: &orderer.SeekPosition{
397-
Type: &orderer.SeekPosition_Specified{
398-
Specified: &orderer.SeekSpecified{
399-
Number: ledgerHeight,
379+
return protoutil.CreateSignedEnvelopeWithTLSBinding(
380+
common.HeaderType_DELIVER_SEEK_INFO,
381+
d.ChainID,
382+
d.Signer,
383+
&orderer.SeekInfo{
384+
Start: &orderer.SeekPosition{
385+
Type: &orderer.SeekPosition_Specified{
386+
Specified: &orderer.SeekSpecified{
387+
Number: ledgerHeight,
388+
},
400389
},
401390
},
402-
},
403-
Stop: &orderer.SeekPosition{
404-
Type: &orderer.SeekPosition_Specified{
405-
Specified: &orderer.SeekSpecified{
406-
Number: math.MaxUint64,
391+
Stop: &orderer.SeekPosition{
392+
Type: &orderer.SeekPosition_Specified{
393+
Specified: &orderer.SeekSpecified{
394+
Number: math.MaxUint64,
395+
},
407396
},
408397
},
398+
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
399+
ContentType: ct,
409400
},
410-
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
411-
ContentType: ct,
412-
},
413-
int32(0),
414-
uint64(0),
415-
d.TLSCertHash,
416-
)
401+
int32(0),
402+
uint64(0),
403+
d.TLSCertHash,
404+
)
405+
}
417406
}
418407

419408
func (d *Deliverer) launchHeaderReceivers() {

internal/pkg/peer/bftblocksprovider/datareciver.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type UnitDeliver struct {
3333
workFunc func(ctx context.Context, block *common.Block)
3434
endFunc func()
3535
stop atomic.Int32
36-
seekInfoEnv *common.Envelope
36+
seekInfoEnvFunc func() (*common.Envelope, error)
3737
}
3838

3939
func NewUnitDeliver(
@@ -51,7 +51,7 @@ func NewUnitDeliver(
5151
workHeader bool,
5252
workFunc func(ctx context.Context, block *common.Block),
5353
endFunc func(),
54-
seekInfoEnv *common.Envelope,
54+
seekInfoEnvFunc func() (*common.Envelope, error),
5555
) *UnitDeliver {
5656
ctx, cancel := context.WithCancel(ctx)
5757

@@ -73,7 +73,7 @@ func NewUnitDeliver(
7373
endFunc: endFunc,
7474
logger: flogging.MustGetLogger("peer.bftblocksprovider").
7575
With("channel", channelID, "orderer-address", endpoint.Address, "work-header", workHeader),
76-
seekInfoEnv: seekInfoEnv,
76+
seekInfoEnvFunc: seekInfoEnvFunc,
7777
}
7878

7979
return u
@@ -105,7 +105,13 @@ func (u *UnitDeliver) DeliverBlocks() {
105105
u.sleep(sleepDuration)
106106
}
107107

108-
deliverClient, cancel, err := u.connect(u.seekInfoEnv)
108+
seekInfoEnv, err := u.seekInfoEnvFunc()
109+
if err != nil {
110+
u.logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
111+
failureCounter++
112+
continue
113+
}
114+
deliverClient, cancel, err := u.connect(seekInfoEnv)
109115
if err != nil {
110116
u.logger.Warningf("Could not connect to ordering service: %v", err)
111117
failureCounter++

0 commit comments

Comments
 (0)