From a780a0fecc6728b5f8ec54ae7a1a288c2dbe7578 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 23 Dec 2025 17:59:41 +0700 Subject: [PATCH 1/5] feat: call TN action in ChainLink CRE --- core/tnclient/options_cre.go | 109 ++++++ core/tnclient/transport.go | 2 +- core/tnclient/transport_cre.go | 384 +++++++++++++++++++++ core/tnclient/transport_cre_test.go | 185 ++++++++++ docs/api-reference.md | 2 +- docs/stream-permissions.md | 2 +- examples/stream_cache_demo/go.mod | 4 +- go.mod | 8 +- go.sum | 14 + tests/integration/composed_actions_test.go | 2 +- 10 files changed, 704 insertions(+), 8 deletions(-) create mode 100644 core/tnclient/options_cre.go create mode 100644 core/tnclient/transport_cre.go create mode 100644 core/tnclient/transport_cre_test.go diff --git a/core/tnclient/options_cre.go b/core/tnclient/options_cre.go new file mode 100644 index 0000000..14f73ab --- /dev/null +++ b/core/tnclient/options_cre.go @@ -0,0 +1,109 @@ +//go:build wasip1 + +package tnclient + +import ( + "github.com/smartcontractkit/cre-sdk-go/cre" + "github.com/trufnetwork/kwil-db/core/crypto/auth" +) + +// WithCRETransport configures the client to use Chainlink CRE's HTTP client. +// +// This option is used when running the SDK in Chainlink Runtime Environment (CRE) +// workflows. CRE provides its own HTTP client with consensus and caching features +// that must be used instead of standard net/http. +// +// The runtime parameter is provided by the CRE workflow execution context via +// cre.RunInNodeMode(). The endpoint should match the TRUF.NETWORK gateway URL. +// +// Example usage in CRE workflow: +// +// func onCronTrigger(config *Config, runtime cre.Runtime, trigger *cron.Payload) (*Result, error) { +// logger := runtime.Logger() +// +// return cre.RunInNodeMode(config, runtime, +// func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) { +// // Create TRUF client with CRE transport +// client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, +// tnclient.WithSigner(signer), +// tnclient.WithCRETransport(nodeRuntime, config.TRUFEndpoint), // ← CRE transport +// ) +// if err != nil { +// logger.Error("Failed to create TRUF client", "error", err) +// return nil, err +// } +// +// // Use SDK normally - all methods work! +// actions, err := client.LoadActions() +// if err != nil { +// logger.Error("Failed to load actions", "error", err) +// return nil, err +// } +// +// fromTime := int(time.Now().Add(-24 * time.Hour).Unix()) +// toTime := int(time.Now().Unix()) +// +// result, err := actions.GetRecord(context.Background(), types.GetRecordInput{ +// DataProvider: "0x1234...", +// StreamId: "stai0000000000000000000000000000", +// From: &fromTime, +// To: &toTime, +// }) +// if err != nil { +// logger.Error("GetRecord failed", "error", err) +// return nil, err +// } +// +// logger.Info("Successfully fetched records", "count", len(result.Results)) +// return &Result{Records: result.Results}, nil +// }, +// cre.ConsensusAggregationFromTags[*Result](), +// ).Await() +// } +// +// Parameters: +// - runtime: CRE NodeRuntime from the workflow execution context +// - endpoint: TRUF.NETWORK gateway URL (e.g., "https://gateway.example.com") +// +// Note: When using WithCRETransport, you must also provide WithSigner if you need +// to perform write operations (InsertRecords, DeployStream, etc.). The signer should +// be created before the CRE workflow execution. +// +// Note: The provider URL passed to NewClient is ignored when using WithCRETransport, +// since the endpoint is provided directly to this option. +func WithCRETransport(runtime cre.NodeRuntime, endpoint string) Option { + return func(c *Client) { + // Note: Transport is created immediately with the current signer (if set) + // If WithSigner is applied after this option, the signer won't be available yet + // For guaranteed signer availability, use WithCRETransportAndSigner instead + c.transport, _ = NewCRETransport(runtime, endpoint, c.signer) + } +} + +// WithCRETransportAndSigner is a convenience function that combines WithSigner +// and WithCRETransport in the correct order. +// +// This ensures the signer is set before creating the CRE transport, which is +// necessary for write operations. +// +// Example: +// +// client, err := tnclient.NewClient(ctx, endpoint, +// tnclient.WithCRETransportAndSigner(nodeRuntime, endpoint, signer), +// ) +// +// This is equivalent to: +// +// client, err := tnclient.NewClient(ctx, endpoint, +// tnclient.WithSigner(signer), +// tnclient.WithCRETransport(nodeRuntime, endpoint), +// ) +func WithCRETransportAndSigner(runtime cre.NodeRuntime, endpoint string, signer auth.Signer) Option { + return func(c *Client) { + // Set signer first + c.signer = signer + + // Then create CRE transport with the signer + c.transport, _ = NewCRETransport(runtime, endpoint, signer) + } +} diff --git a/core/tnclient/transport.go b/core/tnclient/transport.go index 463ca75..6a7c1f4 100644 --- a/core/tnclient/transport.go +++ b/core/tnclient/transport.go @@ -9,7 +9,7 @@ import ( "github.com/trufnetwork/kwil-db/core/types" ) -// Transport abstracts the communication layer for TRUF Network operations. +// Transport abstracts the communication layer for TRUF.NETWORK operations. // This interface allows using different transport implementations without changing SDK code. // // The default implementation (HTTPTransport) uses standard net/http via kwil-db's diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go new file mode 100644 index 0000000..bdedc4a --- /dev/null +++ b/core/tnclient/transport_cre.go @@ -0,0 +1,384 @@ +//go:build wasip1 + +package tnclient + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "sync" + "sync/atomic" + "time" + + clientType "github.com/trufnetwork/kwil-db/core/client/types" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" + "github.com/trufnetwork/kwil-db/core/types" + + "github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http" + "github.com/smartcontractkit/cre-sdk-go/cre" +) + +// CRETransport implements Transport using Chainlink CRE's HTTP client. +// +// This transport is designed for use in Chainlink Runtime Environment (CRE) +// workflows where standard net/http is not available. It uses CRE's consensus-aware +// HTTP client with Promise-based async operations. +// +// Example usage in CRE workflow: +// +// func onCronTrigger(config *Config, runtime cre.Runtime, trigger *cron.Payload) (*Result, error) { +// return cre.RunInNodeMode(config, runtime, +// func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) { +// // Create TN client with CRE transport +// client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, +// tnclient.WithCRETransport(nodeRuntime), +// ) +// if err != nil { +// return nil, err +// } +// +// // Use client normally - all methods work! +// actions, _ := client.LoadActions() +// result, err := actions.GetRecord(context.Background(), ...) +// return &Result{Records: result.Results}, nil +// }, +// cre.ConsensusAggregationFromTags[*Result](), +// ).Await() +// } +type CRETransport struct { + runtime cre.NodeRuntime + client *http.Client + endpoint string + signer auth.Signer + chainID string + chainIDOnce sync.Once + chainIDErr error + reqID atomic.Uint64 +} + +// Verify CRETransport implements Transport interface at compile time +var _ Transport = (*CRETransport)(nil) + +// NewCRETransport creates a new CRE transport for use in Chainlink workflows. +// +// Parameters: +// - runtime: The CRE NodeRuntime provided by the workflow execution context +// - endpoint: HTTP(S) endpoint URL (e.g., "https://gateway.example.com") +// - signer: Cryptographic signer for transaction authentication (can be nil for read-only) +// +// Returns: +// - Configured CRETransport instance +// - Error if initialization fails +// +// Example: +// +// transport, err := NewCRETransport(nodeRuntime, "https://gateway.example.com", signer) +// if err != nil { +// return err +// } +func NewCRETransport(runtime cre.NodeRuntime, endpoint string, signer auth.Signer) (*CRETransport, error) { + return &CRETransport{ + runtime: runtime, + client: &http.Client{}, + endpoint: endpoint, + signer: signer, + chainID: "", // Will be fetched on first call if needed + }, nil +} + +// nextReqID generates the next JSON-RPC request ID +func (t *CRETransport) nextReqID() string { + id := t.reqID.Add(1) + return strconv.FormatUint(id, 10) +} + +// callJSONRPC makes a JSON-RPC call via CRE HTTP client +func (t *CRETransport) callJSONRPC(ctx context.Context, method string, params any, result any) error { + // Marshal the params + paramsJSON, err := json.Marshal(params) + if err != nil { + return fmt.Errorf("failed to marshal params: %w", err) + } + + // Create JSON-RPC request + reqID := t.nextReqID() + rpcReq := jsonrpc.NewRequest(reqID, method, paramsJSON) + + // Marshal the full request + requestBody, err := json.Marshal(rpcReq) + if err != nil { + return fmt.Errorf("failed to marshal JSON-RPC request: %w", err) + } + + // Create CRE HTTP request + httpReq := &http.Request{ + Url: t.endpoint, + Method: "POST", + Body: requestBody, + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + // Execute via CRE client (returns Promise) + httpResp, err := t.client.SendRequest(t.runtime, httpReq).Await() + if err != nil { + return fmt.Errorf("CRE HTTP request failed: %w", err) + } + + // Check HTTP status + if httpResp.StatusCode != 200 { + return fmt.Errorf("unexpected HTTP status code: %d", httpResp.StatusCode) + } + + // Parse JSON-RPC response + var rpcResp jsonrpc.Response + if err := json.Unmarshal(httpResp.Body, &rpcResp); err != nil { + return fmt.Errorf("failed to unmarshal JSON-RPC response: %w", err) + } + + // Check for JSON-RPC errors + if rpcResp.Error != nil { + return fmt.Errorf("JSON-RPC error: %s (code: %d)", rpcResp.Error.Message, rpcResp.Error.Code) + } + + // Verify JSON-RPC version + if rpcResp.JSONRPC != "2.0" { + return fmt.Errorf("invalid JSON-RPC response version: %s", rpcResp.JSONRPC) + } + + // Unmarshal result into provided struct + if result != nil { + if err := json.Unmarshal(rpcResp.Result, result); err != nil { + return fmt.Errorf("failed to unmarshal result: %w", err) + } + } + + return nil +} + +// Call executes a read-only action and returns results. +// +// This method uses CRE's HTTP client to make a JSON-RPC call to the TRUF.NETWORK gateway. +// The call is executed within CRE's consensus mechanism, ensuring all nodes in the DON +// reach agreement on the result. +func (t *CRETransport) Call(ctx context.Context, namespace string, action string, inputs []any) (*types.CallResult, error) { + // Build call params matching kwil-db's user/call endpoint + params := map[string]any{ + "dbid": namespace, + "action": action, + "inputs": inputs, + } + + var result types.CallResult + if err := t.callJSONRPC(ctx, "user.call", params, &result); err != nil { + return nil, err + } + + return &result, nil +} + +// Execute performs a write action and returns the transaction hash. +// +// This method builds a signed transaction and broadcasts it to the TRUF.NETWORK. +// The transaction is signed using the configured signer and executed within CRE's +// consensus mechanism. +func (t *CRETransport) Execute(ctx context.Context, namespace string, action string, inputs [][]any, opts ...clientType.TxOpt) (types.Hash, error) { + if t.signer == nil { + return types.Hash{}, fmt.Errorf("signer required for Execute operations") + } + + // Convert inputs to EncodedValue arrays + var encodedInputs [][]*types.EncodedValue + for _, inputRow := range inputs { + var encodedRow []*types.EncodedValue + for _, val := range inputRow { + encoded, err := types.EncodeValue(val) + if err != nil { + return types.Hash{}, fmt.Errorf("failed to encode input value: %w", err) + } + encodedRow = append(encodedRow, encoded) + } + encodedInputs = append(encodedInputs, encodedRow) + } + + // Build transaction payload using ActionExecution + payload := &types.ActionExecution{ + Namespace: namespace, + Action: action, + Arguments: encodedInputs, + } + + // Serialize payload + payloadBytes, err := payload.MarshalBinary() + if err != nil { + return types.Hash{}, fmt.Errorf("failed to marshal payload: %w", err) + } + + // Apply transaction options (nonce, fee, etc.) + txOpts := &clientType.TxOptions{} + for _, opt := range opts { + opt(txOpts) + } + + // Ensure chain ID is fetched before building transaction + // This prevents transactions with empty chain IDs + t.chainIDOnce.Do(func() { + t.chainIDErr = t.fetchChainID(ctx) + }) + if t.chainIDErr != nil { + return types.Hash{}, fmt.Errorf("failed to get chain ID: %w", t.chainIDErr) + } + if t.chainID == "" { + return types.Hash{}, fmt.Errorf("chain ID is empty") + } + + // Build unsigned transaction + tx := &types.Transaction{ + Body: &types.TransactionBody{ + Payload: payloadBytes, + PayloadType: payload.Type(), + Fee: txOpts.Fee, + Nonce: uint64(txOpts.Nonce), + ChainID: t.chainID, + }, + } + + // Sign transaction + if err := tx.Sign(t.signer); err != nil { + return types.Hash{}, fmt.Errorf("failed to sign transaction: %w", err) + } + + // Broadcast transaction + params := map[string]any{ + "tx": tx, + } + + var result struct { + TxHash types.Hash `json:"tx_hash"` + } + + if err := t.callJSONRPC(ctx, "user.broadcast", params, &result); err != nil { + return types.Hash{}, err + } + + return result.TxHash, nil +} + +// WaitTx polls for transaction confirmation with the specified interval. +// +// This method repeatedly queries the transaction status until it's confirmed, +// rejected, or the context is cancelled. It uses CRE's HTTP client for each poll. +func (t *CRETransport) WaitTx(ctx context.Context, txHash types.Hash, interval time.Duration) (*types.TxQueryResponse, error) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + // Query transaction status + params := map[string]any{ + "tx_hash": txHash, + } + + var result types.TxQueryResponse + if err := t.callJSONRPC(ctx, "user.tx_query", params, &result); err != nil { + // Distinguish between transient errors (not indexed yet) and permanent errors + // JSON-RPC errors with specific codes or messages about "not found" are transient + errMsg := err.Error() + // Common transient error indicators: not found, not indexed, pending + isTransient := containsAny(errMsg, []string{"not found", "not indexed", "pending", "unknown transaction"}) + + if !isTransient { + // Permanent error - authentication failure, network issues, malformed request + return nil, fmt.Errorf("transaction query failed: %w", err) + } + // Transient error - continue polling + continue + } + + // Check if transaction is finalized (either committed or rejected) + if result.Height > 0 { + return &result, nil + } + } + } +} + +// containsAny checks if a string contains any of the specified substrings (case-insensitive) +func containsAny(s string, substrings []string) bool { + lowerS := s + for _, substr := range substrings { + if len(substr) == 0 { + continue + } + // Simple case-insensitive substring check + for i := 0; i <= len(lowerS)-len(substr); i++ { + match := true + for j := 0; j < len(substr); j++ { + c1 := lowerS[i+j] + c2 := substr[j] + // Convert to lowercase for comparison + if c1 >= 'A' && c1 <= 'Z' { + c1 += 'a' - 'A' + } + if c2 >= 'A' && c2 <= 'Z' { + c2 += 'a' - 'A' + } + if c1 != c2 { + match = false + break + } + } + if match { + return true + } + } + } + return false +} + +// fetchChainID fetches and caches the chain ID from the gateway. +// This is called once via sync.Once to ensure thread-safe lazy initialization. +// Returns error if the fetch fails, which can be checked before critical operations. +func (t *CRETransport) fetchChainID(ctx context.Context) error { + // Fetch chain info from gateway + var result struct { + ChainID string `json:"chain_id"` + } + + if err := t.callJSONRPC(ctx, "user.chain_info", map[string]any{}, &result); err != nil { + return fmt.Errorf("failed to fetch chain ID: %w", err) + } + + // Cache the chain ID + t.chainID = result.ChainID + return nil +} + +// ChainID returns the network chain identifier. +// +// The chain ID is fetched from the gateway on first call and cached. +// This is used to ensure transactions are sent to the correct network. +// Returns empty string if the chain ID fetch fails. +func (t *CRETransport) ChainID() string { + // Use sync.Once to ensure thread-safe lazy initialization + t.chainIDOnce.Do(func() { + // Use a background context since this is a cached operation + t.chainIDErr = t.fetchChainID(context.Background()) + }) + + // Return cached chain ID (will be empty if fetch failed) + return t.chainID +} + +// Signer returns the cryptographic signer used for transaction authentication. +// +// Returns nil if no signer is configured (read-only mode). +func (t *CRETransport) Signer() auth.Signer { + return t.signer +} diff --git a/core/tnclient/transport_cre_test.go b/core/tnclient/transport_cre_test.go new file mode 100644 index 0000000..b40557f --- /dev/null +++ b/core/tnclient/transport_cre_test.go @@ -0,0 +1,185 @@ +//go:build wasip1 + +package tnclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Note: These are basic structural tests for CRE transport. +// Full integration tests require running in actual CRE environment. +// See the examples/cre_integration/ directory for complete working examples. + +func TestNewCRETransport(t *testing.T) { + // Note: We cannot create a real NodeRuntime outside of CRE environment, + // so this test just verifies the function signature and basic structure. + + t.Run("constructor_exists", func(t *testing.T) { + // This test just verifies that the NewCRETransport function exists + // and has the expected signature. + // Actual testing requires CRE simulation environment. + + // Verify the function is not nil + assert.NotNil(t, NewCRETransport) + }) +} + +func TestCRETransport_Implements_Transport_Interface(t *testing.T) { + // This compile-time check verifies that CRETransport implements Transport + // The var _ Transport = (*CRETransport)(nil) line in transport_cre.go + // ensures this at compile time, but we include this test for documentation. + + t.Run("implements_interface", func(t *testing.T) { + // If this compiles, the interface is implemented + var _ Transport = (*CRETransport)(nil) + }) +} + +func TestWithCRETransport(t *testing.T) { + t.Run("option_exists", func(t *testing.T) { + // Verify the WithCRETransport option function exists + assert.NotNil(t, WithCRETransport) + }) + + t.Run("option_signature", func(t *testing.T) { + // Verify the function returns an Option + // This test documents the expected signature + var _ Option = WithCRETransport(nil, "http://example.com") + }) +} + +func TestWithCRETransportAndSigner(t *testing.T) { + t.Run("option_exists", func(t *testing.T) { + // Verify the WithCRETransportAndSigner option function exists + assert.NotNil(t, WithCRETransportAndSigner) + }) + + t.Run("option_signature", func(t *testing.T) { + // Verify the function returns an Option + var _ Option = WithCRETransportAndSigner(nil, "http://example.com", nil) + }) +} + +// Example documentation for CRE usage + +func ExampleWithCRETransport() { + // This example shows how to use WithCRETransport in a CRE workflow + // Note: This code will only run in actual CRE environment + + /* + import ( + "context" + "time" + + "github.com/smartcontractkit/cre-sdk-go/capabilities/scheduler/cron" + "github.com/smartcontractkit/cre-sdk-go/cre" + "github.com/trufnetwork/sdk-go/core/tnclient" + "github.com/trufnetwork/sdk-go/core/types" + ) + + type Config struct { + TRUFEndpoint string `json:"trufEndpoint"` + Schedule string `json:"schedule"` + } + + type Result struct { + Records []types.StreamResult `json:"records"` + } + + func onCronTrigger(config *Config, runtime cre.Runtime, trigger *cron.Payload) (*Result, error) { + logger := runtime.Logger() + + return cre.RunInNodeMode(config, runtime, + func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) { + // Create TN client with CRE transport + client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, + tnclient.WithCRETransport(nodeRuntime, config.TRUFEndpoint), + ) + if err != nil { + logger.Error("Failed to create TRUF client", "error", err) + return nil, err + } + + // Load actions + actions, err := client.LoadActions() + if err != nil { + logger.Error("Failed to load actions", "error", err) + return nil, err + } + + // Get records + fromTime := int(time.Now().Add(-24 * time.Hour).Unix()) + toTime := int(time.Now().Unix()) + + result, err := actions.GetRecord(context.Background(), types.GetRecordInput{ + DataProvider: "0x1234...", + StreamId: "stai0000000000000000000000000000", + From: &fromTime, + To: &toTime, + }) + if err != nil { + logger.Error("GetRecord failed", "error", err) + return nil, err + } + + logger.Info("Fetched records", "count", len(result.Results)) + return &Result{Records: result.Results}, nil + }, + cre.ConsensusAggregationFromTags[*Result](), + ).Await() + } + */ +} + +func ExampleWithCRETransportAndSigner() { + // This example shows how to use WithCRETransportAndSigner for write operations + // Note: This code will only run in actual CRE environment + + /* + import ( + "context" + "time" + + "github.com/smartcontractkit/cre-sdk-go/cre" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + "github.com/trufnetwork/sdk-go/core/tnclient" + "github.com/trufnetwork/sdk-go/core/types" + ) + + type Config struct { + TRUFEndpoint string `json:"trufEndpoint"` + PrivateKey string `json:"privateKey"` // Should be from secrets in production + } + + func deployStream(config *Config, nodeRuntime cre.NodeRuntime) error { + // Create signer from private key + signer, err := auth.EthSecp256k1SignerFromKey([]byte(config.PrivateKey)) + if err != nil { + return err + } + + // Create client with CRE transport and signer + client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, + tnclient.WithCRETransportAndSigner(nodeRuntime, config.TRUFEndpoint, signer), + ) + if err != nil { + return err + } + + // Deploy a stream (write operation) + txHash, err := client.DeployStream(context.Background(), + "stai0000000000000000000000000000", + types.PRIMITIVE, + ) + if err != nil { + return err + } + + // Wait for transaction confirmation + _, err = client.WaitForTx(context.Background(), txHash, 5*time.Second) + return err + } + */ +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 2ff176a..029187a 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -172,7 +172,7 @@ if gwClient := client.GetKwilClient(); gwClient != nil { The SDK uses a pluggable transport layer that allows different communication implementations: -- **HTTPTransport** (default): Standard `net/http` communication with the TRUF Network +- **HTTPTransport** (default): Standard `net/http` communication with the TRUF.NETWORK - **Custom transports**: For specialized runtime environments (e.g., Chainlink CRE) - **Mock transports**: For testing without network dependencies diff --git a/docs/stream-permissions.md b/docs/stream-permissions.md index 1d9b31c..9a9185c 100644 --- a/docs/stream-permissions.md +++ b/docs/stream-permissions.md @@ -139,7 +139,7 @@ stream.AllowComposeStream(ctx, allowedStreamLocator) ## Network Writer Role for Stream Creation -To ensure the integrity and quality of data streams on the Truf Network, the creation of new streams (both primitive and composed) is a permissioned operation. This process is governed by a **`system:network_writer`** role within the network's role-based access control (RBAC) system. +To ensure the integrity and quality of data streams on the TRUF.NETWORK, the creation of new streams (both primitive and composed) is a permissioned operation. This process is governed by a **`system:network_writer`** role within the network's role-based access control (RBAC) system. Only wallets that are members of the `system:network_writer` role are authorized to deploy new streams using the SDK's `DeployStream` or `BatchDeployStreams` functions. diff --git a/examples/stream_cache_demo/go.mod b/examples/stream_cache_demo/go.mod index 590fe43..1e436e8 100644 --- a/examples/stream_cache_demo/go.mod +++ b/examples/stream_cache_demo/go.mod @@ -1,8 +1,6 @@ module stream_cache_demo -go 1.24.1 - -toolchain go1.24.4 +go 1.25.3 require ( github.com/trufnetwork/kwil-db/core v0.4.3-0.20250714184403-500f214cd4b5 diff --git a/go.mod b/go.mod index 8ff28cc..3b68c33 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/trufnetwork/sdk-go -go 1.24.1 +go 1.25.3 require ( github.com/cockroachdb/apd/v3 v3.2.1 @@ -9,6 +9,8 @@ require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 github.com/joho/godotenv v1.5.1 github.com/pkg/errors v0.9.1 + github.com/smartcontractkit/cre-sdk-go v1.1.2 + github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http v0.10.0 github.com/stretchr/testify v1.11.1 github.com/trufnetwork/kwil-db v0.10.3-0.20250714184403-500f214cd4b5 github.com/trufnetwork/kwil-db/core v0.4.3-0.20250714184403-500f214cd4b5 @@ -23,15 +25,19 @@ require ( github.com/gabriel-vasile/mimetype v1.4.11 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/jrick/logrotate v1.1.2 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/shopspring/decimal v1.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251021010742-3f8d3dba17d8 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.44.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 039f59e..a7ee1a9 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.28.0 h1:Q7ibns33JjyW48gHkuFT91qX48KG0ktULL6FgHdG688= github.com/go-playground/validator/v10 v10.28.0/go.mod h1:GoI6I1SjPBh9p7ykNE/yj3fFYbyDOpwMn5KXd+m2hUU= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -48,6 +52,14 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251021010742-3f8d3dba17d8 h1:hPeEwcvRVtwhyNXH45qbzqmscqlbygu94cROwbjyzNQ= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251021010742-3f8d3dba17d8/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= +github.com/smartcontractkit/cre-sdk-go v1.1.2 h1:YwfBLNqC8ei+6lJE8BCrL/kqZ/IvvfUimomw52+1xMM= +github.com/smartcontractkit/cre-sdk-go v1.1.2/go.mod h1:sgiRyHUiPcxp1e/EMnaJ+ddMFL4MbE3UMZ2MORAAS9U= +github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http v0.10.0 h1:nP6PVWrrTIICvjwQuFitsQecQWbqpPaYzaTEjx92eTQ= +github.com/smartcontractkit/cre-sdk-go/capabilities/networking/http v0.10.0/go.mod h1:M83m3FsM1uqVu06OO58mKUSZJjjH8OGJsmvFpFlRDxI= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/trufnetwork/kwil-db v0.10.3-0.20250714184403-500f214cd4b5 h1:EZjLztH5YOEoWEUdOs+pZAH0SclnEaMd7Igv0Ivp24A= @@ -66,6 +78,8 @@ golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/tests/integration/composed_actions_test.go b/tests/integration/composed_actions_test.go index 2222dc9..54f0d46 100644 --- a/tests/integration/composed_actions_test.go +++ b/tests/integration/composed_actions_test.go @@ -13,7 +13,7 @@ import ( "github.com/trufnetwork/sdk-go/core/util" ) -// This file contains integration tests for composed streams in the Truf Network (TN). +// This file contains integration tests for composed streams in the TRUF.NETWORK (TN). // It demonstrates the process of deploying, initializing, and querying a composed stream // that aggregates data from multiple primitive streams. From be948c2df991af069e1244df601041031f30aa2b Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 23 Dec 2025 18:11:11 +0700 Subject: [PATCH 2/5] chore: apply suggestion --- core/tnclient/transport_cre.go | 77 ++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go index bdedc4a..d81570a 100644 --- a/core/tnclient/transport_cre.go +++ b/core/tnclient/transport_cre.go @@ -48,14 +48,14 @@ import ( // ).Await() // } type CRETransport struct { - runtime cre.NodeRuntime - client *http.Client - endpoint string - signer auth.Signer - chainID string - chainIDOnce sync.Once - chainIDErr error - reqID atomic.Uint64 + runtime cre.NodeRuntime + client *http.Client + endpoint string + signer auth.Signer + chainID string + chainIDMu sync.RWMutex + chainIDInitialized bool + reqID atomic.Uint64 } // Verify CRETransport implements Transport interface at compile time @@ -225,13 +225,28 @@ func (t *CRETransport) Execute(ctx context.Context, namespace string, action str // Ensure chain ID is fetched before building transaction // This prevents transactions with empty chain IDs - t.chainIDOnce.Do(func() { - t.chainIDErr = t.fetchChainID(ctx) - }) - if t.chainIDErr != nil { - return types.Hash{}, fmt.Errorf("failed to get chain ID: %w", t.chainIDErr) + // Check if already initialized (read lock) + t.chainIDMu.RLock() + initialized := t.chainIDInitialized + chainID := t.chainID + t.chainIDMu.RUnlock() + + if !initialized { + // Need to fetch chain ID (write lock) + t.chainIDMu.Lock() + // Double-check after acquiring write lock + if !t.chainIDInitialized { + if err := t.fetchChainID(ctx); err != nil { + t.chainIDMu.Unlock() + return types.Hash{}, fmt.Errorf("failed to fetch chain ID: %w", err) + } + t.chainIDInitialized = true + } + chainID = t.chainID + t.chainIDMu.Unlock() } - if t.chainID == "" { + + if chainID == "" { return types.Hash{}, fmt.Errorf("chain ID is empty") } @@ -242,7 +257,7 @@ func (t *CRETransport) Execute(ctx context.Context, namespace string, action str PayloadType: payload.Type(), Fee: txOpts.Fee, Nonce: uint64(txOpts.Nonce), - ChainID: t.chainID, + ChainID: chainID, }, } @@ -365,14 +380,34 @@ func (t *CRETransport) fetchChainID(ctx context.Context) error { // The chain ID is fetched from the gateway on first call and cached. // This is used to ensure transactions are sent to the correct network. // Returns empty string if the chain ID fetch fails. +// Unlike sync.Once, this will retry on transient failures. func (t *CRETransport) ChainID() string { - // Use sync.Once to ensure thread-safe lazy initialization - t.chainIDOnce.Do(func() { - // Use a background context since this is a cached operation - t.chainIDErr = t.fetchChainID(context.Background()) - }) + // Fast path: check if already initialized (read lock) + t.chainIDMu.RLock() + if t.chainIDInitialized { + chainID := t.chainID + t.chainIDMu.RUnlock() + return chainID + } + t.chainIDMu.RUnlock() + + // Slow path: fetch chain ID (write lock) + t.chainIDMu.Lock() + defer t.chainIDMu.Unlock() + + // Double-check after acquiring write lock (another goroutine might have initialized it) + if t.chainIDInitialized { + return t.chainID + } + + // Fetch chain ID using background context + if err := t.fetchChainID(context.Background()); err != nil { + // Don't set initialized flag - allow retry on next call + return "" + } - // Return cached chain ID (will be empty if fetch failed) + // Mark as successfully initialized only on success + t.chainIDInitialized = true return t.chainID } From 8fe8cf9d322641ed1674fe9f61d1d5133be076e8 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 23 Dec 2025 18:47:30 +0700 Subject: [PATCH 3/5] chore: apply suggestion --- core/tnclient/transport_cre.go | 131 ++++++++++++----- core/tnclient/transport_cre_test.go | 211 ++++++++++++---------------- 2 files changed, 186 insertions(+), 156 deletions(-) diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go index d81570a..9b4a11a 100644 --- a/core/tnclient/transport_cre.go +++ b/core/tnclient/transport_cre.go @@ -302,17 +302,12 @@ func (t *CRETransport) WaitTx(ctx context.Context, txHash types.Hash, interval t var result types.TxQueryResponse if err := t.callJSONRPC(ctx, "user.tx_query", params, &result); err != nil { - // Distinguish between transient errors (not indexed yet) and permanent errors - // JSON-RPC errors with specific codes or messages about "not found" are transient - errMsg := err.Error() - // Common transient error indicators: not found, not indexed, pending - isTransient := containsAny(errMsg, []string{"not found", "not indexed", "pending", "unknown transaction"}) - - if !isTransient { + // Distinguish between transient errors (retry-able) and permanent errors + if !isTransientTxError(err) { // Permanent error - authentication failure, network issues, malformed request return nil, fmt.Errorf("transaction query failed: %w", err) } - // Transient error - continue polling + // Transient error (tx not indexed yet) - continue polling continue } @@ -324,39 +319,97 @@ func (t *CRETransport) WaitTx(ctx context.Context, txHash types.Hash, interval t } } -// containsAny checks if a string contains any of the specified substrings (case-insensitive) -func containsAny(s string, substrings []string) bool { - lowerS := s - for _, substr := range substrings { - if len(substr) == 0 { - continue +// isTransientTxError determines if an error from tx_query is transient (retry-able). +// +// Strategy: +// 1. First, try to parse as JSON-RPC error and check error code +// 2. Fall back to substring matching if not a structured JSON-RPC error +// +// Known transient error codes: +// - -202 (ErrorTxNotFound): Transaction not yet indexed +// - -32001 (ErrorTimeout): Temporary timeout +// +// Fragility warning: The substring fallback is brittle and may misclassify errors. +// Consider adding structured error codes to the gateway API for better reliability. +func isTransientTxError(err error) bool { + if err == nil { + return false + } + + // Try to extract JSON-RPC error code from the error message + // The error from callJSONRPC is formatted as "JSON-RPC error: (code: )" + errMsg := err.Error() + + // Parse error code from message format + var code int32 + if n, _ := fmt.Sscanf(errMsg, "JSON-RPC error: %*s (code: %d)", &code); n == 1 { + // Check known transient error codes + switch jsonrpc.ErrorCode(code) { + case jsonrpc.ErrorTxNotFound: // -202: Transaction not indexed yet + return true + case jsonrpc.ErrorTimeout: // -32001: Temporary timeout + return true } - // Simple case-insensitive substring check - for i := 0; i <= len(lowerS)-len(substr); i++ { - match := true - for j := 0; j < len(substr); j++ { - c1 := lowerS[i+j] - c2 := substr[j] - // Convert to lowercase for comparison - if c1 >= 'A' && c1 <= 'Z' { - c1 += 'a' - 'A' - } - if c2 >= 'A' && c2 <= 'Z' { - c2 += 'a' - 'A' - } - if c1 != c2 { - match = false - break - } - } - if match { - return true - } + // Other structured errors are likely permanent + return false + } + + // Fallback: Check for transient error patterns in message + // This is fragile and may need updates as error messages change + transientPatterns := []string{ + "not found", + "not indexed", + "pending", + "unknown transaction", + "timeout", + } + + lowerMsg := toLower(errMsg) + for _, pattern := range transientPatterns { + if contains(lowerMsg, pattern) { + return true } } + return false } +// toLower converts a string to lowercase (simple ASCII implementation) +func toLower(s string) string { + b := []byte(s) + for i := 0; i < len(b); i++ { + if b[i] >= 'A' && b[i] <= 'Z' { + b[i] += 'a' - 'A' + } + } + return string(b) +} + +// contains checks if a string contains a substring (case-sensitive) +func contains(s, substr string) bool { + return len(s) >= len(substr) && indexOfSubstring(s, substr) >= 0 +} + +// indexOfSubstring returns the index of substr in s, or -1 if not found +func indexOfSubstring(s, substr string) int { + if len(substr) == 0 { + return 0 + } + for i := 0; i <= len(s)-len(substr); i++ { + match := true + for j := 0; j < len(substr); j++ { + if s[i+j] != substr[j] { + match = false + break + } + } + if match { + return i + } + } + return -1 +} + // fetchChainID fetches and caches the chain ID from the gateway. // This is called once via sync.Once to ensure thread-safe lazy initialization. // Returns error if the fetch fails, which can be checked before critical operations. @@ -400,8 +453,12 @@ func (t *CRETransport) ChainID() string { return t.chainID } - // Fetch chain ID using background context - if err := t.fetchChainID(context.Background()); err != nil { + // Fetch chain ID with timeout to prevent indefinite hanging + // Use a reasonable timeout since this is a lightweight metadata query + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := t.fetchChainID(ctx); err != nil { // Don't set initialized flag - allow retry on next call return "" } diff --git a/core/tnclient/transport_cre_test.go b/core/tnclient/transport_cre_test.go index b40557f..d9c1499 100644 --- a/core/tnclient/transport_cre_test.go +++ b/core/tnclient/transport_cre_test.go @@ -3,6 +3,7 @@ package tnclient import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -62,124 +63,96 @@ func TestWithCRETransportAndSigner(t *testing.T) { }) } -// Example documentation for CRE usage - -func ExampleWithCRETransport() { - // This example shows how to use WithCRETransport in a CRE workflow - // Note: This code will only run in actual CRE environment - - /* - import ( - "context" - "time" - - "github.com/smartcontractkit/cre-sdk-go/capabilities/scheduler/cron" - "github.com/smartcontractkit/cre-sdk-go/cre" - "github.com/trufnetwork/sdk-go/core/tnclient" - "github.com/trufnetwork/sdk-go/core/types" - ) - - type Config struct { - TRUFEndpoint string `json:"trufEndpoint"` - Schedule string `json:"schedule"` - } - - type Result struct { - Records []types.StreamResult `json:"records"` - } - - func onCronTrigger(config *Config, runtime cre.Runtime, trigger *cron.Payload) (*Result, error) { - logger := runtime.Logger() - - return cre.RunInNodeMode(config, runtime, - func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) { - // Create TN client with CRE transport - client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, - tnclient.WithCRETransport(nodeRuntime, config.TRUFEndpoint), - ) - if err != nil { - logger.Error("Failed to create TRUF client", "error", err) - return nil, err - } - - // Load actions - actions, err := client.LoadActions() - if err != nil { - logger.Error("Failed to load actions", "error", err) - return nil, err - } - - // Get records - fromTime := int(time.Now().Add(-24 * time.Hour).Unix()) - toTime := int(time.Now().Unix()) - - result, err := actions.GetRecord(context.Background(), types.GetRecordInput{ - DataProvider: "0x1234...", - StreamId: "stai0000000000000000000000000000", - From: &fromTime, - To: &toTime, - }) - if err != nil { - logger.Error("GetRecord failed", "error", err) - return nil, err - } - - logger.Info("Fetched records", "count", len(result.Results)) - return &Result{Records: result.Results}, nil - }, - cre.ConsensusAggregationFromTags[*Result](), - ).Await() - } - */ -} - -func ExampleWithCRETransportAndSigner() { - // This example shows how to use WithCRETransportAndSigner for write operations - // Note: This code will only run in actual CRE environment - - /* - import ( - "context" - "time" - - "github.com/smartcontractkit/cre-sdk-go/cre" - "github.com/trufnetwork/kwil-db/core/crypto/auth" - "github.com/trufnetwork/sdk-go/core/tnclient" - "github.com/trufnetwork/sdk-go/core/types" - ) - - type Config struct { - TRUFEndpoint string `json:"trufEndpoint"` - PrivateKey string `json:"privateKey"` // Should be from secrets in production - } - - func deployStream(config *Config, nodeRuntime cre.NodeRuntime) error { - // Create signer from private key - signer, err := auth.EthSecp256k1SignerFromKey([]byte(config.PrivateKey)) - if err != nil { - return err - } - - // Create client with CRE transport and signer - client, err := tnclient.NewClient(context.Background(), config.TRUFEndpoint, - tnclient.WithCRETransportAndSigner(nodeRuntime, config.TRUFEndpoint, signer), - ) - if err != nil { - return err +// Unit tests for error classification + +func TestIsTransientTxError(t *testing.T) { + tests := []struct { + name string + err error + want bool + reasoning string + }{ + { + name: "nil error", + err: nil, + want: false, + reasoning: "nil errors are not transient", + }, + { + name: "ErrorTxNotFound code", + err: fmt.Errorf("JSON-RPC error: transaction not found (code: -202)"), + want: true, + reasoning: "ErrorTxNotFound (-202) is transient - tx not indexed yet", + }, + { + name: "ErrorTimeout code", + err: fmt.Errorf("JSON-RPC error: request timeout (code: -32001)"), + want: true, + reasoning: "ErrorTimeout (-32001) is transient", + }, + { + name: "ErrorInvalidParams code", + err: fmt.Errorf("JSON-RPC error: invalid parameters (code: -32602)"), + want: false, + reasoning: "ErrorInvalidParams is permanent - malformed request", + }, + { + name: "ErrorInternal code", + err: fmt.Errorf("JSON-RPC error: internal error (code: -32603)"), + want: false, + reasoning: "ErrorInternal is permanent - server issue", + }, + { + name: "Fallback: not found message", + err: fmt.Errorf("transaction not found in mempool"), + want: true, + reasoning: "Contains 'not found' pattern", + }, + { + name: "Fallback: not indexed message", + err: fmt.Errorf("transaction not indexed yet"), + want: true, + reasoning: "Contains 'not indexed' pattern", + }, + { + name: "Fallback: pending message", + err: fmt.Errorf("transaction is pending"), + want: true, + reasoning: "Contains 'pending' pattern", + }, + { + name: "Fallback: timeout message", + err: fmt.Errorf("connection timeout"), + want: true, + reasoning: "Contains 'timeout' pattern", + }, + { + name: "Permanent: authentication error", + err: fmt.Errorf("authentication failed"), + want: false, + reasoning: "Does not match transient patterns", + }, + { + name: "Permanent: network error", + err: fmt.Errorf("network unreachable"), + want: false, + reasoning: "Network errors should be handled by caller retries", + }, + { + name: "Case insensitive: NOT FOUND", + err: fmt.Errorf("Transaction NOT FOUND"), + want: true, + reasoning: "Should match case-insensitively", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isTransientTxError(tt.err) + if got != tt.want { + t.Errorf("isTransientTxError() = %v, want %v\nReasoning: %s\nError: %v", + got, tt.want, tt.reasoning, tt.err) } - - // Deploy a stream (write operation) - txHash, err := client.DeployStream(context.Background(), - "stai0000000000000000000000000000", - types.PRIMITIVE, - ) - if err != nil { - return err - } - - // Wait for transaction confirmation - _, err = client.WaitForTx(context.Background(), txHash, 5*time.Second) - return err - } - */ + }) + } } From 573ac4c6533c076f9d803819dc824230abb72849 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 23 Dec 2025 19:05:11 +0700 Subject: [PATCH 4/5] chore: use standard library --- core/tnclient/transport_cre.go | 72 +++++++++-------------------- core/tnclient/transport_cre_test.go | 6 +++ 2 files changed, 29 insertions(+), 49 deletions(-) diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go index 9b4a11a..1e1e330 100644 --- a/core/tnclient/transport_cre.go +++ b/core/tnclient/transport_cre.go @@ -6,7 +6,9 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -340,18 +342,26 @@ func isTransientTxError(err error) bool { // The error from callJSONRPC is formatted as "JSON-RPC error: (code: )" errMsg := err.Error() - // Parse error code from message format - var code int32 - if n, _ := fmt.Sscanf(errMsg, "JSON-RPC error: %*s (code: %d)", &code); n == 1 { - // Check known transient error codes - switch jsonrpc.ErrorCode(code) { - case jsonrpc.ErrorTxNotFound: // -202: Transaction not indexed yet - return true - case jsonrpc.ErrorTimeout: // -32001: Temporary timeout - return true + // Use regex to extract error code from "(code: )" pattern + // This handles multi-word error messages unlike fmt.Sscanf with %*s + codePattern := regexp.MustCompile(`\(code:\s*(-?\d+)\)`) + matches := codePattern.FindStringSubmatch(errMsg) + + if len(matches) >= 2 { + // Parse the captured code number + if codeInt, err := strconv.ParseInt(matches[1], 10, 32); err == nil { + code := jsonrpc.ErrorCode(int32(codeInt)) + + // Check known transient error codes + switch code { + case jsonrpc.ErrorTxNotFound: // -202: Transaction not indexed yet + return true + case jsonrpc.ErrorTimeout: // -32001: Temporary timeout + return true + } + // Other structured errors are likely permanent + return false } - // Other structured errors are likely permanent - return false } // Fallback: Check for transient error patterns in message @@ -364,9 +374,9 @@ func isTransientTxError(err error) bool { "timeout", } - lowerMsg := toLower(errMsg) + lowerMsg := strings.ToLower(errMsg) for _, pattern := range transientPatterns { - if contains(lowerMsg, pattern) { + if strings.Contains(lowerMsg, pattern) { return true } } @@ -374,42 +384,6 @@ func isTransientTxError(err error) bool { return false } -// toLower converts a string to lowercase (simple ASCII implementation) -func toLower(s string) string { - b := []byte(s) - for i := 0; i < len(b); i++ { - if b[i] >= 'A' && b[i] <= 'Z' { - b[i] += 'a' - 'A' - } - } - return string(b) -} - -// contains checks if a string contains a substring (case-sensitive) -func contains(s, substr string) bool { - return len(s) >= len(substr) && indexOfSubstring(s, substr) >= 0 -} - -// indexOfSubstring returns the index of substr in s, or -1 if not found -func indexOfSubstring(s, substr string) int { - if len(substr) == 0 { - return 0 - } - for i := 0; i <= len(s)-len(substr); i++ { - match := true - for j := 0; j < len(substr); j++ { - if s[i+j] != substr[j] { - match = false - break - } - } - if match { - return i - } - } - return -1 -} - // fetchChainID fetches and caches the chain ID from the gateway. // This is called once via sync.Once to ensure thread-safe lazy initialization. // Returns error if the fetch fails, which can be checked before critical operations. diff --git a/core/tnclient/transport_cre_test.go b/core/tnclient/transport_cre_test.go index d9c1499..b1d9ff6 100644 --- a/core/tnclient/transport_cre_test.go +++ b/core/tnclient/transport_cre_test.go @@ -84,6 +84,12 @@ func TestIsTransientTxError(t *testing.T) { want: true, reasoning: "ErrorTxNotFound (-202) is transient - tx not indexed yet", }, + { + name: "Multi-word message with code", + err: fmt.Errorf("JSON-RPC error: transaction not found in mempool or ledger (code: -202)"), + want: true, + reasoning: "Regex should handle multi-word messages (fixed from %*s limitation)", + }, { name: "ErrorTimeout code", err: fmt.Errorf("JSON-RPC error: request timeout (code: -32001)"), From 9add162a40ce516c4e44a035e3ef778ada8d689c Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 23 Dec 2025 19:30:13 +0700 Subject: [PATCH 5/5] chore: apply suggestion --- core/tnclient/transport_cre.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/tnclient/transport_cre.go b/core/tnclient/transport_cre.go index 1e1e330..b6e95d1 100644 --- a/core/tnclient/transport_cre.go +++ b/core/tnclient/transport_cre.go @@ -242,16 +242,13 @@ func (t *CRETransport) Execute(ctx context.Context, namespace string, action str t.chainIDMu.Unlock() return types.Hash{}, fmt.Errorf("failed to fetch chain ID: %w", err) } + // Only mark as initialized if fetchChainID succeeded (returned non-empty chainID) t.chainIDInitialized = true } chainID = t.chainID t.chainIDMu.Unlock() } - if chainID == "" { - return types.Hash{}, fmt.Errorf("chain ID is empty") - } - // Build unsigned transaction tx := &types.Transaction{ Body: &types.TransactionBody{ @@ -397,6 +394,11 @@ func (t *CRETransport) fetchChainID(ctx context.Context) error { return fmt.Errorf("failed to fetch chain ID: %w", err) } + // Validate that chain ID is not empty + if result.ChainID == "" { + return fmt.Errorf("gateway returned empty chain ID") + } + // Cache the chain ID t.chainID = result.ChainID return nil