Skip to content

Commit d336ac1

Browse files
committed
[history server][e2e] Add dead cluster tests for Azure Blob Storage
Add e2e tests for Azure Blob Storage that verify history server functionality after cluster deletion: - testAzureDeadClusters: verifies all endpoints work for dead clusters - testAzureLogFileEndpointLiveCluster: verifies log file access for live - testAzureLogFileEndpointDeadCluster: verifies log file access for dead Extract shared test helpers to avoid duplication between S3 and Azure: - VerifyLogFileEndpointReturnsContent - VerifyLogFileEndpointRejectsPathTraversal - DeleteRayClusterAndWait
1 parent d2de310 commit d336ac1

File tree

3 files changed

+146
-92
lines changed

3 files changed

+146
-92
lines changed

historyserver/test/e2e/historyserver_azureblob_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ func TestAzureHistoryServer(t *testing.T) {
2222
name: "Live cluster: historyserver endpoints should be accessible",
2323
testFunc: testAzureLiveClusters,
2424
},
25+
{
26+
name: "Dead cluster: historyserver endpoints should be accessible",
27+
testFunc: testAzureDeadClusters,
28+
},
29+
{
30+
name: "/v0/logs/file endpoint (live cluster)",
31+
testFunc: testAzureLogFileEndpointLiveCluster,
32+
},
33+
{
34+
name: "/v0/logs/file endpoint (dead cluster)",
35+
testFunc: testAzureLogFileEndpointDeadCluster,
36+
},
2537
}
2638

2739
for _, tt := range tests {
@@ -51,3 +63,76 @@ func testAzureLiveClusters(test Test, g *WithT, namespace *corev1.Namespace, azu
5163
DeleteAzureBlobContainer(test, g, azureClient)
5264
LogWithTimestamp(test.T(), "Azure live clusters E2E test completed successfully")
5365
}
66+
67+
func testAzureDeadClusters(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
68+
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
69+
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
70+
71+
DeleteRayClusterAndWait(test, g, namespace.Name, rayCluster.Name)
72+
73+
ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
74+
historyServerURL := GetHistoryServerURL(test, g, namespace)
75+
76+
clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
77+
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName), "Dead cluster should not have sessionName='live'")
78+
79+
client := CreateHTTPClientWithCookieJar(g)
80+
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
81+
verifyHistoryServerEndpoints(test, g, client, historyServerURL)
82+
83+
DeleteAzureBlobContainer(test, g, azureClient)
84+
LogWithTimestamp(test.T(), "Azure dead clusters E2E test completed successfully")
85+
}
86+
87+
func testAzureLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
88+
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
89+
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
90+
ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
91+
historyServerURL := GetHistoryServerURL(test, g, namespace)
92+
93+
clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
94+
client := CreateHTTPClientWithCookieJar(g)
95+
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
96+
97+
nodeID := GetOneOfNodeID(g, client, historyServerURL)
98+
99+
test.T().Run("should return log content", func(t *testing.T) {
100+
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
101+
})
102+
103+
test.T().Run("should reject path traversal", func(t *testing.T) {
104+
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
105+
})
106+
107+
DeleteAzureBlobContainer(test, g, azureClient)
108+
LogWithTimestamp(test.T(), "Azure log file endpoint tests completed")
109+
}
110+
111+
func testAzureLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
112+
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
113+
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
114+
115+
DeleteRayClusterAndWait(test, g, namespace.Name, rayCluster.Name)
116+
117+
ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
118+
historyServerURL := GetHistoryServerURL(test, g, namespace)
119+
120+
clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
121+
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))
122+
123+
client := CreateHTTPClientWithCookieJar(g)
124+
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
125+
126+
nodeID := GetOneOfNodeID(g, client, historyServerURL)
127+
128+
test.T().Run("should return log content from Azure Blob", func(t *testing.T) {
129+
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
130+
})
131+
132+
test.T().Run("should reject path traversal from Azure Blob", func(t *testing.T) {
133+
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
134+
})
135+
136+
DeleteAzureBlobContainer(test, g, azureClient)
137+
LogWithTimestamp(test.T(), "Azure dead cluster log file endpoint tests completed")
138+
}

historyserver/test/e2e/historyserver_test.go

Lines changed: 6 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,14 @@ import (
1010
"github.com/aws/aws-sdk-go/service/s3"
1111
. "github.com/onsi/gomega"
1212
corev1 "k8s.io/api/core/v1"
13-
k8serrors "k8s.io/apimachinery/pkg/api/errors"
14-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1513

1614
. "github.com/ray-project/kuberay/ray-operator/test/support"
1715

1816
"github.com/ray-project/kuberay/historyserver/pkg/utils"
1917
. "github.com/ray-project/kuberay/historyserver/test/support"
2018
)
2119

