Skip to content

Commit cc4d3ab

Browse files
committed
Store last TX Info. to storage
- TX is treated as expired 1 minute(= 5 blocks) after send. - If the last TX has not been executed, sender.GetStatus() will sleep until the expiration time of the last TX. - When checking the TX result, if the TX is expired, BMVUnkown error is passed.
1 parent 9822924 commit cc4d3ab

File tree

2 files changed

+210
-53
lines changed

2 files changed

+210
-53
lines changed

chain/eth2/sender.go

Lines changed: 169 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"math"
2222
"math/big"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526
"sync"
@@ -30,63 +31,56 @@ import (
3031
"github.com/ethereum/go-ethereum/accounts/abi/bind"
3132
"github.com/ethereum/go-ethereum/common"
3233
etypes "github.com/ethereum/go-ethereum/core/types"
34+
"github.com/icon-project/btp2/common/codec"
3335
"github.com/icon-project/btp2/common/errors"
3436
"github.com/icon-project/btp2/common/log"
3537
"github.com/icon-project/btp2/common/types"
3638
"github.com/icon-project/btp2/common/wallet"
39+
"github.com/syndtr/goleveldb/leveldb"
3740

3841
"github.com/icon-project/btp2-eth2/chain/eth2/client"
3942
)
4043

4144
const (
42-
txMaxDataSize = 524288 //512 * 1024 // 512kB
43-
txOverheadScale = 0.37 //base64 encoding overhead 0.36, rlp and other fields 0.01
44-
GetTXResultInterval = SecondPerSlot * time.Second
45-
txPendingMAX = 3 // unit: finality
45+
txMaxDataSize = 524288 //512 * 1024 // 512kB
46+
txOverheadScale = 0.37 //base64 encoding overhead 0.36, rlp and other fields 0.01
47+
blockInterval = 12 * time.Second
48+
txExpireDuration = 5 * blockInterval
4649
)
4750

4851
var (
4952
txSizeLimit = int(math.Ceil(txMaxDataSize / (1 + txOverheadScale)))
53+
lastTXKey = []byte("last_tx")
5054
)
5155

5256
type request struct {
53-
rm types.RelayMessage
54-
txHash common.Hash
55-
txPendingCount int
56-
}
57-
58-
func (r *request) RelayMessage() types.RelayMessage {
59-
return r.rm
57+
rm types.RelayMessage
58+
transaction
6059
}
6160

6261
func (r *request) ID() string {
6362
return r.rm.Id()
6463
}
6564

