Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 81 additions & 30 deletions input/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -41,6 +42,12 @@ const (
kupoHealthTimeout = 3 * time.Second
)

// pollTx holds a transaction and its hash for one mempool poll.
type pollTx struct {
hash string
tx ledger.Transaction
}

type Mempool struct {
logger plugin.Logger
network string
Expand Down Expand Up @@ -101,13 +108,26 @@ func (m *Mempool) Start() error {
}
m.doneChan = make(chan struct{})

// Reset Kupo state on each start so configuration changes or temporary
// errors don't permanently disable input resolution.
m.kupoClient = nil
m.kupoDisabled = false
m.kupoInvalidPatternLogged = false

if m.kupoUrl == "" {
m.kupoUrl = config.GetConfig().KupoUrl
}
if m.kupoUrl == "" {
m.logger.Info("Kupo URL not set; resolved inputs will be omitted (set KUPO_URL or --input-mempool-kupo-url)")
} else {
m.logger.Info("Using Kupo for input resolution", "url", m.kupoUrl)
if m.logger != nil {
if m.kupoUrl == "" {
m.logger.Info(
"Kupo URL not set; inputs will be resolved from mempool only (chained txs). Set KUPO_URL or --input-mempool-kupo-url to also resolve on-chain inputs.",
)
} else {
m.logger.Info(
"Using Kupo for input resolution (on-chain); mempool chained txs resolved from poll",
"url", m.kupoUrl,
)
}
}

if err := m.setupConnection(); err != nil {
Expand Down Expand Up @@ -297,10 +317,6 @@ func (m *Mempool) pollOnce() {

// Collect all txs this poll. We only need to remember last poll's hashes
// to emit events only for newly seen transactions.
type pollTx struct {
hash string
tx ledger.Transaction
}
var pollTxs []pollTx
for {
select {
Expand Down Expand Up @@ -334,21 +350,22 @@ func (m *Mempool) pollOnce() {
thisPollHashes[p.hash] = struct{}{}
}

// Build UTxO set from this poll's transactions so chained mempool txs can
// resolve inputs (e.g. tx A spends an output of tx B, both in mempool).
mempoolUtxo := m.buildMempoolUtxo(pollTxs)

for _, p := range pollTxs {
if _, seen := m.seenTxHashes[p.hash]; seen {
continue
}
ctx := event.NewMempoolTransactionContext(p.tx, 0, m.networkMagic)
payload := event.NewTransactionEventFromTx(p.tx, m.includeCbor)
if m.kupoUrl != "" && !m.kupoDisabled {
resolvedInputs, err := m.resolveTransactionInputs(p.tx)
if err != nil {
if m.logger != nil {
m.logger.Warn("failed to resolve transaction inputs via Kupo, emitting without resolved inputs", "error", err)
}
} else if len(resolvedInputs) > 0 {
payload.ResolvedInputs = resolvedInputs
}
resolvedInputs, resolveErr := m.resolveTransactionInputs(p.tx, mempoolUtxo)
if len(resolvedInputs) > 0 {
payload.ResolvedInputs = resolvedInputs
}
if resolveErr != nil && m.logger != nil {
m.logger.Warn("some transaction inputs could not be resolved; partial resolved inputs may be set", "error", resolveErr)
}
evt := event.New("input.transaction", time.Now(), ctx, payload)
select {
Expand Down Expand Up @@ -419,16 +436,48 @@ func (m *Mempool) getKupoClient() (*kugo.Client, error) {
return k, nil
}

func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction) ([]ledger.TransactionOutput, error) {
var resolvedInputs []ledger.TransactionOutput
k, err := m.getKupoClient()
if err != nil {
return nil, err
// buildMempoolUtxo builds a map from "txHash:outputIndex" to the transaction
// output so that chained mempool transactions (tx A spends an output of tx B,
// both in the same poll) can resolve inputs without requiring Kupo.
func (m *Mempool) buildMempoolUtxo(pollTxs []pollTx) map[string]ledger.TransactionOutput {
utxo := make(map[string]ledger.TransactionOutput)
for _, p := range pollTxs {
txID := p.hash
for idx, out := range p.tx.Outputs() {
key := txID + ":" + strconv.Itoa(idx)
utxo[key] = out
}
}
return utxo
}

// resolveTransactionInputs resolves each input from the mempool UTxO set (chained
// txs) or Kupo (on-chain). It always returns whatever could be resolved. If any
// input failed to resolve (e.g. Kupo error), the second return is a non-nil error
// so the caller can log it; partial results are still returned.
func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction, mempoolUtxo map[string]ledger.TransactionOutput) ([]ledger.TransactionOutput, error) {
var resolvedInputs []ledger.TransactionOutput
var resolveErrs []error
for _, input := range tx.Inputs() {
txID := input.Id().String()
txIndex := int(input.Index())
// Kupo output-reference pattern: output_index@transaction_id (see Kupo Patterns doc)
key := txID + ":" + strconv.Itoa(txIndex)

// Resolve from mempool first (chained txs: both in same poll).
if out, ok := mempoolUtxo[key]; ok {
resolvedInputs = append(resolvedInputs, out)
continue
}

// Fall back to Kupo for on-chain outputs.
if m.kupoUrl == "" || m.kupoDisabled {
continue
}
k, err := m.getKupoClient()
if err != nil {
resolveErrs = append(resolveErrs, fmt.Errorf("input %s:%d kupo client: %w", txID, txIndex, err))
continue
}
Comment on lines +476 to +480
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid per-input Kupo client initialization retries.

m.getKupoClient() is called inside the input loop. When Kupo is down, each unresolved input repeats client init/health-check work, amplifying latency and error volume for a single tx.

🔧 Suggested fix (fetch Kupo client once per transaction resolution)
 func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction, mempoolUtxo map[string]ledger.TransactionOutput) ([]ledger.TransactionOutput, error) {
 	var resolvedInputs []ledger.TransactionOutput
 	var resolveErrs []error
+	var kupoClient *kugo.Client
+	var kupoClientErr error
+	kupoChecked := false
 	for _, input := range tx.Inputs() {
 		txID := input.Id().String()
 		txIndex := int(input.Index())
 		key := txID + ":" + strconv.Itoa(txIndex)

 		// Resolve from mempool first (chained txs: both in same poll).
 		if out, ok := mempoolUtxo[key]; ok {
 			resolvedInputs = append(resolvedInputs, out)
 			continue
 		}

 		// Fall back to Kupo for on-chain outputs.
 		if m.kupoUrl == "" || m.kupoDisabled {
 			continue
 		}
-		k, err := m.getKupoClient()
-		if err != nil {
-			resolveErrs = append(resolveErrs, fmt.Errorf("input %s:%d kupo client: %w", txID, txIndex, err))
-			continue
-		}
+		if !kupoChecked {
+			kupoChecked = true
+			kupoClient, kupoClientErr = m.getKupoClient()
+			if kupoClientErr != nil {
+				resolveErrs = append(resolveErrs, fmt.Errorf("kupo client: %w", kupoClientErr))
+			}
+		}
+		if kupoClientErr != nil {
+			continue
+		}
 		pattern := fmt.Sprintf("%d@%s", txIndex, txID)
 		ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
-		matches, err := k.Matches(ctx, kugo.Pattern(pattern))
+		matches, err := kupoClient.Matches(ctx, kugo.Pattern(pattern))
 		cancel()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@input/mempool/mempool.go` around lines 476 - 480, The code calls
m.getKupoClient() inside the per-input loop, causing repeated client
initialization/health checks when Kupo is down; move the call out of the loop so
you obtain the Kupo client once per transaction resolution (e.g., call k, err :=
m.getKupoClient() before iterating inputs), handle a single error case by
appending to resolveErrs and skipping the whole input-processing loop for that
tx, and then reuse the same k variable inside the loop where resolveErrs
currently appends errors (references: m.getKupoClient(), resolveErrs, txID,
txIndex).

pattern := fmt.Sprintf("%d@%s", txIndex, txID)
ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
matches, err := k.Matches(ctx, kugo.Pattern(pattern))
Expand All @@ -439,27 +488,29 @@ func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction) ([]ledger.Tran
if !m.kupoInvalidPatternLogged {
m.kupoInvalidPatternLogged = true
if m.logger != nil {
m.logger.Debug("Kupo does not support output-reference pattern, disabling input resolution", "error", err)
m.logger.Debug("Kupo does not support output-reference pattern, disabling Kupo input resolution", "error", err)
}
}
m.kupoDisabled = true
return resolvedInputs, nil
}
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("kupo matches query timed out after %v", defaultKupoTimeout)
continue
}
return nil, fmt.Errorf("error fetching matches for input TxId: %s, Index: %d: %w", txID, txIndex, err)
resolveErrs = append(resolveErrs, fmt.Errorf("input %s:%d: %w", txID, txIndex, err))
continue
}
for _, match := range matches {
out, err := chainsync.NewResolvedTransactionOutput(match)
if err != nil {
return nil, err
resolveErrs = append(resolveErrs, fmt.Errorf("input %s:%d match: %w", txID, txIndex, err))
continue
}
resolvedInputs = append(resolvedInputs, out)
}
if len(matches) == 0 && m.logger != nil {
m.logger.Debug("Kupo returned no matches for input; ensure Kupo is run with a pattern that indexes this output (e.g. --match \"*\")", "pattern", pattern)
}
}
if len(resolveErrs) > 0 {
return resolvedInputs, errors.Join(resolveErrs...)
}
return resolvedInputs, nil
}