Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 159 additions & 2 deletions historyserver/test/e2e/historyserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

const (
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
EndpointLogicalActors = "/logical/actors"
)

func TestHistoryServer(t *testing.T) {
Expand All @@ -43,6 +44,10 @@ func TestHistoryServer(t *testing.T) {
name: "/v0/logs/file endpoint (dead cluster)",
testFunc: testLogFileEndpointDeadCluster,
},
{
name: "/logical/actors endpoint (dead cluster)",
testFunc: testLogicalActorsEndpointDeadCluster,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -282,3 +287,155 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed")
}

// testLogicalActorsEndpointDeadCluster verifies that the history server can return actors from the
// in-memory ClusterActorMap after a cluster is deleted.
//
// Data flow explanation:
// The history server does not fetch actors directly from S3. Instead:
// 1. Collector pushes events to S3 on cluster deletion
// 2. Storage Reader reads event files from S3
// 3. Event Handler processes events and populates ClusterActorMap
// 4. The /logical/actors endpoint returns actors from the in-memory ClusterActorMap
//
// The test case follows these steps:
// 1. Prepare test environment by applying a Ray cluster
// 2. Submit a Ray job to the existing cluster (generates actor events)
// 3. Delete RayCluster to trigger log upload to S3 (and event processing)
// 4. Apply History Server and get its URL
// 5. Verify that the history server returns actors via /logical/actors endpoint
// 6. Verify that the history server returns a single actor via /logical/actors/{actor_id} endpoint
// 7. Delete S3 bucket to ensure test isolation
func testLogicalActorsEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

// Delete RayCluster to trigger log upload to S3
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)

// Wait for cluster to be fully deleted (ensures logs are uploaded to S3 and events are processed)
g.Eventually(func() error {
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
return err
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))

ApplyHistoryServer(test, g, namespace)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)

test.T().Run("should return actors from history server", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
resp, err := client.Get(historyServerURL + EndpointLogicalActors)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("All actors fetched."))

// Verify data.actors exists and is a map
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
actors, ok := data["actors"].(map[string]any)
gg.Expect(ok).To(BeTrue())
gg.Expect(len(actors)).To(BeNumerically(">", 0), "should have at least one actor")

// Verify actor schema matches formatActorForResponse format (for the first actor)
// Required fields from router.go:formatActorForResponse
for _, actorData := range actors {
actor, ok := actorData.(map[string]any)
gg.Expect(ok).To(BeTrue(), "actor should be a map")
gg.Expect(actor["actor_id"]).NotTo(BeNil(), "actor should have actor_id")
gg.Expect(actor["job_id"]).NotTo(BeNil(), "actor should have job_id")
gg.Expect(actor["state"]).NotTo(BeNil(), "actor should have state")
gg.Expect(actor["address"]).NotTo(BeNil(), "actor should have address")
address := actor["address"].(map[string]any)
gg.Expect(address["node_id"]).NotTo(BeNil(), "address should have node_id")
gg.Expect(address["ip_address"]).NotTo(BeNil(), "address should have ip_address")
break // Only verify the first actor
}

LogWithTimestamp(test.T(), "Found %d actors from history server", len(actors))
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should return single actor from history server", func(t *testing.T) {
g := NewWithT(t)

actorID := GetOneOfActorID(g, client, historyServerURL)

// Now test the single actor endpoint
g.Eventually(func(gg Gomega) {
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, actorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("Actor fetched."))

// Verify data.detail exists and contains actor_id
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
detail, ok := data["detail"].(map[string]any)
gg.Expect(ok).To(BeTrue())

// Verify actor schema matches formatActorForResponse format
// Required fields from router.go:formatActorForResponse
gg.Expect(detail["actor_id"]).To(Equal(actorID))
gg.Expect(detail["job_id"]).NotTo(BeNil())
gg.Expect(detail["state"]).NotTo(BeNil())
gg.Expect(detail["address"]).NotTo(BeNil())
address := detail["address"].(map[string]any)
gg.Expect(address["node_id"]).NotTo(BeNil())
gg.Expect(address["ip_address"]).NotTo(BeNil())

LogWithTimestamp(test.T(), "Successfully fetched actor %s from history server", actorID)
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should handle non-existent actor", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
fakeActorID := "ffffffffffffffffffffffffffffffffffffffff"
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, fakeActorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(false))
gg.Expect(result["msg"]).To(Equal("Actor not found."))
}, TestTimeoutShort).Should(Succeed())
})

DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster logical actors endpoint tests completed")
}
33 changes: 33 additions & 0 deletions historyserver/test/support/historyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,36 @@ func GetOneOfNodeID(g *WithT, client *http.Client, historyServerURL string) stri
nodeInfo := summary[0].(map[string]any)
return nodeInfo["raylet"].(map[string]any)["nodeId"].(string)
}

// GetOneOfActorID retrieves an actor ID from the /logical/actors endpoint.
// The history server returns actors from the in-memory ClusterActorMap, which is populated
// by the Event Handler processing events from S3.
func GetOneOfActorID(g *WithT, client *http.Client, historyServerURL string) string {
resp, err := client.Get(historyServerURL + "/logical/actors")
g.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
g.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
g.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
g.Expect(err).NotTo(HaveOccurred())

// Response format: {"result": true, "msg": "...", "data": {"actors": {actor_id: {...}, ...}}}
data, ok := result["data"].(map[string]any)
g.Expect(ok).To(BeTrue(), "response should have 'data' field")

actors, ok := data["actors"].(map[string]any)
g.Expect(ok).To(BeTrue(), "data should have 'actors' field")
g.Expect(len(actors)).To(BeNumerically(">", 0), "should have at least one actor")

// Get the first actor ID from the map
for actorID := range actors {
return actorID
}

// This should never happen due to the length check above
return ""
}
Loading