Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/tnclient/options_cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,25 @@ func WithCRETransportAndSigner(runtime cre.NodeRuntime, endpoint string, signer
c.transport, _ = NewCRETransport(runtime, endpoint, signer)
}
}

// WithCRETransportAndSignerWithHTTPCache is a convenience function that combines
// WithSigner and a CRE transport configured with an HTTP cache in the correct
// order.
//
// This ensures the signer is set before creating the CRE transport, which is
// necessary for write operations. The provided cacheCfg controls the HTTP cache
// behavior used by the underlying CRE transport.
//
// Example:
//
// client, err := tnclient.NewClient(ctx, endpoint,
// tnclient.WithCRETransportAndSignerWithHTTPCache(nodeRuntime, endpoint, signer, cacheCfg),
// )
func WithCRETransportAndSignerWithHTTPCache(runtime cre.NodeRuntime, endpoint string, signer auth.Signer, cacheCfg *CREHTTPCacheConfig) Option {
return func(c *Client) {
// Set signer first
c.signer = signer
// Then create CRE transport with the signer and HTTP cache
c.transport, _ = NewCRETransportWithHTTPCache(runtime, endpoint, signer, cacheCfg)
}
}
137 changes: 119 additions & 18 deletions core/tnclient/transport_cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package tnclient
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
Expand All @@ -24,6 +26,7 @@ import (

"github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http"
"github.com/smartcontractkit/cre-sdk-go/cre"
"google.golang.org/protobuf/types/known/durationpb"
)

// CRETransport implements Transport using Chainlink CRE's HTTP client.
Expand Down Expand Up @@ -67,6 +70,8 @@ type CRETransport struct {
currentNonce int64 // Track nonce for sequential transactions
nonceMu sync.Mutex
nonceFetched bool
httpCacheStore bool
httpCacheMaxAge time.Duration
}

// Verify CRETransport implements Transport interface at compile time
Expand Down Expand Up @@ -98,20 +103,99 @@ func NewCRETransport(runtime cre.NodeRuntime, endpoint string, signer auth.Signe
}

return &CRETransport{
runtime: runtime,
client: &http.Client{},
endpoint: endpoint,
signer: signer,
chainID: "", // Will be fetched on first call if needed
runtime: runtime,
client: &http.Client{},
endpoint: endpoint,
signer: signer,
chainID: "", // Will be fetched on first call if needed
httpCacheStore: defaultHTTPCacheStore,
httpCacheMaxAge: defaultHTTPCacheMaxAge,
}, nil
}

