Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions event/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package event

import (
"encoding/json"

"github.com/blinklabs-io/gouroboros/ledger"
lcommon "github.com/blinklabs-io/gouroboros/ledger/common"
)
Expand Down Expand Up @@ -43,6 +45,18 @@ type TransactionEvent struct {
TTL uint64 `json:"ttl,omitempty"`
}

// MarshalJSON implements json.Marshaler so that Witnesses (not JSON-serializable) are omitted
// when logging or serializing events.
func (e TransactionEvent) MarshalJSON() ([]byte, error) {
type alias TransactionEvent
return json.Marshal(&struct {
Witnesses *struct{} `json:"witnesses,omitempty"` // override: omit non-JSON-serializable TransactionWitnessSet
*alias
}{
alias: (*alias)(&e),
})
}

func NewTransactionContext(
block ledger.Block,
tx ledger.Transaction,
Expand Down
7 changes: 6 additions & 1 deletion input/chainsync/transaction_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ func ExtractAssetDetailsFromMatch(
continue // Skip adding "lovelace" to assetsMap, as it is handled separately
}

byteStringAssetName := cbor.NewByteString([]byte(assetName))
// Kupo returns asset names as hex; decode so JSON gets correct name (UTF-8) and nameHex
assetNameBytes := []byte(assetName)
if decoded, err := hex.DecodeString(assetName); err == nil && len(decoded) > 0 {
assetNameBytes = decoded
}
byteStringAssetName := cbor.NewByteString(assetNameBytes)
policyAssets[byteStringAssetName] = amount.BigInt()
slog.Debug("Get policyId, assetName, assetAmount from match.Value")
slog.Debug(
Expand Down
127 changes: 127 additions & 0 deletions input/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
package mempool

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/SundaeSwap-finance/kugo"
"github.com/blinklabs-io/adder/event"
"github.com/blinklabs-io/adder/input/chainsync"
"github.com/blinklabs-io/adder/internal/config"
"github.com/blinklabs-io/adder/internal/logging"
"github.com/blinklabs-io/adder/plugin"
ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/ledger"
Expand All @@ -29,6 +37,8 @@ import (

const (
defaultPollInterval = 5 * time.Second
defaultKupoTimeout = 30 * time.Second
kupoHealthTimeout = 3 * time.Second
)

type Mempool struct {
Expand All @@ -41,6 +51,7 @@ type Mempool struct {
includeCbor bool
pollIntervalStr string
pollInterval time.Duration
kupoUrl string

eventChan chan event.Event
errorChan chan error
Expand All @@ -51,6 +62,10 @@ type Mempool struct {
dialFamily string
dialAddress string
seenTxHashes map[string]struct{}

kupoClient *kugo.Client
kupoDisabled bool
kupoInvalidPatternLogged bool
}

// New returns a new Mempool input plugin
Expand Down Expand Up @@ -86,6 +101,15 @@ func (m *Mempool) Start() error {
}
m.doneChan = make(chan struct{})

if m.kupoUrl == "" {
m.kupoUrl = config.GetConfig().KupoUrl
}
if m.kupoUrl == "" {
m.logger.Info("Kupo URL not set; resolved inputs will be omitted (set KUPO_URL or --input-mempool-kupo-url)")
} else {
m.logger.Info("Using Kupo for input resolution", "url", m.kupoUrl)
}

if err := m.setupConnection(); err != nil {
return err
}
Expand Down Expand Up @@ -316,6 +340,16 @@ func (m *Mempool) pollOnce() {
}
ctx := event.NewMempoolTransactionContext(p.tx, 0, m.networkMagic)
payload := event.NewTransactionEventFromTx(p.tx, m.includeCbor)
if m.kupoUrl != "" && !m.kupoDisabled {
resolvedInputs, err := m.resolveTransactionInputs(p.tx)
if err != nil {
if m.logger != nil {
m.logger.Warn("failed to resolve transaction inputs via Kupo, emitting without resolved inputs", "error", err)
}
} else if len(resolvedInputs) > 0 {
payload.ResolvedInputs = resolvedInputs
}
}
evt := event.New("input.transaction", time.Now(), ctx, payload)
select {
case <-m.doneChan:
Expand All @@ -335,3 +369,96 @@ func (m *Mempool) parseTx(data []byte) (ledger.Transaction, error) {
}
return ledger.NewTransactionFromCbor(txType, data)
}

func (m *Mempool) getKupoClient() (*kugo.Client, error) {
if m.kupoClient != nil {
return m.kupoClient, nil
}
urlStr := m.kupoUrl
if urlStr == "" {
return nil, errors.New("kupo URL not configured")
}
_, err := url.ParseRequestURI(urlStr)
if err != nil {
return nil, fmt.Errorf("invalid kupo URL: %w", err)
}
kugoLogger := logging.NewKugoCustomLogger(logging.LevelInfo)
k := kugo.New(
kugo.WithEndpoint(urlStr),
kugo.WithLogger(kugoLogger),
kugo.WithTimeout(defaultKupoTimeout),
)
healthURL := strings.TrimRight(urlStr, "/") + "/health"
ctx, cancel := context.WithTimeout(context.Background(), kupoHealthTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create health check request: %w", err)
}
httpClient := &http.Client{Timeout: kupoHealthTimeout}
resp, err := httpClient.Do(req)
if err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
return nil, errors.New("kupo health check timed out after 3 seconds")
case strings.Contains(err.Error(), "no such host"):
return nil, fmt.Errorf("failed to resolve kupo host: %w", err)
default:
return nil, fmt.Errorf("failed to perform health check: %w", err)
}
}
if resp == nil {
return nil, errors.New("health check failed with nil response")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
return nil, fmt.Errorf("health check failed with status code: %d", resp.StatusCode)
}
m.kupoClient = k
return k, nil
}

func (m *Mempool) resolveTransactionInputs(tx ledger.Transaction) ([]ledger.TransactionOutput, error) {
var resolvedInputs []ledger.TransactionOutput
k, err := m.getKupoClient()
if err != nil {
return nil, err
}
for _, input := range tx.Inputs() {
txID := input.Id().String()
txIndex := int(input.Index())
// Kupo output-reference pattern: output_index@transaction_id (see Kupo Patterns doc)
pattern := fmt.Sprintf("%d@%s", txIndex, txID)
ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
matches, err := k.Matches(ctx, kugo.Pattern(pattern))
cancel()
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "Invalid pattern!") || strings.Contains(errStr, "cannot unmarshal object into Go value of type []kugo.Match") {
if !m.kupoInvalidPatternLogged {
m.kupoInvalidPatternLogged = true
if m.logger != nil {
m.logger.Debug("Kupo does not support output-reference pattern, disabling input resolution", "error", err)
}
}
m.kupoDisabled = true
return resolvedInputs, nil
}
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("kupo matches query timed out after %v", defaultKupoTimeout)
}
return nil, fmt.Errorf("error fetching matches for input TxId: %s, Index: %d: %w", txID, txIndex, err)
}
for _, match := range matches {
out, err := chainsync.NewResolvedTransactionOutput(match)
if err != nil {
return nil, err
}
resolvedInputs = append(resolvedInputs, out)
}
if len(matches) == 0 && m.logger != nil {
m.logger.Debug("Kupo returned no matches for input; ensure Kupo is run with a pattern that indexes this output (e.g. --match \"*\")", "pattern", pattern)
}
}
return resolvedInputs, nil
}
7 changes: 7 additions & 0 deletions input/mempool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,10 @@ func WithPollInterval(duration string) MempoolOptionFunc {
m.pollIntervalStr = duration
}
}

