Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
1d02d7e
For dev outside the cluster (should be reverted)
JiangJiaWei1103 Jan 30, 2026
3bb03b5
feat: Add basic actor task def event processing
JiangJiaWei1103 Jan 30, 2026
4c3c701
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 Jan 31, 2026
f1c15e2
feat: Preserve actor task attempt ordering with a dummy merge fn
JiangJiaWei1103 Jan 31, 2026
a4922be
refactor: Merge actor task struct def and processing with other task …
JiangJiaWei1103 Jan 31, 2026
651eb6a
refactor: Extract handling TaskLifecycleEvent
JiangJiaWei1103 Jan 31, 2026
d3fb8d0
feat: Support deepcopy of all fields
JiangJiaWei1103 Jan 31, 2026
b7271a9
feat: Support complete fields in resp task res
JiangJiaWei1103 Jan 31, 2026
631af66
feat: Support limited error info and task log
JiangJiaWei1103 Jan 31, 2026
194221e
feat: Finish all fields for now
JiangJiaWei1103 Jan 31, 2026
2f61f31
fix: Fix field names in unit tests
JiangJiaWei1103 Jan 31, 2026
94a799f
test: Verify detailed task info of /api/v0/tasks
JiangJiaWei1103 Jan 31, 2026
1cb8348
Remove redundant logs
JiangJiaWei1103 Jan 31, 2026
46b1716
feat: Support query params
JiangJiaWei1103 Feb 1, 2026
5afd201
revert: Restore orig filter's impl for compatibility
JiangJiaWei1103 Feb 1, 2026
7555dd8
fix: Correct the number of tasks
JiangJiaWei1103 Feb 2, 2026
c1b9731
test: Add test cases for query params for dead cluster
JiangJiaWei1103 Feb 3, 2026
452cc44
fix: Validate int val
JiangJiaWei1103 Feb 3, 2026
4a4ab9b
fix: Align default vals with Ray
JiangJiaWei1103 Feb 3, 2026
0af806f
fix: Fix typo
JiangJiaWei1103 Feb 3, 2026
b4f2ca1
fix: Fix typo
JiangJiaWei1103 Feb 3, 2026
50fccaa
refactor: Rename filterTasks function for readability
JiangJiaWei1103 Feb 3, 2026
e78d3a4
Merge branch 'epic-4374/add-actor-task-endpoint' of github.com:JiangJ…
JiangJiaWei1103 Feb 3, 2026
8ac7a32
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 Feb 3, 2026
b750272
fix: Convert base64 to hex for task related IDs
JiangJiaWei1103 Feb 3, 2026
80bc021
fix: Preserve all lifecycle-derived fields
JiangJiaWei1103 Feb 3, 2026
bbd7145
fix: Add nil handling to callstring
JiangJiaWei1103 Feb 3, 2026
3ca7290
fix: Enable groupby funcName task summary
JiangJiaWei1103 Feb 3, 2026
961a46e
fix: Overwrite all lifecycle-related fields
JiangJiaWei1103 Feb 3, 2026
88715ba
docs: Clarify why consts are not used
JiangJiaWei1103 Feb 3, 2026
b73d622
fix: Use defined Language type
JiangJiaWei1103 Feb 3, 2026
60f9366
feat: Convert placement group ID to hex
JiangJiaWei1103 Feb 3, 2026
b2ecfe6
docs: Remove redundant docs
JiangJiaWei1103 Feb 3, 2026
193e536
test: Tweak detail fields and add closure for verification
JiangJiaWei1103 Feb 3, 2026
ad05f7a
refactor: Add get name and funcname helpers
JiangJiaWei1103 Feb 3, 2026
9164f3c
docs
JiangJiaWei1103 Feb 3, 2026
6cc9c9f
refactor: Use existing helpers
JiangJiaWei1103 Feb 3, 2026
22ea667
fix: Preserve IDs and PID during overwrite
JiangJiaWei1103 Feb 4, 2026
7c26db1
fix: Align live cluster API shcema
JiangJiaWei1103 Feb 4, 2026
af647b8
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 Feb 5, 2026
b8ac8a8
fix: Align runtime env
JiangJiaWei1103 Feb 5, 2026
81559f6
docs
JiangJiaWei1103 Feb 5, 2026
45c2b04
docs: Nullify task_log_info and enable in the future
JiangJiaWei1103 Feb 5, 2026
bc7ae3c
fix: Correct format
JiangJiaWei1103 Feb 5, 2026
be7d954
fix: Overwrite jobid conditionally
JiangJiaWei1103 Feb 5, 2026
a24d3a8
fix: Deepcopy task info
JiangJiaWei1103 Feb 5, 2026
a4604db
docs
JiangJiaWei1103 Feb 5, 2026
f255d16
fix: Align all nullable fields
JiangJiaWei1103 Feb 5, 2026
d968837
fix: Fix deepcopy
JiangJiaWei1103 Feb 5, 2026
f42de48
fix: Handle empty placement group id
JiangJiaWei1103 Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions historyserver/cmd/historyserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@ func main() {
kubeconfigs := ""
runtimeClassConfigPath := "/var/collector-config/data"
dashboardDir := ""
useKubernetesProxy := false
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.StringVar(&kubeconfigs, "kubeconfigs", "", "")
flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "")
flag.Parse()

