Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
786d495
log reader without running full lp service
Farber98 Sep 10, 2025
9b7c312
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Sep 10, 2025
6bd7919
fix comms
Farber98 Sep 10, 2025
762d836
fix coms
Farber98 Sep 10, 2025
37535f2
nix lint
Farber98 Sep 10, 2025
36cf6a2
nix lint
Farber98 Sep 10, 2025
9e94c59
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Sep 10, 2025
4d15f1b
remove filters from utils and break dep cycle
Farber98 Sep 18, 2025
dd050d2
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Sep 18, 2025
b7b451e
make iface match
Farber98 Sep 18, 2025
a1194eb
lint
Farber98 Sep 18, 2025
5f419d2
o11y log provider
Farber98 Sep 19, 2025
298d437
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Sep 23, 2025
b2e05cb
raw log instead of o11y log
Farber98 Sep 24, 2025
bf57282
expose accessor configs + typeVersion
Farber98 Oct 1, 2025
24a9b4f
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Oct 1, 2025
8865657
expose accessor configs + typeVersion
Farber98 Oct 1, 2025
c6aa276
nix lint
Farber98 Oct 1, 2025
85f11a9
nix lint
Farber98 Oct 1, 2025
047878b
Merge branch 'main' into juan/accessor-expose-configs
Farber98 Oct 1, 2025
e793f6d
return all source chain configs
Farber98 Oct 3, 2025
cdd3006
Merge branch 'juan/accessor-expose-configs' into juan/fetch-logs-for-…
Farber98 Oct 3, 2025
0a4b7e2
remove type and version
Farber98 Oct 3, 2025
3659dec
fix ci dup
Farber98 Oct 3, 2025
6314050
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Oct 7, 2025
1efd23f
fix conflict
Farber98 Oct 7, 2025
cd892f3
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Oct 13, 2025
e639045
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Oct 14, 2025
e21bfc8
load cross chain snake data
Farber98 Oct 14, 2025
8dba13d
fix terms
Farber98 Oct 14, 2025
af646ad
Merge branch 'main' into juan/fetch-logs-for-o11y
Farber98 Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/logpoller/backend/loader/account/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (l *accountTxLoader) LoadTxsForAddresses(ctx context.Context, blockRange *t
for _, addr := range srcAddrs {
currAddr := addr
eg.Go(func() error {
txs, err := l.fetchTxsForAddress(egCtx, currAddr, blockRange)
txs, err := l.FetchTxsForAddress(egCtx, currAddr, blockRange)
if err != nil {
return fmt.Errorf("failed to fetch for %s: %w", currAddr.String(), err)
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (l *accountTxLoader) LoadTxsForAddresses(ctx context.Context, blockRange *t
//
// Note: Block range (prevBlock, toBlock] is exclusive of prevBlock, inclusive of toBlock
// TODO: stream tx back to log poller to avoid memory overhead in production
func (l *accountTxLoader) fetchTxsForAddress(ctx context.Context, addr *address.Address, blockRange *types.BlockRange) ([]types.TxWithBlock, error) {
func (l *accountTxLoader) FetchTxsForAddress(ctx context.Context, addr *address.Address, blockRange *types.BlockRange) ([]types.TxWithBlock, error) {
if blockRange.Prev != nil && blockRange.Prev.SeqNo >= blockRange.To.SeqNo {
return nil, fmt.Errorf("prevBlock %d is not before toBlock %d", blockRange.Prev.SeqNo, blockRange.To.SeqNo)
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/logpoller/backend/txparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/xssnick/tonutils-go/tlb"
"github.com/xssnick/tonutils-go/tvm/cell"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-ton/pkg/logpoller"
"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/types"
)
Expand Down Expand Up @@ -101,3 +102,15 @@ func (p *txParser) parseTx(ctx context.Context, tx types.TxWithBlock) ([]types.L
}
return allLogs, nil
}

// messageParserWithoutFilters provides a way to parse messages without using log poller filters
type messageParserWithoutFilters struct{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll need to add this if we remove filtering from parsers


func NewMessageParser() logpoller.MessageParser {
return &messageParserWithoutFilters{}
}

func (p *messageParserWithoutFilters) ParseExtMsgOut(msg *tlb.ExternalMessageOut) (uint32, *cell.Cell, error) {
// eventSig=0 to bypass filtering
return ParseExtMsgOut(msg, 0)
}
6 changes: 4 additions & 2 deletions pkg/logpoller/backend/txparser/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func ParseExtMsgOut(msg *tlb.ExternalMessageOut, eventSig uint32) (sig uint32, b
return 0, nil, errors.New("failed to decode event topic")
}

if topic != eventSig {
// eventSig == 0 bypasses filtering
if eventSig != 0 && topic != eventSig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I scoped this utility incorrectly when I first wrote it. This parsing utility should only handle parsing and return the sig, body, and error. The filtering and skipping should really be handled in each domain’s logic instead.

Could you update it accordingly? Otherwise, I can push the changes to this branch myself

return 0, nil, nil // topic doesn't match this event sig
}

Expand All @@ -41,7 +42,8 @@ func ParseInternalMsg(msg *tlb.InternalMessage, eventSig uint32) (sig uint32, bo
return 0, nil, fmt.Errorf("failed to extract opcode and body: %w", err)
}

if opcode != eventSig {
// eventSig == 0 bypasses filtering
Copy link
Contributor Author

@Farber98 Farber98 Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 here too. Added to be consistent despite I'm not using internal messages right now

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

if eventSig != 0 && opcode != eventSig {
return 0, nil, nil // opcode doesn't match this event sig
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/logpoller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/xssnick/tonutils-go/address"
"github.com/xssnick/tonutils-go/tlb"
"github.com/xssnick/tonutils-go/ton"
"github.com/xssnick/tonutils-go/tvm/cell"

"github.com/smartcontractkit/chainlink-common/pkg/services"

Expand Down Expand Up @@ -38,9 +40,12 @@ type FilterStore interface {

// TxLoader defines the interface for loading transactions from the TON blockchain.
type TxLoader interface {
// LoadTxsForAddresses retrieves all transactions from the specified source addresses
// LoadTxsForAddresses retrieves all transactions from multiple source addresses concurrently
// within the given block range (prevBlock, toBlock] - exclusive of prevBlock, inclusive of toBlock.
LoadTxsForAddresses(ctx context.Context, blockRange *types.BlockRange, srcAddrs []*address.Address) ([]types.TxWithBlock, error)
// FetchTxsForAddress retrieves all transactions from single source address
// within the given block range (prevBlock, toBlock] - exclusive of prevBlock, inclusive of toBlock.
FetchTxsForAddress(ctx context.Context, addr *address.Address, blockRange *types.BlockRange) ([]types.TxWithBlock, error)
}

// TxParser defines the interface for parsing raw blockchain transactions into structured logs.
Expand All @@ -52,6 +57,12 @@ type TxParser interface {
ParseTransactions(ctx context.Context, txs []types.TxWithBlock) ([]types.Log, error)
}

// MessageParser defines an interface for parsing TON messages
type MessageParser interface {
// ParseExtMsgOut extracts event signature and payload from an external message output.
ParseExtMsgOut(msg *tlb.ExternalMessageOut) (eventSig uint32, body *cell.Cell, err error)
}

// LogStore defines the interface for storing and retrieving logs.
type LogStore interface {
SaveLog(log types.Log)
Expand Down Expand Up @@ -102,3 +113,9 @@ type QueryBuilder[T any] interface {
// Execute runs the constructed query and returns the results.
Execute(ctx context.Context, store LogStore) (query.Result[T], error)
}

// LogReader provides a way to retrieve logs without running the full LogPoller service.
type LogReader interface {
// GetLogs retrieves all external message outputs for an address between fromBlockSeqNo (exclusive) and toBlock (inclusive).
GetLogs(ctx context.Context, addr *address.Address, fromBlockSeqNo uint32, toBlock *ton.BlockIDExt) ([]types.Log, error)
}
116 changes: 116 additions & 0 deletions pkg/logpoller/log_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package logpoller

import (
"context"
"fmt"
"strconv"
"time"

"github.com/xssnick/tonutils-go/address"
"github.com/xssnick/tonutils-go/tlb"
"github.com/xssnick/tonutils-go/ton"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/types"
)

var _ LogReader = (*logReader)(nil)

type logReader struct {
client ton.APIClientWrapped
lggr logger.Logger
loader TxLoader
msgParser MessageParser
}

// NewLogReader creates a new LogReader instance.
func NewLogReader(client ton.APIClientWrapped, lggr logger.Logger, loader TxLoader, msgParser MessageParser) LogReader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the new LogReader is essentially taking on the responsibilties of both the Loader and Parser from the existing LogPoller. I’m not entirely convinced that introducing a separate subcomponent is justified just for the sake of merging those functions — is there a specific motivation for creating this abstraction?

Another point I’d like to bring up is how we’re currently structuring O11Y ingestion. Right now, O11Y will consume types.Log into its ingestion pipeline based on the TON LogPoller. But we should think carefully about whether we want to pass along the refined Log type (which is domain-specific to log polling) or instead rely on each chain’s native raw transaction type.

In either case, O11Y ultimately performs the same step of converting input into its own log type. The question is whether it’s better to give O11Y a domain-specific log that might need constant updates (e.g., adding fee information or other chain-specific fields in the future), or just pass the raw transactions directly and let O11Y handle the conversion in a consistent way.

My concern is that if we continue extending the LogPoller type every time O11Y needs more fieds, we’ll end up overloading that type with responsibilities that don’t really belong to it. That’s why I’m leaning toward passing raw transactions instead — it feels more future-proof and keeps responsibilities clearer.

And if O11Y only consumes raw transactions, then there’s no need to introduce a separate structured type at all. In that case, a single component would be sufficient to handle ingestion, which would further simplify the design and reduce unnecessary abstractions. What do you think?

return &logReader{
client: client,
lggr: lggr,
loader: loader,
msgParser: msgParser,
}
}

// GetLogs retrieves all ExternalMsgOutLogs for an address between fromBlockSeqNo (exclusive) and toBlock (inclusive).
func (lr *logReader) GetLogs(ctx context.Context, addr *address.Address, fromBlockSeqNo uint32, toBlock *ton.BlockIDExt) ([]types.Log, error) {
// No new logs to fetch
if toBlock.SeqNo <= fromBlockSeqNo {
return nil, nil
}

// Resolve previous block if exists
var prevBlock *ton.BlockIDExt
var err error
if fromBlockSeqNo == 0 {
prevBlock = nil // genesis has no prevBlock
} else {
prevBlock, err = lr.client.LookupBlock(ctx, toBlock.Workchain, toBlock.Shard, fromBlockSeqNo)
if err != nil {
return nil, fmt.Errorf("failed to lookup block for address=%s, fromSeqNo=%d: %w", addr.String(), fromBlockSeqNo, err)
}
}

// Fetch tx for address on given blockRange
blockRange := &types.BlockRange{Prev: prevBlock, To: toBlock}
txs, err := lr.loader.FetchTxsForAddress(ctx, addr, blockRange)
if err != nil {
// display "genesis" if nil and don't panic
fromSeqNoStr := "genesis"
if prevBlock != nil {
fromSeqNoStr = strconv.FormatUint(uint64(prevBlock.SeqNo), 10)
}

return nil, fmt.Errorf("failed to fetch transactions fromSeqNo=%s, toSeqNo=%d: %w", fromSeqNoStr, toBlock.SeqNo, err)
}

// Extract only externalMsgOut logs that we found in all these txes.
logs, err := lr.extractExternalMsgOutLogs(txs)
if err != nil {
return nil, fmt.Errorf("failed to extract logs for address=%s: %w", addr.String(), err)
}

return logs, nil
}

func (lr *logReader) extractExternalMsgOutLogs(txs []types.TxWithBlock) ([]types.Log, error) {
var allLogs []types.Log

for _, tx := range txs {
msgs, _ := tx.Tx.IO.Out.ToSlice()
for _, msg := range msgs {
// Skip any message that's not an external out message
if msg.MsgType != tlb.MsgTypeExternalOut {
continue
}

srcAddr := msg.Msg.SenderAddr()
extMsg := msg.AsExternalOut()

// Fail hard so we don't skip events. We want at-least-once delivery guarantees on events
eventSig, body, err := lr.msgParser.ParseExtMsgOut(extMsg)
if err != nil {
return nil, fmt.Errorf("failed to parse external message out for txHash=%v, LT=%d: %w", tx.Tx.Hash, tx.Tx.LT, err)
}

// If we got a valid event and body
if body != nil && eventSig != 0 {
log := types.Log{
EventSig: eventSig,
Address: srcAddr,
Data: body,
TxHash: types.TxHash(tx.Tx.Hash),
TxLT: tx.Tx.LT,
TxTimestamp: time.Unix(int64(tx.Tx.Now), 0).UTC(),
Block: tx.Block,
}

allLogs = append(allLogs, log)
}
}
}

return allLogs, nil
}
Loading