Skip to content

Commit e562958

Browse files
committed
feat+test: add node_ip support
Signed-off-by: machichima <nary12321@gmail.com>
1 parent f739e5a commit e562958

File tree

4 files changed

+164
-10
lines changed

4 files changed

+164
-10
lines changed

historyserver/pkg/historyserver/reader.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"strings"
1616

1717
"github.com/emicklei/go-restful/v3"
18+
"github.com/ray-project/kuberay/historyserver/pkg/eventserver/types"
1819
eventtypes "github.com/ray-project/kuberay/historyserver/pkg/eventserver/types"
1920
"github.com/ray-project/kuberay/historyserver/pkg/utils"
2021
"github.com/sirupsen/logrus"
@@ -437,6 +438,81 @@ func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, er
437438
return json.Marshal(templ)
438439
}
439440

441+
// ipToNodeId resolves node_id from node_ip by querying node_events from storage.
442+
// This mirrors Ray Dashboard's ip_to_node_id logic.
443+
// Returns node_id in hex format if found, error otherwise.
444+
func (s *ServerHandler) ipToNodeId(rayClusterNameID, sessionID, nodeIP string) (string, error) {
445+
if nodeIP == "" {
446+
return "", fmt.Errorf("node_ip is empty")
447+
}
448+
449+
// List all node_events files
450+
nodeEventsPath := path.Join(sessionID, "node_events")
451+
files := s.reader.ListFiles(rayClusterNameID, nodeEventsPath)
452+
453+
// Parse each node event file to find matching node_ip
454+
for _, file := range files {
455+
filePath := path.Join(nodeEventsPath, file)
456+
reader := s.reader.GetContent(rayClusterNameID, filePath)
457+
if reader == nil {
458+
continue
459+
}
460+
461+
data, err := io.ReadAll(reader)
462+
if err != nil {
463+
logrus.Warnf("Failed to read node event file %s: %v", filePath, err)
464+
continue
465+
}
466+
467+
var events []map[string]interface{}
468+
if err := json.Unmarshal(data, &events); err != nil {
469+
logrus.Warnf("Failed to unmarshal node events from %s: %v", filePath, err)
470+
continue
471+
}
472+
473+
// Search for NODE_DEFINITION_EVENT with matching node_ip
474+
for _, event := range events {
475+
eventType, ok := event["eventType"].(string)
476+
if !ok || eventType != string(types.NODE_DEFINITION_EVENT) {
477+
continue
478+
}
479+
480+
nodeDefEvent, ok := event["nodeDefinitionEvent"].(map[string]interface{})
481+
if !ok {
482+
continue
483+
}
484+
485+
ipAddr, ok := nodeDefEvent["nodeIpAddress"].(string)
486+
if !ok || ipAddr != nodeIP {
487+
continue
488+
}
489+
490+
// Found matching node, extract node_id
491+
nodeIDBytes, ok := nodeDefEvent["nodeId"].(string)
492+
if !ok {
493+
continue
494+
}
495+
496+
// Convert base64 node_id to hex (Ray stores node_id as base64 in events)
497+
decoded, err := base64.StdEncoding.DecodeString(nodeIDBytes)
498+
if err != nil {
499+
// Try URL-safe base64 encoding
500+
decoded, err = base64.RawURLEncoding.DecodeString(nodeIDBytes)
501+
if err != nil {
502+
logrus.Warnf("Failed to decode node_id %s: %v", nodeIDBytes, err)
503+
continue
504+
}
505+
}
506+
507+
nodeIDHex := fmt.Sprintf("%x", decoded)
508+
logrus.Infof("Resolved node_ip %s to node_id %s", nodeIP, nodeIDHex)
509+
return nodeIDHex, nil
510+
}
511+
}
512+
513+
return "", fmt.Errorf("node_id not found for node_ip=%s", nodeIP)
514+
}
515+
440516
// TODO: implement this
441517
func (h *ServerHandler) getGrafanaHealth(req *restful.Request, resp *restful.Response) {
442518
resp.WriteErrorString(http.StatusNotImplemented, "Grafana health not yet supported")

historyserver/pkg/historyserver/router.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ func routerAPI(s *ServerHandler) {
107107
Writes("")) // Placeholder for specific return type
108108
ws.Route(ws.GET("/v0/logs/file").To(s.getNodeLogFile).Filter(s.CookieHandle).
109109
Doc("get logfile").
110-
Param(ws.QueryParameter("node_id", "node_id (optional if task_id or actor_id is provided)")).
110+
Param(ws.QueryParameter("node_id", "node_id (optional if node_ip/task_id/actor_id is provided)")).
111+
Param(ws.QueryParameter("node_ip", "node_ip (optional, resolve to node_id)")).
111112
Param(ws.QueryParameter("filename", "filename (explicit log file path)")).
112113
Param(ws.QueryParameter("task_id", "task_id (resolve log file from task)")).
113114
Param(ws.QueryParameter("actor_id", "actor_id (resolve log file from actor)")).
@@ -599,19 +600,19 @@ func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Respo
599600
return
600601
}
601602

603+
// node_id or node_ip is required when not using actor_id or task_id (they can auto-resolve node_id)
604+
if options.NodeID == "" && options.NodeIP == "" && options.ActorID == "" && options.TaskID == "" {
605+
resp.WriteErrorString(http.StatusBadRequest, "node_id or node_ip is required when actor_id or task_id is not provided")
606+
return
607+
}
608+
602609
// Validate required parameters following Ray Dashboard logic
603610
// At least one of: actor_id, task_id, pid, filename, submission_id must be provided
604611
if options.ActorID == "" && options.TaskID == "" && options.PID == 0 && options.Filename == "" {
605612
resp.WriteErrorString(http.StatusBadRequest, "At least one of actor_id, task_id, pid, or filename is required")
606613
return
607614
}
608615

609-
// node_id is required when not using actor_id or task_id (they can auto-resolve node_id)
610-
if options.NodeID == "" && options.ActorID == "" && options.TaskID == "" {
611-
resp.WriteErrorString(http.StatusBadRequest, "node_id is required when actor_id or task_id is not provided")
612-
return
613-
}
614-
615616
// Prevent path traversal attacks (e.g., ../../etc/passwd)
616617
if options.NodeID != "" && !fs.ValidPath(options.NodeID) {
617618
resp.WriteErrorString(http.StatusBadRequest, fmt.Sprintf("invalid path: path traversal not allowed (node_id=%s)", options.NodeID))
@@ -622,11 +623,23 @@ func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Respo
622623
return
623624
}
624625

626+
// For live cluster, proxy the request directly to Ray Dashboard without any processing
625627
if sessionName == "live" {
626628
s.redirectRequest(req, resp)
627629
return
628630
}
629631

632+
// Only resolve node_ip to node_id from stored events for dead cluster
633+
if options.NodeID == "" && options.NodeIP != "" {
634+
nodeID, err := s.ipToNodeId(clusterNameID+"_"+clusterNamespace, sessionName, options.NodeIP)
635+
if err != nil {
636+
resp.WriteErrorString(http.StatusNotFound,
637+
fmt.Sprintf("Cannot find matching node_id for a given node ip %s", options.NodeIP))
638+
return
639+
}
640+
options.NodeID = nodeID
641+
}
642+
630643
content, err := s._getNodeLogFile(clusterNameID+"_"+clusterNamespace, sessionName, options)
631644
if err != nil {
632645
var httpErr *utils.HTTPError
@@ -652,6 +665,7 @@ func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Respo
652665
func parseGetLogFileOptions(req *restful.Request) (GetLogFileOptions, error) {
653666
options := GetLogFileOptions{
654667
NodeID: req.QueryParameter("node_id"),
668+
NodeIP: req.QueryParameter("node_ip"),
655669
Filename: req.QueryParameter("filename"),
656670
TaskID: req.QueryParameter("task_id"),
657671
ActorID: req.QueryParameter("actor_id"),

historyserver/pkg/historyserver/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package historyserver
44
type GetLogFileOptions struct {
55
// Node identification (one of these is required if not using task_id/actor_id)
66
NodeID string // The node id where the log file is located
7+
NodeIP string // The node ip address (will be resolved to node_id)
78

89
// Log file identification (provide one of: Filename, TaskID, ActorID, PID)
910
Filename string // The log file name (explicit path)

historyserver/test/e2e/historyserver_test.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func testLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Names
209209
{"lines+timeout+filter", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=%s&lines=50&timeout=10&filter_ansi_code=true", u, EndpointLogFile, n, filename) }, http.StatusOK},
210210

211211
// Missing mandatory parameters
212-
{"missing node_id", func(u, n string) string { return fmt.Sprintf("%s%s?filename=%s", u, EndpointLogFile, filename) }, http.StatusBadRequest},
212+
{"missing node_id and node_ip", func(u, n string) string { return fmt.Sprintf("%s%s?filename=%s", u, EndpointLogFile, filename) }, http.StatusBadRequest},
213213
{"missing filename", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s", u, EndpointLogFile, n) }, http.StatusBadRequest},
214214
{"missing both", func(u, n string) string { return fmt.Sprintf("%s%s", u, EndpointLogFile) }, http.StatusBadRequest},
215215

@@ -223,6 +223,9 @@ func testLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Names
223223
{"file not found", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=nonexistent.log", u, EndpointLogFile, n) }, http.StatusInternalServerError},
224224
{"task_id invalid (not found)", func(u, n string) string { return fmt.Sprintf("%s%s?task_id=nonexistent-task-id", u, EndpointLogFile) }, http.StatusInternalServerError},
225225

226+
// node_ip parameter tests
227+
{"node_ip invalid (non-existent)", func(u, n string) string { return fmt.Sprintf("%s%s?node_ip=192.168.255.255&filename=%s", u, EndpointLogFile, filename) }, http.StatusInternalServerError},
228+
226229
// Path traversal attacks
227230
{"traversal ../etc/passwd", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=../etc/passwd", u, EndpointLogFile, n) }, http.StatusBadRequest},
228231
{"traversal ..", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=..", u, EndpointLogFile, n) }, http.StatusBadRequest},
@@ -379,6 +382,30 @@ func testLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Names
379382
g.Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError), "Expected 500 for non-existent pid, got %d: %s", resp.StatusCode, string(body))
380383
})
381384

385+
// Sub-test for node_ip parameter (live cluster)
386+
test.T().Run("node_ip parameter", func(t *testing.T) {
387+
g := NewWithT(t)
388+
389+
// Get node IP from head pod (use Pod IP, not Host IP)
390+
// Ray registers nodes with Pod IP (--node-ip-address flag)
391+
headPod, err := GetHeadPod(test, rayCluster)
392+
g.Expect(err).NotTo(HaveOccurred())
393+
nodeIP := headPod.Status.PodIP
394+
g.Expect(nodeIP).NotTo(BeEmpty(), "Head pod should have a pod IP")
395+
LogWithTimestamp(t, "Found head pod with IP: %s", nodeIP)
396+
397+
// Test successful case: node_ip + filename
398+
url := fmt.Sprintf("%s%s?node_ip=%s&filename=%s", historyServerURL, EndpointLogFile, nodeIP, filename)
399+
resp, err := client.Get(url)
400+
g.Expect(err).NotTo(HaveOccurred())
401+
body, _ := io.ReadAll(resp.Body)
402+
resp.Body.Close()
403+
// For live cluster, the request is proxied to Ray Dashboard
404+
// The dashboard should be able to resolve node_ip to node_id
405+
g.Expect(resp.StatusCode).To(Equal(http.StatusOK), "Expected OK for valid node_ip, got %d: %s", resp.StatusCode, string(body))
406+
g.Expect(len(body)).To(BeNumerically(">", 0))
407+
})
408+
382409
DeleteS3Bucket(test, g, s3Client)
383410
LogWithTimestamp(test.T(), "Log file endpoint tests completed")
384411
}
@@ -398,8 +425,15 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
398425
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
399426
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
400427

428+
// Capture node IP and ID before deleting cluster (for node_ip tests later)
429+
headPod, err := GetHeadPod(test, rayCluster)
430+
g.Expect(err).NotTo(HaveOccurred())
431+
savedNodeIP := headPod.Status.PodIP
432+
savedNodeID := GetNodeIDFromHeadPod(test, g, rayCluster)
433+
LogWithTimestamp(test.T(), "Captured node IP %s and node ID %s before cluster deletion", savedNodeIP, savedNodeID)
434+
401435
// Delete RayCluster to trigger log upload
402-
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
436+
err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
403437
g.Expect(err).NotTo(HaveOccurred())
404438
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)
405439

@@ -458,7 +492,7 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
458492
{"all parameters", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=%s&lines=100&timeout=15&attempt_number=0&download_file=true&filter_ansi_code=true", u, EndpointLogFile, n, filename) }, http.StatusOK},
459493

460494
// Missing mandatory parameters
461-
{"missing node_id", func(u, n string) string { return fmt.Sprintf("%s%s?filename=%s", u, EndpointLogFile, filename) }, http.StatusBadRequest},
495+
{"missing node_id and node_ip", func(u, n string) string { return fmt.Sprintf("%s%s?filename=%s", u, EndpointLogFile, filename) }, http.StatusBadRequest},
462496
{"missing filename", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s", u, EndpointLogFile, n) }, http.StatusBadRequest},
463497
{"missing both", func(u, n string) string { return fmt.Sprintf("%s%s", u, EndpointLogFile) }, http.StatusBadRequest},
464498

@@ -471,6 +505,9 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
471505
{"task_id invalid (not found)", func(u, n string) string { return fmt.Sprintf("%s%s?task_id=nonexistent-task-id", u, EndpointLogFile) }, http.StatusBadRequest},
472506
{"non-existent pid", func(u, n string) string { return fmt.Sprintf("%s%s?pid=999999&node_id=%s", u, EndpointLogFile, n) }, http.StatusNotFound},
473507

508+
// node_ip parameter tests
509+
{"node_ip invalid (non-existent)", func(u, n string) string { return fmt.Sprintf("%s%s?node_ip=192.168.255.255&filename=%s", u, EndpointLogFile, filename) }, http.StatusNotFound},
510+
474511
// Path traversal attacks
475512
{"traversal ../etc/passwd", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=../etc/passwd", u, EndpointLogFile, n) }, http.StatusBadRequest},
476513
{"traversal ..", func(u, n string) string { return fmt.Sprintf("%s%s?node_id=%s&filename=..", u, EndpointLogFile, n) }, http.StatusBadRequest},
@@ -687,6 +724,32 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
687724
t.Skip("Skipping pid parameter test for dead cluster: worker_pid not available in Ray export events (see https://github.com/ray-project/ray/issues/60129)")
688725
})
689726

727+
// Sub-test for node_ip parameter (dead cluster)
728+
test.T().Run("node_ip parameter", func(t *testing.T) {
729+
g := NewWithT(t)
730+
731+
// Use the captured node IP and ID from before cluster deletion
732+
LogWithTimestamp(t, "Testing node_ip parameter with IP: %s, ID: %s", savedNodeIP, savedNodeID)
733+
734+
// Test successful case: node_ip + filename
735+
url := fmt.Sprintf("%s%s?node_ip=%s&filename=%s", historyServerURL, EndpointLogFile, savedNodeIP, filename)
736+
resp, err := client.Get(url)
737+
g.Expect(err).NotTo(HaveOccurred())
738+
body, _ := io.ReadAll(resp.Body)
739+
resp.Body.Close()
740+
g.Expect(resp.StatusCode).To(Equal(http.StatusOK), "Expected OK for valid node_ip, got %d: %s", resp.StatusCode, string(body))
741+
g.Expect(len(body)).To(BeNumerically(">", 0))
742+
743+
// Test that node_ip and node_id point to the same node (should return same content)
744+
urlWithNodeID := fmt.Sprintf("%s%s?node_id=%s&filename=%s", historyServerURL, EndpointLogFile, savedNodeID, filename)
745+
resp2, err := client.Get(urlWithNodeID)
746+
g.Expect(err).NotTo(HaveOccurred())
747+
bodyWithNodeID, _ := io.ReadAll(resp2.Body)
748+
resp2.Body.Close()
749+
g.Expect(resp2.StatusCode).To(Equal(http.StatusOK))
750+
g.Expect(len(body)).To(Equal(len(bodyWithNodeID)), "node_ip and node_id should return same content")
751+
})
752+
690753
DeleteS3Bucket(test, g, s3Client)
691754
LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed")
692755
}

0 commit comments

Comments
 (0)