-
Notifications
You must be signed in to change notification settings - Fork 698
[Feat] [history server] Add actor task endpoint #4463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
JiangJiaWei1103
wants to merge
50
commits into
ray-project:master
Choose a base branch
from
JiangJiaWei1103:epic-4374/add-actor-task-endpoint
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 12 commits
Commits
Show all changes
50 commits
Select commit
Hold shift + click to select a range
1d02d7e
For dev outside the cluster (should be reverted)
JiangJiaWei1103 3bb03b5
feat: Add basic actor task def event processing
JiangJiaWei1103 4c3c701
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 f1c15e2
feat: Preserve actor task attempt ordering with a dummy merge fn
JiangJiaWei1103 a4922be
refactor: Merge actor task struct def and processing with other task …
JiangJiaWei1103 651eb6a
refactor: Extract handling TaskLifecycleEvent
JiangJiaWei1103 d3fb8d0
feat: Support deepcopy of all fields
JiangJiaWei1103 b7271a9
feat: Support complete fields in resp task res
JiangJiaWei1103 631af66
feat: Support limited error info and task log
JiangJiaWei1103 194221e
feat: Finish all fields for now
JiangJiaWei1103 2f61f31
fix: Fix field names in unit tests
JiangJiaWei1103 94a799f
test: Verify detailed task info of /api/v0/tasks
JiangJiaWei1103 1cb8348
Remove redundant logs
JiangJiaWei1103 46b1716
feat: Support query params
JiangJiaWei1103 5afd201
revert: Restore orig filter's impl for compatibility
JiangJiaWei1103 7555dd8
fix: Correct the number of tasks
JiangJiaWei1103 c1b9731
test: Add test cases for query params for dead cluster
JiangJiaWei1103 452cc44
fix: Validate int val
JiangJiaWei1103 4a4ab9b
fix: Align default vals with Ray
JiangJiaWei1103 0af806f
fix: Fix typo
JiangJiaWei1103 b4f2ca1
fix: Fix typo
JiangJiaWei1103 50fccaa
refactor: Rename filterTasks function for readability
JiangJiaWei1103 e78d3a4
Merge branch 'epic-4374/add-actor-task-endpoint' of github.com:JiangJ…
JiangJiaWei1103 8ac7a32
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 b750272
fix: Convert base64 to hex for task related IDs
JiangJiaWei1103 80bc021
fix: Preserve all lifecycle-derived fields
JiangJiaWei1103 bbd7145
fix: Add nil handling to callstring
JiangJiaWei1103 3ca7290
fix: Enable groupby funcName task summary
JiangJiaWei1103 961a46e
fix: Overwrite all lifecycle-related fields
JiangJiaWei1103 88715ba
docs: Clarify why consts are not used
JiangJiaWei1103 b73d622
fix: Use defined Language type
JiangJiaWei1103 60f9366
feat: Convert placement group ID to hex
JiangJiaWei1103 b2ecfe6
docs: Remove redundant docs
JiangJiaWei1103 193e536
test: Tweak detail fields and add closure for verification
JiangJiaWei1103 ad05f7a
refactor: Add get name and funcname helpers
JiangJiaWei1103 9164f3c
docs
JiangJiaWei1103 6cc9c9f
refactor: Use existing helpers
JiangJiaWei1103 22ea667
fix: Preserve IDs and PID during overwrite
JiangJiaWei1103 7c26db1
fix: Align live cluster API shcema
JiangJiaWei1103 af647b8
Merge branch 'my-master' into epic-4374/add-actor-task-endpoint
JiangJiaWei1103 b8ac8a8
fix: Align runtime env
JiangJiaWei1103 81559f6
docs
JiangJiaWei1103 45c2b04
docs: Nullify task_log_info and enable in the future
JiangJiaWei1103 bc7ae3c
fix: Correct format
JiangJiaWei1103 be7d954
fix: Overwrite jobid conditionally
JiangJiaWei1103 a24d3a8
fix: Deepcopy task info
JiangJiaWei1103 a4604db
docs
JiangJiaWei1103 f255d16
fix: Align all nullable fields
JiangJiaWei1103 d968837
fix: Fix deepcopy
JiangJiaWei1103 f42de48
fix: Handle empty placement group id
JiangJiaWei1103 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,6 +198,7 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error { | |
| } | ||
| eventType := types.EventType(eventTypeStr) | ||
|
|
||
| // clusterNameVal is actually the cluster session key. | ||
| clusterNameVal, ok := eventMap["clusterName"] | ||
| if !ok { | ||
| return fmt.Errorf("event missing 'clusterName' field") | ||
|
|
@@ -210,126 +211,9 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error { | |
| logrus.Infof("current eventType: %v", eventType) | ||
| switch eventType { | ||
| case types.TASK_DEFINITION_EVENT: | ||
| taskDef, ok := eventMap["taskDefinitionEvent"] | ||
| if !ok { | ||
| return fmt.Errorf("event does not have 'taskDefinitionEvent'") | ||
| } | ||
| jsonTaskDefinition, err := json.Marshal(taskDef) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var currTask types.Task | ||
| if err := json.Unmarshal(jsonTaskDefinition, &currTask); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(currentClusterName) | ||
| taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.AttemptNumber, func(t *types.Task) { | ||
| // Merge definition fields (preserve existing Events if any) | ||
| existingEvents := t.Events | ||
| *t = currTask | ||
| if len(existingEvents) > 0 { | ||
| t.Events = existingEvents | ||
| t.State = existingEvents[len(existingEvents)-1].State | ||
| } | ||
| }) | ||
|
|
||
| return h.handleTaskDefinitionEvent(eventMap, currentClusterName, false) | ||
| case types.TASK_LIFECYCLE_EVENT: | ||
| lifecycleEvent, ok := eventMap["taskLifecycleEvent"].(map[string]any) | ||
| if !ok { | ||
| return fmt.Errorf("invalid taskLifecycleEvent format") | ||
| } | ||
|
|
||
| taskId, _ := lifecycleEvent["taskId"].(string) | ||
| taskAttempt, _ := lifecycleEvent["taskAttempt"].(float64) | ||
| transitions, _ := lifecycleEvent["stateTransitions"].([]any) | ||
|
|
||
| nodeId, _ := lifecycleEvent["nodeId"].(string) | ||
| workerId, _ := lifecycleEvent["workerId"].(string) | ||
|
|
||
| if len(transitions) == 0 || taskId == "" { | ||
| return nil | ||
| } | ||
|
|
||
| // Parse state transitions | ||
| var stateEvents []types.StateEvent | ||
| for _, transition := range transitions { | ||
| tr, ok := transition.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| state, _ := tr["state"].(string) | ||
| timestampStr, _ := tr["timestamp"].(string) | ||
|
|
||
| var timestamp time.Time | ||
| if timestampStr != "" { | ||
| timestamp, _ = time.Parse(time.RFC3339Nano, timestampStr) | ||
| } | ||
|
|
||
| stateEvents = append(stateEvents, types.StateEvent{ | ||
| State: types.TaskStatus(state), | ||
| Timestamp: timestamp, | ||
| }) | ||
| } | ||
|
|
||
| if len(stateEvents) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(currentClusterName) | ||
| taskMap.CreateOrMergeAttempt(taskId, int(taskAttempt), func(t *types.Task) { | ||
| // --- DEDUPLICATION using (State + Timestamp) as unique key --- | ||
| // Build a set of existing event keys to detect duplicates | ||
| type eventKey struct { | ||
| State string | ||
| Timestamp int64 | ||
| } | ||
| existingKeys := make(map[eventKey]bool) | ||
| for _, e := range t.Events { | ||
| existingKeys[eventKey{string(e.State), e.Timestamp.UnixNano()}] = true | ||
| } | ||
|
|
||
| // Only append events that haven't been seen before | ||
| for _, e := range stateEvents { | ||
| key := eventKey{string(e.State), e.Timestamp.UnixNano()} | ||
| if !existingKeys[key] { | ||
| t.Events = append(t.Events, e) | ||
| existingKeys[key] = true | ||
| } | ||
| } | ||
|
|
||
| // Sort events by timestamp to ensure correct order | ||
| sort.Slice(t.Events, func(i, j int) bool { | ||
| return t.Events[i].Timestamp.Before(t.Events[j].Timestamp) | ||
| }) | ||
|
|
||
| if len(t.Events) == 0 { | ||
| return | ||
| } | ||
|
|
||
| t.State = t.Events[len(t.Events)-1].State | ||
|
|
||
| if nodeId != "" { | ||
| t.NodeID = nodeId | ||
| } | ||
| if workerId != "" { | ||
| t.WorkerID = workerId | ||
| } | ||
| if t.StartTime.IsZero() { | ||
| for _, e := range t.Events { | ||
| if e.State == types.RUNNING { | ||
| t.StartTime = e.Timestamp | ||
| break | ||
| } | ||
| } | ||
| } | ||
| lastEvent := t.Events[len(t.Events)-1] | ||
| if lastEvent.State == types.FINISHED || lastEvent.State == types.FAILED { | ||
| t.EndTime = lastEvent.Timestamp | ||
| } | ||
| }) | ||
|
|
||
| return h.handleTaskLifecycleEvent(eventMap, currentClusterName) | ||
| case types.ACTOR_DEFINITION_EVENT: | ||
| actorDef, ok := eventMap["actorDefinitionEvent"] | ||
| if !ok { | ||
|
|
@@ -525,12 +409,8 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error { | |
| } | ||
| a.NumRestarts = restartCount | ||
| }) | ||
|
|
||
| case types.ACTOR_TASK_DEFINITION_EVENT: | ||
| // TODO: Handle actor task definition event | ||
| // This is related to GET /api/v0/tasks (type=ACTOR_TASK) | ||
| logrus.Debugf("ACTOR_TASK_DEFINITION_EVENT received, not yet implemented") | ||
|
|
||
| return h.handleTaskDefinitionEvent(eventMap, currentClusterName, true) | ||
| case types.DRIVER_JOB_DEFINITION_EVENT: | ||
| // NOTE: When event comes in, JobID will be in base64, processing will convert it to Hex | ||
| jobDef, ok := eventMap["driverJobDefinitionEvent"] | ||
|
|
@@ -743,29 +623,31 @@ func (h *EventHandler) getAllNodeEventFiles(clusterInfo utils.ClusterInfo) []str | |
| return nodeEventFiles | ||
| } | ||
|
|
||
| // GetTasks returns a thread-safe deep copy of all tasks (including all attempts) for a given cluster. | ||
| // Each task attempt is returned as a separate element in the slice. | ||
| // GetTasks returns a slice of thread-safe deep copies of all task attempts for a given cluster session. | ||
| // Deep copy ensures the returned data is safe to use after the lock is released. | ||
| func (h *EventHandler) GetTasks(clusterName string) []types.Task { | ||
| func (h *EventHandler) GetTasks(clusterSessionKey string) []types.Task { | ||
| h.ClusterTaskMap.RLock() | ||
| defer h.ClusterTaskMap.RUnlock() | ||
|
|
||
| taskMap, ok := h.ClusterTaskMap.ClusterTaskMap[clusterName] | ||
| taskMap, ok := h.ClusterTaskMap.ClusterTaskMap[clusterSessionKey] | ||
| if !ok { | ||
| // TODO(jwj): Add error handling. | ||
| logrus.Errorf("Task map not found for cluster session: %s", clusterSessionKey) | ||
| return []types.Task{} | ||
| } | ||
|
|
||
| taskMap.Lock() | ||
| defer taskMap.Unlock() | ||
|
|
||
| // Flatten all attempts into a single slice with deep copy | ||
| var tasks []types.Task | ||
| for _, attempts := range taskMap.TaskMap { | ||
| for _, task := range attempts { | ||
| tasks = append(tasks, task.DeepCopy()) | ||
| // Flatten all task attempts into a single slice with deep copy. | ||
| allTasks := make([]types.Task, 0) | ||
| for _, taskAttempts := range taskMap.TaskMap { | ||
| for _, taskAttempt := range taskAttempts { | ||
| allTasks = append(allTasks, taskAttempt.DeepCopy()) | ||
| } | ||
| } | ||
| return tasks | ||
|
|
||
| return allTasks | ||
| } | ||
|
|
||
| // GetTaskByID returns all attempts for a specific task ID in a given cluster. | ||
|
|
@@ -915,3 +797,143 @@ func (h *EventHandler) GetJobByJobID(clusterName, jobID string) (types.Job, bool | |
| } | ||
| return job.DeepCopy(), true | ||
| } | ||
|
|
||
| // handleTaskDefinitionEvent processes TASK_DEFINITION_EVENT or ACTOR_TASK_DEFINITION_EVENT and preserves the task attempt ordering. | ||
| func (h *EventHandler) handleTaskDefinitionEvent(eventMap map[string]any, clusterSessionKey string, isActorTask bool) error { | ||
| var taskDefField string | ||
| if isActorTask { | ||
| taskDefField = "actorTaskDefinitionEvent" | ||
| } else { | ||
| taskDefField = "taskDefinitionEvent" | ||
| } | ||
|
|
||
| taskDef, ok := eventMap[taskDefField] | ||
| if !ok { | ||
| return fmt.Errorf("event does not have '%s' field", taskDefField) | ||
| } | ||
|
|
||
| jsonTaskDefinition, err := json.Marshal(taskDef) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal %s event: %w", taskDefField, err) | ||
| } | ||
|
|
||
| var currTask types.Task | ||
| if err := json.Unmarshal(jsonTaskDefinition, &currTask); err != nil { | ||
| return fmt.Errorf("failed to unmarshal %s event: %w", taskDefField, err) | ||
| } | ||
|
|
||
| // Manually set the task type for an actor task. | ||
| if isActorTask { | ||
| currTask.TaskType = types.ACTOR_TASK | ||
| } | ||
|
|
||
| taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(clusterSessionKey) | ||
| taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.TaskAttempt, func(task *types.Task) { | ||
| // Preserve existing state transitions. | ||
| existingStateTransitions := task.StateTransitions | ||
|
|
||
| *task = currTask | ||
|
|
||
| if len(existingStateTransitions) > 0 { | ||
| task.StateTransitions = existingStateTransitions | ||
| task.State = task.GetLastState() | ||
| } | ||
| }) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| logrus.Infof("Task definition event processed: %v", currTask) | ||
| logrus.Infof("Is actor task: %t", isActorTask) | ||
| logrus.Infof("Task map: %v", taskMap.TaskMap[currTask.TaskID]) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // handleTaskLifecycleEvent processes TASK_LIFECYCLE_EVENT and merges state transitions for a given task attempt. | ||
| func (h *EventHandler) handleTaskLifecycleEvent(eventMap map[string]any, clusterSessionKey string) error { | ||
| taskLifecycle, ok := eventMap["taskLifecycleEvent"] | ||
| if !ok { | ||
| return fmt.Errorf("event does not have 'taskLifecycleEvent' field") | ||
| } | ||
|
|
||
| jsonTaskLifecycle, err := json.Marshal(taskLifecycle) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal task lifecycle event: %w", err) | ||
| } | ||
|
|
||
| var currTask types.Task | ||
| if err := json.Unmarshal(jsonTaskLifecycle, &currTask); err != nil { | ||
| return fmt.Errorf("failed to unmarshal task lifecycle event: %w", err) | ||
| } | ||
| if currTask.TaskID == "" { | ||
| return fmt.Errorf("task ID is empty") | ||
| } | ||
|
|
||
| // TODO(jwj): Clarify if there must be at least one state transition. Can one task have more than one state transition? | ||
| if len(currTask.StateTransitions) == 0 { | ||
| return fmt.Errorf("TASK_LIFECYCLE_EVENT must have at least one state transition") | ||
| } | ||
|
|
||
| taskMap := h.ClusterTaskMap.GetOrCreateTaskMap(clusterSessionKey) | ||
| taskMap.CreateOrMergeAttempt(currTask.TaskID, currTask.TaskAttempt, func(task *types.Task) { | ||
| // --- DEDUPLICATION using (State + Timestamp) as unique key --- | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deduplication logic can be reused from here after the node endpoint pr is merged. |
||
| // Build a set of existing event keys to detect duplicates | ||
| type eventKey struct { | ||
| State string | ||
| Timestamp int64 | ||
| } | ||
| existingKeys := make(map[eventKey]bool) | ||
| for _, e := range task.StateTransitions { | ||
| existingKeys[eventKey{string(e.State), e.Timestamp.UnixNano()}] = true | ||
| } | ||
|
|
||
| // Only append events that haven't been seen before | ||
| for _, e := range currTask.StateTransitions { | ||
| key := eventKey{string(e.State), e.Timestamp.UnixNano()} | ||
| if !existingKeys[key] { | ||
| task.StateTransitions = append(task.StateTransitions, e) | ||
| existingKeys[key] = true | ||
| } | ||
| } | ||
|
|
||
| // Sort events by timestamp to ensure correct order | ||
| sort.Slice(task.StateTransitions, func(i, j int) bool { | ||
| return task.StateTransitions[i].Timestamp.Before(task.StateTransitions[j].Timestamp) | ||
| }) | ||
|
|
||
| if len(task.StateTransitions) == 0 { | ||
| return | ||
| } | ||
|
|
||
| // TODO(jwj): Before beta, the lifecycle-related fields are overwritten. | ||
| // In beta, the complete historical replay will be supported. | ||
| task.JobID = currTask.JobID | ||
JiangJiaWei1103 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
JiangJiaWei1103 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
JiangJiaWei1103 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
JiangJiaWei1103 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| task.NodeID = currTask.NodeID | ||
| task.WorkerID = currTask.WorkerID | ||
| task.WorkerPID = currTask.WorkerPID | ||
| task.IsDebuggerPaused = currTask.IsDebuggerPaused | ||
| task.ActorReprName = currTask.ActorReprName | ||
| task.State = task.GetLastState() | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Derive creation time, start time and end time from state transitions. | ||
| // Ref: https://github.com/ray-project/ray/blob/d0b1d151d8ea964a711e451d0ae736f8bf95b629/python/ray/util/state/common.py#L1660-L1685 | ||
| for _, tr := range task.StateTransitions { | ||
| switch tr.State { | ||
| case types.PENDING_ARGS_AVAIL: | ||
| if task.CreationTime.IsZero() { | ||
| task.CreationTime = tr.Timestamp | ||
| } | ||
| case types.RUNNING: | ||
| if task.StartTime.IsZero() { | ||
| task.StartTime = tr.Timestamp | ||
| } | ||
| case types.FINISHED, types.FAILED: | ||
| // Take the latest timestamp as the end time. | ||
| task.EndTime = tr.Timestamp | ||
| } | ||
| } | ||
| }) | ||
|
|
||
| logrus.Infof("Task lifecycle event processed: %v", currTask) | ||
| logrus.Infof("Task map: %v", taskMap.TaskMap) | ||
|
|
||
| return nil | ||
| } | ||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.