Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions token/services/auditor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,42 @@ type TokenManagementServiceProvider interface {
GetManagementService(opts ...token.ServiceOption) (*token.ManagementService, error)
}

// auditTokenExistenceChecker can tell whether a given txID was already committed to tokenDB.
type auditTokenExistenceChecker interface {
TransactionExists(ctx context.Context, id string) (bool, error)
}

// auditStatusSetter can promote an auditDB record from Pending to Confirmed.
type auditStatusSetter interface {
SetStatus(ctx context.Context, txID string, status TxStatus, message string) error
}

// recoverAuditCommittedPending heals the split-brain state that arises when the node crashes
// after auditDB.Append (tokenDB write) succeeds but before auditDB.SetStatus(Confirmed) runs.
//
// Returns true if the record was healed — the caller should then skip AddFinalityListener
// because the transaction is already fully committed.
// Returns false on any error so the caller falls back to the normal finality-listener path.
func recoverAuditCommittedPending(ctx context.Context, txID string, checker auditTokenExistenceChecker, setter auditStatusSetter) bool {
committed, err := checker.TransactionExists(ctx, txID)
if err != nil {
logger.Warnf("recover audit tx [%s]: failed to check token existence, falling back to finality listener: %v", txID, err)

return false
}
if !committed {
return false
}
logger.Infof("recover audit tx [%s]: tokens committed to tokenDB but auditDB still Pending; setting Confirmed directly", txID)
if err := setter.SetStatus(ctx, txID, auditdb.Confirmed, "recovered on restart: tokenDB committed before auditDB status update"); err != nil {
logger.Errorf("recover audit tx [%s]: failed to set Confirmed: %v; falling back to finality listener", txID, err)

return false
}

return true
}

type StoreServiceManager = auditdb.StoreServiceManager

type TokensServiceManager = services.ServiceManager[*tokens.Service]
Expand Down Expand Up @@ -126,6 +162,14 @@ func (cm *ServiceManager) RestoreTMS(tmsID token.TMSID) error {
return iterators.ForEach(it, func(record *storage.TokenRequestRecord) error {
logger.Debugf("restore transaction [%s] with status [%s]", record.TxID, TxStatusMessage[record.Status])

// Crash-recovery: if tokens were already committed to tokenDB but the audit
// status was never flipped to Confirmed (crash between Append and SetStatus),
// heal the record directly instead of waiting for a finality event that may
// never be re-delivered.
if recoverAuditCommittedPending(context.Background(), record.TxID, tokenDB.Storage, auditor.auditDB) {
return nil
}

return net.AddFinalityListener(
tmsID.Namespace,
record.TxID,
Expand Down
1 change: 1 addition & 0 deletions token/services/storage/db/sql/common/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func (w *AtomicWrite) AddTokenRequest(ctx context.Context, txID string, tr []byt
query, args := q.InsertInto(w.table.Requests).
Fields("tx_id", "request", "status", "status_message", "application_metadata", "public_metadata", "pp_hash").
Row(txID, tr, driver4.Pending, "", ja, jp, ppHash).
OnConflictDoNothing().
Format()
logging.Debug(logger, query, txID, fmt.Sprintf("(%d bytes)", len(tr)), len(applicationMetadata), len(publicMetadata), len(ppHash))
_, err = w.txn.ExecContext(ctx, query, args...)
Expand Down
Loading