Skip to content

Commit f91d109

Browse files
authored
Merge branch 'main' into kannan/worker-commands-capability
2 parents f745b68 + 094162e commit f91d109

5 files changed

Lines changed: 84 additions & 8 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -713,9 +713,11 @@ func (a *Activity) RecordHeartbeat(
713713
if err != nil {
714714
return nil, err
715715
}
716+
prevHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
716717
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
717-
RecordedTime: timestamppb.New(ctx.Now(a)),
718-
Details: input.Request.GetHeartbeatRequest().GetDetails(),
718+
RecordedTime: timestamppb.New(ctx.Now(a)),
719+
Details: input.Request.GetHeartbeatRequest().GetDetails(),
720+
TotalHeartbeatCount: prevHeartbeat.GetTotalHeartbeatCount() + 1,
719721
})
720722
if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 {
721723
ctx.AddTask(
@@ -817,6 +819,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.
817819
Header: requestData.GetHeader(),
818820
HeartbeatDetails: heartbeat.GetDetails(),
819821
HeartbeatTimeout: a.GetHeartbeatTimeout(),
822+
TotalHeartbeatCount: heartbeat.GetTotalHeartbeatCount(),
820823
LastAttemptCompleteTime: attempt.GetCompleteTime(),
821824
LastFailure: attempt.GetLastFailureDetails().GetFailure(),
822825
LastHeartbeatTime: heartbeat.GetRecordedTime(),

chasm/lib/activity/activity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1111
"go.temporal.io/server/api/historyservice/v1"
1212
"go.temporal.io/server/chasm"
13-
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
13+
activitypb "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
1414
"go.temporal.io/server/common/dynamicconfig"
1515
"go.temporal.io/server/common/namespace"
1616
serviceerrors "go.temporal.io/server/common/serviceerror"

chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/proto/v1/activity_state.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ message ActivityHeartbeatState {
162162
temporal.api.common.v1.Payloads details = 1;
163163
// Time the last heartbeat was recorded.
164164
google.protobuf.Timestamp recorded_time = 2;
165+
// Total number of heartbeats recorded across all attempts of this activity, including retries.
166+
int64 total_heartbeat_count = 3;
165167
}
166168

167169
message ActivityRequestData {

tests/standalone_activity_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,6 +1326,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() {
13261326
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(),
13271327
"expected Canceled but is %s", info.GetStatus())
13281328
require.Equal(t, "Test Cancellation", info.GetCanceledReason())
1329+
require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count")
13291330
require.Greater(t, info.GetExecutionDuration().AsDuration(), time.Duration(0))
13301331
require.NotNil(t, info.GetCloseTime())
13311332
protorequire.ProtoEqual(t, details, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetDetails())
@@ -1411,6 +1412,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() {
14111412
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(),
14121413
"expected Canceled but is %s", info.GetStatus())
14131414
require.Equal(t, "Test Cancellation", info.GetCanceledReason())
1415+
require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count")
14141416
protorequire.ProtoEqual(t, details, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetDetails())
14151417
require.Equal(t, identity, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetIdentity())
14161418
})
@@ -1485,6 +1487,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() {
14851487
require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, info.GetRunState(),
14861488
"expected CancelRequested but is %s", info.GetRunState())
14871489
require.Equal(t, "Test Cancellation", info.GetCanceledReason())
1490+
require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count")
14881491
})
14891492

14901493
t.Run("DifferentRequestIDFails", func(t *testing.T) {
@@ -4648,6 +4651,22 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
46484651

46494652
// Verify: heartbeat details from first attempt are available
46504653
protorequire.ProtoEqual(t, heartbeatDetails, pollResp2.HeartbeatDetails)
4654+
4655+
_, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{
4656+
Namespace: s.Namespace().String(),
4657+
TaskToken: pollResp2.TaskToken,
4658+
Details: heartbeatDetails,
4659+
})
4660+
require.NoError(t, err)
4661+
4662+
desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{
4663+
Namespace: s.Namespace().String(),
4664+
ActivityId: activityID,
4665+
IncludeOutcome: true,
4666+
})
4667+
require.NoError(t, err)
4668+
4669+
require.Equal(t, int64(2), desc.Info.GetTotalHeartbeatCount(), "total heartbeat count")
46514670
})
46524671

46534672
t.Run("ActivityTimesOutWithoutHeartbeat", func(t *testing.T) {
@@ -4793,6 +4812,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
47934812
require.NoError(t, err)
47944813
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, pollResp.GetInfo().GetStatus(),
47954814
"expected status=Completed but is %s", pollResp.GetInfo().GetStatus())
4815+
require.Equal(t, int64(2), pollResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count")
47964816
protorequire.ProtoEqual(t, defaultResult, pollResp.GetOutcome().GetResult())
47974817
})
47984818

@@ -4842,6 +4862,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
48424862
require.NoError(t, err)
48434863
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, descResp.GetInfo().GetStatus(),
48444864
"activity should still be running but is %s", descResp.GetInfo().GetStatus())
4865+
require.Equal(t, int64(1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count")
48454866

48464867
// Complete the activity to confirm it's still operable.
48474868
_, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{
@@ -4860,6 +4881,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
48604881
require.NoError(t, err)
48614882
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus(),
48624883
"expected status=Completed but is %s", descResp.GetInfo().GetStatus())
4884+
require.Equal(t, int64(1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count")
48634885
protorequire.ProtoEqual(t, defaultResult, descResp.GetOutcome().GetResult())
48644886
})
48654887

@@ -4918,8 +4940,47 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
49184940
require.NoError(t, err)
49194941
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, pollResp.GetInfo().GetStatus(),
49204942
"expected status=Completed but is %s", pollResp.GetInfo().GetStatus())
4943+
require.Equal(t, int64(1), pollResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count")
49214944
protorequire.ProtoEqual(t, defaultResult, pollResp.GetOutcome().GetResult())
49224945
})
4946+
4947+
t.Run("HeartbeatCountIncrementsPerHeartbeat", func(t *testing.T) {
4948+
activityID := testcore.RandomizeStr(t.Name())
4949+
taskQueue := testcore.RandomizeStr(t.Name())
4950+
4951+
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
4952+
Namespace: s.Namespace().String(),
4953+
ActivityId: activityID,
4954+
ActivityType: s.tv.ActivityType(),
4955+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
4956+
StartToCloseTimeout: durationpb.New(1 * time.Minute),
4957+
})
4958+
require.NoError(t, err)
4959+
4960+
pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
4961+
Namespace: s.Namespace().String(),
4962+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
4963+
})
4964+
require.NoError(t, err)
4965+
4966+
const numHeartbeats = 5
4967+
for i := range numHeartbeats {
4968+
_, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{
4969+
Namespace: s.Namespace().String(),
4970+
TaskToken: pollTaskResp.TaskToken,
4971+
Details: heartbeatDetails,
4972+
})
4973+
require.NoError(t, err)
4974+
4975+
descResp, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{
4976+
Namespace: s.Namespace().String(),
4977+
ActivityId: activityID,
4978+
RunId: startResp.RunId,
4979+
})
4980+
require.NoError(t, err)
4981+
require.Equal(t, int64(i+1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count after heartbeat %d", i+1)
4982+
}
4983+
})
49234984
}
49244985

49254986
func (s *standaloneActivityTestSuite) TestStartDelay() {

0 commit comments

Comments
 (0)