Skip to content

Commit 091ecad

Browse files
adecaroalexandrosfilios
authored andcommitted
comments update
Signed-off-by: Angelo De Caro <adc@zurich.ibm.com>
1 parent 4e7f46d commit 091ecad

File tree

2 files changed

+114
-99
lines changed

2 files changed

+114
-99
lines changed

token/services/network/fabric/finality/deliveryqs.go

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import (
1818
"go.uber.org/zap"
1919
)
2020

21+
const (
22+
NumberPastBlocks = 10
23+
FirstBlock = 1
24+
)
25+
2126
type DeliveryScanQueryByID struct {
2227
Delivery *fabric.Delivery
2328
Ledger *fabric.Ledger
@@ -26,19 +31,20 @@ type DeliveryScanQueryByID struct {
2631

2732
func (q *DeliveryScanQueryByID) QueryByID(ctx context.Context, lastBlock driver.BlockNum, evicted map[driver.TxID][]finality.ListenerEntry[TxInfo]) (<-chan []TxInfo, error) {
2833
keys := collections.Keys(evicted)
29-
results := collections.NewSet(keys...)
3034
ch := make(chan []TxInfo, len(keys))
31-
go q.queryByID(ctx, results, ch, lastBlock)
35+
go q.queryByID(ctx, keys, ch, lastBlock)
3236
return ch, nil
3337
}
3438

35-
func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, results collections.Set[string], ch chan []TxInfo, lastBlock uint64) {
39+
func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, keys []driver.TxID, ch chan []TxInfo, lastBlock uint64) {
3640
defer close(ch)
3741

42+
keySet := collections.NewSet(keys...)
43+
3844
// for each txID, fetch the corresponding transaction.
3945
// if the transaction is not found, start a delivery for it
4046
startDelivery := false
41-
for _, txID := range results.ToSlice() {
47+
for _, txID := range keySet.ToSlice() {
4248
logger.Debugf("loading transaction [%s] from ledger...", txID)
4349
pt, err := q.Ledger.GetTransactionByID(txID)
4450
if err == nil {
@@ -48,7 +54,7 @@ func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, results collectio
4854
logger.Errorf("failed to map tx [%s]: [%s]", txID, err)
4955
return
5056
}
51-
results.Remove(txID)
57+
keySet.Remove(txID)
5258
ch <- infos
5359
continue
5460
}
@@ -68,37 +74,39 @@ func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, results collectio
6874
return
6975
}
7076

71-
if startDelivery {
72-
startingBlock := finality.MaxUint64(1, lastBlock-10)
73-
// startingBlock := uint64(0)
74-
if logger.IsEnabledFor(zap.DebugLevel) {
75-
logger.Debugf("start scanning blocks starting from [%d], looking for remaining keys [%v]", startingBlock, results.ToSlice())
76-
}
77+
if !startDelivery {
78+
return
79+
}
7780

78-
// start delivery for the future
79-
err := q.Delivery.ScanFromBlock(
80-
ctx,
81-
startingBlock,
82-
func(tx *fabric.ProcessedTransaction) (bool, error) {
83-
if !results.Contains(tx.TxID()) {
84-
return false, nil
85-
}
86-
logger.Debugf("received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
87-
infos, err := q.Mapper.MapProcessedTx(tx)
88-
if err != nil {
89-
logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err)
90-
return true, err
91-
}
92-
ch <- infos
93-
results.Remove(tx.TxID())
94-
logger.Debugf("removing [%s] from searching list, remaining keys [%d]", tx.TxID(), results.Length())
95-
return results.Length() == 0, nil
96-
},
97-
)
98-
if err != nil {
99-
logger.Errorf("failed scanning blocks [%s], started from [%d]", err, startingBlock)
100-
return
101-
}
102-
logger.Debugf("finished scanning blocks starting from [%d]", startingBlock)
81+
startingBlock := finality.MaxUint64(FirstBlock, lastBlock-NumberPastBlocks)
82+
// startingBlock := uint64(0)
83+
if logger.IsEnabledFor(zap.DebugLevel) {
84+
logger.Debugf("start scanning blocks starting from [%d], looking for remaining keys [%v]", startingBlock, keySet.ToSlice())
85+
}
86+
87+
// start delivery for the future
88+
err := q.Delivery.ScanFromBlock(
89+
ctx,
90+
startingBlock,
91+
func(tx *fabric.ProcessedTransaction) (bool, error) {
92+
if !keySet.Contains(tx.TxID()) {
93+
return false, nil
94+
}
95+
logger.Debugf("received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
96+
infos, err := q.Mapper.MapProcessedTx(tx)
97+
if err != nil {
98+
logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err)
99+
return true, err
100+
}
101+
ch <- infos
102+
keySet.Remove(tx.TxID())
103+
logger.Debugf("removing [%s] from searching list, remaining keys [%d]", tx.TxID(), keySet.Length())
104+
return keySet.Length() == 0, nil
105+
},
106+
)
107+
if err != nil {
108+
logger.Errorf("failed scanning blocks [%s], started from [%d]", err, startingBlock)
109+
return
103110
}
111+
logger.Debugf("finished scanning blocks starting from [%d]", startingBlock)
104112
}

token/services/network/fabric/lookup/deliveryqs.go

Lines changed: 70 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,39 @@ import (
1616
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
1717
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/finality"
1818
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/fabric/tcc"
19+
slices2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/utils/slices"
1920
"go.uber.org/zap"
2021
)
2122

2223
const (
23-
QueryStates = tcc.QueryStates
24+
QueryStates = tcc.QueryStates
25+
NumberPastBlocks = 10
26+
FirstBlock = 1
2427
)
2528

2629
type DeliveryScanQueryByID struct {
2730
Delivery *fabric.Delivery
2831
Channel *fabric.Channel
2932
}
3033

31-
func (q *DeliveryScanQueryByID) QueryByID(ctx context.Context, lastBlock driver2.BlockNum, evicted map[driver2.TxID][]finality.ListenerEntry[TxInfo]) (<-chan []TxInfo, error) {
34+
func (q *DeliveryScanQueryByID) QueryByID(ctx context.Context, startingBlock driver2.BlockNum, evicted map[driver2.TxID][]finality.ListenerEntry[TxInfo]) (<-chan []TxInfo, error) {
3235
// we are abusing TxID to carry the name of the keys we are looking for.
3336
// Keys are supposed to be unique
3437
keys := collections.Keys(evicted) // These are the state keys we are looking for
35-
results := collections.NewSet(keys...)
3638
ch := make(chan []TxInfo, len(keys))
37-
go q.queryByID(ctx, results, ch, lastBlock, evicted)
39+
go q.queryByID(ctx, keys, ch, startingBlock, evicted)
3840
return ch, nil
3941
}
4042

41-
func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, results collections.Set[string], ch chan []TxInfo, lastBlock uint64, evicted map[driver2.TxID][]finality.ListenerEntry[TxInfo]) {
43+
func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, keys []driver2.TxID, ch chan []TxInfo, lastBlock uint64, evicted map[driver2.TxID][]finality.ListenerEntry[TxInfo]) {
4244
defer close(ch)
4345

46+
keySet := collections.NewSet(keys...)
47+
4448
// group keys by namespace
45-
keysByNS := map[driver2.Namespace][]string{}
49+
keysByNS := map[driver2.Namespace][]driver2.PKey{}
4650
for k, v := range evicted {
47-
ns := v[0].Namespace()
51+
ns := slices2.GetAny(v).Namespace()
4852
_, ok := keysByNS[ns]
4953
if !ok {
5054
keysByNS[ns] = []string{}
@@ -74,83 +78,86 @@ func (q *DeliveryScanQueryByID) queryByID(ctx context.Context, results collectio
7478
logger.Errorf("failed unmarshalling results for query by ids [%v]: [%s]", keys, err)
7579
return
7680
}
77-
infos := make([]TxInfo, 0, len(values))
78-
var remainingKeys []string
81+
found := make([]TxInfo, 0, len(values))
82+
var notFound []string
7983
for i, value := range values {
8084
if len(value) == 0 {
8185
startDelivery = true
82-
remainingKeys = append(remainingKeys, keys[i])
86+
notFound = append(notFound, keys[i])
8387
continue
8488
}
85-
infos = append(infos, TxInfo{
89+
found = append(found, TxInfo{
8690
Namespace: ns,
8791
Key: keys[i],
8892
Value: value,
8993
})
90-
results.Remove(keys[i])
94+
keySet.Remove(keys[i])
9195
}
92-
if len(remainingKeys) == 0 {
96+
if len(notFound) == 0 {
9397
delete(keysByNS, ns)
9498
} else {
95-
keysByNS[ns] = remainingKeys
99+
keysByNS[ns] = notFound
96100
}
97-
ch <- infos
101+
ch <- found
98102
}
99103

100-
if startDelivery {
101-
startingBlock := finality.MaxUint64(1, lastBlock-10)
102-
// startingBlock := uint64(0)
103-
if logger.IsEnabledFor(zap.DebugLevel) {
104-
logger.Debugf("start scanning blocks starting from [%d], looking for remaining keys [%v]", startingBlock, results.ToSlice())
105-
}
104+
if !startDelivery {
105+
return
106+
}
107+
108+
startingBlock := finality.MaxUint64(FirstBlock, lastBlock-NumberPastBlocks)
109+
// startingBlock := uint64(0)
110+
if logger.IsEnabledFor(zap.DebugLevel) {
111+
logger.Debugf("start scanning blocks starting from [%d], looking for remaining keys [%v]", startingBlock, keySet.ToSlice())
112+
}
113+
114+
// start delivery for the future
115+
v := q.Channel.Vault()
116+
err := q.Delivery.ScanFromBlock(
117+
ctx,
118+
startingBlock,
119+
func(tx *fabric.ProcessedTransaction) (bool, error) {
120+
rws, err := v.InspectRWSet(ctx, tx.Results())
121+
if err != nil {
122+
return false, err
123+
}
106124

107-
// start delivery for the future
108-
v := q.Channel.Vault()
109-
err := q.Delivery.ScanFromBlock(
110-
ctx,
111-
startingBlock,
112-
func(tx *fabric.ProcessedTransaction) (bool, error) {
113-
rws, err := v.InspectRWSet(ctx, tx.Results())
114-
if err != nil {
115-
return false, err
125+
var txInfos []TxInfo
126+
for namespace, keys := range keysByNS {
127+
if !slices.Contains(rws.Namespaces(), namespace) {
128+
logger.Debugf("scanning [%s] does not contain namespace [%s]", tx.TxID(), namespace)
129+
continue
116130
}
117131

118-
var txInfos []TxInfo
119-
for namespace, keys := range keysByNS {
120-
if !slices.Contains(rws.Namespaces(), namespace) {
121-
logger.Debugf("scanning [%s] does not contain namespace [%s]", tx.TxID(), namespace)
122-
continue
132+
for i := 0; i < rws.NumWrites(namespace); i++ {
133+
k, v, err := rws.GetWriteAt(namespace, i)
134+
if err != nil {
135+
logger.Debugf("scanning [%s]: failed to get key [%s]", tx.TxID(), err)
136+
return false, err
123137
}
124-
125-
for i := 0; i < rws.NumWrites(namespace); i++ {
126-
k, v, err := rws.GetWriteAt(namespace, i)
127-
if err != nil {
128-
logger.Debugf("scanning [%s]: failed to get key [%s]", tx.TxID(), err)
129-
return false, err
130-
}
131-
if slices.Contains(keys, k) {
132-
logger.Debugf("scanning [%s]: found key [%s]", tx.TxID(), k)
133-
txInfos = append(txInfos, TxInfo{
134-
Namespace: namespace,
135-
Key: k,
136-
Value: v,
137-
})
138-
logger.Debugf("removing [%s] from searching list, remaining keys [%d]", k, results.Length())
139-
results.Remove(k)
140-
}
138+
if slices.Contains(keys, k) {
139+
logger.Debugf("scanning [%s]: found key [%s]", tx.TxID(), k)
140+
txInfos = append(txInfos, TxInfo{
141+
Namespace: namespace,
142+
Key: k,
143+
Value: v,
144+
})
145+
logger.Debugf("removing [%s] from searching list, remaining keys [%d]", k, keySet.Length())
146+
keySet.Remove(k)
141147
}
142148
}
143-
if len(txInfos) != 0 {
144-
ch <- txInfos
145-
}
149+
}
150+
if len(txInfos) != 0 {
151+
ch <- txInfos
152+
}
146153

147-
return results.Length() == 0, nil
148-
},
149-
)
150-
if err != nil {
151-
logger.Errorf("failed scanning blocks [%s], started from [%d]", err, startingBlock)
152-
return
153-
}
154-
logger.Debugf("finished scanning blocks starting from [%d]", startingBlock)
154+
return keySet.Length() == 0, nil
155+
},
156+
)
157+
if err != nil {
158+
logger.Errorf("failed scanning blocks [%s], started from [%d]", err, startingBlock)
159+
return
155160
}
161+
logger.Debugf("finished scanning blocks starting from [%d]", startingBlock)
162+
156163
}

0 commit comments

Comments
 (0)