diff --git a/internal/activity.go b/internal/activity.go index ec56fdc5f..79d7b7d2c 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -25,19 +25,21 @@ type ( // // Exposed as: [go.temporal.io/sdk/activity.Info] ActivityInfo struct { - TaskToken []byte - WorkflowType *WorkflowType - WorkflowNamespace string - WorkflowExecution WorkflowExecution - ActivityID string - ActivityType ActivityType - TaskQueue string - HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed. - ScheduledTime time.Time // Time of activity scheduled by a workflow - StartedTime time.Time // Time of activity start - Deadline time.Time // Time of activity timeout - Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. - IsLocalActivity bool // true if it is a local activity + TaskToken []byte + WorkflowType *WorkflowType + WorkflowNamespace string + WorkflowExecution WorkflowExecution + ActivityID string + ActivityType ActivityType + TaskQueue string + HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed. + ScheduleToCloseTimeout time.Duration // Schedule to close timeout set by the activity options. + StartToCloseTimeout time.Duration // Start to close timeout set by the activity options. + ScheduledTime time.Time // Time of activity scheduled by a workflow + StartedTime time.Time // Time of activity start + Deadline time.Time // Time of activity timeout + Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. + IsLocalActivity bool // true if it is a local activity // Priority settings that control relative ordering of task processing when activity tasks are backed up in a queue. // If no priority is set, the default value is the zero value. // @@ -318,17 +320,19 @@ func WithActivityTask( workflowExecution: WorkflowExecution{ RunID: task.WorkflowExecution.RunId, ID: task.WorkflowExecution.WorkflowId}, - logger: logger, - metricsHandler: metricsHandler, - deadline: deadline, - heartbeatTimeout: heartbeatTimeout, - scheduledTime: scheduled, - startedTime: started, - taskQueue: taskQueue, - dataConverter: dataConverter, - attempt: task.GetAttempt(), - priority: task.GetPriority(), - heartbeatDetails: task.HeartbeatDetails, + logger: logger, + metricsHandler: metricsHandler, + deadline: deadline, + heartbeatTimeout: heartbeatTimeout, + scheduleToCloseTimeout: scheduleToCloseTimeout, + startToCloseTimeout: startToCloseTimeout, + scheduledTime: scheduled, + startedTime: started, + taskQueue: taskQueue, + dataConverter: dataConverter, + attempt: task.GetAttempt(), + priority: task.GetPriority(), + heartbeatDetails: task.HeartbeatDetails, workflowType: &WorkflowType{ Name: task.WorkflowType.GetName(), }, @@ -380,22 +384,24 @@ func WithLocalActivityTask( deadline = task.expireTime } return newActivityContext(ctx, interceptors, &activityEnvironment{ - workflowType: &workflowTypeLocal, - workflowNamespace: task.params.WorkflowInfo.Namespace, - taskQueue: task.params.WorkflowInfo.TaskQueueName, - activityType: ActivityType{Name: activityType}, - activityID: fmt.Sprintf("%v", task.activityID), - workflowExecution: task.params.WorkflowInfo.WorkflowExecution, - logger: logger, - metricsHandler: metricsHandler, - isLocalActivity: true, - deadline: deadline, - scheduledTime: task.scheduledTime, - startedTime: startedTime, - dataConverter: dataConverter, - attempt: task.attempt, - client: client, - workerStopChannel: workerStopChannel, + workflowType: &workflowTypeLocal, + workflowNamespace: task.params.WorkflowInfo.Namespace, + taskQueue: task.params.WorkflowInfo.TaskQueueName, + activityType: ActivityType{Name: activityType}, + activityID: fmt.Sprintf("%v", task.activityID), + workflowExecution: task.params.WorkflowInfo.WorkflowExecution, + logger: logger, + metricsHandler: metricsHandler, + scheduleToCloseTimeout: scheduleToCloseTimeout, + startToCloseTimeout: startToCloseTimeout, + isLocalActivity: true, + deadline: deadline, + scheduledTime: task.scheduledTime, + startedTime: startedTime, + dataConverter: dataConverter, + attempt: task.attempt, + client: client, + workerStopChannel: workerStopChannel, }) } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index a2188973a..b8f1276cd 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -104,28 +104,30 @@ type ( } activityEnvironment struct { - taskToken []byte - workflowExecution WorkflowExecution - activityID string - activityType ActivityType - serviceInvoker ServiceInvoker - logger log.Logger - metricsHandler metrics.Handler - isLocalActivity bool - heartbeatTimeout time.Duration - deadline time.Time - scheduledTime time.Time - startedTime time.Time - taskQueue string - dataConverter converter.DataConverter - attempt int32 // starts from 1. - heartbeatDetails *commonpb.Payloads - workflowType *WorkflowType - workflowNamespace string - workerStopChannel <-chan struct{} - contextPropagators []ContextPropagator - client *WorkflowClient - priority *commonpb.Priority + taskToken []byte + workflowExecution WorkflowExecution + activityID string + activityType ActivityType + serviceInvoker ServiceInvoker + logger log.Logger + metricsHandler metrics.Handler + isLocalActivity bool + heartbeatTimeout time.Duration + scheduleToCloseTimeout time.Duration + startToCloseTimeout time.Duration + deadline time.Time + scheduledTime time.Time + startedTime time.Time + taskQueue string + dataConverter converter.DataConverter + attempt int32 // starts from 1. + heartbeatDetails *commonpb.Payloads + workflowType *WorkflowType + workflowNamespace string + workerStopChannel <-chan struct{} + contextPropagators []ContextPropagator + client *WorkflowClient + priority *commonpb.Priority } // context.WithValue need this type instead of basic type string to avoid lint error @@ -349,20 +351,22 @@ func (a *activityEnvironmentInterceptor) ExecuteActivity( func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityInfo { return ActivityInfo{ - ActivityID: a.env.activityID, - ActivityType: a.env.activityType, - TaskToken: a.env.taskToken, - WorkflowExecution: a.env.workflowExecution, - HeartbeatTimeout: a.env.heartbeatTimeout, - Deadline: a.env.deadline, - ScheduledTime: a.env.scheduledTime, - StartedTime: a.env.startedTime, - TaskQueue: a.env.taskQueue, - Attempt: a.env.attempt, - WorkflowType: a.env.workflowType, - WorkflowNamespace: a.env.workflowNamespace, - IsLocalActivity: a.env.isLocalActivity, - Priority: convertFromPBPriority(a.env.priority), + ActivityID: a.env.activityID, + ActivityType: a.env.activityType, + TaskToken: a.env.taskToken, + WorkflowExecution: a.env.workflowExecution, + HeartbeatTimeout: a.env.heartbeatTimeout, + ScheduleToCloseTimeout: a.env.scheduleToCloseTimeout, + StartToCloseTimeout: a.env.startToCloseTimeout, + Deadline: a.env.deadline, + ScheduledTime: a.env.scheduledTime, + StartedTime: a.env.startedTime, + TaskQueue: a.env.taskQueue, + Attempt: a.env.attempt, + WorkflowType: a.env.workflowType, + WorkflowNamespace: a.env.workflowNamespace, + IsLocalActivity: a.env.isLocalActivity, + Priority: convertFromPBPriority(a.env.priority), } } diff --git a/test/activity_test.go b/test/activity_test.go index 9a34d2c28..727b23e2e 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -192,7 +192,7 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error { return errFailOnPurpose } -func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error { +func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool, scheduleToCloseTimeout, startToCloseTimeout time.Duration) error { a.append("inspectActivityInfo") if !activity.IsActivity(ctx) { return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx)) @@ -220,6 +220,12 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue if info.IsLocalActivity != isLocalActivity { return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity) } + if info.ScheduleToCloseTimeout != scheduleToCloseTimeout { + return fmt.Errorf("expected ScheduleToCloseTimeout %v but got %v", scheduleToCloseTimeout, info.ScheduleToCloseTimeout) + } + if info.StartToCloseTimeout != startToCloseTimeout { + return fmt.Errorf("expected StartToCloseTimeout %v but got %v", startToCloseTimeout, info.StartToCloseTimeout) + } return nil } diff --git a/test/workflow_test.go b/test/workflow_test.go index 1c356a3db..5610cd5e0 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1494,7 +1494,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error { wfType := info.WorkflowType.Name taskQueue := info.TaskQueueName ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) - return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil) + return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false, 5*time.Second, 5*time.Second).Get(ctx, nil) } func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { @@ -1505,7 +1505,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions()) var activities *Activities return workflow.ExecuteLocalActivity( - ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil) + ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true, 5*time.Second, 5*time.Second).Get(ctx, nil) } func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) {