Skip to content

Re-use unit tests #7469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/softassert/softassert.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ func That(logger log.Logger, condition bool, msg string) bool {
}
return condition
}

// Fail logs an error message indicating a failed assertion.
// It works the same as That, but does not require a condition.
func Fail(logger log.Logger, msg string) {
logger.Error("failed assertion: "+msg, tag.FailedAssertion)
}
Comment on lines +52 to +54
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience function for when no condition is needed.

215 changes: 174 additions & 41 deletions service/matching/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,181 @@
package matching

import (
"context"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/testing/testlogger"
"go.temporal.io/server/common/tqid"
)

func TestAckManager_AddingTasksIncreasesBacklogCounter(t *testing.T) {
type AckManagerTestSuite struct {
suite.Suite
logger *testlogger.TestLogger
}

func TestAckManagerTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &AckManagerTestSuite{})
}

func (s *AckManagerTestSuite) SetupTest() {
s.logger = testlogger.NewTestLogger(s.T(), testlogger.FailOnAnyUnexpectedError)
}

controller := gomock.NewController(t)
backlogMgr := newBacklogMgr(t, controller, false)
func (s *AckManagerTestSuite) AddingTasksIncreasesBacklogCounter() {
ackMgr := newTestAckMgr(s.logger)

backlogMgr.taskAckManager.addTask(1)
require.Equal(t, backlogMgr.taskAckManager.getBacklogCountHint(), int64(1))
backlogMgr.taskAckManager.addTask(12)
require.Equal(t, backlogMgr.taskAckManager.getBacklogCountHint(), int64(2))
ackMgr.addTask(1)
s.Equal(ackMgr.getBacklogCountHint(), int64(1))

ackMgr.addTask(12)
s.Equal(ackMgr.getBacklogCountHint(), int64(2))
}

