Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions internal/daemon/controller/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/altuslabsxyz/devnet-builder/internal/daemon/store"
Expand All @@ -19,6 +20,7 @@ type TxController struct {
logger *slog.Logger

// Cache for in-flight transactions (unsigned tx bytes)
cacheMu sync.RWMutex
unsignedTxCache map[string]*network.UnsignedTx
}

Expand All @@ -39,6 +41,13 @@ func (c *TxController) SetLogger(logger *slog.Logger) {

// Reconcile processes a single transaction by name.
func (c *TxController) Reconcile(ctx context.Context, key string) error {
// Check context deadline before starting reconciliation
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled before reconciliation: %w", ctx.Err())
default:
}

c.logger.Debug("reconciling transaction", "key", key)

tx, err := c.store.GetTransaction(ctx, key)
Expand Down Expand Up @@ -80,6 +89,13 @@ func (c *TxController) reconcilePending(ctx context.Context, tx *types.Transacti
}

func (c *TxController) reconcileBuilding(ctx context.Context, tx *types.Transaction) error {
// Check context deadline
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during building: %w", ctx.Err())
default:
}

c.logger.Debug("building transaction", "name", tx.Metadata.Name)

builder, err := c.runtime.GetTxBuilder(ctx, tx.Spec.DevnetRef)
Expand All @@ -98,13 +114,16 @@ func (c *TxController) reconcileBuilding(ctx context.Context, tx *types.Transact
Sender: tx.Spec.Signer,
Payload: tx.Spec.Payload,
GasLimit: gasLimit,
Memo: tx.Spec.Memo,
})
if err != nil {
return c.setFailed(ctx, tx, fmt.Sprintf("failed to build tx: %v", err))
}

// Cache unsigned tx for signing phase
// Cache unsigned tx for signing phase (thread-safe)
c.cacheMu.Lock()
c.unsignedTxCache[tx.Metadata.Name] = unsignedTx
c.cacheMu.Unlock()

tx.Status.Phase = types.TxPhaseSigning
tx.Status.Message = "Signing transaction"
Expand All @@ -114,6 +133,13 @@ func (c *TxController) reconcileBuilding(ctx context.Context, tx *types.Transact
}

func (c *TxController) reconcileSigning(ctx context.Context, tx *types.Transaction) error {
// Check context deadline
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during signing: %w", ctx.Err())
default:
}

c.logger.Debug("signing transaction", "name", tx.Metadata.Name)

// Get TxBuilder for signing
Expand All @@ -122,10 +148,13 @@ func (c *TxController) reconcileSigning(ctx context.Context, tx *types.Transacti
return c.setFailed(ctx, tx, fmt.Sprintf("failed to get TxBuilder: %v", err))
}

// Get cached unsigned tx
// Get cached unsigned tx (thread-safe read)
c.cacheMu.RLock()
unsignedTx, ok := c.unsignedTxCache[tx.Metadata.Name]
c.cacheMu.RUnlock()

