Skip to content

Commit 1ac00b5

Browse files
Merge pull request #123 from hyperledger/block-listener
FFTM new listener type for block notifications and receipt decoding functions
2 parents 32d8a0c + 29b880d commit 1ac00b5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2379
-273
lines changed

.golangci.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ linters-settings:
1515
values:
1616
regexp:
1717
COMPANY: .*
18+
YEAR_FUZZY: '\d\d\d\d(,\d\d\d\d)?'
1819
template: |-
19-
Copyright © {{ YEAR }} {{ COMPANY }}
20+
Copyright © {{ YEAR_FUZZY }} {{ COMPANY }}
2021
2122
SPDX-License-Identifier: Apache-2.0
2223
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
DROP INDEX IF EXISTS transactions_status;
1+
BEGIN;
2+
DROP INDEX IF EXISTS transactions_status;
3+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
CREATE INDEX transactions_status ON transactions(status);
1+
BEGIN;
2+
CREATE INDEX transactions_status ON transactions(status);
3+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
3+
ALTER TABLE listeners DROP COLUMN "type";
4+
5+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
BEGIN;
2+
3+
ALTER TABLE listeners ADD COLUMN "type" VARCHAR(64);
4+
UPDATE listeners SET "type" = 'events';
5+
ALTER TABLE listeners ALTER COLUMN "type" SET NOT NULL;
6+
7+
COMMIT;

internal/confirmations/confirmations.go

+38-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type Manager interface {
4545
Stop()
4646
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
4747
CheckInFlight(listenerID *fftypes.UUID) bool
48+
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
49+
StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
4850
}
4951

5052
type NotificationType string
@@ -99,6 +101,8 @@ type blockConfirmationManager struct {
99101
pendingMux sync.Mutex
100102
receiptChecker *receiptChecker
101103
retry *retry.Retry
104+
cblLock sync.Mutex
105+
cbls map[fftypes.UUID]*confirmedBlockListener
102106
fetchReceiptUponEntry bool
103107
done chan struct{}
104108
}
@@ -108,6 +112,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
108112
bcm := &blockConfirmationManager{
109113
baseContext: baseContext,
110114
connector: connector,
115+
cbls: make(map[fftypes.UUID]*confirmedBlockListener),
111116
blockListenerStale: true,
112117
requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired),
113118
staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout),
@@ -233,6 +238,9 @@ func (bcm *blockConfirmationManager) Stop() {
233238
// Reset context ready for restart
234239
bcm.ctx, bcm.cancelFunc = context.WithCancel(bcm.baseContext)
235240
}
241+
for _, cbl := range bcm.copyCBLsList() {
242+
_ = bcm.StopConfirmedBlockListener(bcm.ctx, cbl.id)
243+
}
236244
}
237245

238246
func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent {
@@ -301,9 +309,10 @@ func (bcm *blockConfirmationManager) getBlockByHash(blockHash string) (*apitypes
301309
return blockInfo, nil
302310
}
303311

304-
func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, expectedParentHash string) (*apitypes.BlockInfo, error) {
312+
func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, allowCache bool, expectedParentHash string) (*apitypes.BlockInfo, error) {
305313
res, reason, err := bcm.connector.BlockInfoByNumber(bcm.ctx, &ffcapi.BlockInfoByNumberRequest{
306314
BlockNumber: fftypes.NewFFBigInt(int64(blockNumber)),
315+
AllowCache: allowCache,
307316
ExpectedParentHash: expectedParentHash,
308317
})
309318
if err != nil {
@@ -326,6 +335,27 @@ func transformBlockInfo(res *ffcapi.BlockInfo) *apitypes.BlockInfo {
326335
}
327336
}
328337

338+
func (bcm *blockConfirmationManager) copyCBLsList() []*confirmedBlockListener {
339+
bcm.cblLock.Lock()
340+
defer bcm.cblLock.Unlock()
341+
cbls := make([]*confirmedBlockListener, 0, len(bcm.cbls))
342+
for _, cbl := range bcm.cbls {
343+
cbls = append(cbls, cbl)
344+
}
345+
return cbls
346+
}
347+
348+
func (bcm *blockConfirmationManager) propagateBlockHashToCBLs(bhe *ffcapi.BlockHashEvent) {
349+
bcm.cblLock.Lock()
350+
defer bcm.cblLock.Unlock()
351+
for _, cbl := range bcm.cbls {
352+
select {
353+
case cbl.newBlockHashes <- bhe:
354+
case <-cbl.processorDone:
355+
}
356+
}
357+
}
358+
329359
func (bcm *blockConfirmationManager) confirmationsListener() {
330360
defer close(bcm.done)
331361
notifications := make([]*Notification, 0)
@@ -340,6 +370,11 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
340370
}
341371
blockHashes = append(blockHashes, bhe.BlockHashes...)
342372

373+
// Need to also pass this event to any confirmed block listeners
374+
// (they promise to always be efficient in handling these, having a go-routine
375+
// dedicated to spinning fast just processing those separate to dispatching them)
376+
bcm.propagateBlockHashToCBLs(bhe)
377+
343378
if bhe.Created != nil {
344379
for i := 0; i < len(bhe.BlockHashes); i++ {
345380
bcm.metricsEmitter.RecordBlockHashQueueingMetrics(bcm.ctx, time.Since(*bhe.Created.Time()).Seconds())
@@ -371,7 +406,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
371406

372407
if bcm.blockListenerStale {
373408
if err := bcm.walkChain(blocks); err != nil {
374-
log.L(bcm.ctx).Errorf("Failed to create walk chain after restoring blockListener: %s", err)
409+
log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err)
375410
continue
376411
}
377412
bcm.blockListenerStale = false
@@ -704,7 +739,7 @@ func (bs *blockState) getByNumber(blockNumber uint64, expectedParentHash string)
704739
if block != nil {
705740
return block, nil
706741
}
707-
block, err := bs.bcm.getBlockByNumber(blockNumber, expectedParentHash)
742+
block, err := bs.bcm.getBlockByNumber(blockNumber, true, expectedParentHash)
708743
if err != nil {
709744
return nil, err
710745
}

0 commit comments

Comments
 (0)