Skip to content

Commit f255d16

Browse files
fix: Align all nullable fields
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
1 parent a4604db commit f255d16

File tree

4 files changed

+67
-29
lines changed

4 files changed

+67
-29
lines changed

historyserver/pkg/eventserver/eventserver.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -972,17 +972,6 @@ func normalizeTaskIDsToHex(task *types.Task) {
972972
return ""
973973
}
974974

975-
// Check if the Ray ID is nil.
976-
isNil, err := utils.IsBase64Nil(base64ID)
977-
if err != nil {
978-
logrus.Errorf("Failed to check if Ray ID is nil: %v", err)
979-
return base64ID
980-
}
981-
if isNil {
982-
logrus.Infof("Ray ID is nil, keeping original base64 ID: %s", base64ID)
983-
return base64ID
984-
}
985-
986975
hexID, err := utils.ConvertBase64ToHex(base64ID)
987976
if err != nil {
988977
logrus.Errorf("Failed to convert ID from base64 to hex, keeping original: %v", err)

historyserver/pkg/eventserver/types/task.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ type TaskLogInfo struct {
101101

102102
// Task's fields are populated from the TASK_DEFINITION_EVENT, ACTOR_TASK_DEFINITION_EVENT, and TASK_LIFECYCLE_EVENT.
103103
// A TASK_DEFINITION_EVENT or an ACTOR_TASK_DEFINITION_EVENT is expected to be emitted once per task attempt,
104+
// For proto definitions, please refer to:
105+
// https://github.com/ray-project/ray/tree/master/src/ray/protobuf/public.
106+
// For field population, please refer to:
107+
// https://github.com/ray-project/ray/blob/36be009ae360788550e541d81806493f52963730/src/ray/core_worker/task_event_buffer.cc#L189-L295.
104108
type Task struct {
105109
// TaskID and TaskAttempt form the unique identifier for a task.
106110
TaskID string `json:"taskId"`
@@ -130,21 +134,23 @@ type Task struct {
130134
RefIDs map[string]string `json:"refIds,omitempty"`
131135
SerializedRuntimeEnv string `json:"serializedRuntimeEnv,omitempty"`
132136
// CallSite is the human readable stacktrace of the actor task invocation.
133-
CallSite string `json:"callSite,omitempty"`
137+
CallSite *string `json:"callSite,omitempty"`
134138
// LabelSelector is the key-value label constraints of the node to schedule this actor task on.
135139
LabelSelector map[string]string `json:"labelSelector,omitempty"`
136140

137141
// The task execution information, populated from TASK_LIFECYCLE_EVENT.
138142
StateTransitions []TaskStateTransition `json:"stateTransitions,omitempty"`
139-
RayErrorInfo RayErrorInfo `json:"rayErrorInfo,omitempty"`
143+
// RayErrorInfo is only populated when the state error info has any values.
144+
// Ref: https://github.com/ray-project/ray/blob/36be009ae360788550e541d81806493f52963730/src/ray/core_worker/task_event_buffer.cc#L263-L265.
145+
RayErrorInfo *RayErrorInfo `json:"rayErrorInfo,omitempty"`
140146

141147
NodeID string `json:"nodeId,omitempty"`
142148
WorkerID string `json:"workerId,omitempty"`
143149
WorkerPID int `json:"workerPid,omitempty"`
144150
// Whether the task is paused by the debugger.
145-
IsDebuggerPaused bool `json:"isDebuggerPaused,omitempty"`
151+
IsDebuggerPaused *bool `json:"isDebuggerPaused,omitempty"`
146152
// Actor task repr name, if applicable.
147-
ActorReprName string `json:"actorReprName,omitempty"`
153+
ActorReprName *string `json:"actorReprName,omitempty"`
148154

149155
// TaskLogInfo is just added at https://github.com/ray-project/ray/pull/60287.
150156
// TODO(jwj): Add support for TaskLogInfo.
@@ -340,6 +346,21 @@ func (t Task) DeepCopy() Task {
340346
copy(cp.StateTransitions, t.StateTransitions)
341347
}
342348

349+
if t.RayErrorInfo != nil {
350+
rayErrorInfo := *t.RayErrorInfo
351+
cp.RayErrorInfo = &rayErrorInfo
352+
}
353+
354+
if t.IsDebuggerPaused != nil {
355+
cp.IsDebuggerPaused = new(bool)
356+
*cp.IsDebuggerPaused = *t.IsDebuggerPaused
357+
}
358+
359+
if t.ActorReprName != nil {
360+
actorReprName := *t.ActorReprName
361+
cp.ActorReprName = &actorReprName
362+
}
363+
343364
if t.TaskLogInfo != nil {
344365
taskLogInfo := *t.TaskLogInfo
345366
cp.TaskLogInfo = &taskLogInfo

historyserver/pkg/historyserver/router.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,14 @@ func (s *ServerHandler) getTasks(req *restful.Request, resp *restful.Response) {
939939
// The schema aligns with the Ray Dashboard API.
940940
// Ref: https://github.com/ray-project/ray/blob/d0b1d151d8ea964a711e451d0ae736f8bf95b629/python/ray/util/state/common.py#L730-L819.
941941
func formatTaskForResponse(task eventtypes.Task, detail bool) map[string]interface{} {
942+
setNullableField := func(result map[string]interface{}, key string, value interface{}) {
943+
if value != nil {
944+
result[key] = value
945+
} else {
946+
result[key] = nil
947+
}
948+
}
949+
942950
// TODO(jwj): Maybe define result schema in types.go.
943951
result := map[string]interface{}{
944952
"task_id": task.TaskID,
@@ -953,7 +961,11 @@ func formatTaskForResponse(task eventtypes.Task, detail bool) map[string]interfa
953961
"node_id": task.NodeID,
954962
"worker_id": task.WorkerID,
955963
"worker_pid": task.WorkerPID,
956-
"error_type": string(task.RayErrorInfo.ErrorType),
964+
}
965+
if task.RayErrorInfo != nil {
966+
result["error_type"] = string(task.RayErrorInfo.ErrorType)
967+
} else {
968+
result["error_type"] = nil
957969
}
958970

959971
if detail {
@@ -969,7 +981,12 @@ func formatTaskForResponse(task eventtypes.Task, detail bool) map[string]interfa
969981
"log_files": []string{},
970982
},
971983
}
972-
result["placement_group_id"] = task.PlacementGroupID
984+
isNil, err := utils.IsHexNil(task.PlacementGroupID)
985+
if isNil || err != nil {
986+
result["placement_group_id"] = nil
987+
} else {
988+
result["placement_group_id"] = task.PlacementGroupID
989+
}
973990

974991
events := make([]map[string]interface{}, 0, len(task.StateTransitions))
975992
for _, event := range task.StateTransitions {
@@ -983,9 +1000,13 @@ func formatTaskForResponse(task eventtypes.Task, detail bool) map[string]interfa
9831000
// Ref: https://github.com/ray-project/ray/blob/d0b1d151d8ea964a711e451d0ae736f8bf95b629/python/ray/util/state/common.py#L1616-L1622.
9841001
// result["profiling_data"] = task.ProfilingData
9851002
result["task_log_info"] = task.TaskLogInfo
986-
result["error_message"] = task.RayErrorInfo.ErrorMessage
987-
result["is_debugger_paused"] = task.IsDebuggerPaused
988-
result["call_site"] = task.CallSite
1003+
if task.RayErrorInfo != nil {
1004+
result["error_message"] = task.RayErrorInfo.ErrorMessage
1005+
} else {
1006+
result["error_message"] = nil
1007+
}
1008+
setNullableField(result, "is_debugger_paused", task.IsDebuggerPaused)
1009+
setNullableField(result, "call_site", task.CallSite)
9891010
if task.LabelSelector != nil {
9901011
result["label_selector"] = task.LabelSelector
9911012
} else {

historyserver/pkg/utils/utils.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,22 +254,29 @@ func ConvertBase64ToHex(input string) (string, error) {
254254
return hexStr, nil
255255
}
256256

257-
func IsBase64Nil(base64Str string) (bool, error) {
258-
bytes, err := base64.StdEncoding.DecodeString(base64Str)
259-
if err != nil {
260-
return false, err
261-
}
257+
// IsHexNil returns true if hexStr decodes to a non-empty byte slice where every byte is 0xff.
258+
func IsHexNil(hexStr string) (bool, error) {
259+
s := strings.TrimSpace(hexStr)
262260

263-
if len(bytes) == 0 {
261+
if len(s) == 0 {
264262
return false, nil
265263
}
266264

267-
for i := range bytes {
268-
if bytes[i] != 0xff {
265+
// Hex string must have even length.
266+
if len(s)%2 != 0 {
267+
return false, hex.ErrLength
268+
}
269+
270+
bytes, err := hex.DecodeString(s)
271+
if err != nil {
272+
return false, err
273+
}
274+
275+
for _, v := range bytes {
276+
if v != 0xff {
269277
return false, nil
270278
}
271279
}
272-
273280
return true, nil
274281
}
275282

0 commit comments

Comments
 (0)