Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion relayer/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -439,7 +440,9 @@ func (t *TronTxm) maybeRetry(unconfirmedTx *InflightTx, bumpEnergy bool, isOutOf
}

func (t *TronTxm) checkFinalized() {
startTime := time.Now()
allConfirmed := t.AccountStore.GetAllConfirmed()

for acc, confirmedTxs := range allConfirmed {
store := t.AccountStore.GetTxStore(acc)

Expand Down Expand Up @@ -470,37 +473,78 @@ func (t *TronTxm) checkFinalized() {
}
}
}

duration := time.Since(startTime)
t.Logger.Debugw("checkFinalized completed",
"duration_ms", duration.Milliseconds(),
"total_tx_count", len(allConfirmed),
)
}

func (t *TronTxm) reapLoop() {
defer t.Done.Done()
ticker := time.NewTicker(t.Config.ReapInterval)
t.Logger.Debugw("reapLoop: started with interval", "interval", t.Config.ReapInterval)
defer ticker.Stop()

// Get initial CPU stats
var startCPU runtime.MemStats
runtime.ReadMemStats(&startCPU)
startTime := time.Now()

for {
select {
case <-ticker.C:
loopStartTime := time.Now()
var loopStartCPU runtime.MemStats
runtime.ReadMemStats(&loopStartCPU)

t.Logger.Debugw("reapLoop: reaping finished transactions")
cutoff := time.Now().Add(-t.Config.RetentionPeriod)
allFinished := t.AccountStore.GetAllFinished()
accountTxIds := make(map[string][]string)

for acc, finishedTxs := range allFinished {
t.Logger.Debugw("reapLoop: processing account", "account", acc, "tx_count", len(finishedTxs))
var idsToDelete []string
for _, ft := range finishedTxs {
if ft.RetentionTs.Before(cutoff) {
t.Logger.Debugw("reapLoop: deleting finished transaction", "txID", ft.Tx.ID)
idsToDelete = append(idsToDelete, ft.Tx.ID)
}
}
if len(idsToDelete) > 0 {
t.Logger.Debugw("reapLoop: adding account to delete", "account", acc, "tx_count", len(idsToDelete))
accountTxIds[acc] = idsToDelete
}
}

reapCount := 0
if len(accountTxIds) > 0 {
reapCount := t.AccountStore.DeleteAllFinishedTxs(accountTxIds)
t.Logger.Debugw("reapLoop: deleting finished transactions", "account_count", len(accountTxIds))
reapCount = t.AccountStore.DeleteAllFinishedTxs(accountTxIds)
if reapCount > 0 {
t.Logger.Debugw("reapLoop: reaped finished transactions", "count", reapCount)
}
}

// Calculate CPU and memory stats
loopDuration := time.Since(loopStartTime)
var loopEndCPU runtime.MemStats
runtime.ReadMemStats(&loopEndCPU)

totalDuration := time.Since(startTime)
totalAllocs := loopEndCPU.TotalAlloc - startCPU.TotalAlloc
loopAllocs := loopEndCPU.TotalAlloc - loopStartCPU.TotalAlloc

t.Logger.Debugw("reapLoop: completed",
"duration_ms", loopDuration.Milliseconds(),
"total_duration_ms", totalDuration.Milliseconds(),
"reap_count", reapCount,
"account_count", len(allFinished),
"total_allocations", totalAllocs,
"loop_allocations", loopAllocs,
"memory_heap_mb", loopEndCPU.HeapAlloc / 1024 / 1024)
case <-t.Stop:
t.Logger.Debugw("reapLoop: stopped")
return
Expand Down Expand Up @@ -577,6 +621,7 @@ func (t *TronTxm) estimateEnergy(tx *TronTx) (int64, error) {
// checkReorged attempts to fetch transaction info with retries to distinguish
// between temporary RPC failures and actual chain reorgs.
func (t *TronTxm) checkReorged(txHash string) bool {
startTime := time.Now()
_, err := t.GetClient().GetTransactionInfoByIdFullNode(txHash)
if err == nil {
return false
Expand All @@ -585,13 +630,25 @@ func (t *TronTxm) checkReorged(txHash string) bool {
ticker := time.NewTicker(REORG_RETRY_DELAY)
defer ticker.Stop()

retryCount := 0
for retry := uint(0); retry < REORG_RETRY_COUNT; retry++ {
<-ticker.C
retryCount++
_, err = t.GetClient().GetTransactionInfoByIdFullNode(txHash)
if err == nil {
duration := time.Since(startTime)
t.Logger.Debugw("checkReorged: transaction found after retry",
"txHash", txHash,
"duration_ms", duration.Milliseconds(),
"retry_count", retryCount)
return false
}
}

duration := time.Since(startTime)
t.Logger.Debugw("checkReorged: transaction confirmed missing",
"txHash", txHash,
"duration_ms", duration.Milliseconds(),
"retry_count", retryCount)
return true
}
Loading