Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 149 additions & 2 deletions historyserver/test/e2e/historyserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

const (
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
EndpointLogicalActors = "/logical/actors"
)

func TestHistoryServer(t *testing.T) {
Expand All @@ -43,6 +44,10 @@ func TestHistoryServer(t *testing.T) {
name: "/v0/logs/file endpoint (dead cluster)",
testFunc: testLogFileEndpointDeadCluster,
},
{
name: "/logical/actors endpoint (dead cluster)",
testFunc: testLogicalActorsEndpointDeadCluster,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -282,3 +287,145 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed")
}

// testLogicalActorsEndpointDeadCluster verifies that the history server can fetch actors from S3 after a cluster is deleted.
//
// The test case follows these steps:
// 1. Prepare test environment by applying a Ray cluster
// 2. Submit a Ray job to the existing cluster (generates actor events)
// 3. Delete RayCluster to trigger log upload to S3
// 4. Apply History Server and get its URL
// 5. Verify that the history server can fetch actors from S3 via /logical/actors endpoint
// 6. Verify that the history server can fetch a single actor via /logical/actors/{actor_id} endpoint
// 7. Delete S3 bucket to ensure test isolation
func testLogicalActorsEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

// Delete RayCluster to trigger log upload
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)

// Wait for cluster to be fully deleted (ensures logs are uploaded to S3)
g.Eventually(func() error {
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
return err
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))

ApplyHistoryServer(test, g, namespace)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)

test.T().Run("should return actors from S3", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
resp, err := client.Get(historyServerURL + EndpointLogicalActors)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("All actors fetched."))

// Verify data.actors exists and is a map
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
actors, ok := data["actors"].(map[string]any)
gg.Expect(ok).To(BeTrue())

LogWithTimestamp(test.T(), "Found %d actors in dead cluster", len(actors))
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should return single actor from S3", func(t *testing.T) {
g := NewWithT(t)

// First, get an actor ID from the actors list
var actorID string
g.Eventually(func(gg Gomega) {
resp, err := client.Get(historyServerURL + EndpointLogicalActors)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())

data := result["data"].(map[string]any)
actors := data["actors"].(map[string]any)

// Get the first actor ID from the map
for id := range actors {
actorID = id
break
}
gg.Expect(actorID).NotTo(BeEmpty(), "At least one actor should exist")
}, TestTimeoutShort).Should(Succeed())

// Now test the single actor endpoint
g.Eventually(func(gg Gomega) {
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, actorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("Actor fetched."))

// Verify data.detail exists and contains actor_id
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
detail, ok := data["detail"].(map[string]any)
gg.Expect(ok).To(BeTrue())
gg.Expect(detail["actor_id"]).To(Equal(actorID))

LogWithTimestamp(test.T(), "Successfully fetched actor %s from dead cluster", actorID)
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should handle non-existent actor", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
fakeActorID := "ffffffffffffffffffffffffffffffffffffffff"
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, fakeActorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(false))
gg.Expect(result["msg"]).To(Equal("Actor not found."))
}, TestTimeoutShort).Should(Succeed())
})

DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster logical actors endpoint tests completed")
}
Loading