Skip to content

Commit f0a7d9f

Browse files
committed
feat: destroy stream in CRE flow
1 parent 853c26d commit f0a7d9f

File tree

2 files changed

+199
-12
lines changed

2 files changed

+199
-12
lines changed

core/tnclient/client.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,21 @@ func (c *Client) DeployStream(ctx context.Context, streamId util.StreamId, strea
143143
}
144144

145145
func (c *Client) DestroyStream(ctx context.Context, streamId util.StreamId) (types.Hash, error) {
146-
return tn_api.DestroyStream(ctx, tn_api.DestroyStreamInput{
147-
StreamId: streamId,
148-
KwilClient: c.GetKwilClient(),
149-
})
146+
// For HTTP transport, use the existing implementation (backwards compatible)
147+
// For custom transports (CRE, etc.), use transport.Execute directly
148+
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
149+
return tn_api.DestroyStream(ctx, tn_api.DestroyStreamInput{
150+
StreamId: streamId,
151+
KwilClient: httpTransport.gatewayClient,
152+
})
153+
}
154+
// Use transport.Execute directly for custom transports
155+
// Derive address from signer for delete_stream call
156+
addr, _ := auth.EthSecp256k1Authenticator{}.Identifier(c.signer.CompactID())
157+
return c.transport.Execute(ctx, "", "delete_stream", [][]any{{
158+
addr,
159+
streamId.String(),
160+
}})
150161
}
151162