66-
func (r *request) TxHash() common.Hash {
67-
return r.txHash
68-
}
69-
70-
func (r *request) SetTxHash(txHash common.Hash) {
71-
r.txHash = txHash
72-
}
73-
74-
func (r *request) SetTxPendingCount(value int) {
75-
r.txPendingCount = value
76-
}
77-
78-
func (r *request) IncTxPendingCount() int {
79-
r.txPendingCount += 1
80-
return r.txPendingCount
65+
func newRequest(rm types.RelayMessage, txHash common.Hash) *request {
66+
now := time.Now()
67+
return &request{
68+
rm: rm,
69+
transaction: transaction{
70+
TxHash: txHash,
71+
TS: now,
72+
ExpireTS: now.Add(txExpireDuration),
73+
},
74+
}
8175
}
8276

8377
func (r *request) Format(f fmt.State, c rune) {
8478
switch c {
8579
case 'v', 's':
8680
if f.Flag('+') {
87-
fmt.Fprintf(f, "request{id=%s txHash=%#x txPendingCount=%d)", r.ID(), r.txHash, r.txPendingCount)
81+
fmt.Fprintf(f, "request{id=%s tx=%+v)", r.ID(), r.transaction)
8882
} else {
89-
fmt.Fprintf(f, "request{%s %#x %d)", r.ID(), r.txHash, r.txPendingCount)
83+
fmt.Fprintf(f, "request{%s %v)", r.ID(), r.transaction)
9084
}
9185
}
9286
}
@@ -99,6 +93,7 @@ type sender struct {
9993
sc chan *types.RelayResult
10094
reqs []*request
10195
mtx sync.RWMutex
96+
db *leveldb.DB
10297

10398
cl *client.ConsensusLayer
10499
el *client.ExecutionLayer
@@ -116,6 +111,11 @@ func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt m
116111
l: l,
117112
sc: make(chan *types.RelayResult),
118113
}
114+
115+
if err = s.initDatabase(baseDir); err != nil {
116+
l.Panicf("fail to initialize database %v", err)
117+
}
118+
119119
s.el, err = client.NewExecutionLayer(endpoint, l)
120120
if err != nil {
121121
l.Panicf("fail to connect to %s, %v", endpoint, err)
@@ -137,6 +137,24 @@ func newSender(src, dst types.BtpAddress, w types.Wallet, endpoint string, opt m
137137
return s
138138
}
139139

140+
func (s *sender) initDatabase(baseDir string) error {
141+
var err error
142+
//dbDir := filepath.Join(baseDir, s.dst.NetworkAddress(), s.src.NetworkAddress())
143+
dbDir := filepath.Join(baseDir, s.src.NetworkAddress(), s.src.ContractAddress(),
144+
"sender", s.dst.NetworkAddress(), s.dst.ContractAddress())
145+
s.l.Debugln("open database", dbDir)
146+
s.db, err = leveldb.OpenFile(dbDir, nil)
147+
if err != nil {
148+
return errors.Wrap(err, "fail to open database")
149+
}
150+
defer func() {
151+
if err != nil {
152+
s.db.Close()
153+
}
154+
}()
155+
return nil
156+
}
157+
140158
func (s *sender) Start() (<-chan *types.RelayResult, error) {
141159
go s.handleFinalityUpdate()
142160

@@ -155,7 +173,12 @@ func (s *sender) Relay(rm types.RelayMessage) (string, error) {
155173
if tx, err := s.relay(rm); err != nil {
156174
return rm.Id(), err
157175
} else {
158-
s.addRequest(&request{rm: rm, txHash: tx.Hash()})
176+
req := newRequest(rm, tx.Hash())
177+
s.addRequest(req)
178+
if err = s.putLastTransaction(&req.transaction); err != nil {
179+
s.removeRequest(rm.Id())
180+
return rm.Id(), err
181+
}
159182
}
160183
return rm.Id(), nil
161184
}
@@ -201,6 +224,23 @@ func (s *sender) removeRequest(id string) {
201224
}
202225

203226
func (s *sender) getStatus(bn uint64) (*types.BMCLinkStatus, error) {
227+
if ltx, err := s.getLastTransaction(); err != nil {
228+
s.l.Errorf("fail to get last TX. %v", err)
229+
return nil, err
230+
} else if ltx != nil {
231+
s.l.Debugf("last TX. %+v", ltx)
232+
var pending bool
233+
if _, pending, err = s.el.TransactionByHash(ltx.TxHash); err != nil {
234+
s.l.Errorf("fail to query last TX %#x. %v", ltx.TxHash, err)
235+
return nil, err
236+
} else if pending {
237+
if !ltx.IsExpired() {
238+
s.l.Debugf("last TX %#x is not yet executed. sleep to %s", ltx.TxHash, ltx.ExpireTS)
239+
time.Sleep(ltx.GetDurationToExpire())
240+
}
241+
}
242+
}
243+
204244
var status client.TypesLinkStatus
205245
var callOpts *bind.CallOpts
206246
if bn != 0 {
@@ -238,43 +278,41 @@ func (s *sender) handleFinalityUpdate() {
238278
}
239279
}
240280

281+
func (s *sender) sendRelayResult(id string, errCode errors.Code, finalized bool) {
282+
s.sc <- &types.RelayResult{Id: id, Err: errCode, Finalized: finalized}
283+
}
284+
241285
func (s *sender) checkRelayResult(to uint64) {
242286
finished := make([]*request, 0)
243287
s.mtx.RLock()
244-
for i, req := range s.reqs {
245-
_, pending, err := s.el.TransactionByHash(req.TxHash())
288+
for _, req := range s.reqs {
289+
_, pending, err := s.el.TransactionByHash(req.TxHash)
246290
if err != nil {
247-
s.l.Warnf("can't get TX %#x. %v", req.TxHash(), err)
291+
s.l.Errorf("can't get TX via rpc. %+v. Drop %+v.", err, req)
292+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
248293
break
249294
}
250295
if pending {
251-
s.l.Debugf("TX %#x is not yet executed.", req.TxHash())
252-
if req.IncTxPendingCount() == txPendingMAX {
253-
s.l.Debugf("resend rm %s", req.ID())
254-
if tx, err := s.relay(req.RelayMessage()); err != nil {
255-
s.l.Errorf("fail to resend relay message %s", req.ID())
256-
} else {
257-
req.SetTxHash(tx.Hash())
258-
req.SetTxPendingCount(0)
259-
}
296+
if req.IsExpired() {
297+
s.l.Errorf("request is still pending. Drop %+v.", req)
298+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
299+
break
260300
}
261-
s.reqs[i] = req
262-
s.l.Debugf("update req: %+v", s.reqs[i])
263-
break
264301
}
265-
receipt, err := s.el.TransactionReceipt(req.TxHash())
302+
receipt, err := s.el.TransactionReceipt(req.TxHash)
266303
if err != nil {
267-
s.l.Warnf("can't get TX receipt for %#x. %v", req.TxHash(), err)
304+
s.l.Errorf("can't get TX receipt. %+v. Drop %+v.", err, req)
305+
s.sendRelayResult(req.ID(), errors.BMVUnknown, false)
268306
break
269307
}
270308
if to < receipt.BlockNumber.Uint64() {
271-
s.l.Debugf("%#x is not yet finalized", req.TxHash())
309+
s.l.Debugf("%#x is not yet finalized", req.TxHash)
272310
break
273311
}
274312
err = s.receiptToRevertError(receipt)
275313
errCode := errors.SUCCESS
276314
if err != nil {
277-
s.l.Debugf("result fail %v. %v", req, err)
315+
s.l.Debugf("result fail %+v. %v", req, err)
278316
if ec, ok := errors.CoderOf(err); ok {
279317
errCode = ec.ErrorCode()
280318
} else {
@@ -283,11 +321,7 @@ func (s *sender) checkRelayResult(to uint64) {
283321
} else {
284322
s.l.Debugf("result success. %v", req)
285323
}
286-
s.sc <- &types.RelayResult{
287-
Id: req.ID(),
288-
Err: errCode,
289-
Finalized: true,
290-
}
324+
s.sendRelayResult(req.ID(), errCode, true)
291325
finished = append(finished, req)
292326
}
293327
s.mtx.RUnlock()
@@ -316,3 +350,85 @@ func (s *sender) receiptToRevertError(receipt *etypes.Receipt) error {
316350
}
317351
return nil
318352
}
353+
354+
func (s *sender) putLastTransaction(tx *transaction) error {
355+
if bs, err := tx.Bytes(); err != nil {
356+
return err
357+
} else {
358+
if err = s.db.Put(lastTXKey, bs, nil); err != nil {
359+
return err
360+
}
361+
}
362+
return nil
363+
}
364+
365+
func (s *sender) getLastTransaction() (*transaction, error) {
366+
var bs []byte
367+
var has bool
368+
var err error
369+
if has, err = s.db.Has(lastTXKey, nil); err != nil {
370+
return nil, err
371+
} else if has {
372+
if bs, err = s.db.Get(lastTXKey, nil); err != nil {
373+
return nil, err
374+
} else {
375+
return newTransactionFromBytes(bs)
376+
}
377+
} else {
378+
return nil, nil
379+
}
380+
}
381+
382+
type transaction struct {
383+
TxHash common.Hash
384+
TS time.Time
385+
ExpireTS time.Time // wait for TX propagation and pending time
386+
}
387+
388+
func (tx *transaction) IsExpired() bool {
389+
return time.Now().After(tx.ExpireTS)
390+
}
391+
392+
func (tx *transaction) GetDurationToExpire() time.Duration {
393+
ts := tx.ExpireTS
394+
return ts.Sub(time.Now())
395+
}
396+
397+
func (tx *transaction) Bytes() ([]byte, error) {
398+
var bytes []byte
399+
if bs, err := codec.MarshalToBytes(&tx); err != nil {
400+
return nil, err
401+
} else {
402+
bytes = bs
403+
}
404+
return bytes, nil
405+
}
406+
407+
func (tx *transaction) Format(f fmt.State, c rune) {
408+
switch c {
409+
case 'v', 's':
410+
if f.Flag('+') {
411+
fmt.Fprintf(f, "transaction{txHash=%#x TS=%s expireTS=%s)",
412+
tx.TxHash, tx.TS.Format(time.RFC3339), tx.ExpireTS.Format(time.RFC3339))
413+
} else {
414+
fmt.Fprintf(f, "transaction{%#x %s %s)",
415+
tx.TxHash, tx.TS.Format(time.RFC3339), tx.ExpireTS.Format(time.RFC3339))
416+
}
417+
}
418+
}
419+
420+
func newTransaction(ts time.Time, txHash common.Hash) *transaction {
421+
return &transaction{
422+
TxHash: txHash,
423+
TS: ts,
424+
ExpireTS: ts.Add(txExpireDuration),
425+
}
426+
}
427+
428+
func newTransactionFromBytes(bs []byte) (*transaction, error) {
429+
ltx := new(transaction)
430+
if _, err := codec.UnmarshalFromBytes(bs, &ltx); err != nil {
431+
return nil, err
432+
}
433+
return ltx, nil
434+
}

chain/eth2/sender_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2023 ICON Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package eth2
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/ethereum/go-ethereum/common"
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestTransaction(t *testing.T) {
28+
ts := time.Now()
29+
txHash := common.HexToHash("0xfb9a68914db86ef3aef1465130692aab0f0c290058e2ae1339ac145f764dd73b")
30+
31+
ltx := newTransaction(ts, txHash)
32+
bs, err := ltx.Bytes()
33+
assert.NoError(t, err)
34+
35+
ltx2, err := newTransactionFromBytes(bs)
36+
assert.NoError(t, err)
37+
38+
assert.Equal(t, ltx.TxHash, ltx2.TxHash)
39+
assert.True(t, ltx.TS.Equal(ltx2.TS))
40+
assert.True(t, ltx.ExpireTS.Equal(ltx2.ExpireTS))
41+
}

0 commit comments

Comments
 (0)