Skip to content

Commit bf81c66

Browse files
committed
new EnqueueUint64 function to support uint64 priority variable
Signed-off-by: Juan Peña <[email protected]>
1 parent f89dfed commit bf81c66

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

client/v3/experimental/recipes/priority_queue.go

+7
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
4141
return err
4242
}
4343

44+
// Enqueue puts a value into a queue with a given uint64 priority.
45+
func (q *PriorityQueue) EnqueueUint64(val string, pr uint64) error {
46+
prefix := fmt.Sprintf("%s%05d", q.key, pr)
47+
_, err := newSequentialKV(q.client, prefix, val)
48+
return err
49+
}
50+
4451
// Dequeue returns Enqueue()'d items in FIFO order. If the
4552
// queue is empty, Dequeue blocks until items are available.
4653
func (q *PriorityQueue) Dequeue() (string, error) {

tests/integration/clientv3/experimental/recipes/v3_queue_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -237,3 +237,67 @@ func (q *flatPriorityQueue) Enqueue(val string) error {
237237
func (q *flatPriorityQueue) Dequeue() (string, error) {
238238
return q.PriorityQueue.Dequeue()
239239
}
240+
241+
type flatUint64PriorityQueue struct{ *recipe.PriorityQueue }
242+
243+
func (q *flatUint64PriorityQueue) Enqueue(val string) error {
244+
// randomized to stress dequeuing logic; order isn't important
245+
return q.PriorityQueue.EnqueueUint64(val, uint64(rand.Intn(2)))
246+
}
247+
func (q *flatUint64PriorityQueue) Dequeue() (string, error) {
248+
return q.PriorityQueue.Dequeue()
249+
}
250+
251+
func newUint64PriorityQueues(clus *integration2.Cluster, n int) (qs []testQueue) {
252+
for i := 0; i < n; i++ {
253+
etcdc := clus.RandClient()
254+
q := &flatUint64PriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
255+
qs = append(qs, q)
256+
}
257+
return qs
258+
}
259+
260+
// TestUint64PrQueueOneReaderOneWriter tests whether uint64 priority queues respect priorities.
261+
func TestUint64PrQueueOneReaderOneWriter(t *testing.T) {
262+
integration2.BeforeTest(t)
263+
264+
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
265+
defer clus.Terminate(t)
266+
267+
// write out five items with random priority
268+
etcdc := clus.RandClient()
269+
q := recipe.NewPriorityQueue(etcdc, "testprq")
270+
for i := 0; i < 5; i++ {
271+
// [0, 2] priority for priority collision to test seq keys
272+
pr := uint64(rand.Intn(3))
273+
if err := q.EnqueueUint64(fmt.Sprintf("%d", pr), pr); err != nil {
274+
t.Fatalf("error enqueuing (%v)", err)
275+
}
276+
}
277+
278+
// read back items; confirm priority order is respected
279+
lastPr := -1
280+
for i := 0; i < 5; i++ {
281+
s, err := q.Dequeue()
282+
if err != nil {
283+
t.Fatalf("error dequeueing (%v)", err)
284+
}
285+
curPr := 0
286+
if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil {
287+
t.Fatalf(`error parsing item "%s" (%v)`, s, err)
288+
}
289+
if lastPr > curPr {
290+
t.Fatalf("expected priority %v > %v", curPr, lastPr)
291+
}
292+
}
293+
}
294+
295+
func TestUint64PrQueueManyReaderManyWriter(t *testing.T) {
296+
integration2.BeforeTest(t)
297+
298+
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
299+
defer clus.Terminate(t)
300+
rqs := newUint64PriorityQueues(clus, manyQueueClients)
301+
wqs := newUint64PriorityQueues(clus, manyQueueClients)
302+
testReadersWriters(t, rqs, wqs)
303+
}

0 commit comments

Comments
 (0)