Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type EventHandler struct {
ClusterTaskMap *types.ClusterTaskMap
ClusterActorMap *types.ClusterActorMap
ClusterJobMap *types.ClusterJobMap
ClusterEventMap *types.ClusterEventMap // For /events API
}

var eventFilePattern = regexp.MustCompile(`-\d{4}-\d{2}-\d{2}-\d{2}$`)
Expand All @@ -48,6 +49,7 @@ func NewEventHandler(reader storage.StorageReader) *EventHandler {
ClusterJobMap: &types.ClusterJobMap{
ClusterJobMap: make(map[string]*types.JobMap),
},
ClusterEventMap: types.NewClusterEventMap(),
}
}

Expand Down Expand Up @@ -186,6 +188,133 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
return nil
}

// transformToEvent converts a RayEvent map to the API response Event format.
// This extracts common fields and puts event-type-specific data into customFields.
// It also extracts sourceHostname and sourcePid from nested events where available.
func transformToEvent(eventMap map[string]any) *types.Event {
event := &types.Event{
Severity: "INFO",
CustomFields: make(map[string]any),
}

// Extract common fields (already camelCase)
if v, ok := eventMap["eventId"].(string); ok {
event.EventID = v
}
if v, ok := eventMap["sourceType"].(string); ok {
event.SourceType = v
}
// Convert ISO 8601 timestamp to Unix milliseconds to match Ray Dashboard format
if v, ok := eventMap["timestamp"].(string); ok {
if t, err := time.Parse(time.RFC3339Nano, v); err == nil {
event.Timestamp = fmt.Sprintf("%d", t.UnixMilli())
} else {
logrus.Warnf("failed to parse timestamp %q: %v", v, err)
}
}
if v, ok := eventMap["severity"].(string); ok {
event.Severity = v
}
if v, ok := eventMap["message"].(string); ok {
event.Message = v
}
if v, ok := eventMap["sessionName"].(string); ok {
event.CustomFields["sessionName"] = v
}
// Extract nodeId from base RayEvent (field #18)
if v, ok := eventMap["nodeId"].(string); ok {
event.NodeID = v
}

// Extract eventType and set as Label for filtering
eventType, _ := eventMap["eventType"].(string)
event.EventType = eventType
event.Label = eventType // Use eventType as label for Dashboard compatibility

// Map eventType to its corresponding nested data field
eventDataFields := map[string]string{
string(types.TASK_DEFINITION_EVENT): "taskDefinitionEvent",
string(types.TASK_LIFECYCLE_EVENT): "taskLifecycleEvent",
string(types.TASK_PROFILE_EVENT): "taskProfileEvents",
string(types.ACTOR_DEFINITION_EVENT): "actorDefinitionEvent",
string(types.ACTOR_LIFECYCLE_EVENT): "actorLifecycleEvent",
string(types.ACTOR_TASK_DEFINITION_EVENT): "actorTaskDefinitionEvent",
string(types.NODE_DEFINITION_EVENT): "nodeDefinitionEvent",
string(types.NODE_LIFECYCLE_EVENT): "nodeLifecycleEvent",
string(types.DRIVER_JOB_DEFINITION_EVENT): "driverJobDefinitionEvent",
string(types.DRIVER_JOB_LIFECYCLE_EVENT): "driverJobLifecycleEvent",
}

if dataField, ok := eventDataFields[eventType]; ok {
if data, ok := eventMap[dataField].(map[string]any); ok {
event.CustomFields[dataField] = data

// Extract sourceHostname and sourcePid from nested events where available
extractHostnameAndPid(event, eventType, data)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task profile event data may be silently lost

Medium Severity

The field taskProfileEvents is uniquely plural among all event data fields (others are singular like taskDefinitionEvent, actorLifecycleEvent). This naming suggests it may contain an array of profile events rather than a single map. The code only handles map[string]any type assertions - if the actual structure is []any, the type assertion silently fails, causing task profile data to not be captured in customFields and jobId extraction to fail (defaulting to "global" grouping).

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is not valid. The taskProfileEvents field is a single object (map), not an array. Here's the evidence:

  1. Proto Definition:
message TaskProfileEvents {
  bytes task_id = 1;
  int32 attempt_number = 2;
  bytes job_id = 3;
  ray.rpc.ProfileEvents profile_events = 4;  // the "events" array is INSIDE here
}
  1. Actual JSON Data
{
  "eventType": "TASK_PROFILE_EVENT",
  "taskProfileEvents": {
    "attemptNumber": 0,
    "jobId": "BAAAAA==",
    "taskId": "...",
    "profileEvents": {
      "componentId": "...",
      "componentType": "worker",
      "events": [...]  // <-- the array is HERE, inside profileEvents
    }
  }
}

}

return event
}

// extractHostnameAndPid extracts sourceHostname and sourcePid from nested event data.
// These fields are available in specific event types as discovered from Ray protos.
func extractHostnameAndPid(event *types.Event, eventType string, data map[string]any) {
switch eventType {
case string(types.NODE_DEFINITION_EVENT):
// NodeDefinitionEvent has: hostname, node_name, node_ip_address
if hostname, ok := data["hostname"].(string); ok && hostname != "" {
event.SourceHostname = hostname
}
case string(types.TASK_LIFECYCLE_EVENT):
// TaskLifecycleEvent has: worker_pid, worker_id, node_id
if pid, ok := data["workerPid"].(float64); ok {
event.SourcePid = int(pid)
}
case string(types.ACTOR_LIFECYCLE_EVENT):
// ActorLifecycleEvent has pid in StateTransition when ALIVE
if transitions, ok := data["stateTransitions"].([]any); ok && len(transitions) > 0 {
// Get the latest transition
if lastTransition, ok := transitions[len(transitions)-1].(map[string]any); ok {
if pid, ok := lastTransition["pid"].(float64); ok {
event.SourcePid = int(pid)
}
}
}
case string(types.DRIVER_JOB_DEFINITION_EVENT):
// DriverJobDefinitionEvent has: driver_pid, driver_node_id
if pid, ok := data["driverPid"].(float64); ok {
event.SourcePid = int(pid)
} else if pidStr, ok := data["driverPid"].(string); ok && pidStr != "" {
// Sometimes stored as string
var pidInt int
if _, err := fmt.Sscanf(pidStr, "%d", &pidInt); err == nil {
event.SourcePid = pidInt
}
}
}
}

// extractJobIDFromEvent extracts jobId from various event type payloads
func extractJobIDFromEvent(eventMap map[string]any) string {
// Check common nested structures for jobId
nestedFields := []string{
"taskDefinitionEvent", "taskLifecycleEvent",
"actorDefinitionEvent", "actorLifecycleEvent",
"driverJobDefinitionEvent", "driverJobLifecycleEvent",
"taskProfileEvents", "actorTaskDefinitionEvent",
}

for _, field := range nestedFields {
if nested, ok := eventMap[field].(map[string]any); ok {
if jobID, ok := nested["jobId"].(string); ok && jobID != "" {
return jobID
}
}
}
return "" // Will be grouped under "global"
}

// storeEvent unmarshals the event map into the correct actor/task struct and then stores it into the corresonding list
func (h *EventHandler) storeEvent(eventMap map[string]any) error {
eventTypeVal, ok := eventMap["eventType"]
Expand All @@ -207,6 +336,14 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
return fmt.Errorf("clusterName is not a string, got %T", clusterNameVal)
}

// ========== Store RayEvent for /events API ==========
if event := transformToEvent(eventMap); event != nil && event.EventID != "" {
jobID := extractJobIDFromEvent(eventMap)
clusterEventMap := h.ClusterEventMap.GetOrCreateEventMap(currentClusterName)
clusterEventMap.AddEvent(jobID, *event)
}
// ====================================================

logrus.Infof("current eventType: %v", eventType)
switch eventType {
case types.TASK_DEFINITION_EVENT:
Expand Down
51 changes: 51 additions & 0 deletions historyserver/pkg/eventserver/eventserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,54 @@ func TestMultipleReprocessingCycles(t *testing.T) {
}
}
}

