Skip to content

Commit 0b5f3c9

Browse files
alexandrosfiliosadecaro
authored andcommitted
Created ScanBlock method to allow for custom and more efficient uses of the delivery service
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 69d39fe commit 0b5f3c9

File tree

4 files changed

+51
-25
lines changed

4 files changed

+51
-25
lines changed

platform/fabric/core/generic/delivery/delivery.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@ const (
4747
other messageType = "other"
4848
)
4949

50-
// Callback is the callback function prototype to alert the rest of the stack about the availability of a new block.
51-
// The function returns two argument a boolean to signal if delivery should be stopped, and an error
52-
// to signal an issue during the processing of the block.
53-
// In case of an error, the same block is re-processed after a delay.
54-
type Callback func(context.Context, *common.Block) (bool, error)
55-
5650
// Vault models a key-value store that can be updated by committing rwsets
5751
type Vault interface {
5852
// GetLastTxID returns the last transaction id committed
@@ -73,7 +67,7 @@ type Delivery struct {
7367
Services Services
7468
Ledger driver.Ledger
7569
waitForEventTimeout time.Duration
76-
callback Callback
70+
callback driver.BlockCallback
7771
vault Vault
7872
client services.PeerClient
7973
tracer trace.Tracer
@@ -89,7 +83,7 @@ func New(
8983
ConfigService driver.ConfigService,
9084
PeerManager Services,
9185
Ledger driver.Ledger,
92-
callback Callback,
86+
callback driver.BlockCallback,
9387
vault Vault,
9488
waitForEventTimeout time.Duration,
9589
tracerProvider trace.TracerProvider,

platform/fabric/core/generic/delivery/service.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewService(
5050
waitForEventTimeout time.Duration,
5151
txIDStore driver.TXIDStore,
5252
transactionManager driver.TransactionManager,
53-
callback Callback,
53+
callback driver.BlockCallback,
5454
tracerProvider trace.TracerProvider,
5555
metricsProvider metrics.Provider,
5656
) (*Service, error) {
@@ -96,8 +96,7 @@ func (c *Service) Stop() {
9696
c.deliveryService.Stop()
9797
}
9898

99-
func (c *Service) Scan(ctx context.Context, txID string, callback driver.DeliveryCallback) error {
100-
vault := &fakeVault{txID: txID}
99+
func (c *Service) scanBlock(ctx context.Context, vault Vault, callback driver.BlockCallback) error {
101100
deliveryService, err := New(
102101
c.NetworkName,
103102
c.channelConfig,
@@ -106,6 +105,26 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver
106105
c.ConfigService,
107106
c.PeerManager,
108107
c.Ledger,
108+
callback,
109+
vault,
110+
c.channelConfig.CommitterWaitForEventTimeout(),
111+
&noop.TracerProvider{},
112+
&disabled.Provider{},
113+
)
114+
if err != nil {
115+
return err
116+
}
117+
118+
return deliveryService.Run(ctx)
119+
}
120+
121+
func (c *Service) ScanBlock(ctx context.Context, callback driver.BlockCallback) error {
122+
return c.scanBlock(ctx, &fakeVault{}, callback)
123+
}
124+
125+
func (c *Service) Scan(ctx context.Context, txID string, callback driver.DeliveryCallback) error {
126+
vault := &fakeVault{txID: txID}
127+
return c.scanBlock(ctx, vault,
109128
func(_ context.Context, block *common.Block) (bool, error) {
110129
for i, tx := range block.Data.Data {
111130
validationCode := ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[i]
@@ -140,17 +159,7 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver
140159
logger.Debugf("commit transaction [%s] in block [%d]", channelHeader.TxId, block.Header.Number)
141160
}
142161
return false, nil
143-
},
144-
vault,
145-
c.channelConfig.CommitterWaitForEventTimeout(),
146-
&noop.TracerProvider{},
147-
&disabled.Provider{},
148-
)
149-
if err != nil {
150-
return err
151-
}
152-
153-
return deliveryService.Run(ctx)
162+
})
154163
}
155164

156165
type fakeVault struct {

platform/fabric/delivery.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,24 @@ import (
1010
"context"
1111

1212
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
13+
"github.com/hyperledger/fabric-protos-go/common"
1314
)
1415

1516
type DeliveryCallback func(tx *ProcessedTransaction) (bool, error)
1617

18+
type BlockCallback func(context.Context, *common.Block) (bool, error)
19+
1720
// Delivery models the Fabric's delivery service
1821
type Delivery struct {
1922
delivery driver.Delivery
2023
}
2124

25+
func (d *Delivery) ScanBlock(ctx context.Context, callback BlockCallback) error {
26+
return d.delivery.ScanBlock(ctx, func(ctx context.Context, block *common.Block) (bool, error) {
27+
return callback(ctx, block)
28+
})
29+
}
30+
2231
// Scan iterates over all transactions in block starting from the block containing the passed transaction id.
2332
// If txID is empty, the iterations starts from the first block.
2433
// On each transaction, the callback function is invoked.

platform/fabric/driver/delivery.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,32 @@ SPDX-License-Identifier: Apache-2.0
66

77
package driver
88

9-
import "context"
9+
import (
10+
"context"
11+
12+
"github.com/hyperledger/fabric-protos-go/common"
13+
)
1014

1115
// DeliveryCallback is a callback function used to process a transaction.
1216
// Return true, if the scan should finish.
1317
type DeliveryCallback func(tx ProcessedTransaction) (bool, error)
1418

19+
// BlockCallback is the callback function prototype to alert the rest of the stack about the availability of a new block.
20+
// The function returns two argument a boolean to signal if delivery should be stopped, and an error
21+
// to signal an issue during the processing of the block.
22+
// In case of an error, the same block is re-processed after a delay.
23+
type BlockCallback func(context.Context, *common.Block) (bool, error)
24+
1525
// Delivery gives access to Fabric channel delivery
1626
type Delivery interface {
17-
// StartDelivery starts the delivery process
27+
// Start starts the delivery process
1828
Start(ctx context.Context) error
1929

20-
// Scan iterates over all transactions in block starting from the block containing the passed transaction id.
30+
// ScanBlock iterates over all blocks.
31+
// On each block, the callback function is invoked.
32+
ScanBlock(ctx context.Context, callback BlockCallback) error
33+
34+
// ScanTx iterates over all transactions in block starting from the block containing the passed transaction id.
2135
// If txID is empty, the iterations starts from the first block.
2236
// On each transaction, the callback function is invoked.
2337
Scan(ctx context.Context, txID string, callback DeliveryCallback) error

0 commit comments

Comments
 (0)