Skip to content

Commit 9a7264a

Browse files
committed
Add SequenceTracker for efficiently tracking insertions.
To be more exact, it can: - Insert a number. Complexity: Amortized O(1). - Check if all numbers upto a given number are inserted. Complexity: O(1). - Efficiently wait until all numbers inserted up to a given number.
1 parent 43ac462 commit 9a7264a

File tree

2 files changed

+280
-0
lines changed

2 files changed

+280
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package predicate
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
)
9+
10+
// sequenceTracker is a helper structure that tracks a sequence of integers and allows waiting until all
11+
// non-negative integers up to a certain value have been inserted.
12+
type sequenceTracker struct {
13+
received map[int]bool
14+
currentContiguous int // All non-negative numbers until this is guaranteed to be inserted.
15+
broadcaster map[int]chan struct{} // Used to efficiently signal when a all the non-negative numbers until the number in the key are inserted.
16+
mutex sync.Mutex
17+
}
18+
19+
func newSequenceTracker() *sequenceTracker {
20+
return &sequenceTracker{
21+
received: make(map[int]bool),
22+
currentContiguous: -1,
23+
broadcaster: make(map[int]chan struct{}),
24+
}
25+
}
26+
27+
// Complexity: Amortized O(1).
28+
func (s *sequenceTracker) Insert(i int) {
29+
s.mutex.Lock()
30+
defer s.mutex.Unlock()
31+
32+
s.received[i] = true
33+
34+
// Slide the window forward
35+
for s.received[s.currentContiguous+1] {
36+
s.currentContiguous++
37+
if ch, ok := s.broadcaster[s.currentContiguous]; ok {
38+
close(ch)
39+
delete(s.broadcaster, s.currentContiguous)
40+
}
41+
delete(s.received, s.currentContiguous)
42+
}
43+
}
44+
45+
// Complexity: O(1).
46+
func (s *sequenceTracker) allUpToInserted(n int) bool {
47+
return s.currentContiguous >= n
48+
}
49+
50+
// WaitUntilAllUpToInserted waits until all non-negative integers up to n have been inserted into the tracker.
51+
// If all numbers are already inserted, it returns immediately.
52+
// If not, it waits until the condition is met, the context is cancelled, or the timeout is reached.
53+
func (s *sequenceTracker) WaitUntilAllUpToInserted(ctx context.Context, n int, timeout time.Duration) error {
54+
ctx, _ = context.WithTimeout(ctx, timeout)
55+
s.mutex.Lock()
56+
// After entering the mutex, there are only 2 cases:
57+
// - Either all up to the given number is inserted. In such case we exit quickly.
58+
// - Not all up to the given number is inserted. In such case, we create a channel if it doesn't exist, and rely on Insert to close it when all below are inserted.
59+
if s.allUpToInserted(n) {
60+
s.mutex.Unlock()
61+
return nil
62+
}
63+
ch, ok := s.broadcaster[n]
64+
if !ok {
65+
ch = make(chan struct{})
66+
s.broadcaster[n] = ch
67+
}
68+
s.mutex.Unlock()
69+
70+
select {
71+
case <-ctx.Done():
72+
return fmt.Errorf("context is done: %v", ctx.Err())
73+
case <-ch:
74+
return nil
75+
}
76+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package predicate
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
const (
14+
testWaitTimeout = 3 * time.Second
15+
)
16+
17+
func TestNewSequenceTracker(t *testing.T) {
18+
tracker := newSequenceTracker()
19+
assert.NotNil(t, tracker)
20+
assert.Equal(t, -1, tracker.currentContiguous)
21+
assert.NotNil(t, tracker.received)
22+
assert.NotNil(t, tracker.broadcaster)
23+
}
24+
25+
func TestAllUpToInserted(t *testing.T) {
26+
type testStep struct {
27+
numToInsert int
28+
assertions map[int]bool
29+
}
30+
31+
testCases := []struct {
32+
name string
33+
insertions []testStep
34+
}{
35+
{
36+
name: "multiple insertions",
37+
insertions: []testStep{
38+
{
39+
numToInsert: 0,
40+
assertions: map[int]bool{
41+
0: true,
42+
1: false,
43+
2: false,
44+
3: false,
45+
4: false,
46+
5: false,
47+
},
48+
},
49+
{
50+
numToInsert: 1,
51+
assertions: map[int]bool{
52+
0: true,
53+
1: true,
54+
2: false,
55+
3: false,
56+
4: false,
57+
5: false,
58+
},
59+
},
60+
{
61+
numToInsert: 2,
62+
assertions: map[int]bool{
63+
0: true,
64+
1: true,
65+
2: true,
66+
3: false,
67+
4: false,
68+
5: false,
69+
},
70+
},
71+
{
72+
numToInsert: 5,
73+
assertions: map[int]bool{
74+
0: true,
75+
1: true,
76+
2: true,
77+
3: false,
78+
4: false,
79+
5: false,
80+
},
81+
},
82+
},
83+
},
84+
{
85+
name: "multiple insertions - reverse order",
86+
insertions: []testStep{
87+
{
88+
numToInsert: 5,
89+
assertions: map[int]bool{
90+
0: false,
91+
1: false,
92+
2: false,
93+
3: false,
94+
4: false,
95+
5: false,
96+
},
97+
},
98+
{
99+
numToInsert: 2,
100+
assertions: map[int]bool{
101+
0: false,
102+
1: false,
103+
2: false,
104+
3: false,
105+
4: false,
106+
5: false,
107+
},
108+
},
109+
{
110+
numToInsert: 1,
111+
assertions: map[int]bool{
112+
0: false,
113+
1: false,
114+
2: false,
115+
3: false,
116+
4: false,
117+
5: false,
118+
},
119+
},
120+
{
121+
numToInsert: 0,
122+
assertions: map[int]bool{
123+
0: true,
124+
1: true,
125+
2: true,
126+
3: false,
127+
4: false,
128+
5: false,
129+
},
130+
},
131+
},
132+
},
133+
}
134+
for _, tc := range testCases {
135+
t.Run(tc.name, func(t *testing.T) {
136+
tracker := newSequenceTracker()
137+
for _, insertion := range tc.insertions {
138+
tracker.Insert(insertion.numToInsert)
139+
for i, expected := range insertion.assertions {
140+
assert.Equal(t, expected, tracker.allUpToInserted(i), fmt.Sprintf("allUpToInserted(%d) should be %v after inserting %d", i, expected, insertion.numToInsert))
141+
}
142+
}
143+
})
144+
}
145+
}
146+
147+
func TestWaitUntilAllUpToInserted(t *testing.T) {
148+
tracker := newSequenceTracker()
149+
var wg sync.WaitGroup
150+
wg.Add(1)
151+
waitFinished := false
152+
go func() {
153+
defer wg.Done()
154+
ctx := context.Background()
155+
err := tracker.WaitUntilAllUpToInserted(ctx, 2, testWaitTimeout)
156+
assert.NoError(t, err)
157+
waitFinished = true
158+
}()
159+
160+
assert.False(t, waitFinished)
161+
162+
tracker.Insert(0)
163+
assert.False(t, waitFinished)
164+
165+
tracker.Insert(1)
166+
assert.False(t, waitFinished)
167+
168+
tracker.Insert(2)
169+
wg.Wait()
170+
assert.True(t, waitFinished)
171+
}
172+
173+
func TestWaitUntilAllUpToInserted_AlreadyInserted(t *testing.T) {
174+
tracker := newSequenceTracker()
175+
var wg sync.WaitGroup
176+
wg.Add(1)
177+
178+
tracker.Insert(0)
179+
tracker.Insert(1)
180+
tracker.Insert(2)
181+
182+
go func() {
183+
defer wg.Done()
184+
ctx := context.Background()
185+
err := tracker.WaitUntilAllUpToInserted(ctx, 2, testWaitTimeout)
186+
assert.NoError(t, err)
187+
}()
188+
189+
wg.Wait()
190+
}
191+
192+
func TestWaitUntilAllUpToInserted_WithContext(t *testing.T) {
193+
tracker := newSequenceTracker()
194+
ctx, cancel := context.WithCancel(context.Background())
195+
var wg sync.WaitGroup
196+
wg.Add(1)
197+
go func() {
198+
defer wg.Done()
199+
err := tracker.WaitUntilAllUpToInserted(ctx, 0, testWaitTimeout)
200+
assert.Error(t, err)
201+
}()
202+
cancel()
203+
wg.Wait()
204+
}

0 commit comments

Comments
 (0)