func TestTransformToEventTimestamp(t *testing.T) {
// Test that transformToEvent correctly converts timestamp format
eventMap := map[string]any{
"eventId": "test-event-id",
"eventType": "TASK_DEFINITION_EVENT",
"sourceType": "CORE_WORKER",
"timestamp": "2026-01-16T19:22:49.414579427Z",
"severity": "INFO",
"taskDefinitionEvent": map[string]any{
"taskId": "task-123",
"jobId": "AQAAAA==",
},
}

event := transformToEvent(eventMap)

// Verify timestamp is converted to Unix milliseconds
expectedTimestamp := "1768591369414"
if event.Timestamp != expectedTimestamp {
t.Errorf("transformToEvent timestamp = %q, want %q", event.Timestamp, expectedTimestamp)
}

// Verify other fields are preserved
if event.EventID != "test-event-id" {
t.Errorf("EventID = %q, want %q", event.EventID, "test-event-id")
}
if event.SourceType != "CORE_WORKER" {
t.Errorf("SourceType = %q, want %q", event.SourceType, "CORE_WORKER")
}
}

func TestTransformToEventInvalidTimestamp(t *testing.T) {
// Test that transformToEvent handles invalid timestamp gracefully (consistent with Task/Actor/Job handling)
eventMap := map[string]any{
"eventId": "test-event-id",
"eventType": "TASK_DEFINITION_EVENT",
"timestamp": "invalid-timestamp",
}

event := transformToEvent(eventMap)

// Invalid timestamp should result in empty string (not an error)
if event.Timestamp != "" {
t.Errorf("transformToEvent with invalid timestamp should have empty Timestamp, got %q", event.Timestamp)
}
// Other fields should still be populated
if event.EventID != "test-event-id" {
t.Errorf("EventID = %q, want %q", event.EventID, "test-event-id")
}
}
149 changes: 149 additions & 0 deletions historyserver/pkg/eventserver/types/event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package types

import (
"sort"
"sync"
)

// EventType is the Ray event type.
type EventType string

