Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions endorsement/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ SPDX-License-Identifier: Apache-2.0
package endorsement

import (
"crypto/rand"
"errors"
"fmt"
"net/http"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
"github.com/hyperledger/fabric-x-common/protoutil"
sdk "github.com/hyperledger/fabric-x-sdk"
"github.com/hyperledger/fabric-x-sdk/blocks"
)

Expand Down Expand Up @@ -85,6 +87,62 @@ func Success(rws blocks.ReadWriteSet, event []byte, payload []byte) ExecutionRes
}
}

// NewInvocation creates an Invocation directly from a signer, channel, namespace and args.
func NewInvocation(signer sdk.Signer, channel, namespace string, args [][]byte) (Invocation, error) {
creator, err := signer.Serialize()
if err != nil {
return Invocation{}, err
}

nonce := make([]byte, 24)
if _, err := rand.Read(nonce); err != nil {
// rand.Read uses operating system APIs that are documented to never
// return an error on all but legacy Linux systems.
panic(err)
}

txID := protoutil.ComputeTxID(nonce, creator)
ccid := &peer.ChaincodeID{Name: namespace, Version: "1.0"}
proposal, _, err := protoutil.CreateChaincodeProposalWithTxIDNonceAndTransient(
txID,
common.HeaderType_ENDORSER_TRANSACTION,
channel,
&peer.ChaincodeInvocationSpec{
ChaincodeSpec: &peer.ChaincodeSpec{
Type: peer.ChaincodeSpec_CAR,
ChaincodeId: ccid,
Input: &peer.ChaincodeInput{Args: args},
},
},
nonce,
creator,
nil,
)
if err != nil {
return Invocation{}, err
}

hdr, err := protoutil.UnmarshalHeader(proposal.Header)
if err != nil {
return Invocation{}, err
}
propHash, err := protoutil.GetProposalHash1(hdr, proposal.Payload)
if err != nil {
return Invocation{}, err
}

return Invocation{
TxID: txID,
Nonce: nonce,
Creator: creator,
Args: args,
CCID: ccid,
Channel: channel,
Proposal: proposal,
ProposalHash: propHash,
}, nil
}

// Parse extracts the fields that are relevant for endorsement from a SignedProposal.
// expectedTime is an optional timestamp of the expected time of signing. If provided,
// validation will fail in case of a larger difference than 5 minutes.
Expand Down
25 changes: 6 additions & 19 deletions integration/cases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
sdk "github.com/hyperledger/fabric-x-sdk"
"github.com/hyperledger/fabric-x-sdk/blocks"
"github.com/hyperledger/fabric-x-sdk/endorsement"
"github.com/hyperledger/fabric-x-sdk/network"
"github.com/hyperledger/fabric-x-sdk/notification"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -329,13 +328,9 @@ func testAddLog(t *testing.T, s *testSetup) {
t.Fatalf("marshal logs: %v", err)
}

