Skip to content

Commit 1e8a007

Browse files
authored
this commit removes as many panic as possible (#441)
Signed-off-by: Angelo De Caro <adc@zurich.ibm.com>
1 parent 2080c0c commit 1e8a007

File tree

9 files changed

+97
-97
lines changed

9 files changed

+97
-97
lines changed

platform/fabric/core/generic/channel.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,9 @@ func newChannel(network *network, name string, quiet bool) (*channel, error) {
125125

126126
// Delivery
127127
deliveryService, err := delivery2.New(name, sp, network, func(block *common.Block) (bool, error) {
128-
if err := committerInst.Commit(block); err != nil {
129-
return true, err
130-
}
131-
return false, nil
128+
// commit the block, if an error occurs then retry
129+
err := committerInst.Commit(block)
130+
return false, err
132131
}, txIDStore, waitForEventTimeout)
133132
if err != nil {
134133
return nil, err

platform/fabric/core/generic/committer/committer.go

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package committer
88

99
import (
1010
"context"
11-
"fmt"
1211
"runtime/debug"
1312
"sync"
1413
"time"
@@ -27,7 +26,7 @@ const (
2726
ConfigTXPrefix = "configtx_"
2827
)
2928

30-
var logger = flogging.MustGetLogger("fabric-sdk.committer")
29+
var logger = flogging.MustGetLogger("fabric-sdk.Committer")
3130

3231
type Metrics interface {
3332
EmitKey(val float32, event ...string)
@@ -43,7 +42,7 @@ type Network interface {
4342
Ledger(channel string) (driver.Ledger, error)
4443
}
4544

46-
type committer struct {
45+
type Committer struct {
4746
channel string
4847
network Network
4948
finality Finality
@@ -58,12 +57,12 @@ type committer struct {
5857
publisher events.Publisher
5958
}
6059

61-
func New(channel string, network Network, finality Finality, waitForEventTimeout time.Duration, quiet bool, metrics Metrics, publisher events.Publisher) (*committer, error) {
60+
func New(channel string, network Network, finality Finality, waitForEventTimeout time.Duration, quiet bool, metrics Metrics, publisher events.Publisher) (*Committer, error) {
6261
if len(channel) == 0 {
63-
panic("expected a channel, got empty string")
62+
return nil, errors.Errorf("expected a channel, got empty string")
6463
}
6564

66-
d := &committer{
65+
d := &Committer{
6766
channel: channel,
6867
network: network,
6968
waitForEventTimeout: waitForEventTimeout,
@@ -79,7 +78,7 @@ func New(channel string, network Network, finality Finality, waitForEventTimeout
7978
}
8079

8180
// Commit commits the transactions in the block passed as argument
82-
func (c *committer) Commit(block *common.Block) error {
81+
func (c *Committer) Commit(block *common.Block) error {
8382
for i, tx := range block.Data.Data {
8483

8584
env, err := protoutil.UnmarshalEnvelope(tx)
@@ -100,27 +99,31 @@ func (c *committer) Commit(block *common.Block) error {
10099

101100
var event TxEvent
102101

103-
c.metrics.EmitKey(0, "committer", "start", "Commit", chdr.TxId)
102+
c.metrics.EmitKey(0, "Committer", "start", "Commit", chdr.TxId)
104103
switch common.HeaderType(chdr.Type) {
105104
case common.HeaderType_CONFIG:
106105
if logger.IsEnabledFor(zapcore.DebugLevel) {
107106
logger.Debugf("[%s] Config transaction received: %s", c.channel, chdr.TxId)
108107
}
109-
c.handleConfig(block, i, env)
108+
if err := c.handleConfig(block, i, env); err != nil {
109+
return err
110+
}
110111
case common.HeaderType_ENDORSER_TRANSACTION:
111112
if logger.IsEnabledFor(zapcore.DebugLevel) {
112113
logger.Debugf("[%s] Endorser transaction received: %s", c.channel, chdr.TxId)
113114
}
114115
if len(block.Metadata.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
115116
return errors.Errorf("block metadata lacks transaction filter")
116117
}
117-
c.handleEndorserTransaction(block, i, &event, env, chdr)
118+
if err := c.handleEndorserTransaction(block, i, &event, env, chdr); err != nil {
119+
return err
120+
}
118121
default:
119122
if logger.IsEnabledFor(zapcore.DebugLevel) {
120123
logger.Debugf("[%s] Received unhandled transaction type: %s", c.channel, chdr.Type)
121124
}
122125
}
123-
c.metrics.EmitKey(0, "committer", "end", "Commit", chdr.TxId)
126+
c.metrics.EmitKey(0, "Committer", "end", "Commit", chdr.TxId)
124127

125128
c.notify(event)
126129

@@ -135,9 +138,9 @@ func (c *committer) Commit(block *common.Block) error {
135138
// IsFinal takes in input a transaction id and waits for its confirmation
136139
// with the respect to the passed context that can be used to set a deadline
137140
// for the waiting time.
138-
func (c *committer) IsFinal(ctx context.Context, txID string) error {
139-
c.metrics.EmitKey(0, "committer", "start", "IsFinal", txID)
140-
defer c.metrics.EmitKey(0, "committer", "end", "IsFinal", txID)
141+
func (c *Committer) IsFinal(ctx context.Context, txID string) error {
142+
c.metrics.EmitKey(0, "Committer", "start", "IsFinal", txID)
143+
defer c.metrics.EmitKey(0, "Committer", "end", "IsFinal", txID)
141144

142145
if logger.IsEnabledFor(zapcore.DebugLevel) {
143146
logger.Debugf("Is [%s] final?", txID)
@@ -217,7 +220,7 @@ func (c *committer) IsFinal(ctx context.Context, txID string) error {
217220
}
218221
time.Sleep(100 * time.Millisecond)
219222
default:
220-
panic(fmt.Sprintf("invalid status code, got %c", vd))
223+
return errors.Errorf("invalid status code, got [%c]", vd)
221224
}
222225
} else {
223226
logger.Errorf("Is [%s] final? Failed getting transaction status from vault", txID)
@@ -229,7 +232,7 @@ func (c *committer) IsFinal(ctx context.Context, txID string) error {
229232
return c.listenTo(ctx, txID, c.waitForEventTimeout)
230233
}
231234

232-
func (c *committer) addListener(txid string, ch chan TxEvent) {
235+
func (c *Committer) addListener(txid string, ch chan TxEvent) {
233236
c.mutex.Lock()
234237
defer c.mutex.Unlock()
235238

@@ -242,7 +245,7 @@ func (c *committer) addListener(txid string, ch chan TxEvent) {
242245
c.listeners[txid] = ls
243246
}
244247

245-
func (c *committer) deleteListener(txid string, ch chan TxEvent) {
248+
func (c *Committer) deleteListener(txid string, ch chan TxEvent) {
246249
c.mutex.Lock()
247250
defer c.mutex.Unlock()
248251

@@ -259,7 +262,7 @@ func (c *committer) deleteListener(txid string, ch chan TxEvent) {
259262
}
260263
}
261264

262-
func (c *committer) notify(event TxEvent) {
265+
func (c *Committer) notify(event TxEvent) {
263266
c.mutex.Lock()
264267
defer c.mutex.Unlock()
265268

@@ -287,14 +290,13 @@ func (c *committer) notify(event TxEvent) {
287290
}
288291

289292
// notifyChaincodeListeners notifies the chaincode event to the registered chaincode listeners.
290-
func (c *committer) notifyChaincodeListeners(event *ChaincodeEvent) error {
293+
func (c *Committer) notifyChaincodeListeners(event *ChaincodeEvent) {
291294
c.publisher.Publish(event)
292-
return nil
293295
}
294296

295-
func (c *committer) listenTo(ctx context.Context, txid string, timeout time.Duration) error {
296-
c.metrics.EmitKey(0, "committer", "start", "listenTo", txid)
297-
defer c.metrics.EmitKey(0, "committer", "end", "listenTo", txid)
297+
func (c *Committer) listenTo(ctx context.Context, txid string, timeout time.Duration) error {
298+
c.metrics.EmitKey(0, "Committer", "start", "listenTo", txid)
299+
defer c.metrics.EmitKey(0, "Committer", "end", "listenTo", txid)
298300

299301
if logger.IsEnabledFor(zapcore.DebugLevel) {
300302
logger.Debugf("Listen to finality of [%s]", txid)

platform/fabric/core/generic/committer/config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ package committer
88

99
import (
1010
"github.com/hyperledger/fabric-protos-go/common"
11+
"github.com/pkg/errors"
1112
)
1213

13-
func (c *committer) handleConfig(block *common.Block, i int, env *common.Envelope) {
14+
func (c *Committer) handleConfig(block *common.Block, i int, env *common.Envelope) error {
1415
committer, err := c.network.Committer(c.channel)
1516
if err != nil {
16-
logger.Panicf("Cannot get Committer [%s]", err)
17+
return errors.Wrapf(err, "cannot get Committer for channel [%s]", c.channel)
1718
}
18-
1919
if err := committer.CommitConfig(block.Header.Number, block.Data.Data[i], env); err != nil {
20-
logger.Panicf("Cannot commit config envelope [%s]", err)
20+
return errors.Wrapf(err, "cannot commit config envelope for channel [%s]", c.channel)
2121
}
22+
return nil
2223
}

platform/fabric/core/generic/committer/endorsertx.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,46 +17,47 @@ import (
1717

1818
type ValidationFlags []uint8
1919

20-
func (c *committer) handleEndorserTransaction(block *common.Block, i int, event *TxEvent, env *common.Envelope, chHdr *common.ChannelHeader) {
20+
func (c *Committer) handleEndorserTransaction(block *common.Block, i int, event *TxEvent, env *common.Envelope, chHdr *common.ChannelHeader) error {
2121
txID := chHdr.TxId
2222
event.Txid = txID
2323

2424
validationCode := pb.TxValidationCode(ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[i])
2525
switch validationCode {
2626
case pb.TxValidationCode_VALID:
2727
if err := c.CommitEndorserTransaction(txID, block, i, env, event); err != nil {
28-
logger.Panicf("failed committing transaction [%s] with err [%s]", txID, err)
28+
return errors.Wrapf(err, "failed committing transaction [%s]", txID)
29+
}
30+
if err := c.getChaincodeEvents(env, block); err != nil {
31+
return errors.Wrapf(err, "failed to publish chaincode events [%s]", txID)
2932
}
30-
c.getChaincodeEvents(env, block)
3133
default:
3234
if err := c.DiscardEndorserTransaction(txID, block, event, validationCode); err != nil {
33-
logger.Panicf("failed discarding transaction [%s] with err [%s]", txID, err)
35+
return errors.Wrapf(err, "failed discarding transaction [%s]", txID)
3436
}
3537
}
38+
return nil
3639
}
3740

3841
// getChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode.
39-
func (c *committer) getChaincodeEvents(env *common.Envelope, block *common.Block) {
42+
func (c *Committer) getChaincodeEvents(env *common.Envelope, block *common.Block) error {
4043
chaincodeEvent, err := readChaincodeEvent(env, block.Header.Number)
4144
if err != nil {
42-
logger.Panicf("error reading chaincode event", err)
45+
return errors.Wrapf(err, "error reading chaincode event")
4346
}
4447
if chaincodeEvent != nil {
4548
if logger.IsEnabledFor(zapcore.DebugLevel) {
4649
logger.Debugf("Chaincode Event Received: ", chaincodeEvent)
4750
}
48-
err := c.notifyChaincodeListeners(chaincodeEvent)
49-
if err != nil {
50-
logger.Panicf("error sending chaincode events to listeners")
51-
}
51+
c.notifyChaincodeListeners(chaincodeEvent)
5252
}
53+
return nil
5354
}
5455

5556
// CommitEndorserTransaction commits the transaction to the vault
56-
func (c *committer) CommitEndorserTransaction(txID string, block *common.Block, indexInBlock int, env *common.Envelope, event *TxEvent) error {
57+
func (c *Committer) CommitEndorserTransaction(txID string, block *common.Block, indexInBlock int, env *common.Envelope, event *TxEvent) error {
5758
committer, err := c.network.Committer(c.channel)
5859
if err != nil {
59-
logger.Panicf("Cannot get Committer [%s]", err)
60+
return errors.Wrapf(err, "cannot get Committer for channel [%s]", c.channel)
6061
}
6162

6263
blockNum := block.Header.Number
@@ -70,7 +71,7 @@ func (c *committer) CommitEndorserTransaction(txID string, block *common.Block,
7071

7172
vc, deps, err := committer.Status(txID)
7273
if err != nil {
73-
logger.Panicf("failed getting tx's status [%s], with err [%s]", txID, err)
74+
return errors.Wrapf(err, "failed getting tx's status [%s]", txID)
7475
}
7576
event.DependantTxIDs = append(event.DependantTxIDs, deps...)
7677

@@ -88,23 +89,23 @@ func (c *committer) CommitEndorserTransaction(txID string, block *common.Block,
8889
default:
8990
if block != nil {
9091
if err := committer.CommitTX(event.Txid, event.Block, event.IndexInBlock, env); err != nil {
91-
logger.Panicf("failed committing transaction [%s] with deps [%v] with err [%s]", txID, deps, err)
92+
return errors.Wrapf(err, "failed committing transaction [%s] with deps [%v]", txID, deps)
9293
}
9394
return nil
9495
}
9596

9697
if err := committer.CommitTX(event.Txid, event.Block, event.IndexInBlock, nil); err != nil {
97-
logger.Panicf("failed committing transaction [%s] with deps [%v] with err [%s]", txID, deps, err)
98+
return errors.Wrapf(err, "failed committing transaction [%s] with deps [%v]", txID, deps)
9899
}
99100
}
100101
return nil
101102
}
102103

103104
// DiscardEndorserTransaction discards the transaction from the vault
104-
func (c *committer) DiscardEndorserTransaction(txID string, block *common.Block, event *TxEvent, validationCode pb.TxValidationCode) error {
105+
func (c *Committer) DiscardEndorserTransaction(txID string, block *common.Block, event *TxEvent, validationCode pb.TxValidationCode) error {
105106
committer, err := c.network.Committer(c.channel)
106107
if err != nil {
107-
logger.Panicf("Cannot get Committer [%s]", err)
108+
return errors.Wrapf(err, "cannot get Committer for channel [%s]", c.channel)
108109
}
109110

110111
blockNum := block.Header.Number
@@ -114,7 +115,7 @@ func (c *committer) DiscardEndorserTransaction(txID string, block *common.Block,
114115

115116
vc, deps, err := committer.Status(txID)
116117
if err != nil {
117-
logger.Panicf("failed getting tx's status [%s], with err [%s]", txID, err)
118+
return errors.Wrapf(err, "failed getting tx's status [%s]", txID)
118119
}
119120
event.DependantTxIDs = append(event.DependantTxIDs, deps...)
120121
switch vc {
@@ -133,5 +134,6 @@ func (c *committer) DiscardEndorserTransaction(txID string, block *common.Block,
133134
logger.Errorf("failed discarding tx in state db with err [%s]", err)
134135
}
135136
}
137+
136138
return nil
137139
}

platform/fabric/core/generic/committer/external.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,21 @@ func (c *ExternalCommitter) Status(txid string) (fdriver.ValidationCode, []strin
6464

6565
func (c *ExternalCommitter) Validate(txid string) (fdriver.ValidationCode, error) {
6666
if c.c == nil {
67-
panic("no external committer defined, programming error")
67+
panic("no external Committer defined, programming error")
6868
}
6969
return c.c.Validate(txid)
7070
}
7171

7272
func (c *ExternalCommitter) CommitTX(txid string, block uint64, indexInBloc int) error {
7373
if c.c == nil {
74-
panic("no external committer defined, programming error")
74+
panic("no external Committer defined, programming error")
7575
}
7676
return c.c.CommitTX(txid, block, indexInBloc)
7777
}
7878

7979
func (c *ExternalCommitter) DiscardTX(txid string) error {
8080
if c.c == nil {
81-
panic("no external committer defined, programming error")
81+
panic("no external Committer defined, programming error")
8282
}
8383
return c.c.DiscardTX(txid)
8484
}
@@ -91,7 +91,7 @@ func GetExternalCommitter(name string, sp view2.ServiceProvider, vault driver.Va
9191
}
9292
c, err := drivers[dNames[0]].Open(name, sp, vault)
9393
if err != nil {
94-
return nil, errors.Wrapf(err, "failed opening external committer [%s]", dNames[0])
94+
return nil, errors.Wrapf(err, "failed opening external Committer [%s]", dNames[0])
9595
}
9696
return &ExternalCommitter{c: c}, nil
9797
}

platform/fabric/core/generic/delivery/delivery.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ var (
3636
}
3737
)
3838

39+
// Callback is the callback function prototype to alert the rest of the stack about the availability of a new block.
40+
// The function returns two argument a boolean to signal if delivery should be stopped, and an error
41+
// to signal an issue during the processing of the block.
42+
// In case of an error, the same block is re-processed after a delay.
3943
type Callback func(block *common.Block) (bool, error)
4044

4145
// Vault models a key-value store that can be updated by committing rwsets
@@ -64,7 +68,7 @@ type Delivery struct {
6468

6569
func New(channel string, sp view2.ServiceProvider, network Network, callback Callback, vault Vault, waitForEventTimeout time.Duration) (*Delivery, error) {
6670
if len(channel) == 0 {
67-
panic("expected a channel, got empty string")
71+
return nil, errors.Errorf("expected a channel, got empty string")
6872
}
6973
d := &Delivery{
7074
channel: channel,
@@ -143,17 +147,9 @@ func (d *Delivery) Run(ctx context.Context) error {
143147

144148
stop, err := d.callback(r.Block)
145149
if err != nil {
146-
switch errors.Cause(err) {
147-
case ErrComm:
148-
logger.Errorf("error occurred when processing filtered block [%s], retry", err)
149-
// retry
150-
time.Sleep(10 * time.Second)
151-
df = nil
152-
default:
153-
// Stop here
154-
logger.Errorf("error occurred when processing filtered block [%s]", err)
155-
return err
156-
}
150+
logger.Errorf("error occurred when processing filtered block [%s], retry...", err)
151+
time.Sleep(10 * time.Second)
152+
df = nil
157153
}
158154
if stop {
159155
return nil

platform/fabric/core/generic/finality/fabric.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type fabricFinality struct {
4141

4242
func NewFabricFinality(channel string, network Network, hasher Hasher, waitForEventTimeout time.Duration) (*fabricFinality, error) {
4343
if len(channel) == 0 {
44-
panic("expected a channel, got empty string")
44+
return nil, errors.Errorf("expected a channel, got empty string")
4545
}
4646

4747
d := &fabricFinality{

0 commit comments

Comments
 (0)