Skip to content

Commit de81593

Browse files
committed
chore: merge develop into branch, resolve cmd/meridian/main.go conflict
Kept the full migration runner implementation from our branch over the stub main() that was added to develop.
2 parents 08ea5b8 + e342018 commit de81593

13 files changed

Lines changed: 2684 additions & 133 deletions

File tree

cmd/meridian/lock.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Package main provides the unified Meridian binary entry point.
2+
package main
3+
4+
import (
5+
"context"
6+
"sync"
7+
8+
"github.com/meridianhub/meridian/shared/platform/scheduler"
9+
)
10+
11+
// Compile-time assertion: localLockManager satisfies the scheduler.DistributedLock interface.
12+
var _ scheduler.DistributedLock = (*localLockManager)(nil)
13+
14+
// lockKey is a composite key for per-resource locks. Using a struct avoids
15+
// delimiter-based collisions when tenantID or resourceID contain ":".
16+
type lockKey struct {
17+
tenantID string
18+
resourceID string
19+
}
20+
21+
// localLockManager is an in-process mutex-based lock manager satisfying
22+
// the scheduler.DistributedLock interface. It is suitable for single-process
23+
// deployments where Redis is not available.
24+
type localLockManager struct {
25+
mu sync.Mutex
26+
locks map[lockKey]uint64
27+
nextToken uint64
28+
}
29+
30+
func newLocalLockManager() *localLockManager {
31+
return &localLockManager{locks: make(map[lockKey]uint64)}
32+
}
33+
34+
// Acquire attempts to acquire the lock for the given tenant and resource.
35+
// Returns (true, release, nil) if acquired, (false, nil, nil) if already held.
36+
// The release function is safe to call multiple times; only the first call from
37+
// the original acquirer has effect - stale releases are ignored via token check.
38+
func (m *localLockManager) Acquire(_ context.Context, tenantID, resourceID string) (bool, func(), error) {
39+
k := lockKey{tenantID: tenantID, resourceID: resourceID}
40+
41+
m.mu.Lock()
42+
if _, ok := m.locks[k]; ok {
43+
m.mu.Unlock()
44+
return false, nil, nil
45+
}
46+
m.nextToken++
47+
token := m.nextToken
48+
m.locks[k] = token
49+
m.mu.Unlock()
50+
51+
release := func() {
52+
m.mu.Lock()
53+
if m.locks[k] == token {
54+
delete(m.locks, k)
55+
}
56+
m.mu.Unlock()
57+
}
58+
return true, release, nil
59+
}
60+
61+
// alwaysLeader is a no-op leader election stub that always reports the current
62+
// instance as leader. Valid for single-process deployments where there is only
63+
// one instance to coordinate.
64+
type alwaysLeader struct{}
65+
66+
func (a *alwaysLeader) IsLeader() bool { return true }
67+
func (a *alwaysLeader) Start(_ context.Context) {}
68+
func (a *alwaysLeader) Stop() {}