// WithKupoUrl specifies the Kupo API URL for resolving transaction inputs (e.g. http://localhost:1442).
func WithKupoUrl(kupoUrl string) MempoolOptionFunc {
return func(m *Mempool) {
m.kupoUrl = kupoUrl
}
}
10 changes: 10 additions & 0 deletions input/mempool/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var cmdlineOptions struct {
ntcTcp bool
includeCbor bool
pollInterval string
kupoUrl string
}

func init() {
Expand Down Expand Up @@ -90,6 +91,14 @@ func init() {
DefaultValue: "5s",
Dest: &(cmdlineOptions.pollInterval),
},
{
Name: "kupo-url",
Type: plugin.PluginOptionTypeString,
CustomEnvVar: "KUPO_URL",
Description: "Kupo API URL for resolving transaction inputs (e.g. http://localhost:1442). Kupo must index the outputs you need (e.g. run with --match \"*\") or resolution will be empty.",
DefaultValue: "",
Dest: &(cmdlineOptions.kupoUrl),
},
},
},
)
Expand All @@ -111,5 +120,6 @@ func NewFromCmdlineOptions() plugin.Plugin {
WithNtcTcp(cmdlineOptions.ntcTcp),
WithIncludeCbor(cmdlineOptions.includeCbor),
WithPollInterval(cmdlineOptions.pollInterval),
WithKupoUrl(cmdlineOptions.kupoUrl),
)
}