Skip to content

Commit 36c0fb1

Browse files
authored
Merge pull request #10 from icon-project/store-last-tx-info-to-storage
Store last TX info to storage
2 parents 544db8f + cc4d3ab commit 36c0fb1

File tree

6 files changed

+227
-534
lines changed

6 files changed

+227
-534
lines changed

chain/eth2/eth2factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ func NewReceiver(srcCfg link.ChainConfig, dstAddr types.BtpAddress, baseDir stri
4040
return newReceiver(srcCfg.GetAddress(), dstAddr, src.Endpoint, src.Options, l), nil
4141
}
4242

43-
func NewSender(srcAddr types.BtpAddress, dstCfg link.ChainConfig, l log.Logger) (types.Sender, error) {
43+
func NewSender(srcAddr types.BtpAddress, dstCfg link.ChainConfig, baseDir string, l log.Logger) (types.Sender, error) {
4444
dst := dstCfg.(chain.BaseConfig)
4545
w, err := newWallet(dst.KeyStorePass, dst.KeySecret, dst.KeyStore)
4646
if err != nil {
4747
return nil, err
4848
}
4949

50-
return newSender(srcAddr, dst.Address, w, dst.Endpoint, dst.Options, l), nil
50+
return newSender(srcAddr, dst.Address, w, dst.Endpoint, dst.Options, baseDir, l), nil
5151
}
5252

5353
func newWallet(passwd, secret string, keyStorePath string) (types.Wallet, error) {

chain/eth2/receiver_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
"github.com/icon-project/btp2-eth2/chain/eth2/proof"
2323
)
2424

25-
func newReceiver(src, dest types.BtpAddress) *receiver {
26-
r := NewReceiver(
25+
func newTestReceiver(src, dest types.BtpAddress) *receiver {
26+
r := newReceiver(
2727
src,
2828
dest,
2929
"https://sepolia.infura.io/v3/ffbf8ebe228f4758ae82e175640275e0",
@@ -36,7 +36,7 @@ func newReceiver(src, dest types.BtpAddress) *receiver {
3636
}
3737

3838
func TestReceiver_BlockUpdate(t *testing.T) {
39-
r := newReceiver(
39+
r := newTestReceiver(
4040
types.BtpAddress("btp://0xaa36a7.eth/0x11167e875E08a113706e8bA3010ac37329b0E6b2"),
4141
types.BtpAddress("btp://0x42.icon/cx8642ab29e608915b43e677d9bcb17ec902b4ec8b"),
4242
)
@@ -113,7 +113,7 @@ func VerifySyncAggregate(t *testing.T, r *receiver, bu *blockUpdateData) {
113113
}
114114

115115
func TestReceiver_BlockProof(t *testing.T) {
116-
r := newReceiver(
116+
r := newTestReceiver(
117117
types.BtpAddress("btp://0xaa36a7.eth/0x11167e875E08a113706e8bA3010ac37329b0E6b2"),
118118
types.BtpAddress("btp://0x42.icon/cx8642ab29e608915b43e677d9bcb17ec902b4ec8b"),
119119
)
@@ -216,7 +216,7 @@ func TestReceiver_BlockProof(t *testing.T) {
216216

217217
func TestReceiver_MessageProof(t *testing.T) {
218218
slot := int64(2091171)
219-
r := newReceiver(
219+
r := newTestReceiver(
220220
types.BtpAddress("btp://0xaa36a7.eth/0x11167e875E08a113706e8bA3010ac37329b0E6b2"),
221221
types.BtpAddress("btp://0x42.icon/cx8642ab29e608915b43e677d9bcb17ec902b4ec8b"),
222222
)

chain/eth2/sender.go

Lines changed: 170 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"math"
2222
"math/big"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526
"sync"
@@ -30,63 +31,56 @@ import (
3031
"github.com/ethereum/go-ethereum/accounts/abi/bind"
3132
"github.com/ethereum/go-ethereum/common"
3233
etypes "github.com/ethereum/go-ethereum/core/types"
34+
"github.com/icon-project/btp2/common/codec"
3335
"github.com/icon-project/btp2/common/errors"
3436
"github.com/icon-project/btp2/common/log"
3537
"github.com/icon-project/btp2/common/types"
3638
"github.com/icon-project/btp2/common/wallet"
39+
"github.com/syndtr/goleveldb/leveldb"
3740

3841
"github.com/icon-project/btp2-eth2/chain/eth2/client"
3942
)
4043

4144
const (
42-
txMaxDataSize = 524288 //512 * 1024 // 512kB
43-
txOverheadScale = 0.37 //base64 encoding overhead 0.36, rlp and other fields 0.01
44-
GetTXResultInterval = SecondPerSlot * time.Second
45-
txPendingMAX = 3 // unit: finality
45+
txMaxDataSize = 524288 //512 * 1024 // 512kB
46+
txOverheadScale = 0.37 //base64 encoding overhead 0.36, rlp and other fields 0.01
47+
blockInterval = 12 * time.Second
48+
txExpireDuration = 5 * blockInterval
4649
)
4750

4851
var (
4952
txSizeLimit = int(math.Ceil(txMaxDataSize / (1 + txOverheadScale)))
53+
lastTXKey = []byte("last_tx")
5054
)
5155

5256
type request struct {
53-
rm types.RelayMessage
54-
txHash common.Hash
55-
txPendingCount int
56-
}
57-
58-
func (r *request) RelayMessage() types.RelayMessage {
59-
return r.rm
57+
rm types.RelayMessage
58+
transaction
6059
}
6160

6261
func (r *request) ID() string {
6362
return r.rm.Id()
6463
}
6564

66-
func (r *request) TxHash() common.Hash {
67-
return r.txHash
68-
}
69-
70-
func (r *request) SetTxHash(txHash common.Hash) {
71-
r.txHash = txHash
72-
}
73-
74-
func (r *request) SetTxPendingCount(value int) {
75-
r.txPendingCount = value
76-
}
77-
78-
func (r *request) IncTxPendingCount() int {
79-
r.txPendingCount += 1
80-
return r.txPendingCount
65+
func newRequest(rm types.RelayMessage, txHash common.Hash) *request {
66+
now := time.Now()
67+
return &request{
68+
rm: rm,
69+
transaction: transaction{
70+
TxHash: txHash,
71+
TS: now,
72+
ExpireTS: now.Add(txExpireDuration),
73+
},
74+
}
8175
}
8276

8377
func (r *request) Format(f fmt.State, c rune) {
8478
switch c {
8579
case 'v', 's':
8680
if f.Flag('+') {
87-
fmt.Fprintf(f, "request{id=%s txHash=%#x txPendingCount=%d)", r.ID(), r.txHash, r.txPendingCount)
81+
fmt.Fprintf(f, "request{id=%s tx=%+v)", r.ID(), r.transaction)
8882
} else {
89-
fmt.Fprintf(f, "request{%s %#x %d)", r.ID(), r.txHash, r.txPendingCount)
83+
fmt.Fprintf(f, "request{%s %v)", r.ID(), r.transaction)
9084
}
9185
}
9286
}
@@ -99,6 +93,7 @@ type sender struct {
9993
sc chan *types.RelayResult
10094
reqs []*request
10195
mtx sync.RWMutex
96+
db *leveldb.DB
10297

10398
cl *client.ConsensusLayer
10499
el *client.ExecutionLayer
@@ -107,7 +102,7 @@ type sender struct {
107102
gasLimit uint64
108103
}
109104

110-
func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt map[string]interface{}, l log.Logger) types.Sender {
105+
func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt map[string]interface{}, baseDir string, l log.Logger) types.Sender {
111106
var err error
112107
s := &sender{
113108
src: src,
@@ -116,6 +111,11 @@ func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt m
116111
l: l,
117112
sc: make(chan *types.RelayResult),
118113
}
114+
115+
if err = s.initDatabase(baseDir); err != nil {
116+
l.Panicf("fail to initialize database %v", err)
117+
}
118+
119119
s.el, err = client.NewExecutionLayer(endpoint, l)
120120
if err != nil {
121121
l.Panicf("fail to connect to %s, %v", endpoint, err)
@@ -137,6 +137,24 @@ func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt m
137137
return s
138138
}
139139

140+
func (s *sender) initDatabase(baseDir string) error {
141+
var err error
142+
//dbDir := filepath.Join(baseDir, s.dst.NetworkAddress(), s.src.NetworkAddress())
143+
dbDir := filepath.Join(baseDir, s.src.NetworkAddress(), s.src.ContractAddress(),
144+
"sender", s.dst.NetworkAddress(), s.dst.ContractAddress())
145+
s.l.Debugln("open database", dbDir)
146+
s.db, err = leveldb.OpenFile(dbDir, nil)
147+
if err != nil {
148+
return errors.Wrap(err, "fail to open database")
149+
}
150+
defer func() {
151+
if err != nil {
152+
s.db.Close()
153+
}
154+
}()
155+
return nil
156+
}
157+
140158
func (s *sender) Start() (<-chan *types.RelayResult, error) {
141159
go s.handleFinalityUpdate()
142160

@@ -155,7 +173,12 @@ func (s *sender) Relay(rm types.RelayMessage) (string, error) {
155173
if tx, err := s.relay(rm); err != nil {
156174
return rm.Id(), err
157175
} else {
158-
s.addRequest(&request{rm: rm, txHash: tx.Hash()})
176+
req := newRequest(rm, tx.Hash())
177+
s.addRequest(req)
178+
if err = s.putLastTransaction(&req.transaction); err != nil {
179+
s.removeRequest(rm.Id())
180+
return rm.Id(), err
181+
}
159182
}
160183
return rm.Id(), nil
161184
}
@@ -201,6 +224,23 @@ func (s *sender) removeRequest(id string) {
201224
}
202225

203226
func (s *sender) getStatus(bn uint64) (*types.BMCLinkStatus, error) {
227+
if ltx, err := s.getLastTransaction(); err != nil {
228+
s.l.Errorf("fail to get last TX. %v", err)
229+
return nil, err
230+
} else if ltx != nil {
231+
s.l.Debugf("last TX. %+v", ltx)
232+
var pending bool
233+
if _, pending, err = s.el.TransactionByHash(ltx.TxHash); err != nil {
234+
s.l.Errorf("fail to query last TX %#x. %v", ltx.TxHash, err)
235+
return nil, err
236+
} else if pending {
237+
if !ltx.IsExpired() {
238+
s.l.Debugf("last TX %#x is not yet executed. sleep to %s", ltx.TxHash, ltx.ExpireTS)
239+
time.Sleep(ltx.GetDurationToExpire())
240+
}
241+
}
242+
}
243+
204244
var status client.TypesLinkStatus
205245
var callOpts *bind.CallOpts
206246
if bn != 0 {
@@ -238,43 +278,41 @@ func (s *sender) handleFinalityUpdate() {
238278
}
239279
}
240280

281+
func (s *sender) sendRelayResult(id string, errCode errors.Code, finalized bool) {
282+
s.sc <- &types.RelayResult{Id: id, Err: errCode, Finalized: finalized}
283+
}
284+
241285
func (s *sender) checkRelayResult(to uint64) {
242286
finished := make([]*request, 0)
243287
s.mtx.RLock()
244-
for i, req := range s.reqs {
245-
_, pending, err := s.el.TransactionByHash(req.TxHash())
288+
for _, req := range s.reqs {
289+
_, pending, err := s.el.TransactionByHash(req.TxHash)
246290
if err != nil {
247-
s.l.Warnf("can't get TX %#x. %v", req.TxHash(), err)
291+
s.l.Errorf("can't get TX via rpc. %+v. Drop %+v.", err, req)
292+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
248293
break
249294
}
250295
if pending {
251-
s.l.Debugf("TX %#x is not yet executed.", req.TxHash())
252-
if req.IncTxPendingCount() == txPendingMAX {
253-
s.l.Debugf("resend rm %s", req.ID())
254-
if tx, err := s.relay(req.RelayMessage()); err != nil {
255-
s.l.Errorf("fail to resend relay message %s", req.ID())
256-
} else {
257-
req.SetTxHash(tx.Hash())
258-
req.SetTxPendingCount(0)
259-
}
296+
if req.IsExpired() {
297+
s.l.Errorf("request is still pending. Drop %+v.", req)
298+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
299+
break
260300
}
261-
s.reqs[i] = req
262-
s.l.Debugf("update req: %+v", s.reqs[i])
263-
break
264301
}
265-
receipt, err := s.el.TransactionReceipt(req.TxHash())
302+
receipt, err := s.el.TransactionReceipt(req.TxHash)
266303
if err != nil {
267-
s.l.Warnf("can't get TX receipt for %#x. %v", req.TxHash(), err)
304+
s.l.Errorf("can't get TX receipt. %+v. Drop %+v.", err, req)
305+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
268306
break
269307
}
270308
if to < receipt.BlockNumber.Uint64() {
271-
s.l.Debugf("%#x is not yet finalized", req.TxHash())
309+
s.l.Debugf("%#x is not yet finalized", req.TxHash)
272310
break
273311
}
274312
err = s.receiptToRevertError(receipt)
275313
errCode := errors.SUCCESS
276314
if err != nil {
277-
s.l.Debugf("result fail %v. %v", req, err)
315+
s.l.Debugf("result fail %+v. %v", req, err)
278316
if ec, ok := errors.CoderOf(err); ok {
279317
errCode = ec.ErrorCode()
280318
} else {
@@ -283,11 +321,7 @@ func (s *sender) checkRelayResult(to uint64) {
283321
} else {
284322
s.l.Debugf("result success. %v", req)
285323
}
286-
s.sc <- &types.RelayResult{
287-
Id: req.ID(),
288-
Err: errCode,
289-
Finalized: true,
290-
}
324+
s.sendRelayResult(req.ID(), errCode, true)
291325
finished = append(finished, req)
292326
}
293327
s.mtx.RUnlock()
@@ -316,3 +350,85 @@ func (s *sender) receiptToRevertError(receipt *etypes.Receipt) error {
316350
}
317351
return nil
318352
}
353+
354+
func (s *sender) putLastTransaction(tx *transaction) error {
355+
if bs, err := tx.Bytes(); err != nil {
356+
return err
357+
} else {
358+
if err = s.db.Put(lastTXKey, bs, nil); err != nil {
359+
return err
360+
}
361+
}
362+
return nil
363+
}
364+
365+
func (s *sender) getLastTransaction() (*transaction, error) {
366+
var bs []byte
367+
var has bool
368+
var err error
369+
if has, err = s.db.Has(lastTXKey, nil); err != nil {
370+
return nil, err
371+
} else if has {
372+
if bs, err = s.db.Get(lastTXKey, nil); err != nil {
373+
return nil, err
374+
} else {
375+
return newTransactionFromBytes(bs)
376+
}
377+
} else {
378+
return nil, nil
379+
}
380+
}
381+
382+
type transaction struct {
383+
TxHash common.Hash
384+
TS time.Time
385+
ExpireTS time.Time // wait for TX propagation and pending time
386+
}
387+
388+
func (tx *transaction) IsExpired() bool {
389+
return time.Now().After(tx.ExpireTS)
390+
}
391+
392+
func (tx *transaction) GetDurationToExpire() time.Duration {
393+
ts := tx.ExpireTS
394+
return ts.Sub(time.Now())
395+
}
396+
397+
func (tx *transaction) Bytes() ([]byte, error) {
398+
var bytes []byte
399+
if bs, err := codec.MarshalToBytes(&tx); err != nil {
400+
return nil, err
401+
} else {
402+
bytes = bs
403+
}
404+
return bytes, nil
405+
}
406+
407+
func (tx *transaction) Format(f fmt.State, c rune) {
408+
switch c {
409+
case 'v', 's':
410+
if f.Flag('+') {
411+
fmt.Fprintf(f, "transaction{txHash=%#x TS=%s expireTS=%s)",
412+
tx.TxHash, tx.TS.Format(time.RFC3339), tx.ExpireTS.Format(time.RFC3339))
413+
} else {
414+
fmt.Fprintf(f, "transaction{%#x %s %s)",
415+
tx.TxHash, tx.TS.Format(time.RFC3339), tx.ExpireTS.Format(time.RFC3339))
416+
}
417+
}
418+
}
419+
420+
func newTransaction(ts time.Time, txHash common.Hash) *transaction {
421+
return &transaction{
422+
TxHash: txHash,
423+
TS: ts,
424+
ExpireTS: ts.Add(txExpireDuration),
425+
}
426+
}
427+
428+
func newTransactionFromBytes(bs []byte) (*transaction, error) {
429+
ltx := new(transaction)
430+
if _, err := codec.UnmarshalFromBytes(bs, &ltx); err != nil {
431+
return nil, err
432+
}
433+
return ltx, nil
434+
}

0 commit comments

Comments
 (0)