Skip to content

Removes race condition cluster get pid issue 1131 #1140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cluster/identitylookup/disthash/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package disthash

import (
"log/slog"
"time"

"github.com/asynkron/protoactor-go/actor"
clustering "github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/eventstream"
"log/slog"
"sync"
"time"
)

const (
Expand All @@ -17,6 +17,7 @@ type Manager struct {
cluster *clustering.Cluster
topologySub *eventstream.Subscription
placementActor *actor.PID
rdvMutex sync.RWMutex
rdv *clustering.Rendezvous
}

Expand Down Expand Up @@ -60,6 +61,9 @@ func (pm *Manager) PidOfActivatorActor(addr string) *actor.PID {
}

func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) {
pm.rdvMutex.Lock()
defer pm.rdvMutex.Unlock()

pm.cluster.Logger().Info("onClusterTopology", slog.Uint64("topology-hash", tplg.TopologyHash))

for _, m := range tplg.Members {
Expand All @@ -72,6 +76,9 @@ func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) {
}

func (pm *Manager) Get(identity *clustering.ClusterIdentity) *actor.PID {
pm.rdvMutex.RLock()
defer pm.rdvMutex.RUnlock()

ownerAddress := pm.rdv.GetByClusterIdentity(identity)

if ownerAddress == "" {
Expand Down
156 changes: 156 additions & 0 deletions cluster/identitylookup/disthash/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package disthash

import (
"fmt"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/test"
"github.com/asynkron/protoactor-go/remote"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"sync"
"testing"
"time"
)

func TestManagerConcurrentAccess(t *testing.T) {
system := actor.NewActorSystem()
provider := test.NewTestProvider(test.NewInMemAgent())
lookup := New()
config := cluster.Configure("test-cluster", provider, lookup, remote.Configure("127.0.0.1", 0))
c := cluster.New(system, config)

manager := newPartitionManager(c)
manager.Start()
defer manager.Stop()

// Create a WaitGroup to synchronize goroutines
var wg sync.WaitGroup
iterations := 1000

// Simulate concurrent topology updates and lookups
wg.Add(2)

// Goroutine 1: Continuously update topology
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
members := []*cluster.Member{
{Id: "1", Host: "localhost", Port: 1},
{Id: "2", Host: "localhost", Port: 2},
}
topology := &cluster.ClusterTopology{
Members: members,
TopologyHash: uint64(i),
}
manager.onClusterTopology(topology)
}
}()

// Goroutine 2: Continuously perform lookups
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
identity := &cluster.ClusterIdentity{
Identity: "test",
Kind: "test",
}
_ = manager.Get(identity)
}
}()

// Wait with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
// Test completed successfully
case <-time.After(5 * time.Second):
t.Fatal("Test timed out")
}
}

// Integration test suite
type DistHashManagerTestSuite struct {
suite.Suite
clusters []*cluster.Cluster
}

func (suite *DistHashManagerTestSuite) SetupTest() {
// Create 3 cluster nodes for testing
suite.clusters = make([]*cluster.Cluster, 3)
inMemAgent := test.NewInMemAgent()

for i := 0; i < 3; i++ {
system := actor.NewActorSystem()
provider := test.NewTestProvider(inMemAgent)
config := cluster.Configure("test-cluster",
provider,
New(),
remote.Configure("localhost", 0),
)

c := cluster.New(system, config)
c.StartMember()
suite.clusters[i] = c
}
}

func (suite *DistHashManagerTestSuite) TearDownTest() {
for _, c := range suite.clusters {
c.Shutdown(true)
}
}

func (suite *DistHashManagerTestSuite) TestConcurrentClusterOperations() {
assert.Equal(suite.T(), 3, len(suite.clusters))

// Create multiple concurrent operations
var wg sync.WaitGroup
iterations := 100

for i := 0; i < iterations; i++ {
wg.Add(1)
go func(iteration int) {
defer wg.Done()

// Randomly select a cluster
cluster := suite.clusters[iteration%len(suite.clusters)]

// Perform a Get operation
identity := fmt.Sprintf("test-%d", iteration)
pid := cluster.Get(identity, "test-kind")

// Verify the operation completed without panicking
assert.NotPanics(suite.T(), func() {
if pid != nil {
// Optionally verify the PID properties
assert.NotEmpty(suite.T(), pid.Address)
assert.NotEmpty(suite.T(), pid.Id)
}
})
}(i)
}

// Wait with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
// Test completed successfully
case <-time.After(10 * time.Second):
suite.T().Fatal("Test timed out")
}
}

func TestDistHashManager(t *testing.T) {
suite.Run(t, new(DistHashManagerTestSuite))
}