Skip to content

Commit 79b95fd

Browse files
Subscribe to finality based on delivery service instead of committer pipeline
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 37bc858 commit 79b95fd

File tree

6 files changed

+525
-8
lines changed

6 files changed

+525
-8
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/gin-gonic/gin v1.10.0
1010
github.com/gobuffalo/packr/v2 v2.7.1
1111
github.com/hashicorp/go-uuid v1.0.3
12-
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241217130304-1083089a2dff
12+
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241225184208-74446d73506d
1313
github.com/hyperledger-labs/orion-sdk-go v0.2.10
1414
github.com/hyperledger-labs/orion-server v0.2.10
1515
github.com/hyperledger/fabric v1.4.0-rc1.0.20230405174026-695dd57e01c2
@@ -38,6 +38,7 @@ require (
3838
go.uber.org/dig v1.18.0
3939
go.uber.org/zap v1.27.0
4040
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
41+
golang.org/x/sync v0.10.0
4142
google.golang.org/protobuf v1.35.1
4243
gopkg.in/yaml.v2 v2.4.0
4344
modernc.org/sqlite v1.33.1
@@ -274,7 +275,6 @@ require (
274275
golang.org/x/mod v0.21.0 // indirect
275276
golang.org/x/net v0.31.0 // indirect
276277
golang.org/x/oauth2 v0.23.0 // indirect
277-
golang.org/x/sync v0.10.0 // indirect
278278
golang.org/x/sys v0.28.0 // indirect
279279
golang.org/x/text v0.21.0 // indirect
280280
golang.org/x/time v0.5.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,8 +1070,8 @@ github.com/hidal-go/hidalgo v0.0.0-20201109092204-05749a6d73df/go.mod h1:bPkrxDl
10701070
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
10711071
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
10721072
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
1073-
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241217130304-1083089a2dff h1:3Erh1H3SV/p08UqnufnP3mFoyk+hbQqCVdOt9wVRuTM=
1074-
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241217130304-1083089a2dff/go.mod h1:Fcz6IOEmwXihzi/Hn8fxuyAbyJxhe706+TJsQwLHRME=
1073+
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241225184208-74446d73506d h1:8PpO3OG7v/uH7FLnfCGtAuiQ5YUE9xvgUy66Rd1aod8=
1074+
github.com/hyperledger-labs/fabric-smart-client v0.3.1-0.20241225184208-74446d73506d/go.mod h1:Fcz6IOEmwXihzi/Hn8fxuyAbyJxhe706+TJsQwLHRME=
10751075
github.com/hyperledger-labs/orion-sdk-go v0.2.10 h1:lFgWgxyvngIhWnIqymYGBmtmq9D6uC5d0uLG9cbyh5s=
10761076
github.com/hyperledger-labs/orion-sdk-go v0.2.10/go.mod h1:iN2xZB964AqwVJwL+EnwPOs8z1EkMEbbIg/qYeC7gDY=
10771077
github.com/hyperledger-labs/orion-server v0.2.10 h1:G4zbQEL5Egk0Oj+TwHCZWdTOLDBHOjaAEvYOT4G7ozw=

integration/token/fungible/tests.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,7 @@ func TestAll(network *integration.Infrastructure, auditorId string, onRestart On
505505
CheckHolding(network, bob, "", "PINE", 110, auditor)
506506
CheckBalanceAndHolding(network, bob, "", "EUR", 20, auditor)
507507
CheckBalanceAndHolding(network, bob, "", "USD", 110, auditor)
508-
CheckOwnerDB(network, []string{
509-
//TODO: Errors
510-
}, bob)
508+
CheckOwnerDB(network, nil, bob)
511509
fmt.Printf("prepared transactions [%s:%s]", txID1, txID2)
512510
Restart(network, true, onRestart, bob)
513511
Restart(network, false, onRestart, auditor)
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package fabric
8+
9+
import (
10+
"context"
11+
"sync"
12+
13+
driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
14+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
15+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
16+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/fabricutils"
17+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/rwset"
18+
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/tracing"
19+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/common/rws/translator"
20+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/driver"
21+
"github.com/hyperledger/fabric-protos-go/common"
22+
"github.com/pkg/errors"
23+
"go.opentelemetry.io/otel/trace"
24+
"golang.org/x/sync/errgroup"
25+
)
26+
27+
// deliveryBasedFLMProvider assumes that a listener for a transaction is added before the transaction (i.e. the corresponding block) arrives in the delivery service listener.
28+
type deliveryBasedFLMProvider struct {
29+
fnsp *fabric.NetworkServiceProvider
30+
tracerProvider trace.TracerProvider
31+
keyTranslator translator.KeyTranslator
32+
}
33+
34+
func NewDeliveryBasedFLMProvider(fnsp *fabric.NetworkServiceProvider, tracerProvider trace.TracerProvider, keyTranslator translator.KeyTranslator) *deliveryBasedFLMProvider {
35+
return &deliveryBasedFLMProvider{
36+
fnsp: fnsp,
37+
tracerProvider: tracerProvider,
38+
keyTranslator: keyTranslator,
39+
}
40+
}
41+
42+
type listenerEntry struct {
43+
namespace driver2.Namespace
44+
listener driver.FinalityListener
45+
}
46+
47+
func (p *deliveryBasedFLMProvider) NewManager(network, channel string) (FinalityListenerManager, error) {
48+
net, err := p.fnsp.FabricNetworkService(network)
49+
if err != nil {
50+
return nil, err
51+
}
52+
ch, err := net.Channel(channel)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
flm := &deliveryBasedFLM{
58+
mapper: NewParallelResponseMapper(10, network, p.keyTranslator),
59+
tracer: p.tracerProvider.Tracer("finality_listener_manager", tracing.WithMetricsOpts(tracing.MetricsOpts{
60+
Namespace: network,
61+
})),
62+
listeners: NewMapCache[translator.TxID, []listenerEntry](),
63+
txInfos: NewMapCache[translator.TxID, txInfo](),
64+
}
65+
logger.Infof("Starting delivery service for [%s:%s]", network, channel)
66+
go func() {
67+
err := ch.Delivery().ScanBlock(context.Background(), func(ctx context.Context, block *common.Block) (bool, error) {
68+
return false, flm.onBlock(ctx, block)
69+
})
70+
logger.Errorf("failed running delivery for [%s:%s]: %v", network, channel, err)
71+
}()
72+
73+
return flm, nil
74+
}
75+
76+
type deliveryBasedFLM struct {
77+
tracer trace.Tracer
78+
mapper *parallelBlockMapper
79+
80+
mu sync.RWMutex
81+
listeners CacheMap[translator.TxID, []listenerEntry]
82+
txInfos CacheMap[translator.TxID, txInfo]
83+
}
84+
85+
func (m *deliveryBasedFLM) onBlock(ctx context.Context, block *common.Block) error {
86+
logger.Infof("New block with %d txs detected [%d]", len(block.Data.Data), block.Header.Number)
87+
88+
txs, err := m.mapper.Map(ctx, block)
89+
if err != nil {
90+
logger.Errorf("failed to process block [%d]: %v", block.Header.Number, err)
91+
return errors.Wrapf(err, "failed to process block [%d]", block.Header.Number)
92+
}
93+
94+
invokedTxIDs := make([]translator.TxID, 0)
95+
96+
m.mu.Lock()
97+
defer m.mu.Unlock()
98+
99+
invokedListeners := 0
100+
for _, txInfos := range txs {
101+
for ns, info := range txInfos {
102+
logger.Infof("Look for listeners of [%s:%s]", ns, info.txID)
103+
// We expect there to be only one namespace.
104+
// The complexity is better with a listenerEntry slice (because of the write operations)
105+
// If more namespaces are expected, it is worth switching to a map.
106+
listeners, ok := m.listeners.Get(info.txID)
107+
if ok {
108+
invokedTxIDs = append(invokedTxIDs, info.txID)
109+
}
110+
logger.Infof("Invoking %d listeners for [%s]", len(listeners), info.txID)
111+
for _, entry := range listeners {
112+
if len(entry.namespace) == 0 || len(ns) == 0 || entry.namespace == ns {
113+
invokedListeners++
114+
go entry.listener.OnStatus(ctx, info.txID, info.status, info.message, info.requestHash)
115+
}
116+
}
117+
}
118+
}
119+
//m.mu.RUnlock()
120+
121+
logger.Infof("Invoked %d listeners for %d TxIDs: [%v]. Removing listeners...", invokedListeners, len(invokedTxIDs), invokedTxIDs)
122+
123+
//m.mu.Lock()
124+
//defer m.mu.Unlock()
125+
for _, txInfos := range txs {
126+
for ns, info := range txInfos {
127+
logger.Warnf("Mapping for ns [%s]", ns)
128+
m.txInfos.Put(info.txID, info)
129+
}
130+
}
131+
logger.Infof("Current size of cache: %d", m.txInfos.Len())
132+
133+
m.listeners.Delete(invokedTxIDs...)
134+
135+
logger.Infof("Removed listeners for %d invoked TxIDs: %v", len(invokedTxIDs), invokedTxIDs)
136+
137+
return nil
138+
139+
}
140+
141+
func (m *deliveryBasedFLM) AddFinalityListener(namespace string, txID string, listener driver.FinalityListener) error {
142+
m.mu.RLock()
143+
if txInfo, ok := m.txInfos.Get(txID); ok {
144+
defer m.mu.RUnlock()
145+
logger.Infof("Found tx [%s]. Invoking listener directly", txID)
146+
go listener.OnStatus(context.TODO(), txInfo.txID, txInfo.status, txInfo.message, txInfo.requestHash)
147+
return nil
148+
}
149+
m.mu.RUnlock()
150+
m.mu.Lock()
151+
logger.Infof("Checking if value has been added meanwhile for [%s]", txID)
152+
defer m.mu.Unlock()
153+
if txInfo, ok := m.txInfos.Get(txID); ok {
154+
logger.Infof("Found tx [%s]! Invoking listener directly", txID)
155+
go listener.OnStatus(context.TODO(), txInfo.txID, txInfo.status, txInfo.message, txInfo.requestHash)
156+
return nil
157+
}
158+
m.listeners.Update(txID, func(_ bool, listeners []listenerEntry) (bool, []listenerEntry) {
159+
return true, append(listeners, listenerEntry{namespace, listener})
160+
})
161+
return nil
162+
}
163+
164+
func (m *deliveryBasedFLM) RemoveFinalityListener(txID string, listener driver.FinalityListener) error {
165+
logger.Infof("Manually invoked listener removal for [%s]", txID)
166+
m.mu.Lock()
167+
defer m.mu.Unlock()
168+
ok := m.listeners.Update(txID, func(_ bool, listeners []listenerEntry) (bool, []listenerEntry) {
169+
for i, entry := range listeners {
170+
if entry.listener == listener {
171+
listeners = append(listeners[:i], listeners[i+1:]...)
172+
}
173+
}
174+
return len(listeners) > 0, listeners
175+
})
176+
if ok {
177+
return nil
178+
}
179+
return errors.Errorf("could not find listener [%v] in txid [%s]", listener, txID)
180+
}
181+
182+
type txInfo struct {
183+
txID translator.TxID
184+
status driver.TxStatus
185+
message string
186+
requestHash []byte
187+
}
188+
189+
type parallelBlockMapper struct {
190+
keyTranslator translator.KeyTranslator
191+
network string
192+
cap int
193+
}
194+
195+
func NewParallelResponseMapper(cap int, network string, keyTranslator translator.KeyTranslator) *parallelBlockMapper {
196+
return &parallelBlockMapper{cap: cap, network: network, keyTranslator: keyTranslator}
197+
}
198+
199+
func (m *parallelBlockMapper) Map(ctx context.Context, block *common.Block) ([]map[driver2.Namespace]txInfo, error) {
200+
logger.Infof("Mapping block [%d]", block.Header.Number)
201+
eg := errgroup.Group{}
202+
eg.SetLimit(m.cap)
203+
results := make([]map[driver2.Namespace]txInfo, len(block.Data.Data))
204+
for i, tx := range block.Data.Data {
205+
eg.Go(func() error {
206+
event, err := m.mapTxInfo(ctx, tx, block.Metadata, block.Header.Number, driver2.TxNum(i))
207+
if err != nil {
208+
return err
209+
}
210+
results[i] = event
211+
logger.Infof("Put tx [%d:%d]: [%v]", block.Header.Number, i, event)
212+
return nil
213+
})
214+
}
215+
if err := eg.Wait(); err != nil {
216+
return nil, err
217+
}
218+
return results, nil
219+
}
220+
221+
func (m *parallelBlockMapper) mapTxInfo(ctx context.Context, tx []byte, block *common.BlockMetadata, blockNum driver2.BlockNum, txNum driver2.TxNum) (map[driver2.Namespace]txInfo, error) {
222+
_, payl, chdr, err := fabricutils.UnmarshalTx(tx)
223+
if err != nil {
224+
return nil, errors.Wrapf(err, "failed unmarshaling tx [%d:%d]", blockNum, txNum)
225+
}
226+
if common.HeaderType(chdr.Type) != common.HeaderType_ENDORSER_TRANSACTION {
227+
logger.Warnf("Type of TX [%d:%d] is [%d]. Skipping...", blockNum, txNum, chdr.Type)
228+
return nil, nil
229+
}
230+
rwSet, err := rwset.NewEndorserTransactionReader(m.network).Read(payl, chdr)
231+
if err != nil {
232+
return nil, errors.Wrapf(err, "failed extracting rwset")
233+
}
234+
key, err := m.keyTranslator.CreateTokenRequestKey(chdr.TxId)
235+
if err != nil {
236+
return nil, errors.Wrapf(err, "can't create for token request [%s]", chdr.TxId)
237+
}
238+
_, finalityEvent, err := committer.MapFinalityEvent(ctx, block, txNum, chdr.TxId)
239+
if err != nil {
240+
return nil, errors.Wrapf(err, "failed mapping finality event")
241+
}
242+
243+
txInfos := make(map[driver2.Namespace]txInfo, len(rwSet.WriteSet.Writes))
244+
logger.Infof("TX [%s] has %d namespaces", chdr.TxId, len(rwSet.WriteSet.Writes))
245+
for ns, write := range rwSet.WriteSet.Writes {
246+
logger.Infof("TX [%s:%s] has %d writes", chdr.TxId, ns, len(write))
247+
if requestHash, ok := write[key]; ok {
248+
txInfos[ns] = txInfo{
249+
txID: chdr.TxId,
250+
status: finalityEvent.ValidationCode,
251+
message: finalityEvent.ValidationMessage,
252+
requestHash: requestHash,
253+
}
254+
} else {
255+
logger.Warnf("TX [%s:%s] did not have key [%s]. Found: %v", chdr.TxId, ns, key, write.Keys())
256+
}
257+
}
258+
return txInfos, nil
259+
}

token/services/network/fabric/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func NewGenericDriver(
8181
NewTokenExecutorProvider(fnsProvider),
8282
NewSpentTokenExecutorProvider(fnsProvider, keyTranslator),
8383
keyTranslator,
84-
NewCommitterBasedFLMProvider(fnsProvider, tracerProvider, keyTranslator),
84+
NewDeliveryBasedFLMProvider(fnsProvider, tracerProvider, keyTranslator),
8585
endorsement.NewServiceProvider(fnsProvider, configService, viewManager, viewRegistry, identityProvider, keyTranslator),
8686
config2.GenericDriver,
8787
)

0 commit comments

Comments
 (0)