Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/internal_versioning_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,17 @@ func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats {
}
}

func statsByPriorityKeyFromResponse(statsByPriorityKey map[int32]*taskqueuepb.TaskQueueStats) map[int32]TaskQueueStats {
if statsByPriorityKey == nil {
return nil
}
result := make(map[int32]TaskQueueStats, len(statsByPriorityKey))
for priority, stats := range statsByPriorityKey {
result[priority] = *statsFromResponse(stats)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nil pointer dereference when map contains nil stats value

High Severity

The statsByPriorityKeyFromResponse function dereferences the return value of statsFromResponse(stats) directly. Since statsFromResponse returns nil when its input is nil, if any entry in the statsByPriorityKey map has a nil value for stats, the expression *statsFromResponse(stats) will cause a nil pointer dereference panic.

Fix in Cursor Fix in Web

}
return result
}

func taskQueueVersionInfoFromResponse(response *taskqueuepb.TaskQueueVersionInfo) TaskQueueVersionInfo {
if response == nil {
return TaskQueueVersionInfo{}
Expand Down
17 changes: 16 additions & 1 deletion internal/internal_worker_deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,19 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi
}
}

func workerDeploymentVersionTaskQueuesFromProto(tqInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue) []WorkerDeploymentTaskQueueInfo {
result := []WorkerDeploymentTaskQueueInfo{}
for _, info := range tqInfos {
result = append(result, WorkerDeploymentTaskQueueInfo{
Name: info.GetName(),
Type: TaskQueueType(info.GetType()),
Stats: statsFromResponse(info.GetStats()),
StatsByPriorityKey: statsByPriorityKeyFromResponse(info.GetStatsByPriorityKey()),
})
}
return result
}

func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, options WorkerDeploymentDescribeVersionOptions) (WorkerDeploymentVersionDescription, error) {

if err := h.validate(); err != nil {
Expand All @@ -381,6 +394,7 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option
BuildId: options.BuildID,
DeploymentName: h.Name,
},
ReportTaskQueueStats: options.ReportTaskQueueStats,
}
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
Expand All @@ -391,7 +405,8 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option
}

return WorkerDeploymentVersionDescription{
Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()),
Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()),
TaskQueueInfos: workerDeploymentVersionTaskQueuesFromProto(resp.GetVersionTaskQueues()),
}, nil
}

Expand Down
17 changes: 16 additions & 1 deletion internal/worker_deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ type (
ConflictToken []byte

// PreviousVersion - The Version that was current before executing this operation, if any.
//
//
// Deprecated: in favor of API idempotency. Use `Describe` before this API to get the previous
// state. Pass the `ConflictToken` returned by `Describe` to this API to avoid race conditions.
PreviousVersion *WorkerDeploymentVersion
Expand Down Expand Up @@ -322,6 +322,8 @@ type (
WorkerDeploymentDescribeVersionOptions struct {
// BuildID - A Build ID within this deployment to describe.
BuildID string
// ReportTaskQueueStats - Whether to report stats for task queues which have been polled by this version.
ReportTaskQueueStats bool
}

// WorkerDeploymentTaskQueueInfo describes properties of the Task Queues involved
Expand All @@ -336,6 +338,13 @@ type (

// Type - The type of this task queue.
Type TaskQueueType

// Stats - Only set if ReportTaskQueueStats is set on the request.
Stats *TaskQueueStats

// StatsByPriorityKey - Task queue stats breakdown by priority key. Only contains actively used priority keys.
// Only set if ReportTaskQueueStats is set on the request.
StatsByPriorityKey map[int32]TaskQueueStats
}

// WorkerDeploymentVersionDrainageInfo describes drainage properties of a Deployment Version.
Expand Down Expand Up @@ -382,6 +391,8 @@ type (
RampPercentage float32

// TaskQueuesInfos - List of task queues polled by workers in this Deployment Version.
//
// Deprecated: Use WorkerDeploymentVersionDescription.TaskQueueInfos instead.
TaskQueuesInfos []WorkerDeploymentTaskQueueInfo

// DrainageInfo - Drainage information for a Worker Deployment Version, enabling users to
Expand All @@ -403,6 +414,10 @@ type (
WorkerDeploymentVersionDescription struct {
// Info - Information about this Version.
Info WorkerDeploymentVersionInfo

// All the Task Queues that have ever polled from this Deployment version.
// Stats are only reported if explicitly requested.
TaskQueueInfos []WorkerDeploymentTaskQueueInfo
}

// WorkerDeploymentDeleteVersionOptions provides options for
Expand Down
173 changes: 172 additions & 1 deletion test/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
Expand Down Expand Up @@ -1208,6 +1208,177 @@ func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() {
ts.Nil(response2.Info.RoutingConfig.RampingVersion)
}

func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_NoPriority() {
ts.testDescribeVersionWithBacklogStats(false)
}

func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_YesPriority() {
ts.testDescribeVersionWithBacklogStats(true)
}

func (ts *WorkerDeploymentTestSuite) testDescribeVersionWithBacklogStats(withPriority bool) {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}

// Start a worker briefly to register the deployment version
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v1,
},
})
worker1.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{
Name: "WaitSignalToStartVersioned",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})

ts.NoError(worker1.Start())

// Wait for the worker deployment and version to exist
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
ts.waitForWorkerDeployment(ctx, dHandle)
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)
ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)

