Skip to content

Commit 7889ccb

Browse files
committed
add e2e test for endpoint api/v0/tasks/timeline
Signed-off-by: AndySung320 <andysung0320@gmail.com>
1 parent 85323ba commit 7889ccb

File tree

3 files changed

+159
-3
lines changed

3 files changed

+159
-3
lines changed

historyserver/test/e2e/collector_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev
8787
_ = ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
8888

8989
// Define variables for constructing S3 object prefix.
90-
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
90+
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayCluster.Namespace)
9191
sessionID := GetSessionIDFromHeadPod(test, g, rayCluster)
9292
headNodeID := GetNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
9393
workerNodeID := GetNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
@@ -134,7 +134,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1
134134
// Submit a Ray job to the existing cluster.
135135
_ = ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
136136

137-
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
137+
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayCluster.Namespace)
138138
sessionID := GetSessionIDFromHeadPod(test, g, rayCluster)
139139
headNodeID := GetNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
140140
workerNodeID := GetNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
@@ -196,7 +196,7 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1
196196
// Use namespace name to ensure test isolation (avoid conflicts from previous test runs)
197197
dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name)
198198
dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name)
199-
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
199+
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayCluster.Namespace)
200200
sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID)
201201

202202
// Inject "leftover" logs BEFORE killing collector.

historyserver/test/e2e/historyserver_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/aws/aws-sdk-go/service/s3"
1111
. "github.com/onsi/gomega"
1212
corev1 "k8s.io/api/core/v1"
13+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1315

1416
"github.com/ray-project/kuberay/historyserver/pkg/utils"
1517
. "github.com/ray-project/kuberay/historyserver/test/support"
@@ -30,6 +32,14 @@ func TestHistoryServer(t *testing.T) {
3032
name: "Live cluster: historyserver endpoints should be accessible",
3133
testFunc: testLiveClusters,
3234
},
35+
{
36+
name: "/api/v0/tasks/timeline endpoint (live cluster)",
37+
testFunc: testTimelineEndpointLiveCluster,
38+
},
39+
{
40+
name: "/api/v0/tasks/timeline endpoint (dead cluster)",
41+
testFunc: testTimelineEndpointDeadCluster,
42+
},
3343
}
3444

