Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 64 additions & 26 deletions input/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -41,6 +42,12 @@
kupoHealthTimeout = 3 * time.Second
)

// pollTx holds a transaction and its hash for one mempool poll.
type pollTx struct {
hash string
tx ledger.Transaction
}

type Mempool struct {
logger plugin.Logger
network string
Expand Down Expand Up @@ -105,9 +112,9 @@
m.kupoUrl = config.GetConfig().KupoUrl
}
if m.kupoUrl == "" {
m.logger.Info("Kupo URL not set; resolved inputs will be omitted (set KUPO_URL or --input-mempool-kupo-url)")
m.logger.Info("Kupo URL not set; inputs will be resolved from mempool only (chained txs). Set KUPO_URL or --input-mempool-kupo-url to also resolve on-chain inputs.")
} else {
m.logger.Info("Using Kupo for input resolution", "url", m.kupoUrl)
m.logger.Info("Using Kupo for input resolution (on-chain); mempool chained txs resolved from poll", "url", m.kupoUrl)
}

if err := m.setupConnection(); err != nil {
Expand Down Expand Up @@ -297,10 +304,6 @@

// Collect all txs this poll. We only need to remember last poll's hashes
// to emit events only for newly seen transactions.
type pollTx struct {
hash string
tx ledger.Transaction
}
var pollTxs []pollTx
for {
select {
Expand Down Expand Up @@ -334,21 +337,23 @@
thisPollHashes[p.hash] = struct{}{}
}

// Build UTxO set from this poll's transactions so chained mempool txs can
// resolve inputs (e.g. tx A spends an output of tx B, both in mempool).
mempoolUtxo := m.buildMempoolUtxo(pollTxs)

for _, p := range pollTxs {
if _, seen := m.seenTxHashes[p.hash]; seen {
continue
}
ctx := event.NewMempoolTransactionContext(p.tx, 0, m.networkMagic)
payload := event.NewTransactionEventFromTx(p.tx, m.includeCbor)
if m.kupoUrl != "" && !m.kupoDisabled {
resolvedInputs, err := m.resolveTransactionInputs(p.tx)
if err != nil {
if m.logger != nil {
m.logger.Warn("failed to resolve transaction inputs via Kupo, emitting without resolved inputs", "error", err)
}
} else if len(resolvedInputs) > 0 {
payload.ResolvedInputs = resolvedInputs
resolvedInputs, err := m.resolveTransactionInputs(p.tx, mempoolUtxo)
if err != nil {
if m.logger != nil {
m.logger.Warn("failed to resolve transaction inputs, emitting without resolved inputs", "error", err)
}
} else if len(resolvedInputs) > 0 {
payload.ResolvedInputs = resolvedInputs
}
evt := event.New("input.transaction", time.Now(), ctx, payload)
select {
Expand Down Expand Up @@ -419,16 +424,45 @@
return k, nil
}

func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction) ([]ledger.TransactionOutput, error) {
var resolvedInputs []ledger.TransactionOutput
k, err := m.getKupoClient()
if err != nil {
return nil, err
// buildMempoolUtxo builds a map from "txHash:outputIndex" to the transaction
// output so that chained mempool transactions (tx A spends an output of tx B,
// both in the same poll) can resolve inputs without requiring Kupo.
func (m *Mempool) buildMempoolUtxo(pollTxs []pollTx) map[string]ledger.TransactionOutput {
utxo := make(map[string]ledger.TransactionOutput)
for _, p := range pollTxs {
txID := p.hash
for idx, out := range p.tx.Outputs() {
key := txID + ":" + strconv.Itoa(idx)
utxo[key] = out
}
}
return utxo
}

func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction, mempoolUtxo map[string]ledger.TransactionOutput) ([]ledger.TransactionOutput, error) {

Check failure on line 442 in input/mempool/mempool.go

View workflow job for this annotation

GitHub Actions / lint

(*Mempool).resolveTransactionInputs - result 1 (error) is always nil (unparam)
var resolvedInputs []ledger.TransactionOutput
for _, input := range tx.Inputs() {
txID := input.Id().String()
txIndex := int(input.Index())
// Kupo output-reference pattern: output_index@transaction_id (see Kupo Patterns doc)
key := txID + ":" + strconv.Itoa(txIndex)

// Resolve from mempool first (chained txs: both in same poll).
if out, ok := mempoolUtxo[key]; ok {
resolvedInputs = append(resolvedInputs, out)
continue
}

// Fall back to Kupo for on-chain outputs.
if m.kupoUrl == "" || m.kupoDisabled {
continue
}
k, err := m.getKupoClient()
if err != nil {
if m.logger != nil {
m.logger.Debug("Kupo client unavailable, skipping Kupo resolution for this input", "error", err, "txId", txID, "index", txIndex)
}
continue
}
Comment on lines +476 to +480
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid per-input Kupo client initialization retries.

m.getKupoClient() is called inside the input loop. When Kupo is down, each unresolved input repeats client init/health-check work, amplifying latency and error volume for a single tx.

🔧 Suggested fix (fetch Kupo client once per transaction resolution)
 func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction, mempoolUtxo map[string]ledger.TransactionOutput) ([]ledger.TransactionOutput, error) {
 	var resolvedInputs []ledger.TransactionOutput
 	var resolveErrs []error
+	var kupoClient *kugo.Client
+	var kupoClientErr error
+	kupoChecked := false
 	for _, input := range tx.Inputs() {
 		txID := input.Id().String()
 		txIndex := int(input.Index())
 		key := txID + ":" + strconv.Itoa(txIndex)

 		// Resolve from mempool first (chained txs: both in same poll).
 		if out, ok := mempoolUtxo[key]; ok {
 			resolvedInputs = append(resolvedInputs, out)
 			continue
 		}

 		// Fall back to Kupo for on-chain outputs.
 		if m.kupoUrl == "" || m.kupoDisabled {
 			continue
 		}
-		k, err := m.getKupoClient()
-		if err != nil {
-			resolveErrs = append(resolveErrs, fmt.Errorf("input %s:%d kupo client: %w", txID, txIndex, err))
-			continue
-		}
+		if !kupoChecked {
+			kupoChecked = true
+			kupoClient, kupoClientErr = m.getKupoClient()
+			if kupoClientErr != nil {
+				resolveErrs = append(resolveErrs, fmt.Errorf("kupo client: %w", kupoClientErr))
+			}
+		}
+		if kupoClientErr != nil {
+			continue
+		}
 		pattern := fmt.Sprintf("%d@%s", txIndex, txID)
 		ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
-		matches, err := k.Matches(ctx, kugo.Pattern(pattern))
+		matches, err := kupoClient.Matches(ctx, kugo.Pattern(pattern))
 		cancel()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@input/mempool/mempool.go` around lines 476 - 480, The code calls
m.getKupoClient() inside the per-input loop, causing repeated client
initialization/health checks when Kupo is down; move the call out of the loop so
you obtain the Kupo client once per transaction resolution (e.g., call k, err :=
m.getKupoClient() before iterating inputs), handle a single error case by
appending to resolveErrs and skipping the whole input-processing loop for that
tx, and then reuse the same k variable inside the loop where resolveErrs
currently appends errors (references: m.getKupoClient(), resolveErrs, txID,
txIndex).

pattern := fmt.Sprintf("%d@%s", txIndex, txID)
ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
matches, err := k.Matches(ctx, kugo.Pattern(pattern))
Expand All @@ -439,21 +473,25 @@
if !m.kupoInvalidPatternLogged {
m.kupoInvalidPatternLogged = true
if m.logger != nil {
m.logger.Debug("Kupo does not support output-reference pattern, disabling input resolution", "error", err)
m.logger.Debug("Kupo does not support output-reference pattern, disabling Kupo input resolution", "error", err)
}
}
m.kupoDisabled = true
return resolvedInputs, nil
continue
}
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("kupo matches query timed out after %v", defaultKupoTimeout)
// Log and skip this input so we keep any already-resolved (e.g. mempool) inputs.
if m.logger != nil {
m.logger.Warn("Kupo lookup failed for input, skipping; partial resolved inputs preserved", "error", err, "txId", txID, "index", txIndex)
}
return nil, fmt.Errorf("error fetching matches for input TxId: %s, Index: %d: %w", txID, txIndex, err)
continue
}
for _, match := range matches {
out, err := chainsync.NewResolvedTransactionOutput(match)
if err != nil {
return nil, err
if m.logger != nil {
m.logger.Debug("failed to build resolved output from Kupo match, skipping", "error", err, "txId", txID, "index", txIndex)
}
continue
}
resolvedInputs = append(resolvedInputs, out)
}
Expand Down
Loading