// nextReqID generates the next JSON-RPC request ID
func (t *CRETransport) nextReqID() string {
func NewCRETransportWithHTTPCache(runtime cre.NodeRuntime, endpoint string, signer auth.Signer, cacheCfg *CREHTTPCacheConfig) (*CRETransport, error) {
t, err := NewCRETransport(runtime, endpoint, signer)
if err != nil {
return nil, err
}
t.ApplyHTTPCacheConfig(cacheCfg)
return t, nil
}

func (t *CRETransport) ApplyHTTPCacheConfig(cfg *CREHTTPCacheConfig) {
if cfg == nil {
return
}

if cfg.Store != nil {
t.httpCacheStore = *cfg.Store
}

if cfg.MaxAgeSeconds != nil {
secs := *cfg.MaxAgeSeconds
if secs < 0 {
secs = 0
}
d := time.Duration(secs) * time.Second

// Clamp to CRE max.
if d > maxHTTPCacheMaxAge {
d = maxHTTPCacheMaxAge
}
t.httpCacheMaxAge = d
}
}

func (t *CRETransport) nextReqID(method string, paramsJSON []byte) string {
// Cache is "active" if we store or we attempt cache reads (MaxAge > 0).
cacheActive := t.httpCacheStore || (t.httpCacheMaxAge > 0)

if cacheActive {
//required because multiple node triggers could mean reqID overrides intended caching
h := sha256.New()
h.Write([]byte(method))
h.Write([]byte{0})
h.Write(paramsJSON)
sum := h.Sum(nil)
return "tn:" + hex.EncodeToString(sum[:8])
}

// Default behavior (unchanged): monotonically increasing per process
id := t.reqID.Add(1)
return strconv.FormatUint(id, 10)
}

// Transport-wide defaults (applied when workflow config does not specify cache settings).
const (
defaultHTTPCacheStore = true
defaultHTTPCacheMaxAge = 60 * time.Second

// CRE documented max for cache MaxAge.
maxHTTPCacheMaxAge = 10 * time.Minute
)

// CREHTTPCacheConfig is intended to be populated from workflow config and then
// passed into the SDK client wiring (Option A).
type CREHTTPCacheConfig struct {
// If nil, defaults to true.
Store *bool `json:"store,omitempty"`

// If nil, defaults to 60 seconds. Values > 600 are clamped to 600 (10 minutes).
MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"`
}

// cacheSettingsForJSONRPC determines caching behavior per JSON-RPC method.
// Uses paramsJSON so callers can evolve this policy without changing call sites.
func (t *CRETransport) cacheSettingsForJSONRPC(method string, paramsJSON []byte) *http.CacheSettings {
// Always attach cache settings so every CRE HTTP request participates.
// Note: per CRE docs, if MaxAge is nil/zero it won't read from cache,
// but may still store if Store is true.
return &http.CacheSettings{
Store: t.httpCacheStore,
MaxAge: durationpb.New(t.httpCacheMaxAge),
}
}

// callJSONRPC makes a JSON-RPC call via CRE HTTP client
// It automatically handles authentication if the endpoint returns 401
func (t *CRETransport) callJSONRPC(ctx context.Context, method string, params any, result any) error {
Expand Down Expand Up @@ -147,8 +231,10 @@ func (t *CRETransport) doJSONRPC(ctx context.Context, method string, params any,
return fmt.Errorf("failed to marshal params: %w", err)
}

cacheSettings := t.cacheSettingsForJSONRPC(method, paramsJSON)

// Create JSON-RPC request
reqID := t.nextReqID()
reqID := t.nextReqID(method, paramsJSON)
rpcReq := jsonrpc.NewRequest(reqID, method, paramsJSON)

// Marshal the full request
Expand All @@ -171,10 +257,11 @@ func (t *CRETransport) doJSONRPC(ctx context.Context, method string, params any,

// Create CRE HTTP request
httpReq := &http.Request{
Url: t.endpoint,
Method: "POST",
Body: requestBody,
Headers: headers,
Url: t.endpoint,
Method: "POST",
Body: requestBody,
Headers: headers,
CacheSettings: cacheSettings,
}

// Execute via CRE client (returns Promise)
Expand Down Expand Up @@ -443,11 +530,21 @@ func (t *CRETransport) executeOnce(ctx context.Context, namespace string, action
}

// Manually construct JSON-RPC request to bypass params map
reqID := t.nextReqID()
reqID := t.nextReqID("user.broadcast", txJSON)
rpcReqJSON := fmt.Sprintf(
`{"jsonrpc":"2.0","id":"%s","method":"user.broadcast","params":{"tx":%s}}`,
reqID, string(txJSON))

// Build paramsJSON for cache policy evaluation (without changing request construction)
type broadcastParams struct {
Tx json.RawMessage `json:"tx"`
}
paramsJSON, err := json.Marshal(&broadcastParams{Tx: txJSON})
if err != nil {
return types.Hash{}, fmt.Errorf("failed to marshal broadcast params: %w", err)
}
cacheSettings := t.cacheSettingsForJSONRPC("user.broadcast", paramsJSON)

// Create headers
headers := map[string]string{
"Content-Type": "application/json",
Expand All @@ -462,10 +559,11 @@ func (t *CRETransport) executeOnce(ctx context.Context, namespace string, action

// Create CRE HTTP request
httpReq := &http.Request{
Url: t.endpoint,
Method: "POST",
Body: []byte(rpcReqJSON),
Headers: headers,
Url: t.endpoint,
Method: "POST",
Body: []byte(rpcReqJSON),
Headers: headers,
CacheSettings: cacheSettings,
}

// Execute via CRE client
Expand Down Expand Up @@ -767,8 +865,10 @@ func (t *CRETransport) doJSONRPCWithResponse(ctx context.Context, method string,
return nil, fmt.Errorf("failed to marshal params: %w", err)
}

cacheSettings := t.cacheSettingsForJSONRPC(method, paramsJSON)

// Create JSON-RPC request
reqID := t.nextReqID()
reqID := t.nextReqID(method, paramsJSON)
rpcReq := jsonrpc.NewRequest(reqID, method, paramsJSON)

// Marshal the full request
Expand All @@ -785,6 +885,7 @@ func (t *CRETransport) doJSONRPCWithResponse(ctx context.Context, method string,
Headers: map[string]string{
"Content-Type": "application/json",
},
CacheSettings: cacheSettings,
}

// Execute via CRE client (returns Promise)
Expand Down
Loading
Loading