Skip to content

Commit 3999773

Browse files
committed
Fix: Log links with showWhilePending: true are not shown while pending for Ray plugin
Signed-off-by: Fabio Grätz <fabio@cusp.ai>
1 parent de65211 commit 3999773

2 files changed

Lines changed: 26 additions & 4 deletions

File tree

flyteplugins/go/tasks/plugins/k8s/ray/ray.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -671,14 +671,12 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
671671
return pluginsCore.PhaseInfoUndefined, err
672672
}
673673

674-
if len(rayJob.Status.JobDeploymentStatus) == 0 {
675-
return pluginsCore.PhaseInfoQueuedWithTaskInfo(pluginsCore.DefaultPhaseVersion, "Scheduling", info), nil
676-
}
677-
678674
var phaseInfo pluginsCore.PhaseInfo
679675

680676
// KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster
681677
switch rayJob.Status.JobDeploymentStatus {
678+
case rayv1.JobDeploymentStatusNew:
679+
phaseInfo, err = pluginsCore.PhaseInfoQueuedWithTaskInfo(pluginsCore.DefaultPhaseVersion, "Scheduling", info), nil
682680
case rayv1.JobDeploymentStatusInitializing:
683681
phaseInfo, err = pluginsCore.PhaseInfoInitializing(pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil
684682
case rayv1.JobDeploymentStatusRunning:

flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,6 +1254,30 @@ func TestGetTaskPhaseIncreasePhaseVersion(t *testing.T) {
12541254
assert.Equal(t, phaseInfo.Version(), pluginsCore.DefaultPhaseVersion+1)
12551255
}
12561256

1257+
// A new RayJob (empty JobDeploymentStatus) must still go through
1258+
// MaybeUpdatePhaseVersionFromPluginContext so that updates (e.g. log links) are
1259+
// reflected via a phase version bump rather than being dropped by an early exit.
1260+
func TestGetTaskPhaseNewJobIncreasePhaseVersion(t *testing.T) {
1261+
rayJobResourceHandler := rayJobResourceHandler{}
1262+
1263+
ctx := context.TODO()
1264+
1265+
pluginState := k8s.PluginState{
1266+
Phase: pluginsCore.PhaseQueued,
1267+
PhaseVersion: pluginsCore.DefaultPhaseVersion,
1268+
Reason: "task submitted to K8s",
1269+
}
1270+
pluginCtx := newPluginContext(pluginState)
1271+
1272+
rayObject := &rayv1.RayJob{}
1273+
rayObject.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
1274+
phaseInfo, err := rayJobResourceHandler.GetTaskPhase(ctx, pluginCtx, rayObject)
1275+
1276+
assert.NoError(t, err)
1277+
assert.Equal(t, pluginsCore.PhaseQueued.String(), phaseInfo.Phase().String())
1278+
assert.Equal(t, pluginsCore.DefaultPhaseVersion+1, phaseInfo.Version())
1279+
}
1280+
12571281
func TestGetEventInfo_LogTemplates(t *testing.T) {
12581282
pluginCtx := newPluginContext(k8s.PluginState{})
12591283
testCases := []struct {

0 commit comments

Comments
 (0)