diff --git a/token/services/auditor/manager.go b/token/services/auditor/manager.go index fce2bf612f..a422190847 100644 --- a/token/services/auditor/manager.go +++ b/token/services/auditor/manager.go @@ -29,6 +29,42 @@ type TokenManagementServiceProvider interface { GetManagementService(opts ...token.ServiceOption) (*token.ManagementService, error) } +// auditTokenExistenceChecker can tell whether a given txID was already committed to tokenDB. +type auditTokenExistenceChecker interface { + TransactionExists(ctx context.Context, id string) (bool, error) +} + +// auditStatusSetter can promote an auditDB record from Pending to Confirmed. +type auditStatusSetter interface { + SetStatus(ctx context.Context, txID string, status TxStatus, message string) error +} + +// recoverAuditCommittedPending heals the split-brain state that arises when the node crashes +// after auditDB.Append (tokenDB write) succeeds but before auditDB.SetStatus(Confirmed) runs. +// +// Returns true if the record was healed — the caller should then skip AddFinalityListener +// because the transaction is already fully committed. +// Returns false on any error so the caller falls back to the normal finality-listener path. +func recoverAuditCommittedPending(ctx context.Context, txID string, checker auditTokenExistenceChecker, setter auditStatusSetter) bool { + committed, err := checker.TransactionExists(ctx, txID) + if err != nil { + logger.Warnf("recover audit tx [%s]: failed to check token existence, falling back to finality listener: %v", txID, err) + + return false + } + if !committed { + return false + } + logger.Infof("recover audit tx [%s]: tokens committed to tokenDB but auditDB still Pending; setting Confirmed directly", txID) + if err := setter.SetStatus(ctx, txID, auditdb.Confirmed, "recovered on restart: tokenDB committed before auditDB status update"); err != nil { + logger.Errorf("recover audit tx [%s]: failed to set Confirmed: %v; falling back to finality listener", txID, err) + + return false + } + + return true +} + type StoreServiceManager = auditdb.StoreServiceManager type TokensServiceManager = services.ServiceManager[*tokens.Service] @@ -126,6 +162,14 @@ func (cm *ServiceManager) RestoreTMS(tmsID token.TMSID) error { return iterators.ForEach(it, func(record *storage.TokenRequestRecord) error { logger.Debugf("restore transaction [%s] with status [%s]", record.TxID, TxStatusMessage[record.Status]) + // Crash-recovery: if tokens were already committed to tokenDB but the audit + // status was never flipped to Confirmed (crash between Append and SetStatus), + // heal the record directly instead of waiting for a finality event that may + // never be re-delivered. + if recoverAuditCommittedPending(context.Background(), record.TxID, tokenDB.Storage, auditor.auditDB) { + return nil + } + return net.AddFinalityListener( tmsID.Namespace, record.TxID, diff --git a/token/services/storage/db/sql/common/transactions.go b/token/services/storage/db/sql/common/transactions.go index 7d12f88063..b2b5dc99ec 100644 --- a/token/services/storage/db/sql/common/transactions.go +++ b/token/services/storage/db/sql/common/transactions.go @@ -507,6 +507,7 @@ func (w *AtomicWrite) AddTokenRequest(ctx context.Context, txID string, tr []byt query, args := q.InsertInto(w.table.Requests). Fields("tx_id", "request", "status", "status_message", "application_metadata", "public_metadata", "pp_hash"). Row(txID, tr, driver4.Pending, "", ja, jp, ppHash). + OnConflictDoNothing(). Format() logging.Debug(logger, query, txID, fmt.Sprintf("(%d bytes)", len(tr)), len(applicationMetadata), len(publicMetadata), len(ppHash)) _, err = w.txn.ExecContext(ctx, query, args...)