cmd/meridian/lock_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLocalLockManager_AcquireAndRelease(t *testing.T) {
13+
m := newLocalLockManager()
14+
ctx := context.Background()
15+
16+
acquired, release, err := m.Acquire(ctx, "tenant-1", "resource-a")
17+
require.NoError(t, err)
18+
assert.True(t, acquired)
19+
assert.NotNil(t, release)
20+
21+
release()
22+
23+
// After release, can acquire again
24+
acquired, release, err = m.Acquire(ctx, "tenant-1", "resource-a")
25+
require.NoError(t, err)
26+
assert.True(t, acquired)
27+
assert.NotNil(t, release)
28+
release()
29+
}
30+
31+
func TestLocalLockManager_DoubleAcquireReturnsFalse(t *testing.T) {
32+
m := newLocalLockManager()
33+
ctx := context.Background()
34+
35+
acquired1, release1, err := m.Acquire(ctx, "tenant-1", "resource-a")
36+
require.NoError(t, err)
37+
assert.True(t, acquired1)
38+
defer release1()
39+
40+
// Second acquire on same key should fail
41+
acquired2, release2, err := m.Acquire(ctx, "tenant-1", "resource-a")
42+
require.NoError(t, err)
43+
assert.False(t, acquired2)
44+
assert.Nil(t, release2)
45+
}
46+
47+
func TestLocalLockManager_ReleaseOfUnheldLockIsIdempotent(t *testing.T) {
48+
m := newLocalLockManager()
49+
ctx := context.Background()
50+
51+
acquired, release, err := m.Acquire(ctx, "tenant-1", "resource-a")
52+
require.NoError(t, err)
53+
assert.True(t, acquired)
54+
55+
// Call release twice - should not panic
56+
release()
57+
release()
58+
59+
// Lock should be acquirable again
60+
acquired2, release2, err := m.Acquire(ctx, "tenant-1", "resource-a")
61+
require.NoError(t, err)
62+
assert.True(t, acquired2)
63+
release2()
64+
}
65+
66+
func TestLocalLockManager_DifferentKeysAreIndependent(t *testing.T) {
67+
m := newLocalLockManager()
68+
ctx := context.Background()
69+
70+
acquired1, release1, err := m.Acquire(ctx, "tenant-1", "resource-a")
71+
require.NoError(t, err)
72+
assert.True(t, acquired1)
73+
defer release1()
74+
75+
// Different resource on same tenant can be acquired
76+
acquired2, release2, err := m.Acquire(ctx, "tenant-1", "resource-b")
77+
require.NoError(t, err)
78+
assert.True(t, acquired2)
79+
defer release2()
80+
81+
// Different tenant, same resource can be acquired
82+
acquired3, release3, err := m.Acquire(ctx, "tenant-2", "resource-a")
83+
require.NoError(t, err)
84+
assert.True(t, acquired3)
85+
defer release3()
86+
}
87+
88+
func TestLocalLockManager_StaleReleaseDoesNotUnlockNewHolder(t *testing.T) {
89+
m := newLocalLockManager()
90+
ctx := context.Background()
91+
92+
// First acquisition
93+
acquired1, release1, err := m.Acquire(ctx, "tenant-1", "resource-a")
94+
require.NoError(t, err)
95+
assert.True(t, acquired1)
96+
97+
// Release - lock is now free
98+
release1()
99+
100+
// Second acquisition of same key
101+
acquired2, release2, err := m.Acquire(ctx, "tenant-1", "resource-a")
102+
require.NoError(t, err)
103+
assert.True(t, acquired2)
104+
defer release2()
105+
106+
// Stale release1 called again - must NOT release the new holder's lock
107+
release1()
108+
109+
// Lock should still be held: a third acquire must fail
110+
acquired3, release3, err := m.Acquire(ctx, "tenant-1", "resource-a")
111+
require.NoError(t, err)
112+
assert.False(t, acquired3, "stale release should not have freed the lock held by the second acquirer")
113+
assert.Nil(t, release3)
114+
}
115+
116+
func TestLocalLockManager_ColonInResourceIDNoCollision(t *testing.T) {
117+
m := newLocalLockManager()
118+
ctx := context.Background()
119+
120+
// These two pairs would produce the same string with a ":" delimiter:
121+
// "tenant" + ":" + "a:b" == "tenant:a" + ":" + "b"
122+
// The struct key prevents this collision.
123+
acquired1, release1, err := m.Acquire(ctx, "tenant", "a:b")
124+
require.NoError(t, err)
125+
assert.True(t, acquired1)
126+
defer release1()
127+
128+
acquired2, release2, err := m.Acquire(ctx, "tenant:a", "b")
129+
require.NoError(t, err)
130+
assert.True(t, acquired2, "keys with ':' in resourceID must not collide with different tenantID")
131+
defer release2()
132+
}
133+
134+
func TestAlwaysLeader_IsLeaderReturnsTrue(t *testing.T) {
135+
ctx, cancel := context.WithCancel(context.Background())
136+
defer cancel()
137+
138+
l := &alwaysLeader{}
139+
l.Start(ctx)
140+
assert.True(t, l.IsLeader())
141+
l.Stop()
142+
assert.True(t, l.IsLeader())
143+
}
144+
145+
func TestLocalLockManager_Concurrency(t *testing.T) {
146+
m := newLocalLockManager()
147+
ctx := context.Background()
148+
149+
const goroutines = 50
150+
acquisitions := make([]bool, goroutines)
151+
var wg sync.WaitGroup
152+
153+
for i := range goroutines {
154+
wg.Add(1)
155+
go func(idx int) {
156+
defer wg.Done()
157+
acquired, release, err := m.Acquire(ctx, "tenant-1", "resource-a")
158+
if err == nil && acquired {
159+
acquisitions[idx] = true
160+
release()
161+
}
162+
}(i)
163+
}
164+
165+
wg.Wait()
166+
167+
// At least one goroutine should have acquired the lock
168+
var count int
169+
for _, a := range acquisitions {
170+
if a {
171+
count++
172+
}
173+
}
174+
assert.Greater(t, count, 0)
175+
}

0 commit comments

Comments
 (0)