Skip to content

Commit 74446d7

Browse files
Extracted endorser transaction mapping functionality for re-use
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 5b0424a commit 74446d7

File tree

10 files changed

+90
-59
lines changed

10 files changed

+90
-59
lines changed

docs/fabric/fabricdev/core/fabricdev/vault/vault.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
3838
)
3939
}
4040

41-
func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
41+
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
4242
return vault.NewInterceptor[fdriver.ValidationCode](
4343
logger,
44+
rwSet,
4445
qe,
4546
txIDStore,
4647
txID,
@@ -53,7 +54,7 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt
5354
// populator is the custom populator for FabricDEV
5455
type populator struct{}
5556

56-
func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
57+
func (p *populator) Populate([]byte, ...driver.Namespace) (vault.ReadWriteSet, error) {
5758
panic("implement me")
5859
}
5960

platform/common/core/generic/vault/inspector.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,6 @@ type Inspector struct {
1515
Rws ReadWriteSet
1616
}
1717

18-
func NewInspector() *Inspector {
19-
return &Inspector{
20-
Rws: ReadWriteSet{
21-
ReadSet: ReadSet{
22-
OrderedReads: map[string][]string{},
23-
Reads: Reads{},
24-
},
25-
WriteSet: WriteSet{
26-
OrderedWrites: map[string][]string{},
27-
Writes: Writes{},
28-
},
29-
MetaWriteSet: MetaWriteSet{
30-
MetaWrites: NamespaceKeyedMetaWrites{},
31-
},
32-
},
33-
}
34-
}
35-
3618
func (i *Inspector) IsValid() error {
3719
return nil
3820
}

platform/common/core/generic/vault/interceptor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func EmptyRWSet() ReadWriteSet {
5555

5656
func NewInterceptor[V driver.ValidationCode](
5757
logger Logger,
58+
rwSet ReadWriteSet,
5859
qe VersionedQueryExecutor,
5960
txIDStore TXIDStoreReader[V],
6061
txID driver.TxID,
@@ -69,7 +70,7 @@ func NewInterceptor[V driver.ValidationCode](
6970
TxID: txID,
7071
QE: qe,
7172
TxIDStore: txIDStore,
72-
Rws: EmptyRWSet(),
73+
Rws: rwSet,
7374
vcProvider: vcProvider,
7475
Marshaller: marshaller,
7576
VersionComparator: versionComparator,

platform/common/core/generic/vault/interceptor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestConcurrency(t *testing.T) {
2020
qe := mocks.NewMockQE()
2121
idsr := mocks.MockTXIDStoreReader{}
2222

23-
i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
23+
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")
2424
s, err := i.GetState("ns", "key")
2525
assert.NoError(t, err)
2626
assert.Equal(t, qe.State.Raw, s, "with no opts, getstate should return the FromStorage value (query executor)")
@@ -64,7 +64,7 @@ func TestConcurrency(t *testing.T) {
6464
func TestAddReadAt(t *testing.T) {
6565
qe := mocks.MockQE{}
6666
idsr := mocks.MockTXIDStoreReader{}
67-
i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
67+
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")
6868

6969
assert.NoError(t, i.AddReadAt("ns", "key", []byte("version")))
7070
assert.Len(t, i.RWs().Reads, 1)

platform/common/core/generic/vault/vault.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ type TxInterceptor interface {
4848
}
4949

5050
type Populator interface {
51-
Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error
51+
Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (ReadWriteSet, error)
5252
}
5353

5454
type Marshaller interface {
5555
Marshal(txID string, rws *ReadWriteSet) ([]byte, error)
5656
Append(destination *ReadWriteSet, raw []byte, nss ...string) error
5757
}
5858

59-
type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor
59+
type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, rwSet ReadWriteSet, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor
6060

6161
type (
6262
VersionedPersistence = dbdriver.VersionedPersistence
@@ -447,7 +447,7 @@ func (db *Vault[V]) NewRWSet(txID driver.TxID) (driver.RWSet, error) {
447447

448448
func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {
449449
db.logger.Debugf("NewRWSet[%s][%d]", txID, db.counter.Load())
450-
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
450+
i := db.newInterceptor(db.logger, EmptyRWSet(), &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
451451

452452
db.interceptorsLock.Lock()
453453
if _, in := db.Interceptors[txID]; in {
@@ -469,12 +469,13 @@ func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {
469469

470470
func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) {
471471
db.logger.Debugf("GetRWSet[%s][%d]", txID, db.counter.Load())
472-
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
473-
474-
if err := db.populator.Populate(i.RWs(), rwsetBytes); err != nil {
472+
rwSet, err := db.populator.Populate(rwsetBytes)
473+
if err != nil {
475474
return nil, errors.Wrapf(err, "failed populating tx [%s]", txID)
476475
}
477476

477+
i := db.newInterceptor(db.logger, rwSet, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
478+
478479
db.interceptorsLock.Lock()
479480
if i, in := db.Interceptors[txID]; in {
480481
if !i.IsClosed() {
@@ -496,13 +497,11 @@ func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet,
496497
}
497498

498499
func (db *Vault[V]) InspectRWSet(rwsetBytes []byte, namespaces ...driver.Namespace) (driver.RWSet, error) {
499-
i := NewInspector()
500-
501-
if err := db.populator.Populate(&i.Rws, rwsetBytes, namespaces...); err != nil {
500+
rwSet, err := db.populator.Populate(rwsetBytes, namespaces...)
501+
if err != nil {
502502
return nil, errors.Wrapf(err, "failed populating ephemeral txID")
503503
}
504-
505-
return i, nil
504+
return &Inspector{Rws: rwSet}, nil
506505
}
507506

508507
func (db *Vault[V]) Match(txID driver.TxID, rwsRaw []byte) error {

platform/common/core/generic/vault/vault_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ func (p *testArtifactProvider) NewMarshaller() Marshaller {
7171

7272
func newInterceptor(
7373
logger Logger,
74+
rwSet ReadWriteSet,
7475
qe VersionedQueryExecutor,
7576
txidStore TXIDStoreReader[ValidationCode],
7677
txid driver2.TxID,
7778
) TxInterceptor {
7879
return NewInterceptor[ValidationCode](
7980
logger,
81+
rwSet,
8082
qe,
8183
txidStore,
8284
txid,
@@ -90,8 +92,12 @@ type populator struct {
9092
marshaller marshaller
9193
}
9294

93-
func (p *populator) Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver2.Namespace) error {
94-
return p.marshaller.Append(rws, rwsetBytes, namespaces...)
95+
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver2.Namespace) (ReadWriteSet, error) {
96+
rwSet := EmptyRWSet()
97+
if err := p.marshaller.Append(&rwSet, rwsetBytes, namespaces...); err != nil {
98+
return ReadWriteSet{}, err
99+
}
100+
return rwSet, nil
95101
}
96102

97103
type marshaller struct{}

platform/fabric/core/generic/committer/endorsertx.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,9 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
2424
if logger.IsEnabledFor(zapcore.DebugLevel) {
2525
logger.Debugf("[%s] EndorserClient transaction received: %s", c.ChannelConfig.ID(), tx.TxID)
2626
}
27-
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
28-
return nil, errors.Errorf("block metadata lacks transaction filter")
29-
}
30-
31-
fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[tx.TxNum]
32-
event := &FinalityEvent{
33-
Ctx: ctx,
34-
TxID: tx.TxID,
35-
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
36-
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
27+
fabricValidationCode, event, err := MapFinalityEvent(ctx, block, tx.TxNum, tx.TxID)
28+
if err != nil {
29+
return nil, err
3730
}
3831

3932
switch pb.TxValidationCode(fabricValidationCode) {
@@ -62,6 +55,21 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
6255
return event, nil
6356
}
6457

58+
func MapFinalityEvent(ctx context.Context, block *common.BlockMetadata, txNum driver.TxNum, txID string) (uint8, *FinalityEvent, error) {
59+
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
60+
return 0, nil, errors.Errorf("block metadata lacks transaction filter")
61+
}
62+
63+
fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[txNum]
64+
event := &FinalityEvent{
65+
Ctx: ctx,
66+
TxID: txID,
67+
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
68+
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
69+
}
70+
return fabricValidationCode, event, nil
71+
}
72+
6573
// GetChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode.
6674
func (c *Committer) GetChaincodeEvents(env *common.Envelope, blockNum driver2.BlockNum) error {
6775
chaincodeEvent, err := readChaincodeEvent(env, blockNum)

platform/fabric/core/generic/rwset/handler.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ package rwset
99
import (
1010
"fmt"
1111

12+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault"
13+
vault2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/vault"
1214
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
1315
"github.com/hyperledger/fabric-protos-go/common"
1416
)
@@ -36,3 +38,26 @@ func (h *endorserTransactionHandler) Load(payl *common.Payload, chdr *common.Cha
3638
}
3739
return rws, upe, nil
3840
}
41+
42+
type endorserTransactionReader struct {
43+
network string
44+
populator vault.Populator
45+
}
46+
47+
func NewEndorserTransactionReader(network string) *endorserTransactionReader {
48+
return &endorserTransactionReader{
49+
network: network,
50+
populator: vault2.NewPopulator(),
51+
}
52+
}
53+
54+
func (h *endorserTransactionReader) Read(payl *common.Payload, chdr *common.ChannelHeader) (vault.ReadWriteSet, error) {
55+
upe, err := UnpackEnvelopeFromPayloadAndCHHeader(h.network, payl, chdr)
56+
if err != nil {
57+
return vault.ReadWriteSet{}, fmt.Errorf("failed unpacking envelope [%s]: %w", chdr.TxId, err)
58+
}
59+
60+
logger.Debugf("retrieve rws [%s,%s]", h.network, chdr.TxId)
61+
62+
return h.populator.Populate(upe.Results)
63+
}

platform/fabric/core/generic/vault/vault.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,17 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
4545
txIDStore,
4646
&fdriver.ValidationCodeProvider{},
4747
newInterceptor,
48-
&populator{},
48+
NewPopulator(),
4949
metricsProvider,
5050
tracerProvider,
5151
&vault.BlockTxIndexVersionBuilder{},
5252
)
5353
}
5454

55-
func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
55+
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
5656
return vault.NewInterceptor[fdriver.ValidationCode](
5757
logger,
58+
rwSet,
5859
qe,
5960
txIDStore,
6061
txID,
@@ -68,18 +69,23 @@ type populator struct {
6869
versionMarshaller vault.BlockTxIndexVersionMarshaller
6970
}
7071

71-
func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
72+
func NewPopulator() *populator {
73+
return &populator{}
74+
}
75+
76+
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
7277
txRWSet := &rwset.TxReadWriteSet{}
7378
err := proto.Unmarshal(rwsetBytes, txRWSet)
7479
if err != nil {
75-
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
80+
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
7681
}
7782

7883
rwsIn, err := rwsetutil.TxRwSetFromProtoMsg(txRWSet)
7984
if err != nil {
80-
return errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
85+
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
8186
}
8287

88+
rws := vault.EmptyRWSet()
8389
namespaceSet := collections.NewSet(namespaces...)
8490
for _, nsrws := range rwsIn.NsRwSets {
8591
ns := nsrws.NameSpace
@@ -101,7 +107,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
101107

102108
for _, write := range nsrws.KvRwSet.Writes {
103109
if err := rws.WriteSet.Add(ns, write.Key, write.Value); err != nil {
104-
return err
110+
return vault.ReadWriteSet{}, err
105111
}
106112
}
107113

@@ -112,12 +118,12 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
112118
}
113119

114120
if err := rws.MetaWriteSet.Add(ns, metaWrite.Key, metadata); err != nil {
115-
return err
121+
return vault.ReadWriteSet{}, err
116122
}
117123
}
118124
}
119125

120-
return nil
126+
return rws, nil
121127
}
122128

123129
type marshaller struct {

platform/orion/core/generic/vault/vault.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ type Interceptor struct {
5151

5252
func newInterceptor(
5353
logger vault.Logger,
54+
rwSet vault.ReadWriteSet,
5455
qe vault.VersionedQueryExecutor,
5556
txidStore vault.TXIDStoreReader[odriver.ValidationCode],
5657
txid string,
5758
) vault.TxInterceptor {
5859
return &Interceptor{Interceptor: vault.NewInterceptor[odriver.ValidationCode](
5960
logger,
61+
rwSet,
6062
qe,
6163
txidStore,
6264
txid,
@@ -88,11 +90,12 @@ type populator struct {
8890
versionMarshaller vault.BlockTxIndexVersionMarshaller
8991
}
9092

91-
func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
93+
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
94+
rws := vault.EmptyRWSet()
9295
txRWSet := &types.DataTx{}
9396
err := proto.Unmarshal(rwsetBytes, txRWSet)
9497
if err != nil {
95-
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
98+
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
9699
}
97100

98101
for _, operation := range txRWSet.DbOperations {
@@ -120,7 +123,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
120123
write.Key,
121124
write.Value,
122125
); err != nil {
123-
return errors.Wrapf(err, "failed to add write to read-write set")
126+
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add write to read-write set")
124127
}
125128
// TODO: What about write.ACL? Shall we store it as metadata?
126129
}
@@ -131,10 +134,10 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
131134
del.Key,
132135
nil,
133136
); err != nil {
134-
return errors.Wrapf(err, "failed to add delete to read-write set")
137+
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add delete to read-write set")
135138
}
136139
}
137140
}
138141

139-
return nil
142+
return rws, nil
140143
}

0 commit comments

Comments
 (0)