-
Notifications
You must be signed in to change notification settings - Fork 2
log reader without running full lp service #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
786d495
9b7c312
6bd7919
762d836
37535f2
36cf6a2
9e94c59
4d15f1b
dd050d2
b7b451e
a1194eb
5f419d2
298d437
b2e05cb
bf57282
24a9b4f
8865657
c6aa276
85f11a9
047878b
e793f6d
cdd3006
0a4b7e2
3659dec
6314050
1efd23f
cd892f3
e639045
e21bfc8
8dba13d
af646ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ func ParseExtMsgOut(msg *tlb.ExternalMessageOut, eventSig uint32) (sig uint32, b | |
return 0, nil, errors.New("failed to decode event topic") | ||
} | ||
|
||
if topic != eventSig { | ||
// eventSig == 0 bypasses filtering | ||
if eventSig != 0 && topic != eventSig { | ||
|
||
return 0, nil, nil // topic doesn't match this event sig | ||
} | ||
|
||
|
@@ -41,7 +42,8 @@ func ParseInternalMsg(msg *tlb.InternalMessage, eventSig uint32) (sig uint32, bo | |
return 0, nil, fmt.Errorf("failed to extract opcode and body: %w", err) | ||
} | ||
|
||
if opcode != eventSig { | ||
// eventSig == 0 bypasses filtering | ||
|
||
if eventSig != 0 && opcode != eventSig { | ||
return 0, nil, nil // opcode doesn't match this event sig | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package logpoller | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/xssnick/tonutils-go/address" | ||
"github.com/xssnick/tonutils-go/tlb" | ||
"github.com/xssnick/tonutils-go/ton" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/types" | ||
) | ||
|
||
var _ LogReader = (*logReader)(nil) | ||
|
||
type logReader struct { | ||
client ton.APIClientWrapped | ||
lggr logger.Logger | ||
loader TxLoader | ||
msgParser MessageParser | ||
} | ||
|
||
// NewLogReader creates a new LogReader instance. | ||
func NewLogReader(client ton.APIClientWrapped, lggr logger.Logger, loader TxLoader, msgParser MessageParser) LogReader { | ||
|
||
return &logReader{ | ||
client: client, | ||
lggr: lggr, | ||
loader: loader, | ||
msgParser: msgParser, | ||
} | ||
} | ||
|
||
// GetLogs retrieves all ExternalMsgOutLogs for an address between fromBlockSeqNo (exclusive) and toBlockSeqNo (inclusive). | ||
Farber98 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
func (lr *logReader) GetLogs(ctx context.Context, addr *address.Address, fromBlockSeqNo uint32, toBlock *ton.BlockIDExt) ([]types.Log, error) { | ||
// No new logs to fetch | ||
if toBlock.SeqNo <= fromBlockSeqNo { | ||
return nil, nil | ||
} | ||
|
||
// Resolve previous block if exists | ||
var prevBlock *ton.BlockIDExt | ||
var err error | ||
if fromBlockSeqNo == 0 { | ||
prevBlock = nil // genesis has no prevBlock | ||
} else { | ||
prevBlock, err = lr.client.LookupBlock(ctx, toBlock.Workchain, toBlock.Shard, fromBlockSeqNo) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to lookup block for address=%s, fromSeqNo=%d: %w", addr.String(), fromBlockSeqNo, err) | ||
} | ||
} | ||
|
||
// Fetch tx for address on given blockRange | ||
blockRange := &types.BlockRange{Prev: prevBlock, To: toBlock} | ||
txs, err := lr.loader.FetchTxsForAddress(ctx, addr, blockRange) | ||
if err != nil { | ||
// display "genesis" if nil and don't panic | ||
fromSeqNoStr := "genesis" | ||
if prevBlock != nil { | ||
fromSeqNoStr = fmt.Sprintf("%d", prevBlock.SeqNo) | ||
} | ||
|
||
return nil, fmt.Errorf("failed to fetch transactions fromSeqNo=%s, toSeqNo=%d: %w", fromSeqNoStr, toBlock.SeqNo, err) | ||
} | ||
|
||
// Extract only externalMsgOut logs that we found in all these txes. | ||
logs, err := lr.extractExternalMsgOutLogs(txs) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to extract logs for address=%s: %w", addr.String(), err) | ||
} | ||
|
||
return logs, nil | ||
} | ||
|
||
func (lr *logReader) extractExternalMsgOutLogs(txs []types.TxWithBlock) ([]types.Log, error) { | ||
var allLogs []types.Log | ||
|
||
for _, tx := range txs { | ||
msgs, _ := tx.Tx.IO.Out.ToSlice() | ||
for _, msg := range msgs { | ||
// Skip any message that's not an external out message | ||
if msg.MsgType != tlb.MsgTypeExternalOut { | ||
continue | ||
} | ||
|
||
srcAddr := msg.Msg.SenderAddr() | ||
extMsg := msg.AsExternalOut() | ||
|
||
// Fail hard so we don't skip events. We want at-least-once delivery guarantees on events | ||
eventSig, body, err := lr.msgParser.ParseExtMsgOut(extMsg) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse external message out for txHash=%v, LT=%d: %w", tx.Tx.Hash, tx.Tx.LT, err) | ||
} | ||
|
||
// If we got a valid event and body | ||
if body != nil && eventSig != 0 { | ||
log := types.Log{ | ||
EventSig: eventSig, | ||
Address: srcAddr, | ||
Data: body, | ||
TxHash: types.TxHash(tx.Tx.Hash), | ||
TxLT: tx.Tx.LT, | ||
TxTimestamp: time.Unix(int64(tx.Tx.Now), 0).UTC(), | ||
Block: tx.Block, | ||
} | ||
|
||
allLogs = append(allLogs, log) | ||
} | ||
} | ||
} | ||
|
||
return allLogs, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we'll need to add this if we remove filtering from parsers