diff --git a/core/tnclient/options_cre.go b/core/tnclient/options_cre.go index 14f73ab..42e97c0 100644 --- a/core/tnclient/options_cre.go +++ b/core/tnclient/options_cre.go @@ -107,3 +107,25 @@ func WithCRETransportAndSigner(runtime cre.NodeRuntime, endpoint string, signer c.transport, _ = NewCRETransport(runtime, endpoint, signer) } } + +// WithCRETransportAndSignerWithHTTPCache is a convenience function that combines +// WithSigner and a CRE transport configured with an HTTP cache in the correct +// order. +// +// This ensures the signer is set before creating the CRE transport, which is +// necessary for write operations. The provided cacheCfg controls the HTTP cache +// behavior used by the underlying CRE transport. +// +// Example: +// +// client, err := tnclient.NewClient(ctx, endpoint, +// tnclient.WithCRETransportAndSignerWithHTTPCache(nodeRuntime, endpoint, signer, cacheCfg), +// ) +func WithCRETransportAndSignerWithHTTPCache(runtime cre.NodeRuntime, endpoint string, signer auth.Signer, cacheCfg *CREHTTPCacheConfig) Option { + return func(c *Client) { + // Set signer first + c.signer = signer + // Then create CRE transport with the signer and HTTP cache + c.transport, _ = NewCRETransportWithHTTPCache(runtime, endpoint, signer, cacheCfg) + } +} diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go index 504e4fa..b1907a3 100644 --- a/core/tnclient/transport_cre.go +++ b/core/tnclient/transport_cre.go @@ -5,6 +5,8 @@ package tnclient import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "math/big" @@ -24,6 +26,7 @@ import ( "github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http" "github.com/smartcontractkit/cre-sdk-go/cre" + "google.golang.org/protobuf/types/known/durationpb" ) // CRETransport implements Transport using Chainlink CRE's HTTP client. @@ -67,6 +70,8 @@ type CRETransport struct { currentNonce int64 // Track nonce for sequential transactions nonceMu sync.Mutex nonceFetched bool + httpCacheStore bool + httpCacheMaxAge time.Duration } // Verify CRETransport implements Transport interface at compile time @@ -98,20 +103,99 @@ func NewCRETransport(runtime cre.NodeRuntime, endpoint string, signer auth.Signe } return &CRETransport{ - runtime: runtime, - client: &http.Client{}, - endpoint: endpoint, - signer: signer, - chainID: "", // Will be fetched on first call if needed + runtime: runtime, + client: &http.Client{}, + endpoint: endpoint, + signer: signer, + chainID: "", // Will be fetched on first call if needed + httpCacheStore: defaultHTTPCacheStore, + httpCacheMaxAge: defaultHTTPCacheMaxAge, }, nil } -// nextReqID generates the next JSON-RPC request ID -func (t *CRETransport) nextReqID() string { +func NewCRETransportWithHTTPCache(runtime cre.NodeRuntime, endpoint string, signer auth.Signer, cacheCfg *CREHTTPCacheConfig) (*CRETransport, error) { + t, err := NewCRETransport(runtime, endpoint, signer) + if err != nil { + return nil, err + } + t.ApplyHTTPCacheConfig(cacheCfg) + return t, nil +} + +func (t *CRETransport) ApplyHTTPCacheConfig(cfg *CREHTTPCacheConfig) { + if cfg == nil { + return + } + + if cfg.Store != nil { + t.httpCacheStore = *cfg.Store + } + + if cfg.MaxAgeSeconds != nil { + secs := *cfg.MaxAgeSeconds + if secs < 0 { + secs = 0 + } + d := time.Duration(secs) * time.Second + + // Clamp to CRE max. + if d > maxHTTPCacheMaxAge { + d = maxHTTPCacheMaxAge + } + t.httpCacheMaxAge = d + } +} + +func (t *CRETransport) nextReqID(method string, paramsJSON []byte) string { + // Cache is "active" if we store or we attempt cache reads (MaxAge > 0). + cacheActive := t.httpCacheStore || (t.httpCacheMaxAge > 0) + + if cacheActive { + //required because multiple node triggers could mean reqID overrides intended caching + h := sha256.New() + h.Write([]byte(method)) + h.Write([]byte{0}) + h.Write(paramsJSON) + sum := h.Sum(nil) + return "tn:" + hex.EncodeToString(sum[:8]) + } + + // Default behavior (unchanged): monotonically increasing per process id := t.reqID.Add(1) return strconv.FormatUint(id, 10) } +// Transport-wide defaults (applied when workflow config does not specify cache settings). +const ( + defaultHTTPCacheStore = true + defaultHTTPCacheMaxAge = 60 * time.Second + + // CRE documented max for cache MaxAge. + maxHTTPCacheMaxAge = 10 * time.Minute +) + +// CREHTTPCacheConfig is intended to be populated from workflow config and then +// passed into the SDK client wiring (Option A). +type CREHTTPCacheConfig struct { + // If nil, defaults to true. + Store *bool `json:"store,omitempty"` + + // If nil, defaults to 60 seconds. Values > 600 are clamped to 600 (10 minutes). + MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"` +} + +// cacheSettingsForJSONRPC determines caching behavior per JSON-RPC method. +// Uses paramsJSON so callers can evolve this policy without changing call sites. +func (t *CRETransport) cacheSettingsForJSONRPC(method string, paramsJSON []byte) *http.CacheSettings { + // Always attach cache settings so every CRE HTTP request participates. + // Note: per CRE docs, if MaxAge is nil/zero it won't read from cache, + // but may still store if Store is true. + return &http.CacheSettings{ + Store: t.httpCacheStore, + MaxAge: durationpb.New(t.httpCacheMaxAge), + } +} + // callJSONRPC makes a JSON-RPC call via CRE HTTP client // It automatically handles authentication if the endpoint returns 401 func (t *CRETransport) callJSONRPC(ctx context.Context, method string, params any, result any) error { @@ -147,8 +231,10 @@ func (t *CRETransport) doJSONRPC(ctx context.Context, method string, params any, return fmt.Errorf("failed to marshal params: %w", err) } + cacheSettings := t.cacheSettingsForJSONRPC(method, paramsJSON) + // Create JSON-RPC request - reqID := t.nextReqID() + reqID := t.nextReqID(method, paramsJSON) rpcReq := jsonrpc.NewRequest(reqID, method, paramsJSON) // Marshal the full request @@ -171,10 +257,11 @@ func (t *CRETransport) doJSONRPC(ctx context.Context, method string, params any, // Create CRE HTTP request httpReq := &http.Request{ - Url: t.endpoint, - Method: "POST", - Body: requestBody, - Headers: headers, + Url: t.endpoint, + Method: "POST", + Body: requestBody, + Headers: headers, + CacheSettings: cacheSettings, } // Execute via CRE client (returns Promise) @@ -443,11 +530,21 @@ func (t *CRETransport) executeOnce(ctx context.Context, namespace string, action } // Manually construct JSON-RPC request to bypass params map - reqID := t.nextReqID() + reqID := t.nextReqID("user.broadcast", txJSON) rpcReqJSON := fmt.Sprintf( `{"jsonrpc":"2.0","id":"%s","method":"user.broadcast","params":{"tx":%s}}`, reqID, string(txJSON)) + // Build paramsJSON for cache policy evaluation (without changing request construction) + type broadcastParams struct { + Tx json.RawMessage `json:"tx"` + } + paramsJSON, err := json.Marshal(&broadcastParams{Tx: txJSON}) + if err != nil { + return types.Hash{}, fmt.Errorf("failed to marshal broadcast params: %w", err) + } + cacheSettings := t.cacheSettingsForJSONRPC("user.broadcast", paramsJSON) + // Create headers headers := map[string]string{ "Content-Type": "application/json", @@ -462,10 +559,11 @@ func (t *CRETransport) executeOnce(ctx context.Context, namespace string, action // Create CRE HTTP request httpReq := &http.Request{ - Url: t.endpoint, - Method: "POST", - Body: []byte(rpcReqJSON), - Headers: headers, + Url: t.endpoint, + Method: "POST", + Body: []byte(rpcReqJSON), + Headers: headers, + CacheSettings: cacheSettings, } // Execute via CRE client @@ -767,8 +865,10 @@ func (t *CRETransport) doJSONRPCWithResponse(ctx context.Context, method string, return nil, fmt.Errorf("failed to marshal params: %w", err) } + cacheSettings := t.cacheSettingsForJSONRPC(method, paramsJSON) + // Create JSON-RPC request - reqID := t.nextReqID() + reqID := t.nextReqID(method, paramsJSON) rpcReq := jsonrpc.NewRequest(reqID, method, paramsJSON) // Marshal the full request @@ -785,6 +885,7 @@ func (t *CRETransport) doJSONRPCWithResponse(ctx context.Context, method string, Headers: map[string]string{ "Content-Type": "application/json", }, + CacheSettings: cacheSettings, } // Execute via CRE client (returns Promise) diff --git a/core/tnclient/transport_cre_test.go b/core/tnclient/transport_cre_test.go index b1d9ff6..4ce592a 100644 --- a/core/tnclient/transport_cre_test.go +++ b/core/tnclient/transport_cre_test.go @@ -4,9 +4,16 @@ package tnclient import ( "fmt" + "go/ast" + "go/parser" + "go/token" + "os" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Note: These are basic structural tests for CRE transport. @@ -25,13 +32,31 @@ func TestNewCRETransport(t *testing.T) { // Verify the function is not nil assert.NotNil(t, NewCRETransport) }) + + t.Run("defaults_and_endpoint_normalization", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + // Endpoint should have /rpc/v1 suffix + assert.Equal(t, "https://example.com/rpc/v1", tr.endpoint) + + // Cache defaults should be applied + assert.Equal(t, defaultHTTPCacheStore, tr.httpCacheStore) + assert.Equal(t, defaultHTTPCacheMaxAge, tr.httpCacheMaxAge) + + // Sanity: cache settings should be non-nil for any method + cs := tr.cacheSettingsForJSONRPC("user.call", []byte(`{}`)) + require.NotNil(t, cs) + assert.Equal(t, defaultHTTPCacheStore, cs.Store) + require.NotNil(t, cs.MaxAge) + assert.Equal(t, defaultHTTPCacheMaxAge, cs.MaxAge.AsDuration()) + }) } func TestCRETransport_Implements_Transport_Interface(t *testing.T) { // This compile-time check verifies that CRETransport implements Transport // The var _ Transport = (*CRETransport)(nil) line in transport_cre.go // ensures this at compile time, but we include this test for documentation. - t.Run("implements_interface", func(t *testing.T) { // If this compiles, the interface is implemented var _ Transport = (*CRETransport)(nil) @@ -162,3 +187,292 @@ func TestIsTransientTxError(t *testing.T) { }) } } + +func TestCRETransport_ApplyHTTPCacheConfig(t *testing.T) { + t.Run("nil_config_is_noop", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + beforeStore := tr.httpCacheStore + beforeAge := tr.httpCacheMaxAge + + tr.ApplyHTTPCacheConfig(nil) + + assert.Equal(t, beforeStore, tr.httpCacheStore) + assert.Equal(t, beforeAge, tr.httpCacheMaxAge) + }) + + t.Run("overrides_are_applied", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + store := false + secs := int64(30) + cfg := &CREHTTPCacheConfig{Store: &store, MaxAgeSeconds: &secs} + + tr.ApplyHTTPCacheConfig(cfg) + + assert.False(t, tr.httpCacheStore) + assert.Equal(t, 30*time.Second, tr.httpCacheMaxAge) + + cs := tr.cacheSettingsForJSONRPC("user.call", []byte(`{}`)) + require.NotNil(t, cs) + assert.False(t, cs.Store) + require.NotNil(t, cs.MaxAge) + assert.Equal(t, 30*time.Second, cs.MaxAge.AsDuration()) + }) + + t.Run("negative_max_age_is_clamped_to_zero", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + secs := int64(-5) + cfg := &CREHTTPCacheConfig{MaxAgeSeconds: &secs} + tr.ApplyHTTPCacheConfig(cfg) + + assert.Equal(t, 0*time.Second, tr.httpCacheMaxAge) + + cs := tr.cacheSettingsForJSONRPC("user.call", []byte(`{}`)) + require.NotNil(t, cs) + require.NotNil(t, cs.MaxAge) + assert.Equal(t, 0*time.Second, cs.MaxAge.AsDuration()) + }) + + t.Run("max_age_is_clamped_to_cre_max", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + secs := int64(999999) + cfg := &CREHTTPCacheConfig{MaxAgeSeconds: &secs} + tr.ApplyHTTPCacheConfig(cfg) + + assert.Equal(t, maxHTTPCacheMaxAge, tr.httpCacheMaxAge) + + cs := tr.cacheSettingsForJSONRPC("user.call", []byte(`{}`)) + require.NotNil(t, cs) + require.NotNil(t, cs.MaxAge) + assert.Equal(t, maxHTTPCacheMaxAge, cs.MaxAge.AsDuration()) + }) +} + +func TestCRETransport_CacheSettingsForJSONRPC(t *testing.T) { + t.Run("all_methods_return_cache_settings", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + methods := []string{ + "user.call", + "user.tx_query", + "user.account", + "user.chain_info", + "kgw.authn_param", + "kgw.authn", + "user.broadcast", + } + + for _, m := range methods { + cs := tr.cacheSettingsForJSONRPC(m, []byte(`{}`)) + require.NotNil(t, cs, "method %s should return non-nil CacheSettings", m) + assert.Equal(t, defaultHTTPCacheStore, cs.Store, "method %s Store should match transport default", m) + require.NotNil(t, cs.MaxAge, "method %s should set MaxAge", m) + assert.Equal(t, defaultHTTPCacheMaxAge, cs.MaxAge.AsDuration(), "method %s MaxAge should match transport default", m) + } + }) +} + +func TestCRETransport_NextReqID(t *testing.T) { + t.Run("deterministic_when_caching_active", func(t *testing.T) { + tr, err := NewCRETransport(nil, "https://example.com", nil) + require.NoError(t, err) + + params := []byte(`{"a":1}`) + id1 := tr.nextReqID("user.call", params) + id2 := tr.nextReqID("user.call", params) + + assert.Equal(t, id1, id2) + assert.True(t, stringsHasPrefix(id1, "tn:"), "expected deterministic id to have tn: prefix") + assert.Equal(t, 19, len(id1), "expected tn: + 16 hex chars") + + id3 := tr.nextReqID("user.call", []byte(`{"a":2}`)) + assert.NotEqual(t, id1, id3) + + // Different method -> different id + id4 := tr.nextReqID("user.tx_query", params) + assert.NotEqual(t, id1, id4) + }) + + t.Run("monotonic_sequence_when_caching_disabled", func(t *testing.T) { + // Construct minimal transport with caching disabled and fresh reqID counter. + tr := &CRETransport{ + httpCacheStore: false, + httpCacheMaxAge: 0, + } + + id1 := tr.nextReqID("user.call", []byte(`{}`)) + id2 := tr.nextReqID("user.call", []byte(`{}`)) + + assert.Equal(t, "1", id1) + assert.Equal(t, "2", id2) + }) +} + +// Test that the caching change is actually wired into HTTP request construction. +// +// We cannot execute the CRE HTTP client outside a CRE runtime, so this test parses +// transport_cre.go and verifies that: +// - doJSONRPC, doJSONRPCWithResponse, and executeOnce each set CacheSettings in +// their &http.Request literals +// - the CacheSettings value is the local variable named `cacheSettings` +// - `cacheSettings` is assigned from t.cacheSettingsForJSONRPC(..., paramsJSON) +func TestCRETransport_HTTPRequestsIncludeCacheSettings(t *testing.T) { + src, err := os.ReadFile("transport_cre.go") + require.NoError(t, err, "failed to read transport_cre.go") + + fset := token.NewFileSet() + f, err := parser.ParseFile(fset, "transport_cre.go", src, 0) + require.NoError(t, err, "failed to parse transport_cre.go") + + targets := []struct { + funcName string + expectFirstArgIsMethod bool + expectFirstArgString string + }{ + {funcName: "doJSONRPC", expectFirstArgIsMethod: true}, + {funcName: "doJSONRPCWithResponse", expectFirstArgIsMethod: true}, + {funcName: "executeOnce", expectFirstArgString: "user.broadcast"}, + } + + for _, tc := range targets { + t.Run(tc.funcName, func(t *testing.T) { + fd := findFuncDecl(t, f, tc.funcName) + require.NotNil(t, fd, "function %s not found", tc.funcName) + + assert.True(t, hasCacheSettingsAssignment(t, fd, tc.expectFirstArgIsMethod, tc.expectFirstArgString), + "%s should assign cacheSettings := t.cacheSettingsForJSONRPC(..., paramsJSON)", tc.funcName) + + assert.True(t, hasHttpRequestWithCacheSettings(t, fd), + "%s should set CacheSettings on http.Request literal", tc.funcName) + }) + } +} + +func findFuncDecl(t *testing.T, file *ast.File, name string) *ast.FuncDecl { + t.Helper() + for _, decl := range file.Decls { + if fd, ok := decl.(*ast.FuncDecl); ok && fd.Name != nil && fd.Name.Name == name { + return fd + } + } + return nil +} + +func hasCacheSettingsAssignment(t *testing.T, fd *ast.FuncDecl, firstArgIsMethod bool, firstArgString string) bool { + t.Helper() + + found := false + ast.Inspect(fd, func(n ast.Node) bool { + as, ok := n.(*ast.AssignStmt) + if !ok || len(as.Lhs) != 1 || len(as.Rhs) != 1 { + return true + } + + lhs, ok := as.Lhs[0].(*ast.Ident) + if !ok || lhs.Name != "cacheSettings" { + return true + } + + call, ok := as.Rhs[0].(*ast.CallExpr) + if !ok { + return true + } + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return true + } + recv, ok := sel.X.(*ast.Ident) + if !ok || recv.Name != "t" || sel.Sel == nil || sel.Sel.Name != "cacheSettingsForJSONRPC" { + return true + } + + if len(call.Args) != 2 { + return true + } + + if id2, ok := call.Args[1].(*ast.Ident); !ok || id2.Name != "paramsJSON" { + return true + } + + if firstArgIsMethod { + id1, ok := call.Args[0].(*ast.Ident) + if !ok || id1.Name != "method" { + return true + } + } + if firstArgString != "" { + bl, ok := call.Args[0].(*ast.BasicLit) + if !ok || bl.Kind != token.STRING { + return true + } + unquoted, err := strconv.Unquote(bl.Value) + if err != nil || unquoted != firstArgString { + return true + } + } + + found = true + return true + }) + return found +} + +func hasHttpRequestWithCacheSettings(t *testing.T, fd *ast.FuncDecl) bool { + t.Helper() + + found := false + ast.Inspect(fd, func(n ast.Node) bool { + ue, ok := n.(*ast.UnaryExpr) + if !ok || ue.Op != token.AND { + return true + } + cl, ok := ue.X.(*ast.CompositeLit) + if !ok { + return true + } + + se, ok := cl.Type.(*ast.SelectorExpr) + if !ok { + return true + } + pkg, ok := se.X.(*ast.Ident) + if !ok || pkg.Name != "http" || se.Sel == nil || se.Sel.Name != "Request" { + return true + } + + for _, elt := range cl.Elts { + kv, ok := elt.(*ast.KeyValueExpr) + if !ok { + continue + } + key, ok := kv.Key.(*ast.Ident) + if !ok || key.Name != "CacheSettings" { + continue + } + val, ok := kv.Value.(*ast.Ident) + if !ok || val.Name != "cacheSettings" { + continue + } + found = true + return true + } + + return true + }) + return found +} + +func stringsHasPrefix(s, prefix string) bool { + if len(prefix) > len(s) { + return false + } + return s[:len(prefix)] == prefix +} diff --git a/docs/CRE_INTEGRATION.md b/docs/CRE_INTEGRATION.md index 06d1d0c..da351f8 100644 --- a/docs/CRE_INTEGRATION.md +++ b/docs/CRE_INTEGRATION.md @@ -168,6 +168,36 @@ Use for: --- +### CRE HTTP caching (recommended for non-idempotent writes) + +CRE executes workflow logic across multiple DON nodes. Without additional controls, **each node may independently issue the same HTTP request**, including requests that have side effects. For non-idempotent operations (for example, submitting transactions or creating resources), this can result in duplicate external calls. + +CRE’s Go HTTP client supports best-effort request de-duplication via `CacheSettings` on `http.Request`. When enabled, one node performs the request and stores the response; other nodes can reuse it if it is still fresh (`MaxAge`). This is most appropriate for **POST/PUT/PATCH/DELETE-style** operations that should not be executed multiple times. + +Example (raw CRE HTTP request): + +```go +import ( + "time" + "google.golang.org/protobuf/types/known/durationpb" + crehttp "github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http" +) + +// ... +req := &crehttp.Request{ + Url: "https://example.com/api", + Method: "POST", + Body: payloadBytes, + CacheSettings: &crehttp.CacheSettings{ + Store: true, + MaxAge: durationpb.New(2 * time.Minute), + }, +} +resp, err := client.SendRequest(nodeRuntime, req).Await() + +``` +--- + ## API Reference For detailed API documentation including function signatures, parameters, and usage examples, see: @@ -186,6 +216,7 @@ For detailed API documentation including function signatures, parameters, and us - [Chainlink CRE Documentation](https://docs.chain.link/cre) - [TRUF.NETWORK SDK Documentation](./api-reference.md) - [CRE SDK Go Documentation](https://pkg.go.dev/github.com/smartcontractkit/cre-sdk-go) +- [CRE HTTP documentation](https://docs.chain.link/cre/guides/workflow/using-http-client/post-request-go) ### Examples - [TRUF + CRE Complete Demo](../examples/truf-cre-demo/) - 3-workflow pattern demonstrating full CRUD lifecycle