// SetCurrent to v1 so that workflows start on that version
_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: response1.ConflictToken,
})
ts.NoError(err)

// Stop the worker so workflows create backlog
worker1.Stop()

// Start workflows with different priority keys to create task queue backlog
// Priority keys: 1 (high), 3 (medium/default), 5 (low)
priorityKeys := []int{1, 3, 5}
workflowsPerPriority := 2
wfHandles := make([]client.WorkflowRun, 0, len(priorityKeys)*workflowsPerPriority)
for _, priorityKey := range priorityKeys {
for j := 0; j < workflowsPerPriority; j++ {
wfID := "backlog-test-" + uuid.NewString()
opts := ts.startWorkflowOptions(wfID)
// Only use priority keys if we are testing with priority keys so that we can verify
// that per-priority stats are not returned when priority keys are not set.
if withPriority {
opts.Priority = temporal.Priority{PriorityKey: priorityKey}
}
handle, err := ts.client.ExecuteWorkflow(ctx, opts, "WaitSignalToStartVersioned")
ts.NoError(err)
wfHandles = append(wfHandles, handle)
}
}

// Wait for backlog stats to be reflected - stats may take time to propagate to version info
ts.Eventuallyf(func() bool {
desc, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{
BuildID: v1.BuildID,
ReportTaskQueueStats: true,
})
if err != nil {
return false
}
if desc.Info.Version != v1 {
return false
}
// Find the workflow task queue and check backlog stats
for _, tqInfo := range desc.TaskQueueInfos {
if tqInfo.Name == ts.taskQueueName && tqInfo.Type == client.TaskQueueTypeWorkflow && tqInfo.Stats != nil {
if !(tqInfo.Stats.ApproximateBacklogCount > 0 &&
tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 &&
tqInfo.Stats.TasksAddRate > 0 &&
tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers
tqInfo.Stats.BacklogIncreaseRate > 0) {
ts.T().Logf("Unexpected backlog stats for version: %+v", tqInfo.Stats)
return false
}
// If the backlog stats have propagated to the version info, check that per-priority stats are as expected.
for priorityKey, priorityKeyStats := range tqInfo.StatsByPriorityKey {
// If PriorityKey is set on the workflow, StatsByPriorityKey will be non-nil
// even if the Priority feature is not enabled in the test namespace.
// Until Priority is enabled by default in all SDK test namespaces, this
// test will be relaxed so that it can pass against a non-Priority-enabled namespace.
// Instead of checking that each StatsByPriorityKey entry has a non-empty ApproximateBacklogCount,
// We just check that for each expected priority key the entry exists and has a non-zero
// BacklogIncreaseRate and TasksAddRate. For the default priority key (3), we check that it has an
// ApproximateBacklogCount of either 2 (if Priority is enabled) or 6 (if Priority is not enabled).
if priorityKey != 3 {
if withPriority {
if !(priorityKeyStats.TasksAddRate > 0 &&
priorityKeyStats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers
priorityKeyStats.BacklogIncreaseRate > 0) {
ts.T().Logf("Unexpected backlog stats for priority key %v: %+v", priorityKey, priorityKeyStats)
return false
}
} else {
ts.T().Logf("No priority keys set, so only the default priority key should be present, but found key %v", priorityKey)
return false
}
} else {
if !((tqInfo.Stats.ApproximateBacklogCount == 2 || tqInfo.Stats.ApproximateBacklogCount == 6) &&
tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 &&
tqInfo.Stats.TasksAddRate > 0 &&
tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers
tqInfo.Stats.BacklogIncreaseRate > 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test checks aggregate stats instead of per-priority stats

Medium Severity

When checking the default priority key (3), the test verifies tqInfo.Stats (aggregate task queue stats) instead of priorityKeyStats (per-priority stats). The comment states the intent is to check the priority key's ApproximateBacklogCount, but the code checks the aggregate stats. This mismatch means the test doesn't actually validate that per-priority stats are correctly reported for the default priority key.

Fix in Cursor Fix in Web

ts.T().Logf("Unexpected backlog stats for default priority key %v: %+v", priorityKey, priorityKeyStats)
return false
}
}
}
if withPriority {
return len(tqInfo.StatsByPriorityKey) == len(priorityKeys)
} else {
return len(tqInfo.StatsByPriorityKey) == 1
}
}
}
return false
}, 10*time.Second, 500*time.Millisecond, "timeout waiting for expected backlog stats to be reflected in version info")

// Also test that stats are NOT returned when ReportTaskQueueStats is false
descNoStats, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{
BuildID: v1.BuildID,
ReportTaskQueueStats: false,
})
ts.NoError(err)
for _, tqInfo := range descNoStats.TaskQueueInfos {
ts.Nil(tqInfo.Stats, "Stats should be nil when ReportTaskQueueStats is false")
}

// Cleanup: restart worker and complete workflows
worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v1,
},
})
worker2.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{
Name: "WaitSignalToStartVersioned",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})

ts.NoError(worker2.Start())
defer worker2.Stop()

// Signal all workflows to complete
for _, handle := range wfHandles {
ts.NoError(ts.client.SignalWorkflow(ctx, handle.GetID(), handle.GetRunID(), "start-signal", "prefix"))
}
// Wait for workflows to complete
for _, handle := range wfHandles {
var result string
ts.NoError(handle.Get(ctx, &result))
}
}

func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
Expand Down
Loading