Expand Down Expand Up @@ -33,3 +38,147 @@ var AllEventTypes = []EventType{
ACTOR_DEFINITION_EVENT,
ACTOR_LIFECYCLE_EVENT,
}

// MaxEventsPerJob is the maximum number of events to cache per job.
// This matches Ray Dashboard's MAX_EVENTS_TO_CACHE constant.
const MaxEventsPerJob = 10000

// Event represents an event returned by the /events API endpoint.
// Fields use camelCase JSON tags to match Ray Dashboard's format.
// This struct is derived from RayEvents stored in object storage.
type Event struct {
EventID string `json:"eventId"`
EventType string `json:"eventType"` // e.g., TASK_DEFINITION_EVENT
SourceType string `json:"sourceType"` // e.g., GCS, CORE_WORKER
Timestamp string `json:"timestamp"` // Unix milliseconds (e.g., "1768591369414")
Severity string `json:"severity"` // INFO, WARNING, ERROR
Message string `json:"message,omitempty"` // Usually empty in RayEvents
Label string `json:"label,omitempty"` // Same as EventType for filtering
NodeID string `json:"nodeId,omitempty"` // Node where event originated
SourceHostname string `json:"sourceHostname,omitempty"` // Extracted from NodeDefinitionEvent
SourcePid int `json:"sourcePid,omitempty"` // Extracted from lifecycle events
CustomFields map[string]any `json:"customFields,omitempty"` // Event-specific nested data
}

// EventMap stores events grouped by jobId with FIFO eviction.
// Key is jobId (base64 encoded) or "global" for cluster-wide events.
// This follows the same pattern as TaskMap and ActorMap.
type EventMap struct {
events map[string][]Event
mu sync.RWMutex
}

// NewEventMap creates a new EventMap instance.
func NewEventMap() *EventMap {
return &EventMap{
events: make(map[string][]Event),
}
}

// AddEvent adds an event to the map and enforces the per-job limit.
// Empty jobID is stored under "global" key.
func (e *EventMap) AddEvent(jobID string, event Event) {
e.mu.Lock()
defer e.mu.Unlock()

if jobID == "" {
jobID = "global"
}

e.events[jobID] = append(e.events[jobID], event)

// Enforce limit: keep newest events (FIFO - drop oldest)
if len(e.events[jobID]) > MaxEventsPerJob {
e.events[jobID] = e.events[jobID][len(e.events[jobID])-MaxEventsPerJob:]
}
}

// GetAll returns all events grouped by jobId, sorted by timestamp.
func (e *EventMap) GetAll() map[string][]Event {
e.mu.RLock()
defer e.mu.RUnlock()

result := make(map[string][]Event, len(e.events))
for jobID, events := range e.events {
sorted := make([]Event, len(events))
copy(sorted, events)
sortEventsByTimestamp(sorted)
result[jobID] = sorted
}
return result
}

// GetByJobID returns events for a specific job, sorted by timestamp.
func (e *EventMap) GetByJobID(jobID string) []Event {
e.mu.RLock()
defer e.mu.RUnlock()

events, ok := e.events[jobID]
if !ok {
return []Event{}
}

sorted := make([]Event, len(events))
copy(sorted, events)
sortEventsByTimestamp(sorted)
return sorted
}

// sortEventsByTimestamp sorts events in ascending order by timestamp.
// Timestamps are Unix milliseconds strings (e.g., "1768591369414").
func sortEventsByTimestamp(events []Event) {
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp < events[j].Timestamp
})
}

// ClusterEventMap stores EventMaps per cluster.
// This follows the same pattern as ClusterTaskMap and ClusterActorMap.
type ClusterEventMap struct {
clusterEvents map[string]*EventMap
mu sync.RWMutex
}

// NewClusterEventMap creates a new ClusterEventMap instance.
func NewClusterEventMap() *ClusterEventMap {
return &ClusterEventMap{
clusterEvents: make(map[string]*EventMap),
}
}

// GetOrCreateEventMap returns the EventMap for the given cluster, creating it if needed.
func (c *ClusterEventMap) GetOrCreateEventMap(clusterName string) *EventMap {
c.mu.Lock()
defer c.mu.Unlock()

if eventMap, ok := c.clusterEvents[clusterName]; ok {
return eventMap
}
eventMap := NewEventMap()
c.clusterEvents[clusterName] = eventMap
return eventMap
}

// GetAll returns all events for a cluster, grouped by jobId.
func (c *ClusterEventMap) GetAll(clusterName string) map[string][]Event {
c.mu.RLock()
eventMap, ok := c.clusterEvents[clusterName]
c.mu.RUnlock()

if !ok {
return map[string][]Event{}
}
return eventMap.GetAll()
}

// GetByJobID returns events for a specific job in a cluster.
func (c *ClusterEventMap) GetByJobID(clusterName, jobID string) []Event {
c.mu.RLock()
eventMap, ok := c.clusterEvents[clusterName]
c.mu.RUnlock()

if !ok {
return []Event{}
}
return eventMap.GetByJobID(jobID)
}
Loading
Loading