Skip to content

Commit 8cc13f9

Browse files
Fetch ahead blocks from delivery service
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 4ab0bd7 commit 8cc13f9

File tree

4 files changed

+68
-18
lines changed

4 files changed

+68
-18
lines changed

platform/fabric/core/generic/config/ds.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type Finality struct {
106106
}
107107

108108
type Delivery struct {
109+
BufferSize int `yaml:"BufferSize,omitempty"`
109110
WaitForEventTimeout time.Duration `yaml:"WaitForEventTimeout,omitempty"`
110111
SleepAfterFailure time.Duration `yaml:"SleepAfterFailure,omitempty"`
111112
}
@@ -205,6 +206,13 @@ func (c *Channel) CommitterWaitForEventTimeout() time.Duration {
205206
return c.Committer.WaitForEventTimeout
206207
}
207208

209+
func (c *Channel) DeliveryBufferSize() int {
210+
if c.Delivery.BufferSize <= 0 {
211+
return 1
212+
}
213+
return c.Delivery.BufferSize
214+
}
215+
208216
func (c *Channel) DiscoveryTimeout() time.Duration {
209217
if c.Discovery.Timeout == 0 {
210218
return 20 * time.Second

platform/fabric/core/generic/delivery/delivery.go

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ package delivery
88

99
import (
1010
"context"
11+
"fmt"
1112
"strings"
13+
"sync/atomic"
1214
"time"
1315

1416
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
@@ -38,6 +40,11 @@ var (
3840
}
3941
)
4042

43+
type blockResponse struct {
44+
ctx context.Context
45+
block *common.Block
46+
}
47+
4148
type messageType = string
4249

4350
const (
@@ -73,7 +80,8 @@ type Delivery struct {
7380
client services.PeerClient
7481
tracer trace.Tracer
7582
lastBlockReceived uint64
76-
stop chan bool
83+
bufferSize int
84+
stop chan struct{}
7785
}
7886

7987
func New(
@@ -87,6 +95,7 @@ func New(
8795
callback driver.BlockCallback,
8896
vault Vault,
8997
waitForEventTimeout time.Duration,
98+
bufferSize int,
9099
tracerProvider trace.TracerProvider,
91100
_ metrics.Provider,
92101
) (*Delivery, error) {
@@ -108,9 +117,10 @@ func New(
108117
Namespace: "fabricsdk",
109118
LabelNames: []tracing.LabelName{messageTypeLabel},
110119
})),
111-
callback: callback,
112-
vault: vault,
113-
stop: make(chan bool),
120+
callback: callback,
121+
vault: vault,
122+
bufferSize: max(bufferSize, 1),
123+
stop: make(chan struct{}),
114124
}
115125
return d, nil
116126
}
@@ -123,22 +133,56 @@ func (d *Delivery) Start(ctx context.Context) {
123133
}
124134

125135
func (d *Delivery) Stop() {
126-
d.stop <- true
136+
close(d.stop)
127137
}
128138

139+
var ctr = atomic.Uint32{}
140+
129141
func (d *Delivery) Run(ctx context.Context) error {
142+
logger.Infof("Running delivery service [%d]", ctr.Add(1))
130143
if ctx == nil {
131144
ctx = context.Background()
132145
}
146+
ch := make(chan blockResponse, d.bufferSize)
147+
go d.readBlocks(ch)
148+
return d.runReceiver(ctx, ch)
149+
}
150+
151+
func (d *Delivery) readBlocks(ch <-chan blockResponse) {
152+
for {
153+
select {
154+
case b := <-ch:
155+
logger.Debugf("Invoking callback for block [%d]", b.block.Header)
156+
stop, err := d.callback(b.ctx, b.block)
157+
if err != nil {
158+
logger.Errorf("callback errored for block %d: %v", b.block.Header.Number, err)
159+
}
160+
if stop {
161+
logger.Infof("Stopping delivery at block [%d]", b.block.Header.Number)
162+
close(d.stop)
163+
return
164+
}
165+
case <-d.stop:
166+
logger.Infof("Stopping delivery service")
167+
return
168+
}
169+
}
170+
}
171+
172+
func (d *Delivery) runReceiver(ctx context.Context, ch chan<- blockResponse) error {
173+
if ctx == nil || ch == nil {
174+
return errors.New("ctx and channel must be provided")
175+
}
133176
var df DeliverStream
134177
var err error
135178
waitTime := d.channelConfig.DeliverySleepAfterFailure()
136179
for {
137180
select {
138181
case <-d.stop:
139-
// Time to stop
182+
logger.Infof("Stopped receiver")
140183
return nil
141184
case <-ctx.Done():
185+
logger.Infof("Context done")
142186
// Time to cancel
143187
return errors.New("context done")
144188
default:
@@ -192,19 +236,14 @@ func (d *Delivery) Run(ctx context.Context) error {
192236
}
193237
d.lastBlockReceived = r.Block.Header.Number
194238

195-
span.AddEvent("invoke_callback")
196-
stop, err := d.callback(deliveryCtx, r.Block)
197-
span.AddEvent("invoked_callback")
198-
if err != nil {
199-
span.RecordError(err)
200-
logger.Errorf("error occurred when processing filtered block [%s], retry...", err)
201-
time.Sleep(waitTime)
202-
df = nil
203-
}
204-
if stop {
205-
span.End()
206-
return nil
239+
span.AddEvent(fmt.Sprintf("push_%d_to_channel", r.Block.Header.Number))
240+
logger.Debugf("Pushing block [%d] to channel with current length %d", r.Block.Header.Number, len(ch))
241+
ch <- blockResponse{
242+
ctx: deliveryCtx,
243+
block: r.Block,
207244
}
245+
logger.Debugf("Pushed block [%d] to channel", r.Block.Header.Number)
246+
span.AddEvent("pushed_to_channel")
208247
case *pb.DeliverResponse_Status:
209248
span.SetAttributes(tracing.String(messageTypeLabel, responseStatus))
210249
if r.Status == common.Status_NOT_FOUND {

platform/fabric/core/generic/delivery/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func NewService(
6868
callback,
6969
txIDStore,
7070
channelConfig.CommitterWaitForEventTimeout(),
71+
channelConfig.DeliveryBufferSize(),
7172
tracerProvider,
7273
metricsProvider,
7374
)
@@ -114,6 +115,7 @@ func (c *Service) scanBlock(ctx context.Context, vault Vault, callback driver.Bl
114115
callback,
115116
vault,
116117
c.channelConfig.CommitterWaitForEventTimeout(),
118+
c.channelConfig.DeliveryBufferSize(),
117119
c.tracerProvider,
118120
c.metricsProvider,
119121
)

platform/fabric/driver/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type ChannelConfig interface {
5151
CommitterFinalityNumRetries() int
5252
CommitterFinalityUnknownTXTimeout() time.Duration
5353
CommitterWaitForEventTimeout() time.Duration
54+
DeliveryBufferSize() int
5455
DeliverySleepAfterFailure() time.Duration
5556
CommitParallelism() int
5657
ChaincodeConfigs() []ChaincodeConfig

0 commit comments

Comments
 (0)