3545
for _, tt := range tests {
@@ -133,3 +143,129 @@ func getClusterFromList(test Test, g *WithT, historyServerURL, clusterName, name
133143

134144
return result
135145
}
146+
147+
// testTimelineEndpointLiveCluster verifies that the history server can return timeline data from a live cluster.
148+
//
149+
// The test case follows these steps:
150+
// 1. Prepare test environment by applying a Ray cluster
151+
// 2. Submit a Ray job to the existing cluster
152+
// 3. Apply History Server and get its URL
153+
// 4. Verify that the timeline endpoint returns valid Chrome Tracing format
154+
func testTimelineEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
155+
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
156+
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
157+
ApplyHistoryServer(test, g, namespace)
158+
historyServerURL := GetHistoryServerURL(test, g, namespace)
159+
160+
clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
161+
g.Expect(clusterInfo.SessionName).To(Equal(LiveSessionName), "Live cluster should have sessionName='live'")
162+
163+
client := CreateHTTPClientWithCookieJar(g)
164+
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
165+
test.T().Run("should return valid timeline data", func(t *testing.T) {
166+
g := NewWithT(t)
167+
verifyTimelineResponse(g, client, historyServerURL)
168+
})
169+
170+
DeleteS3Bucket(test, g, s3Client)
171+
LogWithTimestamp(test.T(), "Live cluster timeline endpoint test completed")
172+
}
173+
174+
// testTimelineEndpointDeadCluster verifies that the history server can return timeline data from S3 after a cluster is deleted.
175+
//
176+
// The test case follows these steps:
177+
// 1. Prepare test environment by applying a Ray cluster
178+
// 2. Submit a Ray job to the existing cluster
179+
// 3. Delete RayCluster to trigger event upload to S3
180+
// 4. Apply History Server and get its URL
181+
// 5. Verify that the timeline endpoint returns valid Chrome Tracing format from S3 data
182+
func testTimelineEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
183+
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
184+
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
185+
186+
// Delete RayCluster to trigger event upload
187+
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
188+
g.Expect(err).NotTo(HaveOccurred())
189+
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)
190+
191+
// Wait for cluster to be fully deleted (ensures events are uploaded to S3)
192+
g.Eventually(func() error {
193+
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
194+
return err
195+
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
196+
197+
ApplyHistoryServer(test, g, namespace)
198+
historyServerURL := GetHistoryServerURL(test, g, namespace)
199+
200+
clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
201+
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))
202+
203+
client := CreateHTTPClientWithCookieJar(g)
204+
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
205+
206+
test.T().Run("should return timeline data from S3", func(t *testing.T) {
207+
g := NewWithT(t)
208+
verifyTimelineResponse(g, client, historyServerURL)
209+
})
210+
211+
DeleteS3Bucket(test, g, s3Client)
212+
LogWithTimestamp(test.T(), "Dead cluster timeline endpoint test completed")
213+
}
214+
215+
// verifyTimelineResponse verifies the timeline endpoint returns valid Chrome Tracing format
216+
func verifyTimelineResponse(g *WithT, client *http.Client, historyServerURL string) {
217+
g.Eventually(func(gg Gomega) {
218+
resp, err := client.Get(historyServerURL + "/api/v0/tasks/timeline")
219+
gg.Expect(err).NotTo(HaveOccurred())
220+
defer resp.Body.Close()
221+
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))
222+
223+
body, err := io.ReadAll(resp.Body)
224+
gg.Expect(err).NotTo(HaveOccurred())
225+
226+
var events []map[string]any
227+
err = json.Unmarshal(body, &events)
228+
gg.Expect(err).NotTo(HaveOccurred())
229+
230+
// Should have at least some events
231+
gg.Expect(len(events)).To(BeNumerically(">", 0), "Timeline should have at least one event")
232+
233+
// Verify metadata and trace events exist
234+
hasProcessName := false
235+
hasThreadName := false
236+
hasTraceEvent := false
237+
238+
for _, event := range events {
239+
name, _ := event["name"].(string)
240+
ph, _ := event["ph"].(string)
241+
242+
if name == "process_name" && ph == "M" {
243+
hasProcessName = true
244+
args, ok := event["args"].(map[string]any)
245+
gg.Expect(ok).To(BeTrue(), "process_name should have args")
246+
gg.Expect(args["name"]).NotTo(BeNil(), "process_name args should have 'name'")
247+
}
248+
if name == "thread_name" && ph == "M" {
249+
hasThreadName = true
250+
}
251+
if ph == "X" {
252+
hasTraceEvent = true
253+
// Verify trace event has required fields
254+
gg.Expect(event["cat"]).NotTo(BeNil(), "Trace event should have 'cat'")
255+
gg.Expect(event["ts"]).NotTo(BeNil(), "Trace event should have 'ts'")
256+
gg.Expect(event["dur"]).NotTo(BeNil(), "Trace event should have 'dur'")
257+
gg.Expect(event["args"]).NotTo(BeNil(), "Trace event should have 'args'")
258+
259+
// Verify args structure
260+
args, ok := event["args"].(map[string]any)
261+
gg.Expect(ok).To(BeTrue())
262+
gg.Expect(args["task_id"]).NotTo(BeNil(), "args should have 'task_id'")
263+
gg.Expect(args["job_id"]).NotTo(BeNil(), "args should have 'job_id'")
264+
}
265+
}
266+
267+
gg.Expect(hasProcessName).To(BeTrue(), "Should have process_name metadata event")
268+
gg.Expect(hasThreadName).To(BeTrue(), "Should have thread_name metadata event")
269+
gg.Expect(hasTraceEvent).To(BeTrue(), "Should have at least one trace event")
270+
}, TestTimeoutShort).Should(Succeed())
271+
}

historyserver/test/support/raycluster.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,31 @@ const (
1717
RayClusterID = "default"
1818
)
1919

20+
// injectCollectorRayClusterID injects the ray-cluster-id argument into all collector containers.
21+
func injectCollectorRayClusterID(containers []corev1.Container, rayClusterID string) {
22+
for i := range containers {
23+
if containers[i].Name == "collector" {
24+
containers[i].Command = append(
25+
containers[i].Command,
26+
fmt.Sprintf("--ray-cluster-id=%s", rayClusterID),
27+
)
28+
}
29+
}
30+
}
31+
2032
// ApplyRayClusterWithCollector deploys a Ray cluster with the collector sidecar into the test namespace.
2133
func ApplyRayClusterWithCollector(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.RayCluster {
2234
rayClusterFromYaml := DeserializeRayClusterYAML(test, RayClusterManifestPath)
2335
rayClusterFromYaml.Namespace = namespace.Name
2436

37+
// Inject namespace name as ray-cluster-id for head group collector
38+
injectCollectorRayClusterID(rayClusterFromYaml.Spec.HeadGroupSpec.Template.Spec.Containers, namespace.Name)
39+
40+
// Inject namespace name as ray-cluster-id for worker group collectors
41+
for wg := range rayClusterFromYaml.Spec.WorkerGroupSpecs {
42+
injectCollectorRayClusterID(rayClusterFromYaml.Spec.WorkerGroupSpecs[wg].Template.Spec.Containers, namespace.Name)
43+
}
44+
2545
rayCluster, err := test.Client().Ray().RayV1().
2646
RayClusters(namespace.Name).
2747
Create(test.Ctx(), rayClusterFromYaml, metav1.CreateOptions{})

0 commit comments

Comments
 (0)