Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1eb0afb
test: add more test case for live cluster e2e test
machichima Jan 29, 2026
64df48d
test: more test case for dead cluster e2e test
machichima Jan 29, 2026
4a4f729
refactor: clean up comments
machichima Jan 29, 2026
0d89338
feat: move query options to struct&add more options
machichima Jan 30, 2026
5dc5f12
feat: implement attempt_numbe, download_file, filter_ansi_code function
machichima Jan 30, 2026
6de7601
feat: e2e test for attempt_numbe, download_file, filter_ansi_code
machichima Jan 30, 2026
8d11aff
fix: update live cluster invalide param status code
machichima Jan 30, 2026
ff75e12
feat: add id related param&implement task_id+suffix
machichima Jan 31, 2026
4f56280
test: remove eventual & print body when status code mismatch
machichima Jan 31, 2026
e01c113
feat: logic to find logs based on worker ID
machichima Jan 31, 2026
41e5970
test: for suffix and task_id
machichima Jan 31, 2026
6497225
fix: update rayjob.yaml to ensure produce log.out file
machichima Jan 31, 2026
6b66a30
feat+test: support actor_id query
machichima Jan 31, 2026
d204ef1
feat+test: support pid query
machichima Jan 31, 2026
f739e5a
docs: todo comment for submission_id
machichima Feb 1, 2026
e562958
feat+test: add node_ip support
machichima Feb 1, 2026
f2f0161
test: move pid invalid test to logFileTestCases
machichima Feb 1, 2026
e0151db
fix: add download_filename rather than download_file flag
machichima Feb 1, 2026
892d41a
refactor: Base64 to hex conversion logic to util function
machichima Feb 1, 2026
d254463
fix: skip convert to hex if already is
machichima Feb 1, 2026
ccd8243
fix: close reader to prevent connection leak
machichima Feb 1, 2026
f4dfefb
refactor: remove duplicate import
machichima Feb 1, 2026
a6b76ad
test: fix and refactor test
machichima Feb 1, 2026
4b7a56b
fix: append suffix when deal with filename
machichima Feb 1, 2026
4bd16da
fix: remove duplicate suffix validation
machichima Feb 1, 2026
f8dd9ee
refactor: remove not yet implemented comment
machichima Feb 1, 2026
fe49c92
feat+test: add logs/stream endpoint
machichima Feb 1, 2026
200d56c
fix: remove redundant status code check
machichima Feb 2, 2026
c41d874
fix: update format of worker log in comment
machichima Feb 3, 2026
dd8a77f
Merge branch 'master' of github.com:ray-project/kuberay into improve-…
machichima Feb 3, 2026
d9b7dba
feat: more robust ConvertBase64ToHex and centralize the logic
machichima Feb 3, 2026
f1799d1
fix: return original id if cannot decode
machichima Feb 3, 2026
e848198
fix: escape filename correctly
machichima Feb 3, 2026
4e30f18
fix: add sessionID == "" check in resolveActorLogFilename
machichima Feb 3, 2026
1432037
fix: use correct cluster name to query task and actor id
machichima Feb 3, 2026
aba1b9e
test: filename header use no ""
machichima Feb 3, 2026
145ab04
fix: redundant err and correct regex for base64
machichima Feb 3, 2026
9980d22
fix: properly encode url parameter for task and actor id
machichima Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 202 additions & 4 deletions historyserver/pkg/historyserver/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package historyserver
import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"regexp"
"sort"
"strings"

"github.com/emicklei/go-restful/v3"
eventtypes "github.com/ray-project/kuberay/historyserver/pkg/eventserver/types"
"github.com/ray-project/kuberay/historyserver/pkg/utils"
"github.com/sirupsen/logrus"
)
Expand All @@ -26,6 +29,15 @@ const (
MAX_LOG_LIMIT = 10000
)

