Skip to content

Commit 63dca5f

Browse files
committed
change tle_peer block listener interface
1 parent 69e702c commit 63dca5f

File tree

4 files changed

+142
-73
lines changed

4 files changed

+142
-73
lines changed

tle_go/listener/listener.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,19 @@ func ListenBlock(channelID string, serverAddr string, seek int, quiet bool, caCe
9696
fmt.Println("failed to load config:", err)
9797
os.Exit(1)
9898
}
99+
// fmt.Println("conf:", conf)
100+
fmt.Println("localMSPDir:", conf.General.LocalMSPDir)
101+
fmt.Println("BCCSP:", conf.General.BCCSP)
102+
fmt.Println("localMSPID:", conf.General.LocalMSPID)
99103

100104
// Load local MSP
101105
mspConfig, err := msp.GetLocalMspConfig(conf.General.LocalMSPDir, conf.General.BCCSP, conf.General.LocalMSPID)
102106
if err != nil {
103107
fmt.Println("Failed to load MSP config:", err)
104108
os.Exit(0)
105109
}
110+
fmt.Println("mspConfig:", mspConfig)
111+
fmt.Println("factoryGetDefault:", factory.GetDefault())
106112
err = mspmgmt.GetLocalMSP(factory.GetDefault()).Setup(mspConfig)
107113
if err != nil { // Handle errors reading the config file
108114
fmt.Println("Failed to initialize local MSP:", err)

tle_go/tlecore/block_listener.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package tlecore
2+
3+
import (
4+
"fmt"
5+
"io/ioutil"
6+
"os"
7+
"path/filepath"
8+
"strconv"
9+
"time"
10+
11+
"github.com/hyperledger/fabric-private-chaincode/tle_go/listener"
12+
"github.com/hyperledger/fabric-protos-go/common"
13+
"github.com/hyperledger/fabric/protoutil"
14+
)
15+
16+
type BlockListener interface {
17+
GetNextBlockNum() int
18+
GetNextBlock() (*common.Block, error)
19+
NotifySuccess() error
20+
}
21+
22+
type FileBlockGetter struct {
23+
nextBlockNum int
24+
}
25+
26+
func NewFileBlockGetter() BlockListener {
27+
return &FileBlockGetter{
28+
nextBlockNum: 0,
29+
}
30+
}
31+
32+
func (f *FileBlockGetter) GetNextBlockNum() int {
33+
return f.nextBlockNum
34+
}
35+
36+
func (f *FileBlockGetter) GetNextBlock() (*common.Block, error) {
37+
// Simulating data retrieval from somewhere
38+
fmt.Printf("Start to get block num: %d\n", f.nextBlockNum)
39+
blockPath := os.Getenv("BLOCK_PATH")
40+
if blockPath == "" {
41+
blockPath = "tmpBlocks"
42+
}
43+
waitTime := 5
44+
maxWaitTime := 120
45+
for {
46+
rawBlock, err := ioutil.ReadFile(filepath.Join(blockPath, "t"+strconv.Itoa(int(f.nextBlockNum))+".block"))
47+
if err != nil {
48+
fmt.Printf("FileBlockGetter GetBlock Failed, %v, wait for a while...\n", err)
49+
50+
time.Sleep(time.Duration(waitTime) * time.Second)
51+
waitTime += 5
52+
if waitTime > maxWaitTime {
53+
waitTime = maxWaitTime
54+
}
55+
continue
56+
}
57+
return protoutil.UnmarshalBlock(rawBlock)
58+
}
59+
}
60+
61+
func (f *FileBlockGetter) NotifySuccess() error {
62+
f.nextBlockNum += 1
63+
return nil
64+
}
65+
66+
type OrdererBlockGetter struct {
67+
blockChan chan common.Block
68+
nextBlockNum int
69+
}
70+
71+
func NewOrdererBlockGetter() BlockListener {
72+
channelID := "testchannel"
73+
serverAddr := "127.0.0.1:20000"
74+
seek := -2 // -2 is load from oldest, -1 load from newest, other int -> load from the int.
75+
quiet := true // true = only print block number, false = print whole block.
76+
caCertPath := "/Users/lew/go/src/github.com/hyperledger/fabric-private-chaincode/samples/deployment/fabric-smart-client/the-simple-testing-network/testdata/fabric.default/crypto/ca-certs.pem"
77+
blockChan := make(chan common.Block, 1)
78+
79+
go listener.ListenBlock(channelID, serverAddr, seek, quiet, caCertPath, blockChan)
80+
81+
return &OrdererBlockGetter{
82+
blockChan: blockChan,
83+
nextBlockNum: 0,
84+
}
85+
}
86+
87+
func (o *OrdererBlockGetter) GetNextBlockNum() int {
88+
return o.nextBlockNum
89+
}
90+
91+
func (o *OrdererBlockGetter) GetNextBlock() (*common.Block, error) {
92+
// TODO will need to get the same block if previous failed.
93+
waitTime := 5
94+
maxWaitTime := 120
95+
for {
96+
select {
97+
case block := <-o.blockChan:
98+
return &block, nil
99+
case <-time.After(time.Duration(waitTime) * time.Second):
100+
fmt.Println("Still waiting for new block: block", o.nextBlockNum)
101+
}
102+
waitTime += 5
103+
if waitTime > maxWaitTime {
104+
waitTime = maxWaitTime
105+
}
106+
}
107+
}
108+
109+
func (o *OrdererBlockGetter) NotifySuccess() error {
110+
o.nextBlockNum += 1
111+
return nil
112+
}

tle_go/tlecore/tle_peer.go

Lines changed: 24 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@ package tlecore
33
import (
44
"crypto/sha256"
55
"fmt"
6-
"io/ioutil"
7-
"os"
8-
"path/filepath"
9-
"strconv"
10-
"sync"
11-
"time"
126

137
"github.com/hyperledger/fabric-protos-go/common"
148
"github.com/hyperledger/fabric/common/policies"
@@ -20,9 +14,8 @@ import (
2014
)
2115

2216
type TlePeer struct {
23-
tleState *Tlestate
24-
nextBlockNum uint
25-
mutex sync.Mutex
17+
tleState *Tlestate
18+
blockListener BlockListener
2619

2720
channelName string
2821
lc *committer.LedgerCommitter
@@ -75,13 +68,13 @@ func (p *TlePeer) vsccExtractRwsetRaw(block *common.Block, txPosition int, actio
7568
}
7669

7770
func (p *TlePeer) UpdateState(block *common.Block) error {
78-
for tIdx, _ := range block.Data.Data {
71+
for tIdx := range block.Data.Data {
7972
// TODO: continue if current txn is invalid.
8073
txsfltr := ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
8174
fmt.Printf("blockNum %d, tIdx: %d, validationCode: %v\n", tIdx, block.Header.Number, txsfltr.Flag(tIdx))
8275
if txsfltr.IsInvalid(tIdx) {
8376
fmt.Println("The current txn is not valid!")
84-
// continue
77+
continue
8578
}
8679

8780
// extract rwset
@@ -118,26 +111,12 @@ func (p *TlePeer) UpdateState(block *common.Block) error {
118111
return nil
119112
}
120113

121-
func (p *TlePeer) GetBlock() (*common.Block, error) {
122-
// Simulating data retrieval from somewhere
123-
fmt.Printf("Start to get block num: %d\n", p.GetNextBlockNum())
124-
blockPath := os.Getenv("BLOCK_PATH")
125-
if blockPath == "" {
126-
blockPath = "tmpBlocks"
127-
}
128-
rawBlock, err := ioutil.ReadFile(filepath.Join(blockPath, "t"+strconv.Itoa(int(p.GetNextBlockNum()))+".block"))
129-
if err != nil {
130-
return nil, err
131-
}
132-
return protoutil.UnmarshalBlock(rawBlock)
133-
}
134-
135-
func (p *TlePeer) ProcessBlock(block *common.Block) error {
136-
err := VerifyBlock(p.policyMgr, []byte(p.channelName), uint64(p.GetNextBlockNum()), block)
114+
func (p *TlePeer) ProcessBlock(block *common.Block, blockNum int) error {
115+
err := VerifyBlock(p.policyMgr, []byte(p.channelName), uint64(blockNum), block)
137116
if err != nil {
138117
return err
139118
}
140-
fmt.Printf("--- Verify Block %d success, start verify txn ---\n", p.GetNextBlockNum())
119+
fmt.Printf("--- Verify Block %d success, start verify txn ---\n", uint64(blockNum))
141120
err = p.validator.Validate(block)
142121
if err != nil {
143122
return err
@@ -149,24 +128,10 @@ func (p *TlePeer) ProcessBlock(block *common.Block) error {
149128
}
150129

151130
// update state
152-
p.IncrementNextBlockNum()
153131
return p.UpdateState(block)
154132
}
155133

156-
func (p *TlePeer) GetNextBlockNum() uint {
157-
p.mutex.Lock()
158-
defer p.mutex.Unlock()
159-
return p.nextBlockNum
160-
}
161-
162-
func (p *TlePeer) IncrementNextBlockNum() {
163-
p.mutex.Lock()
164-
defer p.mutex.Unlock()
165-
p.nextBlockNum += 1
166-
}
167-
168-
func (p *TlePeer) InitFabricPart() func() {
169-
genesisBlock := GetGenesisBlock()
134+
func (p *TlePeer) InitFabricPart(genesisBlock *common.Block) func() {
170135
peerInstance, cleanup := peer.NewFabricPeer()
171136

172137
err := InitializeFabricPeer(peerInstance)
@@ -191,56 +156,42 @@ func (p *TlePeer) InitFabricPart() func() {
191156
p.policyMgr = policyMgr
192157
p.validator = validator
193158
p.channelName = channelName
194-
195159
return cleanup
196160
}
197161

198162
func (p *TlePeer) Start() {
199-
cleanup := p.InitFabricPart()
163+
genesisBlock, err := p.blockListener.GetNextBlock()
164+
if err != nil {
165+
fmt.Println("Get genesis block failed: ", err)
166+
}
167+
p.blockListener.NotifySuccess()
168+
169+
cleanup := p.InitFabricPart(genesisBlock)
200170
defer cleanup()
201171

202-
waitTime := 1
203172
for {
204-
// wait several second to update one block
205-
time.Sleep(time.Duration(waitTime) * time.Second)
206-
207-
block, err := p.GetBlock()
173+
blocknum := p.blockListener.GetNextBlockNum()
174+
block, err := p.blockListener.GetNextBlock()
208175
if err != nil {
209176
fmt.Printf("TlePeer GetBlock Failed, %v\n", err)
210-
waitTime = waitTime * 2
211177
continue
212178
}
213-
err = p.ProcessBlock(block)
179+
err = p.ProcessBlock(block, blocknum)
214180
if err != nil {
215181
fmt.Printf("TlePeer Process Block error, %v\n", err)
182+
continue
216183
}
217-
waitTime = 1
218-
}
219-
}
220-
221-
func GetGenesisBlock() *common.Block {
222-
// TODO get from somewhere else.
223-
blockPath := os.Getenv("BLOCK_PATH")
224-
if blockPath == "" {
225-
blockPath = "tmpBlocks"
226-
}
227-
rawBlock0, err := ioutil.ReadFile(filepath.Join(blockPath, "t0.block"))
228-
if err != nil {
229-
panic(fmt.Sprintf("read genesis block error, err: %v, blockPath: %s", err, blockPath))
230-
}
231-
fmt.Println("Finish reading genesis block!!!")
232-
genesisBlock, err := protoutil.UnmarshalBlock(rawBlock0)
233-
if err != nil {
234-
panic("Unmarshal genesis block error")
184+
p.blockListener.NotifySuccess()
235185
}
236-
return genesisBlock
237186
}
238187

239188
func ServePeer(tleState *Tlestate) {
240189
// TODO change the logic here.
190+
blockListener := NewFileBlockGetter()
191+
// blockListener := NewOrdererBlockGetter()
241192
peer := &TlePeer{
242-
tleState: tleState,
243-
nextBlockNum: 1,
193+
tleState: tleState,
194+
blockListener: blockListener,
244195
}
245196
go peer.Start()
246197
}

0 commit comments

Comments
 (0)