Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions token/services/ttx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,45 @@ type StoreServiceManager = ttxdb.StoreServiceManager

type TokensServiceManager services.ServiceManager[*tokens.Service]

// tokenExistenceChecker can tell whether a given txID was already committed to tokenDB.
// Satisfied by *tokens.DBStorage.
type tokenExistenceChecker interface {
TransactionExists(ctx context.Context, id string) (bool, error)
}

// pendingStatusSetter can promote a ttxDB record from Pending to Confirmed.
// Satisfied by *ttxdb.StoreService.
type pendingStatusSetter interface {
SetStatus(ctx context.Context, txID string, status storage.TxStatus, message string) error
}

// recoverCommittedPending heals the split-brain state that arises when the node crashes
// after tokens.Append (tokenDB write) succeeds but before ttxDB.SetStatus(Confirmed) runs.
//
// Returns true if the record was healed — the caller should then skip AddFinalityListener
// because the transaction is already fully committed.
// Returns false (and logs a warning) on any error so the caller can fall back to the
// normal finality-listener path.
func recoverCommittedPending(ctx context.Context, txID string, checker tokenExistenceChecker, setter pendingStatusSetter) bool {
committed, err := checker.TransactionExists(ctx, txID)
if err != nil {
logger.Warnf("recover tx [%s]: failed to check token existence, falling back to finality listener: %v", txID, err)

return false
}
if !committed {
return false
}
logger.Infof("recover tx [%s]: tokens committed to tokenDB but ttxDB still Pending; setting Confirmed directly", txID)
if err := setter.SetStatus(ctx, txID, storage.Confirmed, "recovered on restart: tokenDB committed before ttxDB status update"); err != nil {
logger.Errorf("recover tx [%s]: failed to set Confirmed: %v; falling back to finality listener", txID, err)

return false
}

return true
}

type CheckServiceProvider interface {
CheckService(id token.TMSID, adb *ttxdb.StoreService, tdb *tokens.Service) (CheckService, error)
}
Expand Down Expand Up @@ -114,6 +153,12 @@ func (m *ServiceManager) RestoreTMS(ctx context.Context, tmsID token.TMSID) erro
return iterators.ForEach(it, func(record *storage.TokenRequestRecord) error {
logger.Debugf("restore transaction [%s] with status [%s]", record.TxID, TxStatusMessage[record.Status])

// Crash-recovery: heal the split-brain state that arises when the node crashes
// after tokens.Append (tokenDB write) succeeds but before ttxDB.SetStatus runs.
if recoverCommittedPending(ctx, record.TxID, db.tokensService.Storage, db.ttxStoreService) {
return nil
}

return net.AddFinalityListener(
tmsID.Namespace,
record.TxID,
Expand Down
120 changes: 120 additions & 0 deletions token/services/ttx/manager_recover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package ttx

import (
"context"
"errors"
"testing"

"github.com/hyperledger-labs/fabric-token-sdk/token/services/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// --- stub types ---

type stubExistenceChecker struct {
exists bool
err error
}

func (s *stubExistenceChecker) TransactionExists(_ context.Context, _ string) (bool, error) {
return s.exists, s.err
}

type stubStatusSetter struct {
calls []setStatusCall
returnErr error
}

type setStatusCall struct {
txID string
status storage.TxStatus
message string
}

func (s *stubStatusSetter) SetStatus(_ context.Context, txID string, status storage.TxStatus, message string) error {
s.calls = append(s.calls, setStatusCall{txID: txID, status: status, message: message})

return s.returnErr
}

// --- tests ---

// TestRecoverCommittedPending_TokensAlreadyCommitted is the primary regression test.
//
// Scenario: node crashed after tokens.Append wrote to tokenDB but before
// ttxDB.SetStatus(Confirmed) ran. On restart, recoverCommittedPending must
// detect that tokenDB already has the txID and call SetStatus(Confirmed)
// directly — without relying on block re-delivery.
func TestRecoverCommittedPending_TokensAlreadyCommitted(t *testing.T) {
checker := &stubExistenceChecker{exists: true}
setter := &stubStatusSetter{}

recovered := recoverCommittedPending(t.Context(), "tx-abc", checker, setter)

require.True(t, recovered, "should report recovery when tokens are already committed")
require.Len(t, setter.calls, 1)
assert.Equal(t, "tx-abc", setter.calls[0].txID)
assert.Equal(t, storage.Confirmed, setter.calls[0].status)
assert.NotEmpty(t, setter.calls[0].message)
}

// TestRecoverCommittedPending_TokensNotYetCommitted covers the normal restart
// path: the node restarted before tokens.Append ran, so recovery must NOT set
// Confirmed — the finality listener should handle it instead.
func TestRecoverCommittedPending_TokensNotYetCommitted(t *testing.T) {
checker := &stubExistenceChecker{exists: false}
setter := &stubStatusSetter{}

recovered := recoverCommittedPending(t.Context(), "tx-xyz", checker, setter)

assert.False(t, recovered, "should not recover when tokens are not yet in tokenDB")
assert.Empty(t, setter.calls, "SetStatus must not be called when tokens are absent")
}

// TestRecoverCommittedPending_ExistenceCheckError confirms that a transient
// storage error on TransactionExists causes a graceful fallback (returns false)
// rather than a panic or incorrect recovery.
func TestRecoverCommittedPending_ExistenceCheckError(t *testing.T) {
checker := &stubExistenceChecker{err: errors.New("db unavailable")}
setter := &stubStatusSetter{}

recovered := recoverCommittedPending(t.Context(), "tx-err", checker, setter)

assert.False(t, recovered, "error in existence check must not be treated as recovery")
assert.Empty(t, setter.calls)
}

// TestRecoverCommittedPending_SetStatusError verifies that when SetStatus
// fails the function returns false so the caller falls back to registering a
// finality listener — preventing silent data loss.
func TestRecoverCommittedPending_SetStatusError(t *testing.T) {
checker := &stubExistenceChecker{exists: true}
setter := &stubStatusSetter{returnErr: errors.New("write failed")}

recovered := recoverCommittedPending(t.Context(), "tx-fail", checker, setter)

assert.False(t, recovered, "SetStatus failure must cause fallback, not silent recovery")
require.Len(t, setter.calls, 1, "SetStatus should have been attempted once")
}

// TestRecoverCommittedPending_Idempotent confirms that calling recoverCommittedPending
// twice for the same txID is safe: the second call hits SetStatus again
// (which is a no-op SQL UPDATE in production) and still returns true.
func TestRecoverCommittedPending_Idempotent(t *testing.T) {
checker := &stubExistenceChecker{exists: true}
setter := &stubStatusSetter{}

first := recoverCommittedPending(t.Context(), "tx-dup", checker, setter)
second := recoverCommittedPending(t.Context(), "tx-dup", checker, setter)

assert.True(t, first)
assert.True(t, second)
assert.Len(t, setter.calls, 2, "both calls should attempt SetStatus (SQL UPDATE is idempotent)")
}
Loading