22-
const (
23-
LiveSessionName = "live"
24-
EndpointLogFile = "/api/v0/logs/file"
25-
)
20+
const LiveSessionName = "live"
2621

2722
func TestHistoryServer(t *testing.T) {
2823
// Share a single S3 client among subtests.
@@ -192,15 +187,6 @@ func getClusterFromList(test Test, g *WithT, historyServerURL, clusterName, name
192187
}
193188

194189
// testLogFileEndpointLiveCluster verifies that the history server can fetch log files from a live cluster.
195-
//
196-
// The test case follows these steps:
197-
// 1. Prepare test environment by applying a Ray cluster
198-
// 2. Submit a Ray job to the existing cluster
199-
// 3. Apply History Server and get its URL
200-
// 4. Get the cluster info from the list
201-
// 5. Verify that the history server can fetch log content (raylet.out)
202-
// 6. Verify that the history server rejects path traversal attempts
203-
// 7. Delete S3 bucket to ensure test isolation
204190
func testLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
205191
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
206192
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
@@ -212,70 +198,25 @@ func testLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Names
212198
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
213199

214200
nodeID := GetOneOfNodeID(g, client, historyServerURL)
215-
// Hardcode "raylet.out" for deterministic testing.
216-
filename := "raylet.out"
217201

218202
test.T().Run("should return log content", func(t *testing.T) {
219-
g := NewWithT(t)
220-
g.Eventually(func(gg Gomega) {
221-
logFileURL := fmt.Sprintf("%s%s?node_id=%s&filename=%s&lines=100", historyServerURL, EndpointLogFile, nodeID, filename)
222-
resp, err := client.Get(logFileURL)
223-
gg.Expect(err).NotTo(HaveOccurred())
224-
defer resp.Body.Close()
225-
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))
226-
227-
body, err := io.ReadAll(resp.Body)
228-
gg.Expect(err).NotTo(HaveOccurred())
229-
gg.Expect(len(body)).To(BeNumerically(">", 0))
230-
}, TestTimeoutShort).Should(Succeed())
203+
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
231204
})
232205

233206
test.T().Run("should reject path traversal", func(t *testing.T) {
234-
g := NewWithT(t)
235-
maliciousPaths := []string{"../etc/passwd", "..", "/etc/passwd", "../../secret"}
236-
237-
for _, malicious := range maliciousPaths {
238-
g.Eventually(func(gg Gomega) {
239-
url := fmt.Sprintf("%s%s?node_id=%s&filename=%s", historyServerURL, EndpointLogFile, nodeID, malicious)
240-
resp, err := client.Get(url)
241-
gg.Expect(err).NotTo(HaveOccurred())
242-
defer func() {
243-
io.Copy(io.Discard, resp.Body)
244-
resp.Body.Close()
245-
}()
246-
gg.Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
247-
}, TestTimeoutShort).Should(Succeed())
248-
}
207+
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
249208
})
250209

251210
DeleteS3Bucket(test, g, s3Client)
252211
LogWithTimestamp(test.T(), "Log file endpoint tests completed")
253212
}
254213

255214
// testLogFileEndpointDeadCluster verifies that the history server can fetch log files from S3 after a cluster is deleted.
256-
//
257-
// The test case follows these steps:
258-
// 1. Prepare test environment by applying a Ray cluster
259-
// 2. Submit a Ray job to the existing cluster
260-
// 3. Delete RayCluster to trigger log upload to S3
261-
// 4. Apply History Server and get its URL
262-
// 5. Verify that the history server can fetch log content from S3 (raylet.out)
263-
// 6. Verify that the history server rejects path traversal attempts from S3
264-
// 7. Delete S3 bucket to ensure test isolation
265215
func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
266216
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
267217
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
268218

269-
// Delete RayCluster to trigger log upload
270-
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
271-
g.Expect(err).NotTo(HaveOccurred())
272-
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)
273-
274-
// Wait for cluster to be fully deleted (ensures logs are uploaded to S3)
275-
g.Eventually(func() error {
276-
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
277-
return err
278-
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
219+
DeleteRayClusterAndWait(test, g, namespace.Name, rayCluster.Name)
279220

280221
ApplyHistoryServer(test, g, namespace, "")
281222
historyServerURL := GetHistoryServerURL(test, g, namespace)
@@ -287,40 +228,13 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
287228
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
288229

289230
nodeID := GetOneOfNodeID(g, client, historyServerURL)
290-
// Hardcode "raylet.out" for deterministic testing.
291-
filename := "raylet.out"
292231

293232
test.T().Run("should return log content from S3", func(t *testing.T) {
294-
g := NewWithT(t)
295-
g.Eventually(func(gg Gomega) {
296-
logFileURL := fmt.Sprintf("%s%s?node_id=%s&filename=%s&lines=100", historyServerURL, EndpointLogFile, nodeID, filename)
297-
resp, err := client.Get(logFileURL)
298-
gg.Expect(err).NotTo(HaveOccurred())
299-
defer resp.Body.Close()
300-
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))
301-
302-
body, err := io.ReadAll(resp.Body)
303-
gg.Expect(err).NotTo(HaveOccurred())
304-
gg.Expect(len(body)).To(BeNumerically(">", 0))
305-
}, TestTimeoutShort).Should(Succeed())
233+
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
306234
})
307235

