Skip to content

Commit b9d0f08

Browse files
committed
fix nodes panic during synchronization
Signed-off-by: Fedor Partanskiy <fedor.partanskiy@atme.com>
1 parent 0501b0d commit b9d0f08

File tree

5 files changed

+65
-17
lines changed

5 files changed

+65
-17
lines changed

orderer/consensus/smartbft/chain.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package smartbft
88

99
import (
10+
"context"
1011
"encoding/base64"
1112
"fmt"
1213
"reflect"
@@ -72,6 +73,7 @@ type BFTChain struct {
7273
Logger *flogging.FabricLogger
7374
WALDir string
7475
consensus *smartbft.Consensus
76+
syncCancel context.CancelFunc
7577
support consensus.ConsenterSupport
7678
ClusterService *cluster.ClusterService
7779
verifier *Verifier
@@ -158,7 +160,9 @@ func NewChain(
158160
c.RuntimeConfig.Store(rtc)
159161

160162
c.verifier = buildVerifier(cv, c.RuntimeConfig, support, requestInspector, policyManager)
161-
c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory)
163+
ctx, cancel := context.WithCancel(context.Background())
164+
c.syncCancel = cancel
165+
c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory, ctx)
162166

163167
// Setup communication with list of remotes notes for the new channel
164168
c.Comm.Configure(c.support.ChannelID(), rtc.RemoteNodes)
@@ -177,6 +181,7 @@ func bftSmartConsensusBuild(
177181
requestInspector *RequestInspector,
178182
egressCommFactory EgressCommFactory,
179183
synchronizerFactory SynchronizerFactory,
184+
syncCtx context.Context,
180185
) *smartbft.Consensus {
181186
var err error
182187

@@ -213,6 +218,7 @@ func bftSmartConsensusBuild(
213218
c.support,
214219
c.bccsp,
215220
c.clusterDialer,
221+
syncCtx,
216222
)
217223

218224
channelDecorator := zap.String("channel", c.support.ChannelID())
@@ -462,6 +468,7 @@ func (c *BFTChain) Start() {
462468
// Halt frees the resources which were allocated for this Chain.
463469
func (c *BFTChain) Halt() {
464470
c.Logger.Infof("Shutting down chain")
471+
c.syncCancel()
465472
c.consensus.Stop()
466473
}
467474

orderer/consensus/smartbft/mocks/synchronizer_factory.go

Lines changed: 14 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orderer/consensus/smartbft/synchronizer.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
77
package smartbft
88

99
import (
10+
"context"
1011
"sort"
12+
"time"
1113

1214
"github.com/hyperledger-labs/SmartBFT/pkg/types"
1315
"github.com/hyperledger-labs/SmartBFT/smartbftprotos"
@@ -34,6 +36,7 @@ type Synchronizer struct {
3436
LocalConfigCluster localconfig.Cluster
3537
BlockPullerFactory BlockPullerFactory
3638
Logger *flogging.FabricLogger
39+
ctx context.Context
3740
}
3841

3942
// Sync synchronizes blocks and returns the response
@@ -141,9 +144,22 @@ func (s *Synchronizer) synchronize() (*types.Decision, error) {
141144
return nil, errors.Errorf("failed pulling block %d", seq)
142145
}
143146

144-
startSeq := startHeight
147+
// wait for the right height
148+
ticker := time.NewTicker(50 * time.Millisecond)
149+
loop:
150+
for {
151+
select {
152+
case <-s.ctx.Done():
153+
break loop
154+
case <-ticker.C:
155+
if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() {
156+
break loop
157+
}
158+
}
159+
}
160+
145161
s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]",
146-
blocksFetched, startSeq, lastPulledBlock.Header.Number)
162+
blocksFetched, startHeight, lastPulledBlock.Header.Number)
147163

148164
viewMetadata, lastConfigSqn := s.getViewMetadataLastConfigSqnFromBlock(lastPulledBlock)
149165

orderer/consensus/smartbft/synchronizer_bft.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package smartbft
88

99
import (
10+
"context"
1011
"sort"
1112
"sync"
1213
"time"
@@ -42,6 +43,7 @@ type BFTSynchronizer struct {
4243

4344
mutex sync.Mutex
4445
syncBuff *SyncBuffer
46+
ctx context.Context
4547
}
4648

4749
func (s *BFTSynchronizer) Sync() types.SyncResponse {
@@ -277,9 +279,22 @@ func (s *BFTSynchronizer) getBlocksFromSyncBuffer(startHeight, targetHeight uint
277279
return nil, errors.Errorf("failed pulling block %d", seq)
278280
}
279281

280-
startSeq := startHeight
282+
// wait for the right height
283+
ticker := time.NewTicker(50 * time.Millisecond)
284+
loop:
285+
for {
286+
select {
287+
case <-s.ctx.Done():
288+
break loop
289+
case <-ticker.C:
290+
if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() {
291+
break loop
292+
}
293+
}
294+
}
295+
281296
s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]",
282-
blocksFetched, startSeq, lastPulledBlock.Header.Number)
297+
blocksFetched, startHeight, lastPulledBlock.Header.Number)
283298

284299
return lastPulledBlock, nil
285300
}

orderer/consensus/smartbft/synchronizer_factory.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
77
package smartbft
88

99
import (
10+
"context"
11+
1012
"github.com/hyperledger-labs/SmartBFT/pkg/api"
1113
"github.com/hyperledger-labs/SmartBFT/pkg/types"
1214
"github.com/hyperledger/fabric-lib-go/bccsp"
@@ -35,6 +37,7 @@ type SynchronizerFactory interface {
3537
support consensus.ConsenterSupport,
3638
bccsp bccsp.BCCSP,
3739
clusterDialer *cluster.PredicateDialer,
40+
syncCtx context.Context,
3841
) api.Synchronizer
3942
}
4043

@@ -50,8 +53,9 @@ func (*synchronizerCreator) CreateSynchronizer(
5053
support consensus.ConsenterSupport,
5154
bccsp bccsp.BCCSP,
5255
clusterDialer *cluster.PredicateDialer,
56+
syncCtx context.Context,
5357
) api.Synchronizer {
54-
return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer)
58+
return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer, syncCtx)
5559
}
5660

5761
// newSynchronizer creates a new synchronizer
@@ -65,6 +69,7 @@ func newSynchronizer(
6569
support consensus.ConsenterSupport,
6670
bccsp bccsp.BCCSP,
6771
clusterDialer *cluster.PredicateDialer,
72+
syncCtx context.Context,
6873
) api.Synchronizer {
6974
switch localConfigCluster.ReplicationPolicy {
7075
case "consensus":
@@ -87,6 +92,7 @@ func newSynchronizer(
8792
VerifierFactory: &verifierCreator{},
8893
BFTDelivererFactory: &bftDelivererCreator{},
8994
Logger: logger,
95+
ctx: syncCtx,
9096
}
9197
case "simple":
9298
logger.Debug("Creating simple Synchronizer")
@@ -106,6 +112,7 @@ func newSynchronizer(
106112
LatestConfig: func() (types.Configuration, []uint64) {
107113
return rtc.BFTConfig, rtc.Nodes
108114
},
115+
ctx: syncCtx,
109116
}
110117
default:
111118
logger.Panicf("Unsupported Cluster.ReplicationPolicy: %s", localConfigCluster.ReplicationPolicy)

0 commit comments

Comments
 (0)