Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion token/services/selector/sherdlock/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type tokenSelectorUnlocker interface {
type manager struct {
selectorCache lazy2.Provider[transaction.ID, tokenSelectorUnlocker]
locker Locker
localLocks *localTokenLockTracker
leaseExpiry time.Duration
leaseCleanupTickPeriod time.Duration
metrics *Metrics
Expand All @@ -62,15 +63,17 @@ func NewManager(
m *Metrics,
) *manager {
ctx, cancel := context.WithCancel(context.Background())
sharedLocalLocks := newLocalTokenLockTracker()
mgr := &manager{
locker: locker,
localLocks: sharedLocalLocks,
leaseExpiry: leaseExpiry,
leaseCleanupTickPeriod: leaseCleanupTickPeriod,
metrics: m,
cancel: cancel,
cleanerDone: make(chan struct{}),
selectorCache: lazy2.NewProvider(func(txID transaction.ID) (tokenSelectorUnlocker, error) {
return NewSherdSelector(txID, fetcher, locker, precision, backoff, maxRetriesAfterBackOff, m), nil
return newSherdSelector(txID, fetcher, locker, precision, backoff, maxRetriesAfterBackOff, sharedLocalLocks, m), nil
}),
}
if leaseCleanupTickPeriod > 0 && leaseExpiry > 0 {
Expand Down
48 changes: 48 additions & 0 deletions token/services/selector/sherdlock/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/storage/driver/common"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/storage/driver/multiplexed"
Expand Down Expand Up @@ -236,6 +237,53 @@ func TestManager_NewSelector(t *testing.T) {
})
}

func TestManager_NewSelector_SharesLocalLocksAcrossSelectors(t *testing.T) {
tok := &token2.UnspentTokenInWallet{
Id: token2.ID{TxId: "tx-shared", Index: 0},
Type: "USD",
Quantity: "1",
}

var lockCalls atomic.Int32
mockFetcher := &mockTokenFetcher{
unspentTokensIteratorByFunc: func(_ context.Context, _ string, _ token2.Type) (iterator[*token2.UnspentTokenInWallet], error) {
return collections.NewSliceIterator([]*token2.UnspentTokenInWallet{tok}), nil
},
}
mockLocker := &mockLocker{
lockFunc: func(_ context.Context, _ *token2.ID, _ transaction.ID) error {
lockCalls.Add(1)

return nil
},
}

m := NewManager(
mockFetcher,
mockLocker,
100,
NoBackoff,
0,
0,
0,
NewMetrics(&disabled.Provider{}),
)

selector1, err := m.NewSelector("tx-1")
require.NoError(t, err)

_, _, err = selector1.Select(t.Context(), &ownerFilter{id: "wallet1"}, "1", "USD")
require.NoError(t, err)

selector2, err := m.NewSelector("tx-2")
require.NoError(t, err)

_, _, err = selector2.Select(t.Context(), &ownerFilter{id: "wallet1"}, "1", "USD")
require.Error(t, err)
require.ErrorIs(t, err, token.SelectorSufficientButLockedFunds)
assert.Equal(t, int32(1), lockCalls.Load(), "second selector should skip DB lock attempt for locally known locked token")
}

func TestManager_Unlock(t *testing.T) {
mockFetcher := &mockTokenFetcher{}
mockLocker := &mockLocker{}
Expand Down
122 changes: 114 additions & 8 deletions token/services/selector/sherdlock/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,54 @@ type selector struct {
cache iterator[*token2.UnspentTokenInWallet]
fetcher tokenFetcher
locker tokenLocker
txID transaction.ID
precision uint64
metrics *Metrics
mu sync.Mutex // protects cache field for concurrent Close() calls
lockMu sync.RWMutex
locks map[token2.ID]struct{}
shared *localTokenLockTracker
}

type localTokenLockTracker struct {
mu sync.RWMutex
locked map[token2.ID]transaction.ID
}

func newLocalTokenLockTracker() *localTokenLockTracker {
return &localTokenLockTracker{locked: make(map[token2.ID]transaction.ID)}
}

func (l *localTokenLockTracker) txIDFor(tokenID token2.ID) (transaction.ID, bool) {
l.mu.RLock()
txID, ok := l.locked[tokenID]
l.mu.RUnlock()

return txID, ok
}

func (l *localTokenLockTracker) lock(tokenID token2.ID, txID transaction.ID) bool {
l.mu.Lock()
defer l.mu.Unlock()

current, ok := l.locked[tokenID]
if ok {
return current == txID
}

l.locked[tokenID] = txID

return true
}

func (l *localTokenLockTracker) unlockIfOwned(tokenID token2.ID, txID transaction.ID) {
l.mu.Lock()
defer l.mu.Unlock()

current, ok := l.locked[tokenID]
if ok && current == txID {
delete(l.locked, tokenID)
}
}

type stubbornSelector struct {
Expand Down Expand Up @@ -85,7 +130,7 @@ func (m *stubbornSelector) Select(ctx context.Context, ownerFilter token.OwnerFi
select {
case <-time.After(backoffDuration):
case <-ctx.Done():
if err := m.locker.UnlockAll(ctx); err != nil {
if err := m.UnlockAll(ctx); err != nil {
m.logger.Errorf("failed to unlock tokens on context cancellation: %s", err)
}
m.metrics.SelectionDuration.Observe(time.Since(start).Seconds())
Expand All @@ -103,21 +148,36 @@ func (m *stubbornSelector) Select(ctx context.Context, ownerFilter token.OwnerFi
}

func NewStubbornSelector(logger logging.Logger, tokenDB tokenFetcher, lockDB tokenLocker, precision uint64, backoff time.Duration, retries int, m *Metrics) *stubbornSelector {
return newStubbornSelector(logger, tokenDB, lockDB, "", precision, backoff, retries, newLocalTokenLockTracker(), m)
}

func newStubbornSelector(logger logging.Logger, tokenDB tokenFetcher, lockDB tokenLocker, txID transaction.ID, precision uint64, backoff time.Duration, retries int, shared *localTokenLockTracker, m *Metrics) *stubbornSelector {
return &stubbornSelector{
selector: NewSelector(logger, tokenDB, lockDB, precision, m),
selector: newSelector(logger, tokenDB, lockDB, txID, precision, shared, m),
backoffInterval: backoff,
maxRetriesAfterBackoff: retries,
}
}

func NewSelector(logger logging.Logger, tokenDB tokenFetcher, lockDB tokenLocker, precision uint64, m *Metrics) *selector {
return newSelector(logger, tokenDB, lockDB, "", precision, newLocalTokenLockTracker(), m)
}

func newSelector(logger logging.Logger, tokenDB tokenFetcher, lockDB tokenLocker, txID transaction.ID, precision uint64, shared *localTokenLockTracker, m *Metrics) *selector {
if shared == nil {
shared = newLocalTokenLockTracker()
}

return &selector{
logger: logger,
cache: collections.NewEmptyIterator[*token2.UnspentTokenInWallet](),
fetcher: tokenDB,
locker: lockDB,
txID: txID,
precision: precision,
metrics: m,
locks: make(map[token2.ID]struct{}),
shared: shared,
}
}

Expand Down Expand Up @@ -157,7 +217,7 @@ func (s *selector) selectInternal(ctx context.Context, owner token.OwnerFilter,
sum, selected, tokensLockedByOthersExist, immediateRetries := token2.NewZeroQuantity(s.precision), collections.NewSet[*token2.ID](), true, 0
for {
if t, err := s.cache.Next(); err != nil {
err2 := s.locker.UnlockAll(ctx)
err2 := s.UnlockAll(ctx)

return nil, nil, immediateRetries, errors.Wrapf(err, "failed to get tokens for [%s:%s] - unlock: %v", owner.ID(), tokenType, err2)
} else if t == nil {
Expand All @@ -173,7 +233,7 @@ func (s *selector) selectInternal(ctx context.Context, owner token.OwnerFilter,

if immediateRetries > maxImmediateRetries {
s.logger.Warnf("Exceeded max number of immediate retries. Unlock tokens and abort...")
if err := s.locker.UnlockAll(ctx); err != nil {
if err := s.UnlockAll(ctx); err != nil {
return nil, nil, immediateRetries, errors.Wrapf(err, "exceeded number of retries: %d and unlock failed", maxImmediateRetries)
}

Expand All @@ -187,18 +247,26 @@ func (s *selector) selectInternal(ctx context.Context, owner token.OwnerFilter,

s.logger.DebugfContext(ctx, "Fetch all non-deleted tokens from the DB and refresh the token cache.")
if s.cache, err = s.fetcher.UnspentTokensIteratorBy(ctx, owner.ID(), tokenType); err != nil {
err2 := s.locker.UnlockAll(ctx)
err2 := s.UnlockAll(ctx)

return nil, nil, immediateRetries, errors.Wrapf(err, "failed to reload tokens for retry %d [%s:%s] - unlock: %v", immediateRetries, owner.ID(), tokenType, err2)
}

immediateRetries++
tokensLockedByOthersExist = false
} else if s.isLockedLocally(t.Id) {
s.logger.DebugfContext(ctx, "Token [%v] already locked locally, skipping lock attempt", t.Id)
} else if txID, locked := s.shared.txIDFor(t.Id); locked {
s.logger.DebugfContext(ctx, "Token [%v] already locked by selector [%s], skipping lock attempt", t.Id, txID)
if txID != s.txID {
tokensLockedByOthersExist = true
}
} else if locked := s.locker.TryLock(ctx, &t.Id); !locked {
s.logger.DebugfContext(ctx, "Tried to lock token [%v], but it was already locked by another process", t)
tokensLockedByOthersExist = true
} else {
s.logger.DebugfContext(ctx, "Got the lock on token [%v]", t)
s.recordLock(t.Id)
q, err := token2.ToQuantity(t.Quantity, s.precision)
if err != nil {
return nil, nil, immediateRetries, errors.Wrapf(err, "invalid token [%s] found", t.Id)
Expand Down Expand Up @@ -235,7 +303,41 @@ func (s *selector) isClosed() bool {
}

func (s *selector) UnlockAll(ctx context.Context) error {
return s.locker.UnlockAll(ctx)
if err := s.locker.UnlockAll(ctx); err != nil {
return err
}
s.clearLocks()

return nil
}

func (s *selector) isLockedLocally(tokenID token2.ID) bool {
s.lockMu.RLock()
_, ok := s.locks[tokenID]
s.lockMu.RUnlock()

return ok
}

func (s *selector) recordLock(tokenID token2.ID) {
if ok := s.shared.lock(tokenID, s.txID); !ok {
s.logger.Warnf("token [%v] was locked in DB but shared local tracker already assigned to another selector", tokenID)
}

s.lockMu.Lock()
s.locks[tokenID] = struct{}{}
s.lockMu.Unlock()
}

func (s *selector) clearLocks() {
s.lockMu.Lock()
for tokenID := range s.locks {
s.shared.unlockIfOwned(tokenID, s.txID)
}
if len(s.locks) > 0 {
s.locks = make(map[token2.ID]struct{})
}
s.lockMu.Unlock()
}

func tokenKey(walletID string, typ token2.Type) string {
Expand All @@ -261,11 +363,15 @@ func (l *locker) UnlockAll(ctx context.Context) error {
}

func NewSherdSelector(txID transaction.ID, fetcher tokenFetcher, lockDB Locker, precision uint64, backoff time.Duration, maxRetriesAfterBackoff int, m *Metrics) tokenSelectorUnlocker {
return newSherdSelector(txID, fetcher, lockDB, precision, backoff, maxRetriesAfterBackoff, newLocalTokenLockTracker(), m)
}

func newSherdSelector(txID transaction.ID, fetcher tokenFetcher, lockDB Locker, precision uint64, backoff time.Duration, maxRetriesAfterBackoff int, shared *localTokenLockTracker, m *Metrics) tokenSelectorUnlocker {
logger := logger.Named("selector-" + txID)
locker := &locker{txID: txID, Locker: lockDB}
if backoff < 0 {
return NewSelector(logger, fetcher, locker, precision, m)
return newSelector(logger, fetcher, locker, txID, precision, shared, m)
} else {
return NewStubbornSelector(logger, fetcher, locker, precision, backoff, maxRetriesAfterBackoff, m)
return newStubbornSelector(logger, fetcher, locker, txID, precision, backoff, maxRetriesAfterBackoff, shared, m)
}
}
58 changes: 58 additions & 0 deletions token/services/selector/sherdlock/selector_local_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package sherdlock

import (
"context"
"testing"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/hyperledger-labs/fabric-token-sdk/token"
token2 "github.com/hyperledger-labs/fabric-token-sdk/token/token"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSelector_SkipsLocallyLockedTokens(t *testing.T) {
dup := &token2.UnspentTokenInWallet{
Id: token2.ID{TxId: "tx1", Index: 0},
Type: "USD",
Quantity: "1",
}

mockFetcher := &mockTokenFetcher{
unspentTokensIteratorByFunc: func(_ context.Context, _ string, _ token2.Type) (iterator[*token2.UnspentTokenInWallet], error) {
return collections.NewSliceIterator([]*token2.UnspentTokenInWallet{dup, dup}), nil
},
}

lockCounter := &countingLocker{}
m := NewMetrics(&disabled.Provider{})
sel := NewSelector(logger, mockFetcher, lockCounter, 64, m)

_, _, err := sel.Select(context.Background(), &ownerFilter{id: "wallet1"}, "2", "USD")

require.Error(t, err)
assert.True(t, errors.Is(err, token.SelectorInsufficientFunds))
assert.Equal(t, 1, lockCounter.tryLockCalls, "expected a single lock attempt for duplicated token")
}

type countingLocker struct {
tryLockCalls int
}

func (l *countingLocker) TryLock(_ context.Context, _ *token2.ID) bool {
l.tryLockCalls++

return true
}

func (l *countingLocker) UnlockAll(_ context.Context) error {
return nil
}