Skip to content

Commit 6c845b6

Browse files
committed
address timestamp comment
1 parent 7ddb657 commit 6c845b6

File tree

4 files changed

+49
-27
lines changed

4 files changed

+49
-27
lines changed

historyserver/pkg/eventserver/eventserver.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
191191
// transformToEvent converts a RayEvent map to the API response Event format.
192192
// This extracts common fields and puts event-type-specific data into customFields.
193193
// It also extracts sourceHostname and sourcePid from nested events where available.
194-
func transformToEvent(eventMap map[string]any) *types.Event {
194+
func transformToEvent(eventMap map[string]any) (*types.Event, error) {
195195
event := &types.Event{
196196
Severity: "INFO",
197197
CustomFields: make(map[string]any),
@@ -206,11 +206,11 @@ func transformToEvent(eventMap map[string]any) *types.Event {
206206
}
207207
// Convert ISO 8601 timestamp to Unix milliseconds to match Ray Dashboard format
208208
if v, ok := eventMap["timestamp"].(string); ok {
209-
if t, err := time.Parse(time.RFC3339Nano, v); err == nil {
210-
event.Timestamp = fmt.Sprintf("%d", t.UnixMilli())
211-
} else {
212-
event.Timestamp = v // Keep original if parsing fails
209+
t, err := time.Parse(time.RFC3339Nano, v)
210+
if err != nil {
211+
return nil, fmt.Errorf("failed to parse timestamp %q: %w", v, err)
213212
}
213+
event.Timestamp = fmt.Sprintf("%d", t.UnixMilli())
214214
}
215215
if v, ok := eventMap["severity"].(string); ok {
216216
event.Severity = v
@@ -254,7 +254,7 @@ func transformToEvent(eventMap map[string]any) *types.Event {
254254
}
255255
}
256256

257-
return event
257+
return event, nil
258258
}
259259

260260
// extractHostnameAndPid extracts sourceHostname and sourcePid from nested event data.
@@ -337,7 +337,11 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
337337
}
338338

339339
// ========== Store RayEvent for /events API ==========
340-
if event := transformToEvent(eventMap); event != nil && event.EventID != "" {
340+
event, err := transformToEvent(eventMap)
341+
if err != nil {
342+
return fmt.Errorf("failed to transform event: %w", err)
343+
}
344+
if event != nil && event.EventID != "" {
341345
jobID := extractJobIDFromEvent(eventMap)
342346
clusterEventMap := h.ClusterEventMap.GetOrCreateEventMap(currentClusterName)
343347
clusterEventMap.AddEvent(jobID, *event)

historyserver/pkg/eventserver/eventserver_test.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,10 @@ func TestTransformToEventTimestamp(t *testing.T) {
943943
},
944944
}
945945

946-
event := transformToEvent(eventMap)
946+
event, err := transformToEvent(eventMap)
947+
if err != nil {
948+
t.Fatalf("transformToEvent failed: %v", err)
949+
}
947950

948951
// Verify timestamp is converted to Unix milliseconds
949952
expectedTimestamp := "1768591369414"
@@ -959,3 +962,17 @@ func TestTransformToEventTimestamp(t *testing.T) {
959962
t.Errorf("SourceType = %q, want %q", event.SourceType, "CORE_WORKER")
960963
}
961964
}
965+
966+
func TestTransformToEventTimestampError(t *testing.T) {
967+
// Test that transformToEvent returns error for invalid timestamp
968+
eventMap := map[string]any{
969+
"eventId": "test-event-id",
970+
"eventType": "TASK_DEFINITION_EVENT",
971+
"timestamp": "invalid-timestamp",
972+
}
973+
974+
_, err := transformToEvent(eventMap)
975+
if err == nil {
976+
t.Error("transformToEvent should return error for invalid timestamp")
977+
}
978+
}

historyserver/pkg/eventserver/types/event.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Event struct {
5050
EventID string `json:"eventId"`
5151
EventType string `json:"eventType"` // e.g., TASK_DEFINITION_EVENT
5252
SourceType string `json:"sourceType"` // e.g., GCS, CORE_WORKER
53-
Timestamp string `json:"timestamp"` // ISO 8601 format
53+
Timestamp string `json:"timestamp"` // Unix milliseconds (e.g., "1768591369414")
5454
Severity string `json:"severity"` // INFO, WARNING, ERROR
5555
Message string `json:"message,omitempty"` // Usually empty in RayEvents
5656
Label string `json:"label,omitempty"` // Same as EventType for filtering
@@ -124,7 +124,8 @@ func (e *EventMap) GetByJobID(jobID string) []Event {
124124
return sorted
125125
}
126126

127-
// sortEventsByTimestamp sorts events in ascending order by timestamp (ISO 8601 string comparison).
127+
// sortEventsByTimestamp sorts events in ascending order by timestamp.
128+
// Timestamps are Unix milliseconds strings (e.g., "1768591369414").
128129
func sortEventsByTimestamp(events []Event) {
129130
sort.Slice(events, func(i, j int) bool {
130131
return events[i].Timestamp < events[j].Timestamp

historyserver/pkg/eventserver/types/event_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ func TestEventMap(t *testing.T) {
1717

1818
t.Run("AddEvent stores events by jobID", func(t *testing.T) {
1919
em := NewEventMap()
20-
em.AddEvent("job1", Event{EventID: "1", Timestamp: "2026-01-16T19:16:15Z"})
21-
em.AddEvent("job2", Event{EventID: "2", Timestamp: "2026-01-16T19:16:16Z"})
20+
em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"})
21+
em.AddEvent("job2", Event{EventID: "2", Timestamp: "1768591376000"})
2222

2323
all := em.GetAll()
2424
assert.Len(t, all, 2)
@@ -51,14 +51,14 @@ func TestEventMap(t *testing.T) {
5151

5252
t.Run("GetByJobID returns sorted events", func(t *testing.T) {
5353
em := NewEventMap()
54-
em.AddEvent("job1", Event{EventID: "1", Timestamp: "2026-01-16T19:16:30Z"})
55-
em.AddEvent("job1", Event{EventID: "2", Timestamp: "2026-01-16T19:16:10Z"})
56-
em.AddEvent("job1", Event{EventID: "3", Timestamp: "2026-01-16T19:16:20Z"})
54+
em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591390000"}) // 30s
55+
em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s
56+
em.AddEvent("job1", Event{EventID: "3", Timestamp: "1768591380000"}) // 20s
5757

5858
events := em.GetByJobID("job1")
59-
assert.Equal(t, "2026-01-16T19:16:10Z", events[0].Timestamp)
60-
assert.Equal(t, "2026-01-16T19:16:20Z", events[1].Timestamp)
61-
assert.Equal(t, "2026-01-16T19:16:30Z", events[2].Timestamp)
59+
assert.Equal(t, "1768591370000", events[0].Timestamp)
60+
assert.Equal(t, "1768591380000", events[1].Timestamp)
61+
assert.Equal(t, "1768591390000", events[2].Timestamp)
6262
})
6363

6464
t.Run("GetByJobID returns empty slice for nonexistent job", func(t *testing.T) {
@@ -70,8 +70,8 @@ func TestEventMap(t *testing.T) {
7070

7171
t.Run("GetAll returns sorted events for each job", func(t *testing.T) {
7272
em := NewEventMap()
73-
em.AddEvent("job1", Event{EventID: "1", Timestamp: "2026-01-16T19:16:20Z"})
74-
em.AddEvent("job1", Event{EventID: "2", Timestamp: "2026-01-16T19:16:10Z"})
73+
em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591380000"}) // 20s
74+
em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s
7575

7676
all := em.GetAll()
7777
assert.Equal(t, "2", all["job1"][0].EventID)
@@ -102,8 +102,8 @@ func TestClusterEventMap(t *testing.T) {
102102
t.Run("GetAll returns events for cluster", func(t *testing.T) {
103103
cm := NewClusterEventMap()
104104
em := cm.GetOrCreateEventMap("cluster1")
105-
em.AddEvent("job1", Event{EventID: "1", Timestamp: "2026-01-16T19:16:15Z"})
106-
em.AddEvent("", Event{EventID: "2", Timestamp: "2026-01-16T19:16:10Z"})
105+
em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"})
106+
em.AddEvent("", Event{EventID: "2", Timestamp: "1768591370000"})
107107

108108
all := cm.GetAll("cluster1")
109109
assert.Len(t, all, 2)
@@ -121,8 +121,8 @@ func TestClusterEventMap(t *testing.T) {
121121
t.Run("GetByJobID returns events for cluster and job", func(t *testing.T) {
122122
cm := NewClusterEventMap()
123123
em := cm.GetOrCreateEventMap("cluster1")
124-
em.AddEvent("job1", Event{EventID: "1", Timestamp: "2026-01-16T19:16:15Z"})
125-
em.AddEvent("job1", Event{EventID: "2", Timestamp: "2026-01-16T19:16:10Z"})
124+
em.AddEvent("job1", Event{EventID: "1", Timestamp: "1768591375000"}) // 15s
125+
em.AddEvent("job1", Event{EventID: "2", Timestamp: "1768591370000"}) // 10s
126126

127127
events := cm.GetByJobID("cluster1", "job1")
128128
assert.Len(t, events, 2)
@@ -169,9 +169,9 @@ func TestEvent(t *testing.T) {
169169

170170
func TestSortEventsByTimestamp(t *testing.T) {
171171
events := []Event{
172-
{EventID: "3", Timestamp: "2026-01-16T19:16:30Z"},
173-
{EventID: "1", Timestamp: "2026-01-16T19:16:10Z"},
174-
{EventID: "2", Timestamp: "2026-01-16T19:16:20Z"},
172+
{EventID: "3", Timestamp: "1768591390000"}, // 30s
173+
{EventID: "1", Timestamp: "1768591370000"}, // 10s
174+
{EventID: "2", Timestamp: "1768591380000"}, // 20s
175175
}
176176

177177
sortEventsByTimestamp(events)

0 commit comments

Comments
 (0)