Skip to content

Commit d3c526e

Browse files
authored
fix: prevent batch-wide finality event loss in queryByID (#1426)
Signed-off-by: atharva01 <atharvaborade568@gmail.com>
1 parent e2cf17d commit d3c526e

2 files changed

Lines changed: 222 additions & 8 deletions

File tree

token/services/network/fabric/finality/deliveryqs.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,17 @@ const (
2424
FirstBlock = 1
2525
)
2626

27+
type txLedger interface {
28+
GetTransactionByID(txID string) (*fabric.ProcessedTransaction, error)
29+
}
30+
31+
type blockScanner interface {
32+
ScanFromBlock(ctx context.Context, block uint64, callback fabric.DeliveryCallback) error
33+
}
34+
2735
type DeliveryScanQueryByID struct {
28-
Delivery *fabric.Delivery
29-
Ledger *fabric.Ledger
36+
Delivery blockScanner
37+
Ledger txLedger
3038
Mapper events2.EventInfoMapper[TxInfo]
3139
}
3240

@@ -53,9 +61,10 @@ func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, keys []driver.TxI
5361
logger.DebugfContext(ctx, "transaction [%s] found on ledger", txID)
5462
infos, err := q.Mapper.MapProcessedTx(pt)
5563
if err != nil {
56-
logger.Errorf("failed to map tx [%s]: [%s]", txID, err)
64+
logger.Errorf("failed to map tx [%s]: [%s], skipping", txID, err)
65+
keySet.Remove(txID)
5766

58-
return
67+
continue
5968
}
6069
keySet.Remove(txID)
6170
ch <- infos
@@ -75,10 +84,9 @@ func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, keys []driver.TxI
7584
continue
7685
}
7786

78-
// error not recoverable, fail
79-
logger.DebugfContext(ctx, "scan for tx [%s] failed with err [%s]", txID, err)
80-
81-
return
87+
// transient ledger error; fall back to block scan for this txID
88+
logger.Errorf("scan for tx [%s] failed with err [%s], falling back to block scan", txID, err)
89+
startDelivery = true
8290
}
8391