cliMgr := historyserver.NewClientManager(kubeconfigs)
cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy)
if err != nil {
logrus.Errorf("Failed to create client manager: %v", err)
os.Exit(1)
}

jsonData := make(map[string]interface{})
if runtimeClassConfigPath != "" {
Expand Down Expand Up @@ -75,7 +81,11 @@ func main() {
logrus.Info("EventHandler shutdown complete")
}()

handler := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler)
handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy)
if err != nil {
logrus.Errorf("Failed to create server handler: %v", err)
os.Exit(1)
}

sigChan := make(chan os.Signal, 1)
stop := make(chan struct{}, 1)
Expand Down
283 changes: 149 additions & 134 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
}
eventType := types.EventType(eventTypeStr)

// clusterNameVal is actually the cluster session key.
clusterNameVal, ok := eventMap["clusterName"]
if !ok {
return fmt.Errorf("event missing 'clusterName' field")
Expand All @@ -210,126 +211,9 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
logrus.Infof("current eventType: %v", eventType)
switch eventType {
case types.TASK_DEFINITION_EVENT:
taskDef, ok := eventMap["taskDefinitionEvent"]
if !ok {
return fmt.Errorf("event does not have 'taskDefinitionEvent'")
}
jsonTaskDefinition, err := json.Marshal(taskDef)
if err != nil {
return err
}

var currTask types.Task
if err := json.Unmarshal(jsonTaskDefinition, &currTask); err != nil {
return err
}

taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(currentClusterName)
taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.AttemptNumber, func(t *types.Task) {
// Merge definition fields (preserve existing Events if any)
existingEvents := t.Events
*t = currTask
if len(existingEvents) > 0 {
t.Events = existingEvents
t.State = existingEvents[len(existingEvents)-1].State
}
})

return h.handleTaskDefinitionEvent(eventMap, currentClusterName, false)
case types.TASK_LIFECYCLE_EVENT:
lifecycleEvent, ok := eventMap["taskLifecycleEvent"].(map[string]any)
if !ok {
return fmt.Errorf("invalid taskLifecycleEvent format")
}

taskId, _ := lifecycleEvent["taskId"].(string)
taskAttempt, _ := lifecycleEvent["taskAttempt"].(float64)
transitions, _ := lifecycleEvent["stateTransitions"].([]any)

nodeId, _ := lifecycleEvent["nodeId"].(string)
workerId, _ := lifecycleEvent["workerId"].(string)

if len(transitions) == 0 || taskId == "" {
return nil
}

// Parse state transitions
var stateEvents []types.StateEvent
for _, transition := range transitions {
tr, ok := transition.(map[string]any)
if !ok {
continue
}
state, _ := tr["state"].(string)
timestampStr, _ := tr["timestamp"].(string)

var timestamp time.Time
if timestampStr != "" {
timestamp, _ = time.Parse(time.RFC3339Nano, timestampStr)
}

stateEvents = append(stateEvents, types.StateEvent{
State: types.TaskStatus(state),
Timestamp: timestamp,
})
}

if len(stateEvents) == 0 {
return nil
}

taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(currentClusterName)
taskMap.CreateOrMergeAttempt(taskId, int(taskAttempt), func(t *types.Task) {
// --- DEDUPLICATION using (State + Timestamp) as unique key ---
// Build a set of existing event keys to detect duplicates
type eventKey struct {
State string
Timestamp int64
}
existingKeys := make(map[eventKey]bool)
for _, e := range t.Events {
existingKeys[eventKey{string(e.State), e.Timestamp.UnixNano()}] = true
}

// Only append events that haven't been seen before
for _, e := range stateEvents {
key := eventKey{string(e.State), e.Timestamp.UnixNano()}
if !existingKeys[key] {
t.Events = append(t.Events, e)
existingKeys[key] = true
}
}

// Sort events by timestamp to ensure correct order
sort.Slice(t.Events, func(i, j int) bool {
return t.Events[i].Timestamp.Before(t.Events[j].Timestamp)
})

if len(t.Events) == 0 {
return
}

t.State = t.Events[len(t.Events)-1].State

if nodeId != "" {
t.NodeID = nodeId
}
if workerId != "" {
t.WorkerID = workerId
}
if t.StartTime.IsZero() {
for _, e := range t.Events {
if e.State == types.RUNNING {
t.StartTime = e.Timestamp
break
}
}
}
lastEvent := t.Events[len(t.Events)-1]
if lastEvent.State == types.FINISHED || lastEvent.State == types.FAILED {
t.EndTime = lastEvent.Timestamp
}
})

return h.handleTaskLifecycleEvent(eventMap, currentClusterName)
case types.ACTOR_DEFINITION_EVENT:
actorDef, ok := eventMap["actorDefinitionEvent"]
if !ok {
Expand Down Expand Up @@ -525,12 +409,8 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
}
a.NumRestarts = restartCount
})