152163
func (c *Client) LoadActions() (clientType.IAction, error) {

core/tnclient/transport_cre.go

Lines changed: 184 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"encoding/json"
99
"fmt"
10+
"math/big"
1011
"net/url"
1112
"regexp"
1213
"strconv"
@@ -31,6 +32,14 @@ import (
3132
// workflows where standard net/http is not available. It uses CRE's consensus-aware
3233
// HTTP client with Promise-based async operations.
3334
//
35+
// KNOWN LIMITATION - CRE WASM Simulation Mode:
36+
// The CRE simulator has a bug where transaction signatures get corrupted due to
37+
// Go WASM pointer handling limitations (golang/go#59156, golang/go#66984).
38+
// Symptoms: "invalid signature: expected address X, received address Y"
39+
// This affects local testing with `cre workflow simulate` but does NOT affect
40+
// production deployments to Chainlink DON nodes. For local testing, use the
41+
// HTTP transport version (see examples/truf-cre-demo/test-http-version).
42+
//
3443
// Example usage in CRE workflow:
3544
//
3645
// func onCronTrigger(config *Config, runtime cre.Runtime, trigger *cron.Payload) (*Result, error) {
@@ -63,6 +72,9 @@ type CRETransport struct {
6372
reqID atomic.Uint64
6473
authCookie string // Cookie value for gateway authentication
6574
authCookieMu sync.RWMutex
75+
currentNonce int64 // Track nonce for sequential transactions
76+
nonceMu sync.Mutex
77+
nonceFetched bool
6678
}
6779

6880
// Verify CRETransport implements Transport interface at compile time
@@ -192,6 +204,19 @@ func (t *CRETransport) doJSONRPC(ctx context.Context, method string, params any,
192204

193205
// Check for JSON-RPC errors
194206
if rpcResp.Error != nil {
207+
// For broadcast errors (-201), decode the BroadcastError details
208+
if rpcResp.Error.Code == -201 && len(rpcResp.Error.Data) > 0 {
209+
var broadcastErr struct {
210+
Code uint32 `json:"code"`
211+
Hash string `json:"hash"`
212+
Message string `json:"message"`
213+
}
214+
if err := json.Unmarshal(rpcResp.Error.Data, &broadcastErr); err == nil {
215+
return fmt.Errorf("JSON-RPC error: %s (code: %d) [Broadcast: code=%d, hash=%s, msg=%s]",
216+
rpcResp.Error.Message, rpcResp.Error.Code,
217+
broadcastErr.Code, broadcastErr.Hash, broadcastErr.Message)
218+
}
219+
}
195220
return fmt.Errorf("JSON-RPC error: %s (code: %d)", rpcResp.Error.Message, rpcResp.Error.Code)
196221
}
197222

@@ -259,12 +284,40 @@ func (t *CRETransport) Call(ctx context.Context, namespace string, action string
259284
//
260285
// This method builds a signed transaction and broadcasts it to the TRUF.NETWORK.
261286
// The transaction is signed using the configured signer and executed within CRE's
262-
// consensus mechanism.
287+
// consensus mechanism. Automatically retries on nonce errors.
263288
func (t *CRETransport) Execute(ctx context.Context, namespace string, action string, inputs [][]any, opts ...clientType.TxOpt) (types.Hash, error) {
264289
if t.signer == nil {
265290
return types.Hash{}, fmt.Errorf("signer required for Execute operations")
266291
}
267292

293+
// Retry loop for nonce errors
294+
const maxRetries = 3
295+
for attempt := 0; attempt < maxRetries; attempt++ {
296+
fmt.Printf("[DEBUG] Execute attempt %d/%d for action=%s\n", attempt+1, maxRetries, action)
297+
txHash, err := t.executeOnce(ctx, namespace, action, inputs, opts...)
298+
if err != nil {
299+
fmt.Printf("[DEBUG] Execute error on attempt %d: %v\n", attempt+1, err)
300+
// Check if it's a nonce error
301+
if strings.Contains(err.Error(), "invalid nonce") && attempt < maxRetries-1 {
302+
fmt.Printf("[DEBUG] Nonce error detected, resetting nonceFetched and retrying\n")
303+
// Reset nonce tracking to refetch on next attempt
304+
t.nonceMu.Lock()
305+
t.nonceFetched = false
306+
t.nonceMu.Unlock()
307+
continue // Retry
308+
}
309+
return types.Hash{}, err
310+
}
311+
return txHash, nil
312+
}
313+
314+
return types.Hash{}, fmt.Errorf("max retries exceeded")
315+
}
316+
317+
// executeOnce performs a single execute attempt (internal helper)
318+
func (t *CRETransport) executeOnce(ctx context.Context, namespace string, action string, inputs [][]any, opts ...clientType.TxOpt) (types.Hash, error) {
319+
fmt.Printf("[DEBUG] executeOnce called: action=%s\n", action)
320+
268321
// Convert inputs to EncodedValue arrays
269322
var encodedInputs [][]*types.EncodedValue
270323
for _, inputRow := range inputs {
@@ -298,6 +351,54 @@ func (t *CRETransport) Execute(ctx context.Context, namespace string, action str
298351
opt(txOpts)
299352
}
300353

354+
// Auto-manage nonce if not explicitly provided
355+
if txOpts.Nonce == 0 {
356+
t.nonceMu.Lock()
357+
358+
// Fetch nonce from gateway on first transaction only
359+
if !t.nonceFetched {
360+
// Create AccountID from signer
361+
acctID := &types.AccountID{
362+
Identifier: t.signer.CompactID(),
363+
KeyType: t.signer.PubKey().Type(),
364+
}
365+
366+
// Fetch account info via user.account RPC call
367+
params := map[string]any{
368+
"id": acctID,
369+
}
370+
371+
var accountResp struct {
372+
ID *types.AccountID `json:"id"`
373+
Balance string `json:"balance"`
374+
Nonce int64 `json:"nonce"`
375+
}
376+
377+
err := t.callJSONRPC(ctx, "user.account", params, &accountResp)
378+
if err != nil {
379+
// If account doesn't exist yet, start with nonce 0
380+
if !strings.Contains(err.Error(), "not found") && !strings.Contains(err.Error(), "does not exist") {
381+
t.nonceMu.Unlock()
382+
return types.Hash{}, fmt.Errorf("failed to fetch account nonce: %w", err)
383+
}
384+
t.currentNonce = 0
385+
fmt.Printf("[DEBUG] Account not found, starting with nonce=0\n")
386+
} else {
387+
// Account nonce is the LAST used nonce, so NEXT nonce is nonce+1
388+
t.currentNonce = accountResp.Nonce + 1
389+
fmt.Printf("[DEBUG] Fetched account nonce=%d, using next nonce=%d\n", accountResp.Nonce, t.currentNonce)
390+
}
391+
t.nonceFetched = true
392+
}
393+
394+
// Use current nonce and increment
395+
txOpts.Nonce = t.currentNonce
396+
fmt.Printf("[DEBUG] Using nonce=%d for transaction\n", t.currentNonce)
397+
t.currentNonce++
398+
399+
t.nonceMu.Unlock()
400+
}
401+
301402
// Ensure chain ID is fetched before building transaction
302403
// This prevents transactions with empty chain IDs
303404
// Check if already initialized (read lock)
@@ -322,33 +423,108 @@ func (t *CRETransport) Execute(ctx context.Context, namespace string, action str
322423
t.chainIDMu.Unlock()
323424
}
324425

426+
// Ensure Fee is not nil to prevent signature verification mismatch
427+
// When Fee is nil, SerializeMsg produces "Fee: <nil>" but after JSON
428+
// marshaling/unmarshaling it becomes "Fee: 0", causing signature mismatch
429+
fee := txOpts.Fee
430+
if fee == nil {
431+
fee = big.NewInt(0)
432+
}
433+
325434
// Build unsigned transaction
326435
tx := &types.Transaction{
327436
Body: &types.TransactionBody{
328437
Payload: payloadBytes,
329438
PayloadType: payload.Type(),
330-
Fee: txOpts.Fee,
439+
Fee: fee,
331440
Nonce: uint64(txOpts.Nonce),
332441
ChainID: chainID,
333442
},
443+
Serialization: types.DefaultSignedMsgSerType, // Required for EthPersonalSigner
334444
}
335445

336446
// Sign transaction
337447
if err := tx.Sign(t.signer); err != nil {
338448
return types.Hash{}, fmt.Errorf("failed to sign transaction: %w", err)
339449
}
340450

341-
// Broadcast transaction
342-
params := map[string]any{
343-
"tx": tx,
451+
// CRITICAL WORKAROUND: Pre-serialize transaction to avoid WASM pointer corruption
452+
// Go WASM uses 64-bit pointers but WASM runtime uses 32-bit pointers.
453+
// Transaction struct contains pointer fields (Signature, Body) which get
454+
// corrupted when crossing the WASM boundary (golang/go#59156, golang/go#66984).
455+
// Solution: Manually construct JSON-RPC request to avoid struct traversal in WASM.
456+
txJSON, err := json.Marshal(tx)
457+
if err != nil {
458+
return types.Hash{}, fmt.Errorf("failed to marshal transaction: %w", err)
344459
}
345460

461+
// Manually construct JSON-RPC request to bypass params map
462+
reqID := t.nextReqID()
463+
rpcReqJSON := fmt.Sprintf(
464+
`{"jsonrpc":"2.0","id":"%s","method":"user.broadcast","params":{"tx":%s}}`,
465+
reqID, string(txJSON))
466+
467+
// Create headers
468+
headers := map[string]string{
469+
"Content-Type": "application/json",
470+
}
471+
472+
// Add auth cookie if we have one
473+
t.authCookieMu.RLock()
474+
if t.authCookie != "" {
475+
headers["Cookie"] = t.authCookie
476+
}
477+
t.authCookieMu.RUnlock()
478+
479+
// Create CRE HTTP request
480+
httpReq := &http.Request{
481+
Url: t.endpoint,
482+
Method: "POST",
483+
Body: []byte(rpcReqJSON),
484+
Headers: headers,
485+
}
486+
487+
// Execute via CRE client
488+
httpResp, err := t.client.SendRequest(t.runtime, httpReq).Await()
489+
if err != nil {
490+
return types.Hash{}, fmt.Errorf("CRE HTTP request failed: %w", err)
491+
}
492+
493+
// Check HTTP status
494+
if httpResp.StatusCode != 200 {
495+
return types.Hash{}, fmt.Errorf("unexpected HTTP status code: %d", httpResp.StatusCode)
496+
}
497+
498+
// Parse JSON-RPC response
499+
var rpcResp jsonrpc.Response
500+
if err := json.Unmarshal(httpResp.Body, &rpcResp); err != nil {
501+
return types.Hash{}, fmt.Errorf("failed to unmarshal JSON-RPC response: %w", err)
502+
}
503+
504+
// Check for JSON-RPC errors
505+
if rpcResp.Error != nil {
506+
// For broadcast errors (-201), decode the BroadcastError details
507+
if rpcResp.Error.Code == -201 && len(rpcResp.Error.Data) > 0 {
508+
var broadcastErr struct {
509+
Code uint32 `json:"code"`
510+
Hash string `json:"hash"`
511+
Message string `json:"message"`
512+
}
513+
if err := json.Unmarshal(rpcResp.Error.Data, &broadcastErr); err == nil {
514+
return types.Hash{}, fmt.Errorf("JSON-RPC error: %s (code: %d) [Broadcast: code=%d, hash=%s, msg=%s]",
515+
rpcResp.Error.Message, rpcResp.Error.Code,
516+
broadcastErr.Code, broadcastErr.Hash, broadcastErr.Message)
517+
}
518+
}
519+
return types.Hash{}, fmt.Errorf("JSON-RPC error: %s (code: %d)", rpcResp.Error.Message, rpcResp.Error.Code)
520+
}
521+
522+
// Unmarshal result
346523
var result struct {
347524
TxHash types.Hash `json:"tx_hash"`
348525
}
349-
350-
if err := t.callJSONRPC(ctx, "user.broadcast", params, &result); err != nil {
351-
return types.Hash{}, err
526+
if err := json.Unmarshal(rpcResp.Result, &result); err != nil {
527+
return types.Hash{}, fmt.Errorf("failed to unmarshal result: %w", err)
352528
}
353529

354530
return result.TxHash, nil

0 commit comments

Comments
 (0)