signedProp, err := network.NewSignedProposal(s.signer, s.channel, s.namespace, "1.0", [][]byte{[]byte("invoke")})
inv, err := endorsement.NewInvocation(s.signer, s.channel, s.namespace, [][]byte{[]byte("invoke")})
if err != nil {
t.Fatalf("NewSignedProposal: %v", err)
}
inv, err := endorsement.Parse(signedProp, time.Time{})
if err != nil {
t.Fatalf("endorsement.Parse: %v", err)
t.Fatalf("NewInvocation: %v", err)
}
var responses []*peer.ProposalResponse
for _, b := range s.builders {
Expand Down Expand Up @@ -396,13 +391,9 @@ func testInputArgsAndEvents(t *testing.T, s *testSetup) {
args := [][]byte{[]byte("invoke"), []byte("arg1"), []byte("arg2")}
eventPayload := []byte(`{"type":"Transfer"}`)

signedProp, err := network.NewSignedProposal(s.signer, s.channel, s.namespace, "1.0", args)
if err != nil {
t.Fatalf("NewSignedProposal: %v", err)
}
inv, err := endorsement.Parse(signedProp, time.Time{})
inv, err := endorsement.NewInvocation(s.signer, s.channel, s.namespace, args)
if err != nil {
t.Fatalf("endorsement.Parse: %v", err)
t.Fatalf("NewInvocation: %v", err)
}

var responses []*peer.ProposalResponse
Expand Down Expand Up @@ -460,13 +451,9 @@ func testNotifications(t *testing.T, s *testSetup) {
key := t.Name() + "/" + rand.Text()

// Build the endorsement in scope so we can access inv.TxID before submitting.
signedProp, err := network.NewSignedProposal(s.signer, s.channel, s.namespace, "1.0", [][]byte{[]byte("invoke")})
if err != nil {
t.Fatalf("NewSignedProposal: %v", err)
}
inv, err := endorsement.Parse(signedProp, time.Time{})
inv, err := endorsement.NewInvocation(s.signer, s.channel, s.namespace, [][]byte{[]byte("invoke")})
if err != nil {
t.Fatalf("endorsement.Parse: %v", err)
t.Fatalf("NewInvocation: %v", err)
}
var responses []*peer.ProposalResponse
for _, b := range s.builders {
Expand Down
23 changes: 5 additions & 18 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/hyperledger/fabric-protos-go-apiv2/peer"
"github.com/hyperledger/fabric-x-common/protoutil"
sdk "github.com/hyperledger/fabric-x-sdk"
"github.com/hyperledger/fabric-x-sdk/blocks"
bfabx "github.com/hyperledger/fabric-x-sdk/blocks/fabricx"
Expand Down Expand Up @@ -357,13 +356,9 @@ func waitUntilSynced(t *testing.T, sync *network.Synchronizer, timeout time.Dura
}

func (s *testSetup) endorseAndSubmit(ctx context.Context, rws blocks.ReadWriteSet) error {
signedProp, err := network.NewSignedProposal(s.signer, s.channel, s.namespace, "1.0", [][]byte{[]byte("invoke")})
inv, err := endorsement.NewInvocation(s.signer, s.channel, s.namespace, [][]byte{[]byte("invoke")})
if err != nil {
return fmt.Errorf("NewSignedProposal: %w", err)
}
inv, err := endorsement.Parse(signedProp, time.Time{})
if err != nil {
return fmt.Errorf("endorsement.Parse: %w", err)
return fmt.Errorf("NewInvocation: %w", err)
}
result := endorsement.Success(rws, nil, nil)
var responses []*peer.ProposalResponse
Expand All @@ -382,13 +377,9 @@ func (s *testSetup) endorseAndSubmit(ctx context.Context, rws blocks.ReadWriteSe
// and returns the combined sdk.Endorsement (one response per builder).
func (s *testSetup) endorse(t *testing.T, endr *localEndorser, args [][]byte) sdk.Endorsement {
t.Helper()
prop, err := network.NewSignedProposal(s.signer, s.channel, s.namespace, "1.0", args)
if err != nil {
t.Fatalf("NewSignedProposal: %v", err)
}
inv, err := endorsement.Parse(prop, time.Time{})
inv, err := endorsement.NewInvocation(s.signer, s.channel, s.namespace, args)
if err != nil {
t.Fatalf("endorsement.Parse: %v", err)
t.Fatalf("NewInvocation: %v", err)
}
result := endr.result(inv)
var responses []*peer.ProposalResponse
Expand All @@ -399,11 +390,7 @@ func (s *testSetup) endorse(t *testing.T, endr *localEndorser, args [][]byte) sd
}
responses = append(responses, resp)
}
proposal, err := protoutil.UnmarshalProposal(prop.ProposalBytes)
if err != nil {
t.Fatalf("UnmarshalProposal: %v", err)
}
return sdk.Endorsement{Proposal: proposal, Responses: responses}
return sdk.Endorsement{Proposal: inv.Proposal, Responses: responses}
}

// waitForKeyValue polls until the key has the given value in the local DB.
Expand Down
155 changes: 150 additions & 5 deletions network/fabric/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ package fabric

import (
"context"
"crypto/rand"
"errors"
"fmt"
"sync"

"github.com/hyperledger/fabric-protos-go-apiv2/common"
peerpb "github.com/hyperledger/fabric-protos-go-apiv2/peer"
"github.com/hyperledger/fabric-x-common/protoutil"
sdk "github.com/hyperledger/fabric-x-sdk"
"github.com/hyperledger/fabric-x-sdk/blocks"
"github.com/hyperledger/fabric-x-sdk/blocks/fabric"
Expand All @@ -23,28 +29,49 @@ func NewPeer(conf network.PeerConf, channel string, signer sdk.Signer) (*Peer, e
if err != nil {
return nil, err
}
return &Peer{Peer: peer, channel: channel, signer: signer}, nil
return &Peer{
Peer: peer,
channel: channel,
signer: signer,
endorserClient: peerpb.NewEndorserClient(peer.Connection()),
}, nil
}

// Peer is a channel-bound client for a classic Fabric peer.
type Peer struct {
*network.Peer
channel string
signer sdk.Signer
channel string
signer sdk.Signer
endorserClient peerpb.EndorserClient
}

// SubscribeBlocks streams blocks from startBlock, invoking processor for each one.
func (p *Peer) SubscribeBlocks(ctx context.Context, startBlock uint64, processor network.BlockProcessor) error {
return p.Peer.SubscribeBlocks(ctx, p.channel, startBlock, p.signer, processor)
}

// ProcessProposal sends a proposal to the peer and returns the response.
// All non-nil responses are returned as-is regardless of status code — a 400
// or 500 is a valid signed reply from the endorser and the caller decides how
// to handle it. Only a nil Response field (malformed reply) is treated as an error.
func (p *Peer) ProcessProposal(ctx context.Context, signedProp *peerpb.SignedProposal) (*peerpb.ProposalResponse, error) {
resp, err := p.endorserClient.ProcessProposal(ctx, signedProp)
if err != nil {
return nil, fmt.Errorf("peer: %w", err)
}
if resp.Response == nil {
return nil, fmt.Errorf("peer returned nil response")
}
return resp, nil
}

// BlockHeight returns the current block height of the channel by querying the QSCC system chaincode.
func (p *Peer) BlockHeight(ctx context.Context) (uint64, error) {
prop, err := network.NewSignedProposal(p.signer, p.channel, "qscc", "1.0", [][]byte{[]byte("GetChainInfo"), []byte(p.channel)})
prop, err := NewSignedProposal(p.signer, p.channel, "qscc", [][]byte{[]byte("GetChainInfo"), []byte(p.channel)})
if err != nil {
return 0, err
}
res, err := p.Peer.ProcessProposal(ctx, prop)
res, err := p.ProcessProposal(ctx, prop)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -72,3 +99,121 @@ func NewSynchronizer(db network.BlockHeightReader, channel string, conf network.
logger,
)
}

// NewSignedProposal creates a new signed proposal to be submitted to a Fabric peer or endorser.
func NewSignedProposal(signer sdk.Signer, channel, namespace string, args [][]byte) (*peerpb.SignedProposal, error) {
creator, err := signer.Serialize()
if err != nil {
return nil, err
}

nonce := mustNonce()
proposal, _, err := protoutil.CreateChaincodeProposalWithTxIDNonceAndTransient(
protoutil.ComputeTxID(nonce, creator),
common.HeaderType_ENDORSER_TRANSACTION,
channel,
&peerpb.ChaincodeInvocationSpec{
ChaincodeSpec: &peerpb.ChaincodeSpec{
Type: peerpb.ChaincodeSpec_CAR,
ChaincodeId: &peerpb.ChaincodeID{
Name: namespace,
},
Input: &peerpb.ChaincodeInput{
Args: args,
},
},
},
nonce,
creator,
nil,
)
if err != nil {
return nil, err
}

return protoutil.GetSignedProposal(proposal, signer)
}

// EndorsementClient sends a proposal to one or more peers in parallel and collects
// their signed responses into an sdk.Endorsement.
type EndorsementClient struct {
peers []*Peer
signer sdk.Signer
channel string
}

// NewEndorsementClient dials all configured peers and returns a client ready to endorse transactions.
func NewEndorsementClient(config []network.PeerConf, signer sdk.Signer, channel string) (*EndorsementClient, error) {
if len(config) == 0 {
return nil, fmt.Errorf("no peers configured")
}
peers := make([]*Peer, len(config))
for i, c := range config {
var err error
peers[i], err = NewPeer(c, channel, signer)
if err != nil {
return nil, err
}
}
return &EndorsementClient{
peers: peers,
signer: signer,
channel: channel,
}, nil
}

// ExecuteTransaction sends args to all configured peers in parallel, collects their
// responses, and returns an sdk.Endorsement. Returns an error if any peer fails.
func (ec *EndorsementClient) ExecuteTransaction(ctx context.Context, namespace string, args [][]byte) (sdk.Endorsement, error) {
prop, err := NewSignedProposal(ec.signer, ec.channel, namespace, args)
if err != nil {
return sdk.Endorsement{}, err
}

type result struct {
resp *peerpb.ProposalResponse
err error
}
results := make([]result, len(ec.peers))
var wg sync.WaitGroup
for i, p := range ec.peers {
wg.Go(func() {
resp, err := p.ProcessProposal(ctx, prop)
results[i] = result{resp, err}
})
}
wg.Wait()

responses := make([]*peerpb.ProposalResponse, 0, len(ec.peers))
for _, r := range results {
if r.err != nil {
return sdk.Endorsement{}, fmt.Errorf("peer endorsement: %w", r.err)
}
responses = append(responses, r.resp)
}

proposal, err := protoutil.UnmarshalProposal(prop.ProposalBytes)
if err != nil {
return sdk.Endorsement{}, err
}
return sdk.Endorsement{Proposal: proposal, Responses: responses}, nil
}

// Close closes all peer connections.
func (ec *EndorsementClient) Close() error {
var errs []error
for _, p := range ec.peers {
if err := p.Close(); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

func mustNonce() []byte {
key := make([]byte, 24)
if _, err := rand.Read(key); err != nil {
panic(err)
}
return key
}
Loading
Loading