if !ok {
// If not in cache, rebuild it with same gas limit as building phase
// If not in cache, rebuild it with same parameters as building phase
gasLimit := tx.Spec.GasLimit
if gasLimit == 0 {
gasLimit = 200000
Expand All @@ -135,6 +164,7 @@ func (c *TxController) reconcileSigning(ctx context.Context, tx *types.Transacti
Sender: tx.Spec.Signer,
Payload: tx.Spec.Payload,
GasLimit: gasLimit,
Memo: tx.Spec.Memo,
})
if err != nil {
return c.setFailed(ctx, tx, fmt.Sprintf("failed to rebuild tx: %v", err))
Expand All @@ -159,8 +189,10 @@ func (c *TxController) reconcileSigning(ctx context.Context, tx *types.Transacti
return c.setFailed(ctx, tx, fmt.Sprintf("failed to broadcast tx: %v", err))
}

// Clean up cache
// Clean up cache (thread-safe delete)
c.cacheMu.Lock()
delete(c.unsignedTxCache, tx.Metadata.Name)
c.cacheMu.Unlock()

tx.Status.Phase = types.TxPhaseSubmitted
tx.Status.TxHash = result.TxHash
Expand All @@ -171,6 +203,13 @@ func (c *TxController) reconcileSigning(ctx context.Context, tx *types.Transacti
}

func (c *TxController) reconcileSubmitted(ctx context.Context, tx *types.Transaction) error {
// Check context deadline
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during confirmation: %w", ctx.Err())
default:
}

c.logger.Debug("waiting for confirmation", "name", tx.Metadata.Name, "txHash", tx.Status.TxHash)

receipt, err := c.runtime.WaitForConfirmation(ctx, tx.Spec.DevnetRef, tx.Status.TxHash)
Expand Down Expand Up @@ -204,8 +243,10 @@ func (c *TxController) setFailed(ctx context.Context, tx *types.Transaction, err
tx.Status.Message = "Transaction failed"
tx.Metadata.UpdatedAt = time.Now()

// Clean up cache
// Clean up cache (thread-safe delete)
c.cacheMu.Lock()
delete(c.unsignedTxCache, tx.Metadata.Name)
c.cacheMu.Unlock()

return c.store.UpdateTransaction(ctx, tx)
}
230 changes: 230 additions & 0 deletions internal/daemon/controller/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,233 @@ func TestTxController_Reconcile_BuildError(t *testing.T) {
t.Error("Error message is empty")
}
}

// TestTxController_ConcurrentReconciliation tests that concurrent reconciliation
// of the same transaction is safe (tests mutex protection).
func TestTxController_ConcurrentReconciliation(t *testing.T) {
ms := store.NewMemoryStore()
runtime := &mockTxRuntime{
builder: plugin.NewMockTxBuilder(),
signingKey: &network.SigningKey{Address: "cosmos1abc", PrivKey: []byte("test")},
receipt: &TxReceipt{TxHash: "abc123", Height: 100, Success: true},
}
tc := NewTxController(ms, runtime)

// Create transaction in Building phase (triggers cache write)
tx := &types.Transaction{
Metadata: types.ResourceMeta{
Name: "tx-concurrent-1",
CreatedAt: time.Now(),
},
Spec: types.TransactionSpec{
DevnetRef: "mydevnet",
TxType: "gov/vote",
Signer: "validator:0",
Payload: json.RawMessage(`{"proposal_id":1,"vote_option":"yes"}`),
},
Status: types.TransactionStatus{
Phase: types.TxPhaseBuilding,
},
}
if err := ms.CreateTransaction(context.Background(), tx); err != nil {
t.Fatalf("CreateTransaction: %v", err)
}

// Run 10 concurrent reconciliations (tests race detector)
const goroutines = 10
errCh := make(chan error, goroutines)
for i := 0; i < goroutines; i++ {
go func() {
errCh <- tc.Reconcile(context.Background(), "tx-concurrent-1")
}()
}

// Collect results
for i := 0; i < goroutines; i++ {
if err := <-errCh; err != nil {
t.Errorf("Reconcile goroutine %d failed: %v", i, err)
}
}

// Verify final state is consistent
got, err := ms.GetTransaction(context.Background(), "tx-concurrent-1")
if err != nil {
t.Fatalf("GetTransaction: %v", err)
}

// Should have progressed past Building (cache operations succeeded)
if got.Status.Phase == types.TxPhaseBuilding {
t.Error("Transaction stuck in Building phase - possible concurrency issue")
}
}

// TestTxController_ContextCancellation_BeforeReconcile tests that context
// cancellation is detected before reconciliation starts.
func TestTxController_ContextCancellation_BeforeReconcile(t *testing.T) {
ms := store.NewMemoryStore()
runtime := &mockTxRuntime{
builder: plugin.NewMockTxBuilder(),
}
tc := NewTxController(ms, runtime)

tx := &types.Transaction{
Metadata: types.ResourceMeta{Name: "tx-cancel-1"},
Spec: types.TransactionSpec{
DevnetRef: "mydevnet",
TxType: "gov/vote",
},
Status: types.TransactionStatus{
Phase: types.TxPhasePending,
},
}
ms.CreateTransaction(context.Background(), tx)

// Create cancelled context
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

// Reconcile should fail with context error
err := tc.Reconcile(ctx, "tx-cancel-1")
if err == nil {
t.Error("Expected error from cancelled context, got nil")
}
if !errors.Is(err, context.Canceled) {
t.Errorf("Expected context.Canceled error, got: %v", err)
}
}

// TestTxController_ContextCancellation_DuringBuilding tests that context
// cancellation is detected during the building phase.
func TestTxController_ContextCancellation_DuringBuilding(t *testing.T) {
ms := store.NewMemoryStore()
runtime := &mockTxRuntime{
builder: plugin.NewMockTxBuilder(),
}
tc := NewTxController(ms, runtime)

tx := &types.Transaction{
Metadata: types.ResourceMeta{Name: "tx-cancel-2"},
Spec: types.TransactionSpec{
DevnetRef: "mydevnet",
TxType: "gov/vote",
Payload: json.RawMessage(`{"proposal_id":1,"vote_option":"yes"}`),
},
Status: types.TransactionStatus{
Phase: types.TxPhaseBuilding,
},
}
ms.CreateTransaction(context.Background(), tx)

// Create context with immediate cancellation
ctx, cancel := context.WithCancel(context.Background())
cancel()

// Reconcile should detect cancellation
err := tc.Reconcile(ctx, "tx-cancel-2")
if err == nil {
t.Error("Expected error from cancelled context, got nil")
}
if !errors.Is(err, context.Canceled) {
t.Errorf("Expected context.Canceled error, got: %v", err)
}
}

// TestTxController_ContextDeadline tests timeout handling.
func TestTxController_ContextDeadline(t *testing.T) {
ms := store.NewMemoryStore()
runtime := &mockTxRuntime{
builder: plugin.NewMockTxBuilder(),
}
tc := NewTxController(ms, runtime)

tx := &types.Transaction{
Metadata: types.ResourceMeta{Name: "tx-deadline-1"},
Spec: types.TransactionSpec{
DevnetRef: "mydevnet",
TxType: "gov/vote",
},
Status: types.TransactionStatus{
Phase: types.TxPhasePending,
},
}
ms.CreateTransaction(context.Background(), tx)

// Create context with deadline in the past
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
defer cancel()

// Reconcile should fail with deadline exceeded
err := tc.Reconcile(ctx, "tx-deadline-1")
if err == nil {
t.Error("Expected error from deadline exceeded, got nil")
}
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("Expected context.DeadlineExceeded error, got: %v", err)
}
}