case types.ACTOR_TASK_DEFINITION_EVENT:
// TODO: Handle actor task definition event
// This is related to GET /api/v0/tasks (type=ACTOR_TASK)
logrus.Debugf("ACTOR_TASK_DEFINITION_EVENT received, not yet implemented")

return h.handleTaskDefinitionEvent(eventMap, currentClusterName, true)
case types.DRIVER_JOB_DEFINITION_EVENT:
// NOTE: When event comes in, JobID will be in base64, processing will convert it to Hex
jobDef, ok := eventMap["driverJobDefinitionEvent"]
Expand Down Expand Up @@ -743,29 +623,31 @@ func (h *EventHandler) getAllNodeEventFiles(clusterInfo utils.ClusterInfo) []str
return nodeEventFiles
}

// GetTasks returns a thread-safe deep copy of all tasks (including all attempts) for a given cluster.
// Each task attempt is returned as a separate element in the slice.
// GetTasks returns a slice of thread-safe deep copies of all task attempts for a given cluster session.
// Deep copy ensures the returned data is safe to use after the lock is released.
func (h *EventHandler) GetTasks(clusterName string) []types.Task {
func (h *EventHandler) GetTasks(clusterSessionKey string) []types.Task {
h.ClusterTaskMap.RLock()
defer h.ClusterTaskMap.RUnlock()

taskMap, ok := h.ClusterTaskMap.ClusterTaskMap[clusterName]
taskMap, ok := h.ClusterTaskMap.ClusterTaskMap[clusterSessionKey]
if !ok {
// TODO(jwj): Add error handling.
logrus.Errorf("Task map not found for cluster session: %s", clusterSessionKey)
return []types.Task{}
}

taskMap.Lock()
defer taskMap.Unlock()

// Flatten all attempts into a single slice with deep copy
var tasks []types.Task
for _, attempts := range taskMap.TaskMap {
for _, task := range attempts {
tasks = append(tasks, task.DeepCopy())
// Flatten all task attempts into a single slice with deep copy.
allTasks := make([]types.Task, 0)
for _, taskAttempts := range taskMap.TaskMap {
for _, taskAttempt := range taskAttempts {
allTasks = append(allTasks, taskAttempt.DeepCopy())
}
}
return tasks

return allTasks
}

// GetTaskByID returns all attempts for a specific task ID in a given cluster.
Expand Down Expand Up @@ -915,3 +797,136 @@ func (h *EventHandler) GetJobByJobID(clusterName, jobID string) (types.Job, bool
}
return job.DeepCopy(), true
}

// handleTaskDefinitionEvent processes TASK_DEFINITION_EVENT or ACTOR_TASK_DEFINITION_EVENT and preserves the task attempt ordering.
func (h *EventHandler) handleTaskDefinitionEvent(eventMap map[string]any, clusterSessionKey string, isActorTask bool) error {
var taskDefField string
if isActorTask {
taskDefField = "actorTaskDefinitionEvent"
} else {
taskDefField = "taskDefinitionEvent"
}

taskDef, ok := eventMap[taskDefField]
if !ok {
return fmt.Errorf("event does not have '%s' field", taskDefField)
}

jsonTaskDefinition, err := json.Marshal(taskDef)
if err != nil {
return fmt.Errorf("failed to marshal %s event: %w", taskDefField, err)
}

var currTask types.Task
if err := json.Unmarshal(jsonTaskDefinition, &currTask); err != nil {
return fmt.Errorf("failed to unmarshal %s event: %w", taskDefField, err)
}

// Manually set the task type for an actor task.
if isActorTask {
currTask.TaskType = types.ACTOR_TASK
}

taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(clusterSessionKey)
taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.TaskAttempt, func(task *types.Task) {
// Preserve existing state transitions.
existingStateTransitions := task.StateTransitions

*task = currTask

if len(existingStateTransitions) > 0 {
task.StateTransitions = existingStateTransitions
task.State = task.GetLastState()
}
})

return nil
}

// handleTaskLifecycleEvent processes TASK_LIFECYCLE_EVENT and merges state transitions for a given task attempt.
func (h *EventHandler) handleTaskLifecycleEvent(eventMap map[string]any, clusterSessionKey string) error {
taskLifecycle, ok := eventMap["taskLifecycleEvent"]
if !ok {
return fmt.Errorf("event does not have 'taskLifecycleEvent' field")
}

jsonTaskLifecycle, err := json.Marshal(taskLifecycle)
if err != nil {
return fmt.Errorf("failed to marshal task lifecycle event: %w", err)
}

var currTask types.Task
if err := json.Unmarshal(jsonTaskLifecycle, &currTask); err != nil {
return fmt.Errorf("failed to unmarshal task lifecycle event: %w", err)
}
if currTask.TaskID == "" {
return fmt.Errorf("task ID is empty")
}

// TODO(jwj): Clarify if there must be at least one state transition. Can one task have more than one state transition?
if len(currTask.StateTransitions) == 0 {
return fmt.Errorf("TASK_LIFECYCLE_EVENT must have at least one state transition")
}

taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(clusterSessionKey)
taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.TaskAttempt, func(task *types.Task) {
// --- DEDUPLICATION using (State + Timestamp) as unique key ---
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplication logic can be reused from here after the node endpoint pr is merged.

// Build a set of existing event keys to detect duplicates
type eventKey struct {
State string
Timestamp int64
}
existingKeys := make(map[eventKey]bool)
for _, e := range task.StateTransitions {
existingKeys[eventKey{string(e.State), e.Timestamp.UnixNano()}] = true
}

// Only append events that haven't been seen before
for _, e := range currTask.StateTransitions {
key := eventKey{string(e.State), e.Timestamp.UnixNano()}
if !existingKeys[key] {
task.StateTransitions = append(task.StateTransitions, e)
existingKeys[key] = true
}
}

// Sort events by timestamp to ensure correct order
sort.Slice(task.StateTransitions, func(i, j int) bool {
return task.StateTransitions[i].Timestamp.Before(task.StateTransitions[j].Timestamp)
})

if len(task.StateTransitions) == 0 {
return
}

// TODO(jwj): Before beta, the lifecycle-related fields are overwritten.
// In beta, the complete historical replay will be supported.
task.JobID = currTask.JobID
task.NodeID = currTask.NodeID
task.WorkerID = currTask.WorkerID
task.WorkerPID = currTask.WorkerPID
task.IsDebuggerPaused = currTask.IsDebuggerPaused
task.ActorReprName = currTask.ActorReprName
task.State = task.GetLastState()

// Derive creation time, start time and end time from state transitions.
// Ref: https://github.com/ray-project/ray/blob/d0b1d151d8ea964a711e451d0ae736f8bf95b629/python/ray/util/state/common.py#L1660-L1685
for _, tr := range task.StateTransitions {
switch tr.State {
case types.PENDING_ARGS_AVAIL:
if task.CreationTime.IsZero() {
task.CreationTime = tr.Timestamp
}
case types.RUNNING:
if task.StartTime.IsZero() {
task.StartTime = tr.Timestamp
}
case types.FINISHED, types.FAILED:
// Take the latest timestamp as the end time.
task.EndTime = tr.Timestamp
}
}
})

return nil
}
Loading
Loading