8492
if !startDelivery {
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package finality_test
8+
9+
import (
10+
"context"
11+
"errors"
12+
"fmt"
13+
"testing"
14+
15+
cdriver "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
16+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
17+
events2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/events"
18+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/fabric/finality"
19+
"github.com/hyperledger/fabric-protos-go-apiv2/common"
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
// --- minimal fakes ---
25+
26+
type fakeLedger struct {
27+
results map[string]fakeLedgerResult
28+
}
29+
30+
type fakeLedgerResult struct {
31+
pt *fabric.ProcessedTransaction
32+
err error
33+
}
34+
35+
func (f *fakeLedger) GetTransactionByID(txID string) (*fabric.ProcessedTransaction, error) {
36+
r, ok := f.results[txID]
37+
if !ok {
38+
return nil, fmt.Errorf("TXID [%s] not available", txID)
39+
}
40+
41+
return r.pt, r.err
42+
}
43+
44+
type fakeScanner struct {
45+
called bool
46+
startBlock uint64
47+
}
48+
49+
func (f *fakeScanner) ScanFromBlock(_ context.Context, block uint64, _ fabric.DeliveryCallback) error {
50+
f.called = true
51+
f.startBlock = block
52+
53+
return nil
54+
}
55+
56+
type fakeMapper struct {
57+
results map[*fabric.ProcessedTransaction]fakeMapperResult
58+
}
59+
60+
type fakeMapperResult struct {
61+
infos []finality.TxInfo
62+
err error
63+
}
64+
65+
func (f *fakeMapper) MapProcessedTx(tx *fabric.ProcessedTransaction) ([]finality.TxInfo, error) {
66+
r, ok := f.results[tx]
67+
if !ok {
68+
return nil, errors.New("unexpected tx in mapper")
69+
}
70+
71+
return r.infos, r.err
72+
}
73+
74+
func (f *fakeMapper) MapTxData(_ context.Context, _ []byte, _ *common.BlockMetadata, _ cdriver.BlockNum, _ cdriver.TxNum) (map[cdriver.Namespace]finality.TxInfo, error) {
75+
return nil, nil
76+
}
77+
78+
// evicted builds a minimal evicted map for the given txIDs with nil listener slices.
79+
func evicted(txIDs ...string) map[cdriver.TxID][]events2.ListenerEntry[finality.TxInfo] {
80+
m := make(map[cdriver.TxID][]events2.ListenerEntry[finality.TxInfo], len(txIDs))
81+
for _, id := range txIDs {
82+
m[id] = nil
83+
}
84+
85+
return m
86+
}
87+
88+
func drain(ch <-chan []finality.TxInfo) []finality.TxInfo {
89+
var all []finality.TxInfo
90+
for batch := range ch {
91+
all = append(all, batch...)
92+
}
93+
94+
return all
95+
}
96+
97+
// --- tests ---
98+
99+
// TestQueryByID_MappingFailure_ContinuesToNextTx verifies that when MapProcessedTx
100+
// fails for one txID, the goroutine continues processing the remaining txIDs instead
101+
// of returning early (the bug this PR fixed).
102+
func TestQueryByID_MappingFailure_ContinuesToNextTx(t *testing.T) {
103+
ctx := context.Background()
104+
// Use zero-value ProcessedTransactions as stand-ins; the mapper is also mocked.
105+
pt1 := new(fabric.ProcessedTransaction)
106+
pt2 := new(fabric.ProcessedTransaction)
107+
108+
wantInfo := finality.TxInfo{TxId: "tx2"}
109+
110+
scanner := &fakeScanner{}
111+
q := &finality.DeliveryScanQueryByID{
112+
Delivery: scanner,
113+
Ledger: &fakeLedger{results: map[string]fakeLedgerResult{
114+
"tx1": {pt: pt1, err: nil},
115+
"tx2": {pt: pt2, err: nil},
116+
}},
117+
Mapper: &fakeMapper{results: map[*fabric.ProcessedTransaction]fakeMapperResult{
118+
pt1: {err: errors.New("mapping failed")},
119+
pt2: {infos: []finality.TxInfo{wantInfo}},
120+
}},
121+
}
122+
123+
ch, err := q.QueryByID(ctx, 20, evicted("tx1", "tx2"))
124+
require.NoError(t, err)
125+
126+
received := drain(ch)
127+
assert.Contains(t, received, wantInfo, "tx2 info must be delivered even though tx1 mapping failed")
128+
assert.False(t, scanner.called, "no delivery scan should be triggered when all txs were found on ledger")
129+
}
130+
131+
// TestQueryByID_MappingFailureOnly_NoDelivery verifies that a mapping failure alone
132+
// (with no TxNotFound / transient errors) does NOT trigger a block delivery scan.
133+
func TestQueryByID_MappingFailureOnly_NoDelivery(t *testing.T) {
134+
ctx := context.Background()
135+
pt1 := new(fabric.ProcessedTransaction)
136+
137+
scanner := &fakeScanner{}
138+
q := &finality.DeliveryScanQueryByID{
139+
Delivery: scanner,
140+
Ledger: &fakeLedger{results: map[string]fakeLedgerResult{
141+
"tx1": {pt: pt1, err: nil},
142+
}},
143+
Mapper: &fakeMapper{results: map[*fabric.ProcessedTransaction]fakeMapperResult{
144+
pt1: {err: errors.New("mapping failed")},
145+
}},
146+
}
147+
148+
ch, err := q.QueryByID(ctx, 20, evicted("tx1"))
149+
require.NoError(t, err)
150+
151+
received := drain(ch)
152+
assert.Empty(t, received)
153+
assert.False(t, scanner.called, "mapping failure must not trigger delivery scan")
154+
}
155+
156+
// TestQueryByID_TxNotFound_TriggersDelivery verifies that a TxNotFound ledger error
157+
// causes the goroutine to fall back to a block scan (startDelivery = true).
158+
func TestQueryByID_TxNotFound_TriggersDelivery(t *testing.T) {
159+
ctx := context.Background()
160+
161+
scanner := &fakeScanner{}
162+
// fakeLedger returns "TXID [tx1] not available" for unknown keys by default.
163+
q := &finality.DeliveryScanQueryByID{
164+
Delivery: scanner,
165+
Ledger: &fakeLedger{results: map[string]fakeLedgerResult{}},
166+
Mapper: &fakeMapper{results: map[*fabric.ProcessedTransaction]fakeMapperResult{}},
167+
}
168+
169+
ch, err := q.QueryByID(ctx, 20, evicted("tx1"))
170+
require.NoError(t, err)
171+
drain(ch)
172+
173+
assert.True(t, scanner.called, "TxNotFound must trigger delivery scan")
174+
// startingBlock = max(1, 20-10) = 10
175+
assert.Equal(t, uint64(10), scanner.startBlock)
176+
}
177+
178+
// TestQueryByID_TransientError_ContinuesToNextTx verifies that a transient ledger
179+
// error for one txID triggers delivery and does NOT prevent other txIDs in the same
180+
// batch from being resolved via the ledger (the second fix in this PR).
181+
func TestQueryByID_TransientError_ContinuesToNextTx(t *testing.T) {
182+
ctx := context.Background()
183+
pt2 := new(fabric.ProcessedTransaction)
184+
wantInfo := finality.TxInfo{TxId: "tx2"}
185+
186+
scanner := &fakeScanner{}
187+
q := &finality.DeliveryScanQueryByID{
188+
Delivery: scanner,
189+
Ledger: &fakeLedger{results: map[string]fakeLedgerResult{
190+
// tx1 returns a transient (non-TxNotFound) error
191+
"tx1": {err: errors.New("peer connection reset")},
192+
// tx2 is found successfully
193+
"tx2": {pt: pt2},
194+
}},
195+
Mapper: &fakeMapper{results: map[*fabric.ProcessedTransaction]fakeMapperResult{
196+
pt2: {infos: []finality.TxInfo{wantInfo}},
197+
}},
198+
}
199+
200+
ch, err := q.QueryByID(ctx, 20, evicted("tx1", "tx2"))
201+
require.NoError(t, err)
202+
203+
received := drain(ch)
204+
assert.Contains(t, received, wantInfo, "tx2 info must be delivered despite tx1 transient error")
205+
assert.True(t, scanner.called, "transient error must trigger delivery scan for tx1")
206+
}

0 commit comments

Comments
 (0)