// memoCapturingBuilder captures BuildTx parameters for verification.
type memoCapturingBuilder struct {
plugin.MockTxBuilder
capturedMemo string
capturedGasLimit uint64
}

func (m *memoCapturingBuilder) BuildTx(ctx context.Context, req *network.TxBuildRequest) (*network.UnsignedTx, error) {
// Capture the fields we're testing
m.capturedMemo = req.Memo
m.capturedGasLimit = req.GasLimit
// Delegate to parent for actual mock behavior
return m.MockTxBuilder.BuildTx(ctx, req)
}

// TestTxController_MemoPropagation verifies that Memo and GasLimit fields
// are properly propagated from TransactionSpec to TxBuildRequest.
func TestTxController_MemoPropagation(t *testing.T) {
ms := store.NewMemoryStore()
mockBuilder := &memoCapturingBuilder{
MockTxBuilder: *plugin.NewMockTxBuilder(),
}
runtime := &mockTxRuntime{
builder: mockBuilder,
signingKey: &network.SigningKey{Address: "cosmos1abc", PrivKey: []byte("test")},
receipt: &TxReceipt{TxHash: "abc123", Height: 100, Success: true},
}
tc := NewTxController(ms, runtime)

// Create transaction with Memo and GasLimit
tx := &types.Transaction{
Metadata: types.ResourceMeta{
Name: "tx-memo-test",
CreatedAt: time.Now(),
},
Spec: types.TransactionSpec{
DevnetRef: "mydevnet",
TxType: "gov/vote",
Signer: "validator:0",
Payload: json.RawMessage(`{"proposal_id":1,"vote_option":"yes"}`),
Memo: "reward-tag:pool-alpha",
GasLimit: 300000,
},
Status: types.TransactionStatus{
Phase: types.TxPhaseBuilding,
},
}
if err := ms.CreateTransaction(context.Background(), tx); err != nil {
t.Fatalf("CreateTransaction: %v", err)
}

// Reconcile - triggers BuildTx
err := tc.Reconcile(context.Background(), "tx-memo-test")
if err != nil {
t.Fatalf("Reconcile: %v", err)
}

// Verify Memo and GasLimit were passed to BuildTx
if mockBuilder.capturedMemo != "reward-tag:pool-alpha" {
t.Errorf("Memo = %q, want %q", mockBuilder.capturedMemo, "reward-tag:pool-alpha")
}
if mockBuilder.capturedGasLimit != 300000 {
t.Errorf("GasLimit = %d, want %d", mockBuilder.capturedGasLimit, 300000)
}
}
2 changes: 2 additions & 0 deletions internal/daemon/server/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (s *TransactionService) SubmitTransaction(ctx context.Context, req *v1.Subm
TxType: req.TxType,
Signer: req.Signer,
Payload: req.Payload,
GasLimit: req.GasLimit,
Memo: req.Memo,
},
Status: types.TransactionStatus{
Phase: types.TxPhasePending,
Expand Down
1 change: 1 addition & 0 deletions internal/daemon/tx_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,4 @@ func TestTransactionE2E_ErrorHandling(t *testing.T) {
t.Log("Server shutdown timed out")
}
}

4 changes: 4 additions & 0 deletions internal/daemon/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type TransactionSpec struct {
// GasLimit sets the gas limit for the transaction.
// If not specified, defaults to 200000.
GasLimit uint64 `json:"gasLimit,omitempty"`

// Memo is an optional transaction memo.
// Used for validator reward distribution tags or other metadata.
Memo string `json:"memo,omitempty"`
}

// TransactionStatus defines the observed state of a Transaction.
Expand Down
Loading
Loading