Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-274: Optimize Timer creation #529

Merged
merged 11 commits into from
Feb 5, 2025
40 changes: 40 additions & 0 deletions integ/any_timer_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ func TestAnyTimerSignalWorkflowTemporal(t *testing.T) {
}
}

func TestGreedyAnyTimerSignalWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig())
smallWaitForFastTest()
}
}

func TestAnyTimerSignalWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
Expand All @@ -32,6 +42,16 @@ func TestAnyTimerSignalWorkflowCadence(t *testing.T) {
}
}

func TestGreedyAnyTimerSignalWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig())
smallWaitForFastTest()
}
}

func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
Expand All @@ -42,6 +62,16 @@ func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) {
}
}

func TestGreedyAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
Expand All @@ -52,6 +82,16 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

func TestGreedyAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := anytimersignal.NewHandler()
Expand Down
188 changes: 188 additions & 0 deletions integ/greedy_timer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package integ

import (
"context"
"encoding/json"
"github.com/indeedeng/iwf/integ/workflow/greedy_timer"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/stretchr/testify/assert"
"log"
"strconv"
"testing"
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
)

func TestGreedyTimerWorkflowBaseTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestGreedyTimerWorkflow(t, service.BackendTypeTemporal)
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowBaseCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestGreedyTimerWorkflow(t, service.BackendTypeCadence)
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowBaseTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestGreedyTimerWorkflowCustomConfig(t, service.BackendTypeTemporal, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowBaseCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestGreedyTimerWorkflowCustomConfig(t, service.BackendTypeCadence, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func doTestGreedyTimerWorkflow(t *testing.T, backendType service.BackendType) {
doTestGreedyTimerWorkflowCustomConfig(t, backendType, minimumGreedyTimerConfig())
}

func doTestGreedyTimerWorkflowCustomConfig(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
assertions := assert.New(t)
// start test workflow server
wfHandler := greedy_timer.NewHandler()
closeFunc1 := startWorkflowWorkerWithRpc(wfHandler, t)
defer closeFunc1()

uClient, closeFunc2 := startIwfServiceWithClient(backendType)
defer closeFunc2()

apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})

// start a workflow
durations := []int64{15, 30}
input := greedy_timer.Input{Durations: durations}

wfId := greedy_timer.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
inputData, _ := json.Marshal(input)

//schedule-1
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: greedy_timer.WorkflowType,
WorkflowTimeoutSeconds: 30,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(greedy_timer.ScheduleTimerState),
StateInput: &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(string(inputData)),
},
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
failTestAtHttpError(err, httpResp, t)

time.Sleep(time.Second * 1)

// assertions
debug := service.DebugDumpResponse{}
err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}
assertions.Equal(1, len(debug.FiringTimersUnixTimestamps))
singleTimerScheduled := debug.FiringTimersUnixTimestamps[0]

scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 20, 1)

// skip next timer for state: schedule-1
skipReq := apiClient.DefaultApi.ApiV1WorkflowTimerSkipPost(context.Background())
httpResp, err = skipReq.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{
WorkflowId: wfId,
WorkflowStateExecutionId: greedy_timer.ScheduleTimerState + "-1",
TimerCommandId: iwfidl.PtrString("duration-15"),
}).Execute()
failTestAtHttpError(err, httpResp, t)

time.Sleep(time.Second * 1)

err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}

// no second timer started
assertions.Equal(1, len(debug.FiringTimersUnixTimestamps))
// LessOrEqual due to continue as new workflow scheduling the next, not skipped timer
assertions.LessOrEqual(singleTimerScheduled, debug.FiringTimersUnixTimestamps[0])
scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 5, 2)

// wait for the workflow
req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
failTestAtHttpError(err, httpResp, t)

history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{
"schedule_start": 3,
"schedule_decide": 1,
}, history, "history does not match expected")
}

func scheduleTimerAndAssertExpectedScheduled(
t *testing.T,
apiClient *iwfidl.APIClient,
uClient uclient.UnifiedClient,
wfId string,
duration int64,
noMoreThan int) {

assertions := assert.New(t)
input := greedy_timer.Input{Durations: []int64{duration}}
inputData, _ := json.Marshal(input)

reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
_, httpResp, err := reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: greedy_timer.SubmitDurationsRPC,
Input: &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(string(inputData)),
},
TimeoutSeconds: iwfidl.PtrInt32(2),
}).Execute()
failTestAtHttpError(err, httpResp, t)

time.Sleep(time.Second * 1)

debug := service.DebugDumpResponse{}
err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}

assertions.LessOrEqual(len(debug.FiringTimersUnixTimestamps), noMoreThan)
}
41 changes: 41 additions & 0 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,47 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

// NOTE: greedy timers should have the same result in these timer tests
func TestGreedyTimerWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig())
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig())
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeTemporal, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeCadence, greedyTimerConfig(true))
smallWaitForFastTest()
}
}

func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := timer.NewHandler()
Expand Down
17 changes: 17 additions & 0 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig {
}
}

func minimumGreedyTimerConfig() *iwfidl.WorkflowConfig {
return greedyTimerConfig(false)
}

func greedyTimerConfig(continueAsNew bool) *iwfidl.WorkflowConfig {
if continueAsNew {
return &iwfidl.WorkflowConfig{
ContinueAsNewThreshold: iwfidl.PtrInt32(1),
OptimizeTimer: iwfidl.PtrBool(true),
}
}

return &iwfidl.WorkflowConfig{
OptimizeTimer: iwfidl.PtrBool(true),
}
}

func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig {
return minimumContinueAsNewConfig(false)
}
Expand Down
Loading