diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 641476701..b2b2a8a8f 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1498,6 +1498,17 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *hi weh.GetFailureConverter().FailureToError(attributes.GetFailure()), ) + var panicErr *PanicError + if errors.As(activityTaskErr, &panicErr) { + weh.logger.Error("Activity panic.", + tagWorkflowID, weh.workflowInfo.WorkflowExecution.ID, + tagRunID, weh.workflowInfo.WorkflowExecution.RunID, + tagActivityType, activity.activityType.Name, + tagActivityID, activityID, + tagPanicError, panicErr.Error(), + tagPanicStack, panicErr.StackTrace()) + } + activity.handle(nil, activityTaskErr) return nil } @@ -1705,6 +1716,17 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details lar.Attempt = lamd.Attempt lar.Backoff = lamd.Backoff lar.Err = weh.GetFailureConverter().FailureToError(failure) + + var panicErr *PanicError + if errors.As(lar.Err, &panicErr) { + weh.logger.Error("LocalActivity panic.", + tagWorkflowID, weh.workflowInfo.WorkflowExecution.ID, + tagRunID, weh.workflowInfo.WorkflowExecution.RunID, + tagActivityType, la.params.ActivityType, + tagActivityID, lamd.ActivityID, + tagPanicError, panicErr.Error(), + tagPanicStack, panicErr.StackTrace()) + } } else { // Result might not be there if local activity doesn't have return value. lar.Result = details[localActivityResultName] diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index a4e142d0d..91407d12d 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -21,6 +21,7 @@ import ( commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" historypb "go.temporal.io/api/history/v1" protocolpb "go.temporal.io/api/protocol/v1" querypb "go.temporal.io/api/query/v1" @@ -1145,6 +1146,57 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { t.True(ok) } +func (t *TaskHandlersTestSuite) TestActivityTask_ActivityPanics() { + taskQueue := "taskQueue" + workflowType := "HelloWorld_Workflow" + activityType := "Greeter_Activity" + activityID := "0" + + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(3), + createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{ScheduledEventId: 2}), + createTestEventActivityTaskScheduled(5, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: activityID, + ActivityType: &commonpb.ActivityType{Name: activityType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }), + createTestEventActivityTaskStarted(6, &historypb.ActivityTaskStartedEventAttributes{}), + { + EventId: 7, + EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED, + Attributes: &historypb.HistoryEvent_ActivityTaskFailedEventAttributes{ + ActivityTaskFailedEventAttributes: &historypb.ActivityTaskFailedEventAttributes{ + ScheduledEventId: 5, + StartedEventId: 6, + Failure: &failurepb.Failure{ + Message: "panicError", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: "PanicError", + }, + }, + StackTrace: "stackTrace", + }, + }, + }, + }, + } + task := createWorkflowTask(testEvents, 4, workflowType) + params := t.getTestWorkerExecutionParams() + + taskHandler := newWorkflowTaskHandler(params, nil, t.registry) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + _, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) + t.NoError(err) +} + func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { parentID := "parentID" parentRunID := "parentRun"