diff --git a/historyserver/pkg/eventserver/eventserver.go b/historyserver/pkg/eventserver/eventserver.go index 88a09536f23..fbea47583c2 100644 --- a/historyserver/pkg/eventserver/eventserver.go +++ b/historyserver/pkg/eventserver/eventserver.go @@ -23,6 +23,7 @@ type EventHandler struct { ClusterTaskMap *types.ClusterTaskMap ClusterActorMap *types.ClusterActorMap ClusterJobMap *types.ClusterJobMap + ClusterEventMap *types.ClusterEventMap // For /events API } var eventFilePattern = regexp.MustCompile(`-\d{4}-\d{2}-\d{2}-\d{2}$`) @@ -48,6 +49,7 @@ func NewEventHandler(reader storage.StorageReader) *EventHandler { ClusterJobMap: &types.ClusterJobMap{ ClusterJobMap: make(map[string]*types.JobMap), }, + ClusterEventMap: types.NewClusterEventMap(), } } @@ -186,6 +188,133 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error { return nil } +// transformToEvent converts a RayEvent map to the API response Event format. +// This extracts common fields and puts event-type-specific data into customFields. +// It also extracts sourceHostname and sourcePid from nested events where available. +func transformToEvent(eventMap map[string]any) *types.Event { + event := &types.Event{ + Severity: "INFO", + CustomFields: make(map[string]any), + } + + // Extract common fields (already camelCase) + if v, ok := eventMap["eventId"].(string); ok { + event.EventID = v + } + if v, ok := eventMap["sourceType"].(string); ok { + event.SourceType = v + } + // Convert ISO 8601 timestamp to Unix milliseconds to match Ray Dashboard format + if v, ok := eventMap["timestamp"].(string); ok { + if t, err := time.Parse(time.RFC3339Nano, v); err == nil { + event.Timestamp = fmt.Sprintf("%d", t.UnixMilli()) + } else { + logrus.Warnf("failed to parse timestamp %q: %v", v, err) + } + } + if v, ok := eventMap["severity"].(string); ok { + event.Severity = v + } + if v, ok := eventMap["message"].(string); ok { + event.Message = v + } + if v, ok := eventMap["sessionName"].(string); ok { + event.CustomFields["sessionName"] = v + } + // Extract nodeId from base RayEvent (field #18) + if v, ok := eventMap["nodeId"].(string); ok { + event.NodeID = v + } + + // Extract eventType and set as Label for filtering + eventType, _ := eventMap["eventType"].(string) + event.EventType = eventType + event.Label = eventType // Use eventType as label for Dashboard compatibility + + // Map eventType to its corresponding nested data field + eventDataFields := map[string]string{ + string(types.TASK_DEFINITION_EVENT): "taskDefinitionEvent", + string(types.TASK_LIFECYCLE_EVENT): "taskLifecycleEvent", + string(types.TASK_PROFILE_EVENT): "taskProfileEvents", + string(types.ACTOR_DEFINITION_EVENT): "actorDefinitionEvent", + string(types.ACTOR_LIFECYCLE_EVENT): "actorLifecycleEvent", + string(types.ACTOR_TASK_DEFINITION_EVENT): "actorTaskDefinitionEvent", + string(types.NODE_DEFINITION_EVENT): "nodeDefinitionEvent", + string(types.NODE_LIFECYCLE_EVENT): "nodeLifecycleEvent", + string(types.DRIVER_JOB_DEFINITION_EVENT): "driverJobDefinitionEvent", + string(types.DRIVER_JOB_LIFECYCLE_EVENT): "driverJobLifecycleEvent", + } + + if dataField, ok := eventDataFields[eventType]; ok { + if data, ok := eventMap[dataField].(map[string]any); ok { + event.CustomFields[dataField] = data + + // Extract sourceHostname and sourcePid from nested events where available + extractHostnameAndPid(event, eventType, data) + } + } + + return event +} + +// extractHostnameAndPid extracts sourceHostname and sourcePid from nested event data. +// These fields are available in specific event types as discovered from Ray protos. +func extractHostnameAndPid(event *types.Event, eventType string, data map[string]any) { + switch eventType { + case string(types.NODE_DEFINITION_EVENT): + // NodeDefinitionEvent has: hostname, node_name, node_ip_address + if hostname, ok := data["hostname"].(string); ok && hostname != "" { + event.SourceHostname = hostname + } + case string(types.TASK_LIFECYCLE_EVENT): + // TaskLifecycleEvent has: worker_pid, worker_id, node_id + if pid, ok := data["workerPid"].(float64); ok { + event.SourcePid = int(pid) + } + case string(types.ACTOR_LIFECYCLE_EVENT): + // ActorLifecycleEvent has pid in StateTransition when ALIVE + if transitions, ok := data["stateTransitions"].([]any); ok && len(transitions) > 0 { + // Get the latest transition + if lastTransition, ok := transitions[len(transitions)-1].(map[string]any); ok { + if pid, ok := lastTransition["pid"].(float64); ok { + event.SourcePid = int(pid) + } + } + } + case string(types.DRIVER_JOB_DEFINITION_EVENT): + // DriverJobDefinitionEvent has: driver_pid, driver_node_id + if pid, ok := data["driverPid"].(float64); ok { + event.SourcePid = int(pid) + } else if pidStr, ok := data["driverPid"].(string); ok && pidStr != "" { + // Sometimes stored as string + var pidInt int + if _, err := fmt.Sscanf(pidStr, "%d", &pidInt); err == nil { + event.SourcePid = pidInt + } + } + } +} + +// extractJobIDFromEvent extracts jobId from various event type payloads +func extractJobIDFromEvent(eventMap map[string]any) string { + // Check common nested structures for jobId + nestedFields := []string{ + "taskDefinitionEvent", "taskLifecycleEvent", + "actorDefinitionEvent", "actorLifecycleEvent", + "driverJobDefinitionEvent", "driverJobLifecycleEvent", + "taskProfileEvents", "actorTaskDefinitionEvent", + } + + for _, field := range nestedFields { + if nested, ok := eventMap[field].(map[string]any); ok { + if jobID, ok := nested["jobId"].(string); ok && jobID != "" { + return jobID + } + } + } + return "" // Will be grouped under "global" +} + // storeEvent unmarshals the event map into the correct actor/task struct and then stores it into the corresonding list func (h *EventHandler) storeEvent(eventMap map[string]any) error { eventTypeVal, ok := eventMap["eventType"] @@ -207,6 +336,14 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error { return fmt.Errorf("clusterName is not a string, got %T", clusterNameVal) } + // ========== Store RayEvent for /events API ========== + if event := transformToEvent(eventMap); event != nil && event.EventID != "" { + jobID := extractJobIDFromEvent(eventMap) + clusterEventMap := h.ClusterEventMap.GetOrCreateEventMap(currentClusterName) + clusterEventMap.AddEvent(jobID, *event) + } + // ==================================================== + logrus.Infof("current eventType: %v", eventType) switch eventType { case types.TASK_DEFINITION_EVENT: diff --git a/historyserver/pkg/eventserver/eventserver_test.go b/historyserver/pkg/eventserver/eventserver_test.go index 9e63ecd6f64..30c488c7b52 100644 --- a/historyserver/pkg/eventserver/eventserver_test.go +++ b/historyserver/pkg/eventserver/eventserver_test.go @@ -928,3 +928,54 @@ func TestMultipleReprocessingCycles(t *testing.T) { } } } + +func TestTransformToEventTimestamp(t *testing.T) { + // Test that transformToEvent correctly converts timestamp format + eventMap := map[string]any{ + "eventId": "test-event-id", + "eventType": "TASK_DEFINITION_EVENT", + "sourceType": "CORE_WORKER", + "timestamp": "2026-01-16T19:22:49.414579427Z", + "severity": "INFO", + "taskDefinitionEvent": map[string]any{ + "taskId": "task-123", + "jobId": "AQAAAA==", + }, + } + + event := transformToEvent(eventMap) + + // Verify timestamp is converted to Unix milliseconds + expectedTimestamp := "1768591369414" + if event.Timestamp != expectedTimestamp { + t.Errorf("transformToEvent timestamp = %q, want %q", event.Timestamp, expectedTimestamp) + } + + // Verify other fields are preserved + if event.EventID != "test-event-id" { + t.Errorf("EventID = %q, want %q", event.EventID, "test-event-id") + } + if event.SourceType != "CORE_WORKER" { + t.Errorf("SourceType = %q, want %q", event.SourceType, "CORE_WORKER") + } +} + +func TestTransformToEventInvalidTimestamp(t *testing.T) { + // Test that transformToEvent handles invalid timestamp gracefully (consistent with Task/Actor/Job handling) + eventMap := map[string]any{ + "eventId": "test-event-id", + "eventType": "TASK_DEFINITION_EVENT", + "timestamp": "invalid-timestamp", + } + + event := transformToEvent(eventMap) + + // Invalid timestamp should result in empty string (not an error) + if event.Timestamp != "" { + t.Errorf("transformToEvent with invalid timestamp should have empty Timestamp, got %q", event.Timestamp) + } + // Other fields should still be populated + if event.EventID != "test-event-id" { + t.Errorf("EventID = %q, want %q", event.EventID, "test-event-id") + } +} diff --git a/historyserver/pkg/eventserver/types/event.go b/historyserver/pkg/eventserver/types/event.go index e38616fdedb..657543642ea 100644 --- a/historyserver/pkg/eventserver/types/event.go +++ b/historyserver/pkg/eventserver/types/event.go @@ -1,5 +1,10 @@ package types +import ( + "sort" + "sync" +) + // EventType is the Ray event type. type EventType string @@ -33,3 +38,147 @@ var AllEventTypes = []EventType{ ACTOR_DEFINITION_EVENT, ACTOR_LIFECYCLE_EVENT, } + +// MaxEventsPerJob is the maximum number of events to cache per job. +// This matches Ray Dashboard's MAX_EVENTS_TO_CACHE constant. +const MaxEventsPerJob = 10000 + +// Event represents an event returned by the /events API endpoint. +// Fields use camelCase JSON tags to match Ray Dashboard's format. +// This struct is derived from RayEvents stored in object storage. +type Event struct { + EventID string `json:"eventId"` + EventType string `json:"eventType"` // e.g., TASK_DEFINITION_EVENT + SourceType string `json:"sourceType"` // e.g., GCS, CORE_WORKER + Timestamp string `json:"timestamp"` // Unix milliseconds (e.g., "1768591369414") + Severity string `json:"severity"` // INFO, WARNING, ERROR + Message string `json:"message,omitempty"` // Usually empty in RayEvents + Label string `json:"label,omitempty"` // Same as EventType for filtering + NodeID string `json:"nodeId,omitempty"` // Node where event originated + SourceHostname string `json:"sourceHostname,omitempty"` // Extracted from NodeDefinitionEvent + SourcePid int `json:"sourcePid,omitempty"` // Extracted from lifecycle events + CustomFields map[string]any `json:"customFields,omitempty"` // Event-specific nested data +} + +// EventMap stores events grouped by jobId with FIFO eviction. +// Key is jobId (base64 encoded) or "global" for cluster-wide events. +// This follows the same pattern as TaskMap and ActorMap. +type EventMap struct { + events map[string][]Event + mu sync.RWMutex +} + +// NewEventMap creates a new EventMap instance. +func NewEventMap() *EventMap { + return &EventMap{ + events: make(map[string][]Event), + } +} + +// AddEvent adds an event to the map and enforces the per-job limit. +// Empty jobID is stored under "global" key. +func (e *EventMap) AddEvent(jobID string, event Event) { + e.mu.Lock() + defer e.mu.Unlock() + + if jobID == "" { + jobID = "global" + } + + e.events[jobID] = append(e.events[jobID], event) + + // Enforce limit: keep newest events (FIFO - drop oldest) + if len(e.events[jobID]) > MaxEventsPerJob { + e.events[jobID] = e.events[jobID][len(e.events[jobID])-MaxEventsPerJob:] + } +} + +// GetAll returns all events grouped by jobId, sorted by timestamp. +func (e *EventMap) GetAll() map[string][]Event { + e.mu.RLock() + defer e.mu.RUnlock() + + result := make(map[string][]Event, len(e.events)) + for jobID, events := range e.events { + sorted := make([]Event, len(events)) + copy(sorted, events) + sortEventsByTimestamp(sorted) + result[jobID] = sorted + } + return result +} + +// GetByJobID returns events for a specific job, sorted by timestamp. +func (e *EventMap) GetByJobID(jobID string) []Event { + e.mu.RLock() + defer e.mu.RUnlock() + + events, ok := e.events[jobID] + if !ok { + return []Event{} + } + + sorted := make([]Event, len(events)) + copy(sorted, events) + sortEventsByTimestamp(sorted) + return sorted +} + +// sortEventsByTimestamp sorts events in ascending order by timestamp. +// Timestamps are Unix milliseconds strings (e.g., "1768591369414"). +func sortEventsByTimestamp(events []Event) { + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp < events[j].Timestamp + }) +} + +// ClusterEventMap stores EventMaps per cluster. +// This follows the same pattern as ClusterTaskMap and ClusterActorMap. +type ClusterEventMap struct { + clusterEvents map[string]*EventMap + mu sync.RWMutex +} + +// NewClusterEventMap creates a new ClusterEventMap instance. +func NewClusterEventMap() *ClusterEventMap { + return &ClusterEventMap{ + clusterEvents: make(map[string]*EventMap), + } +} + +// GetOrCreateEventMap returns the EventMap for the given cluster, creating it if needed. +func (c *ClusterEventMap) GetOrCreateEventMap(clusterName string) *EventMap { + c.mu.Lock() + defer c.mu.Unlock() + + if eventMap, ok := c.clusterEvents[clusterName]; ok { + return eventMap + } + eventMap := NewEventMap() + c.clusterEvents[clusterName] = eventMap + return eventMap +} + +// GetAll returns all events for a cluster, grouped by jobId. +func (c *ClusterEventMap) GetAll(clusterName string) map[string][]Event { + c.mu.RLock() + eventMap, ok := c.clusterEvents[clusterName] + c.mu.RUnlock() + + if !ok { + return map[string][]Event{} + } + return eventMap.GetAll() +} + +// GetByJobID returns events for a specific job in a cluster. +func (c *ClusterEventMap) GetByJobID(clusterName, jobID string) []Event { + c.mu.RLock() + eventMap, ok := c.clusterEvents[clusterName] + c.mu.RUnlock() + + if !ok { + return []Event{} + } + return eventMap.GetByJobID(jobID) +} diff --git a/historyserver/pkg/eventserver/types/event_test.go b/historyserver/pkg/eventserver/types/event_test.go new file mode 100644 index 00000000000..4c3a8fc1f51 --- /dev/null +++ b/historyserver/pkg/eventserver/types/event_test.go @@ -0,0 +1,182 @@ +package types + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventMap(t *testing.T) { + t.Run("NewEventMap creates empty map", func(t *testing.T) { + em := NewEventMap() + require.NotNil(t, em) + assert.Empty(t, em.GetAll()) + }) + + t.Run("AddEvent stores events by jobID", func(t *testing.T) { + em := NewEventMap() + em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"}) + em.AddEvent("job2", Event{EventID: "2", Timestamp: "1768591376000"}) + + all := em.GetAll() + assert.Len(t, all, 2) + assert.Len(t, all["job1"], 1) + assert.Len(t, all["job2"], 1) + }) + + t.Run("AddEvent with empty jobID stores under global", func(t *testing.T) { + em := NewEventMap() + em.AddEvent("", Event{EventID: "1"}) + em.AddEvent("", Event{EventID: "2"}) + + all := em.GetAll() + assert.Len(t, all["global"], 2) + _, hasEmpty := all[""] + assert.False(t, hasEmpty) + }) + + t.Run("AddEvent enforces MaxEventsPerJob limit", func(t *testing.T) { + em := NewEventMap() + for i := 0; i < MaxEventsPerJob+100; i++ { + em.AddEvent("job1", Event{EventID: fmt.Sprintf("%d", i)}) + } + + events := em.GetByJobID("job1") + assert.Len(t, events, MaxEventsPerJob) + // Verify oldest events are dropped (FIFO) + assert.Equal(t, "100", events[0].EventID) + }) + + t.Run("GetByJobID returns sorted events", func(t *testing.T) { + em := NewEventMap() + em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591390000"}) // 30s + em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s + em.AddEvent("job1", Event{EventID: "3", Timestamp: "1768591380000"}) // 20s + + events := em.GetByJobID("job1") + assert.Equal(t, "1768591370000", events[0].Timestamp) + assert.Equal(t, "1768591380000", events[1].Timestamp) + assert.Equal(t, "1768591390000", events[2].Timestamp) + }) + + t.Run("GetByJobID returns empty slice for nonexistent job", func(t *testing.T) { + em := NewEventMap() + events := em.GetByJobID("nonexistent") + require.NotNil(t, events) + assert.Empty(t, events) + }) + + t.Run("GetAll returns sorted events for each job", func(t *testing.T) { + em := NewEventMap() + em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591380000"}) // 20s + em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s + + all := em.GetAll() + assert.Equal(t, "2", all["job1"][0].EventID) + assert.Equal(t, "1", all["job1"][1].EventID) + }) +} + +func TestClusterEventMap(t *testing.T) { + t.Run("NewClusterEventMap creates empty map", func(t *testing.T) { + cm := NewClusterEventMap() + require.NotNil(t, cm) + assert.Empty(t, cm.GetAll("any")) + }) + + t.Run("GetOrCreateEventMap creates and reuses EventMap", func(t *testing.T) { + cm := NewClusterEventMap() + + em1 := cm.GetOrCreateEventMap("cluster1") + require.NotNil(t, em1) + + em2 := cm.GetOrCreateEventMap("cluster1") + assert.Same(t, em1, em2, "should return same instance") + + em3 := cm.GetOrCreateEventMap("cluster2") + assert.NotSame(t, em1, em3, "different cluster should get different instance") + }) + + t.Run("GetAll returns events for cluster", func(t *testing.T) { + cm := NewClusterEventMap() + em := cm.GetOrCreateEventMap("cluster1") + em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"}) + em.AddEvent("", Event{EventID: "2", Timestamp: "1768591370000"}) + + all := cm.GetAll("cluster1") + assert.Len(t, all, 2) + assert.Contains(t, all, "job1") + assert.Contains(t, all, "global") + }) + + t.Run("GetAll returns empty map for nonexistent cluster", func(t *testing.T) { + cm := NewClusterEventMap() + all := cm.GetAll("nonexistent") + require.NotNil(t, all) + assert.Empty(t, all) + }) + + t.Run("GetByJobID returns events for cluster and job", func(t *testing.T) { + cm := NewClusterEventMap() + em := cm.GetOrCreateEventMap("cluster1") + em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"}) // 15s + em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s + + events := cm.GetByJobID("cluster1", "job1") + assert.Len(t, events, 2) + assert.Equal(t, "2", events[0].EventID) + assert.Equal(t, "1", events[1].EventID) + }) + + t.Run("GetByJobID returns empty for nonexistent cluster or job", func(t *testing.T) { + cm := NewClusterEventMap() + cm.GetOrCreateEventMap("cluster1") + + events := cm.GetByJobID("nonexistent", "job1") + assert.Empty(t, events) + + events = cm.GetByJobID("cluster1", "nonexistent") + assert.Empty(t, events) + }) +} + +func TestEvent(t *testing.T) { + event := Event{ + EventID: "LrBbQwLLTK2+SSsBX+AG4EDK02CAVnvtVMD2MA==", + EventType: "ACTOR_DEFINITION_EVENT", + SourceType: "GCS", + Timestamp: "2026-01-16T19:16:15.210327633Z", + Severity: "INFO", + Label: "ACTOR_DEFINITION_EVENT", + NodeID: "531134a446a4f1b4d07301c0ee09b0ca32593dbb", + SourceHostname: "ray-head-node-0", + SourcePid: 12345, + CustomFields: map[string]any{ + "sessionName": "session_2026-01-16_11-06-54_467309_1", + }, + } + + assert.Equal(t, "LrBbQwLLTK2+SSsBX+AG4EDK02CAVnvtVMD2MA==", event.EventID) + assert.Equal(t, "ACTOR_DEFINITION_EVENT", event.EventType) + assert.Equal(t, "GCS", event.SourceType) + assert.Equal(t, "INFO", event.Severity) + assert.Equal(t, "ray-head-node-0", event.SourceHostname) + assert.Equal(t, 12345, event.SourcePid) + assert.NotNil(t, event.CustomFields["sessionName"]) +} + +func TestSortEventsByTimestamp(t *testing.T) { + events := []Event{ + {EventID: "3", Timestamp: "1768591390000"}, // 30s + {EventID: "1", Timestamp: "1768591370000"}, // 10s + {EventID: "2", Timestamp: "1768591380000"}, // 20s + } + + sortEventsByTimestamp(events) + + assert.Equal(t, "1", events[0].EventID) + assert.Equal(t, "2", events[1].EventID) + assert.Equal(t, "3", events[2].EventID) +} diff --git a/historyserver/pkg/historyserver/router.go b/historyserver/pkg/historyserver/router.go index e705d191f7a..638d7af2749 100644 --- a/historyserver/pkg/historyserver/router.go +++ b/historyserver/pkg/historyserver/router.go @@ -322,13 +322,49 @@ func (s *ServerHandler) getNodes(req *restful.Request, resp *restful.Response) { } func (s *ServerHandler) getEvents(req *restful.Request, resp *restful.Response) { + clusterName := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string) + clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string) sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string) + + // Live cluster: proxy to Ray Dashboard if sessionName == "live" { s.redirectRequest(req, resp) return } - // Return "not yet supported" for historical data - resp.WriteErrorString(http.StatusNotImplemented, "Historical events not yet supported") + + clusterSessionKey := utils.BuildClusterSessionKey(clusterName, clusterNamespace, sessionName) + jobID := req.QueryParameter("job_id") + + var response map[string]any + + if jobID != "" { + events := s.eventHandler.ClusterEventMap.GetByJobID(clusterSessionKey, jobID) + response = map[string]any{ + "result": true, + "msg": "Job events fetched.", + "data": map[string]any{ + "jobId": jobID, + "events": events, + }, + } + } else { + events := s.eventHandler.ClusterEventMap.GetAll(clusterSessionKey) + response = map[string]any{ + "result": true, + "msg": "All events fetched.", + "data": map[string]any{ + "events": events, + }, + } + } + + respData, err := json.Marshal(response) + if err != nil { + logrus.Errorf("Failed to marshal events response: %v", err) + resp.WriteErrorString(http.StatusInternalServerError, err.Error()) + return + } + resp.Write(respData) } func (s *ServerHandler) getPrometheusHealth(req *restful.Request, resp *restful.Response) { diff --git a/historyserver/test/e2e/historyserver_test.go b/historyserver/test/e2e/historyserver_test.go index 449340ac6ae..d9053d0b206 100644 --- a/historyserver/test/e2e/historyserver_test.go +++ b/historyserver/test/e2e/historyserver_test.go @@ -48,6 +48,14 @@ func TestHistoryServer(t *testing.T) { name: "/v0/logs/file endpoint (dead cluster)", testFunc: testLogFileEndpointDeadCluster, }, + { + name: "/events endpoint (live cluster)", + testFunc: testEventsEndpointLiveCluster, + }, + { + name: "/events endpoint (dead cluster)", + testFunc: testEventsEndpointDeadCluster, + }, } for _, tt := range tests { @@ -326,3 +334,141 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names DeleteS3Bucket(test, g, s3Client) LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed") } + +// testEventsEndpointLiveCluster verifies that the /events endpoint works for a live cluster. +// For live clusters, the request is proxied to Ray Dashboard. +func testEventsEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := PrepareTestEnv(test, g, namespace, s3Client) + ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster) + ApplyHistoryServer(test, g, namespace) + historyServerURL := GetHistoryServerURL(test, g, namespace) + + clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name) + g.Expect(clusterInfo.SessionName).To(Equal(LiveSessionName), "Live cluster should have sessionName='live'") + + client := CreateHTTPClientWithCookieJar(g) + setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName) + + test.T().Run("should return events", func(t *testing.T) { + g := NewWithT(t) + g.Eventually(func(gg Gomega) { + resp, err := client.Get(historyServerURL + "/events") + gg.Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + gg.Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + body, err := io.ReadAll(resp.Body) + gg.Expect(err).NotTo(HaveOccurred()) + + var result map[string]any + err = json.Unmarshal(body, &result) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(result["result"]).To(Equal(true)) + + // Verify data.events exists + data, ok := result["data"].(map[string]any) + gg.Expect(ok).To(BeTrue(), "response should have 'data' field") + _, ok = data["events"].(map[string]any) + gg.Expect(ok).To(BeTrue(), "data should have 'events' field") + }, TestTimeoutShort).Should(Succeed()) + }) + + DeleteS3Bucket(test, g, s3Client) + LogWithTimestamp(test.T(), "Live cluster events endpoint test completed") +} + +// testEventsEndpointDeadCluster verifies that the /events endpoint works for a dead cluster. +// Events are retrieved from the EventHandler's in-memory ClusterEventMap. +func testEventsEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := PrepareTestEnv(test, g, namespace, s3Client) + ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster) + + // Delete RayCluster to trigger event upload + err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name) + + // Wait for cluster to be fully deleted + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayCluster.Name) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + + ApplyHistoryServer(test, g, namespace) + historyServerURL := GetHistoryServerURL(test, g, namespace) + + clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name) + g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName)) + + client := CreateHTTPClientWithCookieJar(g) + setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName) + + test.T().Run("should return events from storage", func(t *testing.T) { + g := NewWithT(t) + g.Eventually(func(gg Gomega) { + resp, err := client.Get(historyServerURL + "/events") + gg.Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + gg.Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + body, err := io.ReadAll(resp.Body) + gg.Expect(err).NotTo(HaveOccurred()) + + var result map[string]any + err = json.Unmarshal(body, &result) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(result["result"]).To(Equal(true)) + gg.Expect(result["msg"]).To(Equal("All events fetched.")) + + // Verify data.events exists (may be empty if no events were collected) + data, ok := result["data"].(map[string]any) + gg.Expect(ok).To(BeTrue(), "response should have 'data' field") + events, ok := data["events"].(map[string]any) + gg.Expect(ok).To(BeTrue(), "data should have 'events' field") + + // If we have events, verify their structure + for jobID, jobEvents := range events { + eventsList, ok := jobEvents.([]any) + gg.Expect(ok).To(BeTrue(), "events for job %s should be an array", jobID) + for _, event := range eventsList { + eventMap, ok := event.(map[string]any) + gg.Expect(ok).To(BeTrue(), "each event should be an object") + // Verify required fields exist + gg.Expect(eventMap).To(HaveKey("eventId")) + gg.Expect(eventMap).To(HaveKey("eventType")) + gg.Expect(eventMap).To(HaveKey("timestamp")) + } + } + }, TestTimeoutMedium).Should(Succeed()) + }) + + test.T().Run("should support job_id filter", func(t *testing.T) { + g := NewWithT(t) + g.Eventually(func(gg Gomega) { + // Use a non-existent job_id to test the filter + resp, err := client.Get(historyServerURL + "/events?job_id=nonexistent") + gg.Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + gg.Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + body, err := io.ReadAll(resp.Body) + gg.Expect(err).NotTo(HaveOccurred()) + + var result map[string]any + err = json.Unmarshal(body, &result) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(result["result"]).To(Equal(true)) + gg.Expect(result["msg"]).To(Equal("Job events fetched.")) + + data, ok := result["data"].(map[string]any) + gg.Expect(ok).To(BeTrue()) + gg.Expect(data["jobId"]).To(Equal("nonexistent")) + events, ok := data["events"].([]any) + gg.Expect(ok).To(BeTrue()) + gg.Expect(events).To(BeEmpty()) // No events for non-existent job + }, TestTimeoutShort).Should(Succeed()) + }) + + DeleteS3Bucket(test, g, s3Client) + LogWithTimestamp(test.T(), "Dead cluster events endpoint tests completed") +} diff --git a/historyserver/test/support/historyserver.go b/historyserver/test/support/historyserver.go index 67b5bdaff3f..4443d14c9dd 100644 --- a/historyserver/test/support/historyserver.go +++ b/historyserver/test/support/historyserver.go @@ -54,7 +54,6 @@ const ( // - /logical/actors/{actor_id} // // Excluded endpoints that are not yet implemented: -// - /events // - /api/cluster_status // - /api/prometheus_health // - /api/data/datasets/{job_id} @@ -66,6 +65,7 @@ var HistoryServerEndpoints = []string{ "/api/v0/tasks", "/api/v0/tasks/summarize", "/logical/actors", + "/events", } // HistoryServerEndpointGrafanaHealth is a standalone constant