308236
test.T().Run("should reject path traversal from S3", func(t *testing.T) {
309-
g := NewWithT(t)
310-
maliciousPaths := []string{"../etc/passwd", "..", "/etc/passwd", "../../secret"}
311-
312-
for _, malicious := range maliciousPaths {
313-
g.Eventually(func(gg Gomega) {
314-
url := fmt.Sprintf("%s%s?node_id=%s&filename=%s", historyServerURL, EndpointLogFile, nodeID, malicious)
315-
resp, err := client.Get(url)
316-
gg.Expect(err).NotTo(HaveOccurred())
317-
defer func() {
318-
io.Copy(io.Discard, resp.Body)
319-
resp.Body.Close()
320-
}()
321-
gg.Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
322-
}, TestTimeoutShort).Should(Succeed())
323-
}
237+
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
324238
})
325239

326240
DeleteS3Bucket(test, g, s3Client)

historyserver/test/support/historyserver.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,58 @@ func GetOneOfNodeID(g *WithT, client *http.Client, historyServerURL string) stri
217217
nodeInfo := summary[0].(map[string]any)
218218
return nodeInfo["raylet"].(map[string]any)["nodeId"].(string)
219219
}
220+
221+
// VerifyLogFileEndpointReturnsContent verifies that the log file endpoint returns content.
222+
func VerifyLogFileEndpointReturnsContent(test Test, g *WithT, client *http.Client, historyServerURL, nodeID string) {
223+
filename := "raylet.out"
224+
endpointLogFile := "/api/v0/logs/file"
225+
226+
g.Eventually(func(gg Gomega) {
227+
logFileURL := fmt.Sprintf("%s%s?node_id=%s&filename=%s&lines=100", historyServerURL, endpointLogFile, nodeID, filename)
228+
resp, err := client.Get(logFileURL)
229+
gg.Expect(err).NotTo(HaveOccurred())
230+
defer resp.Body.Close()
231+
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))
232+
233+
body, err := io.ReadAll(resp.Body)
234+
gg.Expect(err).NotTo(HaveOccurred())
235+
gg.Expect(len(body)).To(BeNumerically(">", 0))
236+
}, TestTimeoutShort).Should(Succeed())
237+
238+
LogWithTimestamp(test.T(), "Log file endpoint returned content successfully")
239+
}
240+
241+
// VerifyLogFileEndpointRejectsPathTraversal verifies that the log file endpoint rejects path traversal attempts.
242+
func VerifyLogFileEndpointRejectsPathTraversal(test Test, g *WithT, client *http.Client, historyServerURL, nodeID string) {
243+
endpointLogFile := "/api/v0/logs/file"
244+
maliciousPaths := []string{"../etc/passwd", "..", "/etc/passwd", "../../secret"}
245+
246+
for _, malicious := range maliciousPaths {
247+
g.Eventually(func(gg Gomega) {
248+
url := fmt.Sprintf("%s%s?node_id=%s&filename=%s", historyServerURL, endpointLogFile, nodeID, malicious)
249+
resp, err := client.Get(url)
250+
gg.Expect(err).NotTo(HaveOccurred())
251+
defer func() {
252+
io.Copy(io.Discard, resp.Body)
253+
resp.Body.Close()
254+
}()
255+
gg.Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
256+
}, TestTimeoutShort).Should(Succeed())
257+
}
258+
259+
LogWithTimestamp(test.T(), "Log file endpoint correctly rejected path traversal attempts")
260+
}
261+
262+
// DeleteRayClusterAndWait deletes a RayCluster and waits for it to be fully deleted.
263+
func DeleteRayClusterAndWait(test Test, g *WithT, namespace string, clusterName string) {
264+
err := test.Client().Ray().RayV1().RayClusters(namespace).Delete(test.Ctx(), clusterName, metav1.DeleteOptions{})
265+
g.Expect(err).NotTo(HaveOccurred())
266+
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace, clusterName)
267+
268+
g.Eventually(func() error {
269+
_, err := GetRayCluster(test, namespace, clusterName)
270+
return err
271+
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
272+
273+
LogWithTimestamp(test.T(), "RayCluster %s/%s fully deleted", namespace, clusterName)
274+
}

0 commit comments

Comments
 (0)