// ANSI escape code pattern for filtering colored output from logs
// Matches patterns like: \x1b[31m (red color), \x1b[0m (reset), etc.
var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;]+m`)

// filterAnsiEscapeCodes removes ANSI escape sequences from log content
func filterAnsiEscapeCodes(content []byte) []byte {
return ansiEscapePattern.ReplaceAll(content, []byte(""))
}

func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
// Initial continuation marker
logrus.Debugf("Prepare to get list clusters info ...")
Expand Down Expand Up @@ -73,18 +85,39 @@ func (s *ServerHandler) _getNodeLogs(rayClusterNameID, sessionId, nodeId, dir st
return json.Marshal(ret)
}

func (s *ServerHandler) _getNodeLogFile(rayClusterNameID, sessionID, nodeID, filename string, maxLines int) ([]byte, error) {
func (s *ServerHandler) _getNodeLogFile(rayClusterNameID, sessionID string, options GetLogFileOptions) ([]byte, error) {
// Resolve node_id and filename based on options
nodeID, filename, err := s.resolveLogFilename(rayClusterNameID, sessionID, options)
if err != nil {
return nil, utils.NewHTTPError(err, http.StatusBadRequest)
}

// Build log path
logPath := path.Join(sessionID, "logs", nodeID, filename)

// Append attempt_number if specified and not using task_id
// (task_id already includes attempt_number in resolution)
if options.AttemptNumber > 0 && options.TaskID == "" {
logPath = fmt.Sprintf("%s.%d", logPath, options.AttemptNumber)
}

reader := s.reader.GetContent(rayClusterNameID, logPath)

if reader == nil {
return nil, utils.NewHTTPError(fmt.Errorf("log file not found: %s", logPath), http.StatusNotFound)
}

maxLines := options.Lines
if maxLines < 0 {
// -1 means read all lines, match Ray Dashboard API behavior
return io.ReadAll(reader)
// -1 means read all lines
content, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
if options.FilterAnsiCode {
content = filterAnsiEscapeCodes(content)
}
return content, nil
}

if maxLines == 0 {
Expand Down Expand Up @@ -132,7 +165,172 @@ func (s *ServerHandler) _getNodeLogFile(rayClusterNameID, sessionID, nodeID, fil
lines = append(buffer[start:], buffer[:start]...)
}

return []byte(strings.Join(lines, "\n")), nil
result := []byte(strings.Join(lines, "\n"))
if options.FilterAnsiCode {
result = filterAnsiEscapeCodes(result)
}

return result, nil
}

// resolveLogFilename resolves the log file node_id and filename based on the provided options.
// This mirrors Ray Dashboard's resolve_filename logic.
// The sessionID parameter is required for task_id resolution to search worker log files.
func (s *ServerHandler) resolveLogFilename(clusterNameID, sessionID string, options GetLogFileOptions) (nodeID, filename string, err error) {
// Validate suffix
if options.Suffix != "out" && options.Suffix != "err" {
return "", "", fmt.Errorf("invalid suffix: %s (must be 'out' or 'err')", options.Suffix)
}

// If filename is explicitly provided, use it and ignore suffix
if options.Filename != "" {
if options.NodeID == "" {
return "", "", fmt.Errorf("node_id is required when filename is provided")
}
return options.NodeID, options.Filename, nil
}

// If task_id is provided, resolve from task events
if options.TaskID != "" {
return s.resolveTaskLogFilename(clusterNameID, sessionID, options.TaskID, options.AttemptNumber, options.Suffix)
}

// If actor_id is provided, resolve from actor events
// TODO: not implemented
if options.ActorID != "" {
return "", "", fmt.Errorf("actor_id resolution not yet implemented")
}

// If pid is provided, resolve worker log file
// TODO: not implemented
if options.PID > 0 {
return "", "", fmt.Errorf("pid resolution not yet implemented")
}

return "", "", fmt.Errorf("must provide one of: filename, task_id, actor_id, or pid")
}

// resolveTaskLogFilename resolves log file for a task by querying task events.
// This mirrors Ray Dashboard's _resolve_task_filename logic.
// The sessionID parameter is required for searching worker log files when task_log_info is not available.
func (s *ServerHandler) resolveTaskLogFilename(clusterNameID, sessionID, taskID string, attemptNumber int, suffix string) (nodeID, filename string, err error) {
// Get task attempts by task ID
taskAttempts, found := s.eventHandler.GetTaskByID(clusterNameID, taskID)
if !found {
return "", "", fmt.Errorf("task not found: task_id=%s", taskID)
}

// Find the specific attempt
var foundTask *eventtypes.Task
for i, task := range taskAttempts {
if task.AttemptNumber == attemptNumber {
foundTask = &taskAttempts[i]
break
}
}

if foundTask == nil {
return "", "", fmt.Errorf("task attempt not found: task_id=%s, attempt_number=%d", taskID, attemptNumber)
}

// Check if task has node_id
if foundTask.NodeID == "" {
return "", "", fmt.Errorf("task %s (attempt %d) has no node_id (task not scheduled yet)", taskID, attemptNumber)
}

// Check if this is an actor task
if foundTask.ActorID != "" {
return "", "", fmt.Errorf(
"for actor task, please query actor log for actor(%s) by providing actor_id query parameter",
foundTask.ActorID,
)
}

// Check if task has worker_id
if foundTask.WorkerID == "" {
return "", "", fmt.Errorf(
"task %s (attempt %d) has no worker_id",
taskID, attemptNumber,
)
}

// Try to use task_log_info if available
// NOTE: task_log_info is currently not supported in ray export event, so we will always
// fallback to following logic.
if foundTask.TaskLogInfo != nil && len(foundTask.TaskLogInfo) > 0 {
filenameKey := "stdout_file"
if suffix == "err" {
filenameKey = "stderr_file"
}

if logFilename, ok := foundTask.TaskLogInfo[filenameKey]; ok && logFilename != "" {
return foundTask.NodeID, logFilename, nil
}
}

// Fallback: Find worker log file by worker_id
// Worker log files follow the pattern: worker-{worker_id_hex}-{pid}-{worker_startup_token}.{suffix}
// We need to search for files matching this pattern
if sessionID == "" {
return "", "", fmt.Errorf(
"task %s (attempt %d) has no task_log_info and sessionID is required to search for worker log files",
taskID, attemptNumber,
)
}

nodeIDHex, logFilename, err := s.findWorkerLogFile(clusterNameID, sessionID, foundTask.NodeID, foundTask.WorkerID, suffix)
if err != nil {
return "", "", fmt.Errorf(
"failed to find worker log file for task %s (attempt %d, worker_id=%s, node_id=%s): %w",
taskID, attemptNumber, foundTask.WorkerID, foundTask.NodeID, err,
)
}

return nodeIDHex, logFilename, nil
}

// findWorkerLogFile searches for a worker log file by worker_id.
// Worker log files follow the pattern: worker-{worker_id_hex}-{pid}-{worker_startup_token}.{suffix}
// Returns (nodeIDHex, filename, error).
func (s *ServerHandler) findWorkerLogFile(clusterNameID, sessionID, nodeID, workerID, suffix string) (string, string, error) {
// Convert Base64 node_id to hex for the file path
// Ray stores IDs in Base64 (URL-safe) in events, but uses hex in log directory structure
nodeIDBytes, err := base64.RawURLEncoding.DecodeString(nodeID)
if err != nil {
// Try standard Base64 if URL-safe fails
nodeIDBytes, err = base64.StdEncoding.DecodeString(nodeID)
if err != nil {
return "", "", fmt.Errorf("failed to decode node_id: %w", err)
}
}
nodeIDHex := fmt.Sprintf("%x", nodeIDBytes)

// Convert Base64 worker_id to hex
workerIDBytes, err := base64.RawURLEncoding.DecodeString(workerID)
if err != nil {
// Try standard Base64 if URL-safe fails
workerIDBytes, err = base64.StdEncoding.DecodeString(workerID)
if err != nil {
return "", "", fmt.Errorf("failed to decode worker_id: %w", err)
}
}
workerIDHex := fmt.Sprintf("%x", workerIDBytes)

// List all files in the node's log directory
logPath := path.Join(sessionID, "logs", nodeIDHex)
files := s.reader.ListFiles(clusterNameID, logPath)

// Search for files matching pattern: worker-{worker_id_hex}-*.{suffix}
workerPrefix := fmt.Sprintf("worker-%s-", workerIDHex)
workerSuffix := fmt.Sprintf(".%s", suffix)

for _, file := range files {
if strings.HasPrefix(file, workerPrefix) && strings.HasSuffix(file, workerSuffix) {
return nodeIDHex, file, nil
}
}

return "", "", fmt.Errorf("worker log file not found: worker_id=%s (hex=%s), suffix=%s, searched in %s", workerID, workerIDHex, suffix, logPath)
}

func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, error) {
Expand Down
Loading
Loading