Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ type (
//
// Exposed as: [go.temporal.io/sdk/activity.Info]
ActivityInfo struct {
TaskToken []byte
WorkflowType *WorkflowType
WorkflowNamespace string
WorkflowExecution WorkflowExecution
ActivityID string
ActivityType ActivityType
TaskQueue string
HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed.
ScheduledTime time.Time // Time of activity scheduled by a workflow
StartedTime time.Time // Time of activity start
Deadline time.Time // Time of activity timeout
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
IsLocalActivity bool // true if it is a local activity
TaskToken []byte
WorkflowType *WorkflowType
WorkflowNamespace string
WorkflowExecution WorkflowExecution
ActivityID string
ActivityType ActivityType
TaskQueue string
HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed.
ScheduleToCloseTimeout time.Duration // Schedule to close timeout set by the activity options.
StartToCloseTimeout time.Duration // Start to close timeout set by the activity options.
ScheduledTime time.Time // Time of activity scheduled by a workflow
StartedTime time.Time // Time of activity start
Deadline time.Time // Time of activity timeout
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
IsLocalActivity bool // true if it is a local activity
// Priority settings that control relative ordering of task processing when activity tasks are backed up in a queue.
// If no priority is set, the default value is the zero value.
//
Expand Down Expand Up @@ -318,17 +320,19 @@ func WithActivityTask(
workflowExecution: WorkflowExecution{
RunID: task.WorkflowExecution.RunId,
ID: task.WorkflowExecution.WorkflowId},
logger: logger,
metricsHandler: metricsHandler,
deadline: deadline,
heartbeatTimeout: heartbeatTimeout,
scheduledTime: scheduled,
startedTime: started,
taskQueue: taskQueue,
dataConverter: dataConverter,
attempt: task.GetAttempt(),
priority: task.GetPriority(),
heartbeatDetails: task.HeartbeatDetails,
logger: logger,
metricsHandler: metricsHandler,
deadline: deadline,
heartbeatTimeout: heartbeatTimeout,
scheduleToCloseTimeout: scheduleToCloseTimeout,
startToCloseTimeout: startToCloseTimeout,
scheduledTime: scheduled,
startedTime: started,
taskQueue: taskQueue,
dataConverter: dataConverter,
attempt: task.GetAttempt(),
priority: task.GetPriority(),
heartbeatDetails: task.HeartbeatDetails,
workflowType: &WorkflowType{
Name: task.WorkflowType.GetName(),
},
Expand Down Expand Up @@ -380,22 +384,24 @@ func WithLocalActivityTask(
deadline = task.expireTime
}
return newActivityContext(ctx, interceptors, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowNamespace: task.params.WorkflowInfo.Namespace,
taskQueue: task.params.WorkflowInfo.TaskQueueName,
activityType: ActivityType{Name: activityType},
activityID: fmt.Sprintf("%v", task.activityID),
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
logger: logger,
metricsHandler: metricsHandler,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
client: client,
workerStopChannel: workerStopChannel,
workflowType: &workflowTypeLocal,
workflowNamespace: task.params.WorkflowInfo.Namespace,
taskQueue: task.params.WorkflowInfo.TaskQueueName,
activityType: ActivityType{Name: activityType},
activityID: fmt.Sprintf("%v", task.activityID),
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
logger: logger,
metricsHandler: metricsHandler,
scheduleToCloseTimeout: scheduleToCloseTimeout,
startToCloseTimeout: startToCloseTimeout,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
client: client,
workerStopChannel: workerStopChannel,
})
}

Expand Down
76 changes: 40 additions & 36 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,30 @@ type (
}

activityEnvironment struct {
taskToken []byte
workflowExecution WorkflowExecution
activityID string
activityType ActivityType
serviceInvoker ServiceInvoker
logger log.Logger
metricsHandler metrics.Handler
isLocalActivity bool
heartbeatTimeout time.Duration
deadline time.Time
scheduledTime time.Time
startedTime time.Time
taskQueue string
dataConverter converter.DataConverter
attempt int32 // starts from 1.
heartbeatDetails *commonpb.Payloads
workflowType *WorkflowType
workflowNamespace string
workerStopChannel <-chan struct{}
contextPropagators []ContextPropagator
client *WorkflowClient
priority *commonpb.Priority
taskToken []byte
workflowExecution WorkflowExecution
activityID string
activityType ActivityType
serviceInvoker ServiceInvoker
logger log.Logger
metricsHandler metrics.Handler
isLocalActivity bool
heartbeatTimeout time.Duration
scheduleToCloseTimeout time.Duration
startToCloseTimeout time.Duration
deadline time.Time
scheduledTime time.Time
startedTime time.Time
taskQueue string
dataConverter converter.DataConverter
attempt int32 // starts from 1.
heartbeatDetails *commonpb.Payloads
workflowType *WorkflowType
workflowNamespace string
workerStopChannel <-chan struct{}
contextPropagators []ContextPropagator
client *WorkflowClient
priority *commonpb.Priority
}

// context.WithValue need this type instead of basic type string to avoid lint error
Expand Down Expand Up @@ -349,20 +351,22 @@ func (a *activityEnvironmentInterceptor) ExecuteActivity(

func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityInfo {
return ActivityInfo{
ActivityID: a.env.activityID,
ActivityType: a.env.activityType,
TaskToken: a.env.taskToken,
WorkflowExecution: a.env.workflowExecution,
HeartbeatTimeout: a.env.heartbeatTimeout,
Deadline: a.env.deadline,
ScheduledTime: a.env.scheduledTime,
StartedTime: a.env.startedTime,
TaskQueue: a.env.taskQueue,
Attempt: a.env.attempt,
WorkflowType: a.env.workflowType,
WorkflowNamespace: a.env.workflowNamespace,
IsLocalActivity: a.env.isLocalActivity,
Priority: convertFromPBPriority(a.env.priority),
ActivityID: a.env.activityID,
ActivityType: a.env.activityType,
TaskToken: a.env.taskToken,
WorkflowExecution: a.env.workflowExecution,
HeartbeatTimeout: a.env.heartbeatTimeout,
ScheduleToCloseTimeout: a.env.scheduleToCloseTimeout,
StartToCloseTimeout: a.env.startToCloseTimeout,
Deadline: a.env.deadline,
ScheduledTime: a.env.scheduledTime,
StartedTime: a.env.startedTime,
TaskQueue: a.env.taskQueue,
Attempt: a.env.attempt,
WorkflowType: a.env.workflowType,
WorkflowNamespace: a.env.workflowNamespace,
IsLocalActivity: a.env.isLocalActivity,
Priority: convertFromPBPriority(a.env.priority),
}
}

Expand Down
8 changes: 7 additions & 1 deletion test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error {
return errFailOnPurpose
}

func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error {
func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool, scheduleToCloseTimeout, startToCloseTimeout time.Duration) error {
a.append("inspectActivityInfo")
if !activity.IsActivity(ctx) {
return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx))
Expand Down Expand Up @@ -220,6 +220,12 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
if info.IsLocalActivity != isLocalActivity {
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
}
if info.ScheduleToCloseTimeout != scheduleToCloseTimeout {
return fmt.Errorf("expected ScheduleToCloseTimeout %v but got %v", scheduleToCloseTimeout, info.ScheduleToCloseTimeout)
}
if info.StartToCloseTimeout != startToCloseTimeout {
return fmt.Errorf("expected StartToCloseTimeout %v but got %v", startToCloseTimeout, info.StartToCloseTimeout)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error {
wfType := info.WorkflowType.Name
taskQueue := info.TaskQueueName
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil)
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false, 5*time.Second, 5*time.Second).Get(ctx, nil)
}

func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
Expand All @@ -1505,7 +1505,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions())
var activities *Activities
return workflow.ExecuteLocalActivity(
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil)
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true, 5*time.Second, 5*time.Second).Get(ctx, nil)
}

func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) {
Expand Down