Skip to content

Commit 2c0d851

Browse files
Created ScanBlock method to allow for custom and more efficient uses of the delivery service
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 69d39fe commit 2c0d851

File tree

5 files changed

+57
-31
lines changed

5 files changed

+57
-31
lines changed

integration/fabric/atsa/chaincode/views/seller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (a *TransferView) Call(ctx view.Context) (interface{}, error) {
8989

9090
c, cancel := context.WithTimeout(context.Background(), 5*time.Second)
9191
defer cancel()
92-
assert.Error(ch.Delivery().Scan(
92+
assert.Error(ch.Delivery().ScanTx(
9393
c,
9494
envelope.TxID(),
9595
func(tx *fabric.ProcessedTransaction) (bool, error) {
@@ -101,7 +101,7 @@ func (a *TransferView) Call(ctx view.Context) (interface{}, error) {
101101

102102
c, cancel = context.WithTimeout(context.Background(), 1*time.Minute)
103103
defer cancel()
104-
assert.NoError(ch.Delivery().Scan(
104+
assert.NoError(ch.Delivery().ScanTx(
105105
c,
106106
envelope.TxID(),
107107
func(tx *fabric.ProcessedTransaction) (bool, error) {

platform/fabric/core/generic/delivery/delivery.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@ const (
4747
other messageType = "other"
4848
)
4949

50-
// Callback is the callback function prototype to alert the rest of the stack about the availability of a new block.
51-
// The function returns two argument a boolean to signal if delivery should be stopped, and an error
52-
// to signal an issue during the processing of the block.
53-
// In case of an error, the same block is re-processed after a delay.
54-
type Callback func(context.Context, *common.Block) (bool, error)
55-
5650
// Vault models a key-value store that can be updated by committing rwsets
5751
type Vault interface {
5852
// GetLastTxID returns the last transaction id committed
@@ -73,7 +67,7 @@ type Delivery struct {
7367
Services Services
7468
Ledger driver.Ledger
7569
waitForEventTimeout time.Duration
76-
callback Callback
70+
callback driver.BlockCallback
7771
vault Vault
7872
client services.PeerClient
7973
tracer trace.Tracer
@@ -89,7 +83,7 @@ func New(
8983
ConfigService driver.ConfigService,
9084
PeerManager Services,
9185
Ledger driver.Ledger,
92-
callback Callback,
86+
callback driver.BlockCallback,
9387
vault Vault,
9488
waitForEventTimeout time.Duration,
9589
tracerProvider trace.TracerProvider,

platform/fabric/core/generic/delivery/service.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewService(
5050
waitForEventTimeout time.Duration,
5151
txIDStore driver.TXIDStore,
5252
transactionManager driver.TransactionManager,
53-
callback Callback,
53+
callback driver.BlockCallback,
5454
tracerProvider trace.TracerProvider,
5555
metricsProvider metrics.Provider,
5656
) (*Service, error) {
@@ -96,8 +96,7 @@ func (c *Service) Stop() {
9696
c.deliveryService.Stop()
9797
}
9898

99-
func (c *Service) Scan(ctx context.Context, txID string, callback driver.DeliveryCallback) error {
100-
vault := &fakeVault{txID: txID}
99+
func (c *Service) scanBlock(ctx context.Context, vault Vault, callback driver.BlockCallback) error {
101100
deliveryService, err := New(
102101
c.NetworkName,
103102
c.channelConfig,
@@ -106,6 +105,26 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver
106105
c.ConfigService,
107106
c.PeerManager,
108107
c.Ledger,
108+
callback,
109+
vault,
110+
c.channelConfig.CommitterWaitForEventTimeout(),
111+
&noop.TracerProvider{},
112+
&disabled.Provider{},
113+
)
114+
if err != nil {
115+
return err
116+
}
117+
118+
return deliveryService.Run(ctx)
119+
}
120+
121+
func (c *Service) ScanBlock(ctx context.Context, callback driver.BlockCallback) error {
122+
return c.scanBlock(ctx, &fakeVault{}, callback)
123+
}
124+
125+
func (c *Service) ScanTx(ctx context.Context, txID string, callback driver.DeliveryCallback) error {
126+
vault := &fakeVault{txID: txID}
127+
return c.scanBlock(ctx, vault,
109128
func(_ context.Context, block *common.Block) (bool, error) {
110129
for i, tx := range block.Data.Data {
111130
validationCode := ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[i]
@@ -140,17 +159,7 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver
140159
logger.Debugf("commit transaction [%s] in block [%d]", channelHeader.TxId, block.Header.Number)
141160
}
142161
return false, nil
143-
},
144-
vault,
145-
c.channelConfig.CommitterWaitForEventTimeout(),
146-
&noop.TracerProvider{},
147-
&disabled.Provider{},
148-
)
149-
if err != nil {
150-
return err
151-
}
152-
153-
return deliveryService.Run(ctx)
162+
})
154163
}
155164

156165
type fakeVault struct {

platform/fabric/delivery.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,29 @@ import (
1010
"context"
1111

1212
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
13+
"github.com/hyperledger/fabric-protos-go/common"
1314
)
1415

1516
type DeliveryCallback func(tx *ProcessedTransaction) (bool, error)
1617

18+
type BlockCallback func(context.Context, *common.Block) (bool, error)
19+
1720
// Delivery models the Fabric's delivery service
1821
type Delivery struct {
1922
delivery driver.Delivery
2023
}
2124

22-
// Scan iterates over all transactions in block starting from the block containing the passed transaction id.
25+
func (d *Delivery) ScanBlock(ctx context.Context, callback BlockCallback) error {
26+
return d.delivery.ScanBlock(ctx, func(ctx context.Context, block *common.Block) (bool, error) {
27+
return callback(ctx, block)
28+
})
29+
}
30+
31+
// ScanTx iterates over all transactions in block starting from the block containing the passed transaction id.
2332
// If txID is empty, the iterations starts from the first block.
2433
// On each transaction, the callback function is invoked.
25-
func (d *Delivery) Scan(ctx context.Context, txID string, callback DeliveryCallback) error {
26-
return d.delivery.Scan(ctx, txID, func(tx driver.ProcessedTransaction) (bool, error) {
34+
func (d *Delivery) ScanTx(ctx context.Context, txID string, callback DeliveryCallback) error {
35+
return d.delivery.ScanTx(ctx, txID, func(tx driver.ProcessedTransaction) (bool, error) {
2736
return callback(&ProcessedTransaction{
2837
pt: tx,
2938
})

platform/fabric/driver/delivery.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,33 @@ SPDX-License-Identifier: Apache-2.0
66

77
package driver
88

9-
import "context"
9+
import (
10+
"context"
11+
12+
"github.com/hyperledger/fabric-protos-go/common"
13+
)
1014

1115
// DeliveryCallback is a callback function used to process a transaction.
1216
// Return true, if the scan should finish.
1317
type DeliveryCallback func(tx ProcessedTransaction) (bool, error)
1418

19+
// BlockCallback is the callback function prototype to alert the rest of the stack about the availability of a new block.
20+
// The function returns two argument a boolean to signal if delivery should be stopped, and an error
21+
// to signal an issue during the processing of the block.
22+
// In case of an error, the same block is re-processed after a delay.
23+
type BlockCallback func(context.Context, *common.Block) (bool, error)
24+
1525
// Delivery gives access to Fabric channel delivery
1626
type Delivery interface {
17-
// StartDelivery starts the delivery process
27+
// Start starts the delivery process
1828
Start(ctx context.Context) error
1929

20-
// Scan iterates over all transactions in block starting from the block containing the passed transaction id.
30+
// ScanBlock iterates over all blocks.
31+
// On each block, the callback function is invoked.
32+
ScanBlock(ctx context.Context, callback BlockCallback) error
33+
34+
// ScanTx iterates over all transactions in block starting from the block containing the passed transaction id.
2135
// If txID is empty, the iterations starts from the first block.
2236
// On each transaction, the callback function is invoked.
23-
Scan(ctx context.Context, txID string, callback DeliveryCallback) error
37+
ScanTx(ctx context.Context, txID string, callback DeliveryCallback) error
2438
}

0 commit comments

Comments
 (0)