func TestAckManager_CompleteTaskMovesAckLevelUpToGap(t *testing.T) {
t.Parallel()
controller := gomock.NewController(t)
backlogMgr := newBacklogMgr(t, controller, false)
_, err := backlogMgr.db.RenewLease(backlogMgr.tqCtx)
require.NoError(t, err)
func (s *AckManagerTestSuite) CompleteTaskMovesAckLevelUpToGap() {
ackMgr := newTestAckMgr(s.logger)

_, err := ackMgr.db.RenewLease(context.Background())
s.NoError(err)

ackMgr.addTask(1)
ackMgr.db.updateApproximateBacklogCount(1) // increment the backlog so that we don't under-count
s.Equal(int64(-1), ackMgr.getAckLevel(), "should only move ack level on completion")

ackLevel, numAcked := ackMgr.completeTask(1)
s.Equal(int64(1), ackLevel, "should move ack level on completion")
s.Equal(int64(1), numAcked, "should move ack level on completion")

ackMgr.addTask(2)
ackMgr.addTask(3)
ackMgr.addTask(12)
ackMgr.db.updateApproximateBacklogCount(3)

ackLevel, numAcked = ackMgr.completeTask(3)
s.Equal(int64(1), ackLevel, "task 2 is not complete, we should not move ack level")
s.Equal(int64(0), numAcked, "task 2 is not complete, we should not move ack level")

ackLevel, numAcked = ackMgr.completeTask(2)
s.Equal(int64(3), ackLevel, "both tasks 2 and 3 are complete")
s.Equal(int64(2), numAcked, "both tasks 2 and 3 are complete")
}

func (s *AckManagerTestSuite) TestAckManager() {
ackMgr := newTestAckMgr(s.logger)

_, err := ackMgr.db.RenewLease(context.Background())
s.NoError(err)

ackMgr.setAckLevel(100)
s.EqualValues(100, ackMgr.getAckLevel())
s.EqualValues(100, ackMgr.getReadLevel())
const t1 = 200
const t2 = 220
const t3 = 320
const t4 = 340
const t5 = 360
const t6 = 380

ackMgr.addTask(t1)
// Increment the backlog so that we don't under-count
// this happens since we decrease the counter on completion of a task
ackMgr.db.updateApproximateBacklogCount(1)
s.EqualValues(100, ackMgr.getAckLevel())
s.EqualValues(t1, ackMgr.getReadLevel())

ackMgr.addTask(t2)
ackMgr.db.updateApproximateBacklogCount(1)
s.EqualValues(100, ackMgr.getAckLevel())
s.EqualValues(t2, ackMgr.getReadLevel())

backlogMgr.taskAckManager.addTask(1)
ackMgr.completeTask(t2)
s.EqualValues(100, ackMgr.getAckLevel())
s.EqualValues(t2, ackMgr.getReadLevel())

ackMgr.completeTask(t1)
s.EqualValues(t2, ackMgr.getAckLevel())
s.EqualValues(t2, ackMgr.getReadLevel())

ackMgr.setAckLevel(300)
s.EqualValues(300, ackMgr.getAckLevel())
s.EqualValues(300, ackMgr.getReadLevel())

ackMgr.addTask(t3)
ackMgr.db.updateApproximateBacklogCount(1)
s.EqualValues(300, ackMgr.getAckLevel())
s.EqualValues(t3, ackMgr.getReadLevel())

ackMgr.addTask(t4)
ackMgr.db.updateApproximateBacklogCount(1)
s.EqualValues(300, ackMgr.getAckLevel())
s.EqualValues(t4, ackMgr.getReadLevel())

ackMgr.completeTask(t3)
s.EqualValues(t3, ackMgr.getAckLevel())
s.EqualValues(t4, ackMgr.getReadLevel())

ackMgr.completeTask(t4)
s.EqualValues(t4, ackMgr.getAckLevel())
s.EqualValues(t4, ackMgr.getReadLevel())

ackMgr.setReadLevel(t5)
s.EqualValues(t5, ackMgr.getReadLevel())

ackMgr.setAckLevel(t5)
ackMgr.setReadLevelAfterGap(t6)
s.EqualValues(t6, ackMgr.getReadLevel())
s.EqualValues(t6, ackMgr.getAckLevel())
}

func (s *AckManagerTestSuite) Sort() {
ackMgr := newTestAckMgr(s.logger)

_, err := ackMgr.db.RenewLease(context.Background())
s.NoError(err)

const t0 = 100
ackMgr.setAckLevel(t0)
s.EqualValues(t0, ackMgr.getAckLevel())
s.EqualValues(t0, ackMgr.getReadLevel())
const t1 = 200
const t2 = 220
const t3 = 320
const t4 = 340
const t5 = 360

ackMgr.addTask(t1)
ackMgr.addTask(t2)
ackMgr.addTask(t3)
ackMgr.addTask(t4)
ackMgr.addTask(t5)

// Increment the backlog so that we don't under-count
backlogMgr.db.updateApproximateBacklogCount(1)
require.Equal(t, int64(-1), backlogMgr.taskAckManager.getAckLevel(), "should only move ack level on completion")
ackLevel, numAcked := backlogMgr.taskAckManager.completeTask(1)
require.Equal(t, int64(1), ackLevel, "should move ack level on completion")
require.Equal(t, int64(1), numAcked, "should move ack level on completion")

backlogMgr.taskAckManager.addTask(2)
backlogMgr.taskAckManager.addTask(3)
backlogMgr.taskAckManager.addTask(12)
backlogMgr.db.updateApproximateBacklogCount(3)

ackLevel, numAcked = backlogMgr.taskAckManager.completeTask(3)
require.Equal(t, int64(1), ackLevel, "task 2 is not complete, we should not move ack level")
require.Equal(t, int64(0), numAcked, "task 2 is not complete, we should not move ack level")
ackLevel, numAcked = backlogMgr.taskAckManager.completeTask(2)
require.Equal(t, int64(3), ackLevel, "both tasks 2 and 3 are complete")
require.Equal(t, int64(2), numAcked, "both tasks 2 and 3 are complete")
// this happens since we decrease the counter on completion of a task
ackMgr.db.updateApproximateBacklogCount(5)

ackMgr.completeTask(t2)
s.EqualValues(t0, ackMgr.getAckLevel())

ackMgr.completeTask(t1)
s.EqualValues(t2, ackMgr.getAckLevel())

ackMgr.completeTask(t5)
s.EqualValues(t2, ackMgr.getAckLevel())

ackMgr.completeTask(t4)
s.EqualValues(t2, ackMgr.getAckLevel())

ackMgr.completeTask(t3)
s.EqualValues(t5, ackMgr.getAckLevel())
}

func BenchmarkAckManager_AddTask(b *testing.B) {
controller := gomock.NewController(b)
ackMgr := newTestAckMgr(log.NewTestLogger())

tasks := make([]int, 1000)
for i := 0; i < len(tasks); i++ {
Expand All @@ -85,40 +210,48 @@ func BenchmarkAckManager_AddTask(b *testing.B) {
// Add 1000 tasks in order and complete them in a random order.
// This will cause our ack level to jump as we complete them
b.StopTimer()
backlogMgr := newBacklogMgr(b, controller, false)
rand.Shuffle(len(tasks), func(i, j int) {
tasks[i], tasks[j] = tasks[j], tasks[i]
})
b.StartTimer()
for i := 0; i < len(tasks); i++ {
tasks[i] = i
backlogMgr.taskAckManager.addTask(int64(i))
ackMgr.addTask(int64(i))
}
}
}

func BenchmarkAckManager_CompleteTask(b *testing.B) {
controller := gomock.NewController(b)
ackMgr := newTestAckMgr(log.NewTestLogger())

tasks := make([]int, 1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Add 1000 tasks in order and complete them in a random order.
// This will cause our ack level to jump as we complete them
b.StopTimer()
backlogMgr := newBacklogMgr(b, controller, false)
for i := 0; i < len(tasks); i++ {
tasks[i] = i
backlogMgr.taskAckManager.addTask(int64(i))
backlogMgr.db.updateApproximateBacklogCount(int64(1)) // Increment the backlog so that we don't under-count
ackMgr.addTask(int64(i))
ackMgr.db.updateApproximateBacklogCount(int64(1)) // Increment the backlog so that we don't under-count
}
rand.Shuffle(len(tasks), func(i, j int) {
tasks[i], tasks[j] = tasks[j], tasks[i]
})
b.StartTimer()

for i := 0; i < len(tasks); i++ {
backlogMgr.taskAckManager.completeTask(int64(i))
ackMgr.completeTask(int64(i))
}
}
}

func newTestAckMgr(logger log.Logger) *ackManager {
tm := newTestTaskManager(logger)
cfg := NewConfig(dynamicconfig.NewNoopCollection())
f, _ := tqid.NewTaskQueueFamily("", "test-queue")
prtn := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).NormalPartition(0)
tlCfg := newTaskQueueConfig(prtn.TaskQueue(), cfg, "test-namespace")
db := newTaskQueueDB(tlCfg, tm, UnversionedQueueKey(prtn), logger)
return newAckManager(db, logger)
}
12 changes: 10 additions & 2 deletions service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type (
TotalApproximateBacklogCount() int64
BacklogHeadAge() time.Duration
InternalStatus() []*taskqueuespb.InternalTaskQueueStatus

// TODO(pri): remove
getDB() *taskQueueDB
}

backlogManagerImpl struct {
Expand Down Expand Up @@ -232,8 +235,9 @@ func (c *backlogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueS
StartId: c.taskWriter.taskIDBlock.start,
EndId: c.taskWriter.taskIDBlock.end,
},
LoadedTasks: c.taskAckManager.getBacklogCountHint(),
MaxReadLevel: c.db.GetMaxReadLevel(subqueueZero),
LoadedTasks: c.taskAckManager.getBacklogCountHint(),
MaxReadLevel: c.db.GetMaxReadLevel(subqueueZero),
ApproximateBacklogCount: c.db.getApproximateBacklogCount(subqueueZero),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for parity with new backlog manager.

},
}
}
Expand Down Expand Up @@ -306,3 +310,7 @@ func executeWithRetry(
func (c *backlogManagerImpl) queueKey() *PhysicalTaskQueueKey {
return c.pqMgr.QueueKey()
}

func (c *backlogManagerImpl) getDB() *taskQueueDB {
return c.db
}
Loading
Loading