Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8b84e0e
feat: Add node endpoint /nodes POC
JiangJiaWei1103 Jan 24, 2026
fd5e8ff
docs: Clarify router nodes
JiangJiaWei1103 Jan 24, 2026
7dc3f85
feat: Support dashboard API-compatible response
JiangJiaWei1103 Jan 25, 2026
d093b88
refactor: Clean event processing logic
JiangJiaWei1103 Jan 25, 2026
25f87bf
refactor: Extract lifecycle dedup and sorting helper
JiangJiaWei1103 Jan 25, 2026
7f5a5b6
feat: Support more node fields
JiangJiaWei1103 Jan 25, 2026
8d7ff9a
Support local dev outside cluster (should be reverted)
JiangJiaWei1103 Jan 25, 2026
0a1a558
test: Add e2e tests for /nodes endpoint for both live and dead
JiangJiaWei1103 Jan 25, 2026
73809f4
feat: Support base64 to hex conversion
JiangJiaWei1103 Jan 26, 2026
46bee4e
feat: Support /nodes/node_id endpoint with shared formatters
JiangJiaWei1103 Jan 26, 2026
bbbccd8
docs: Remove redundant comments
JiangJiaWei1103 Jan 26, 2026
eb542fb
test: Add e2e tests for /nodes/node_id endpoint for both live and dead
JiangJiaWei1103 Jan 26, 2026
147da88
Revert "Support local dev outside cluster (should be reverted)"
JiangJiaWei1103 Jan 26, 2026
492a011
fix: Use test namespace instead of default
JiangJiaWei1103 Jan 26, 2026
06240b6
fix: Check empty node ID before conversion
JiangJiaWei1103 Jan 26, 2026
ae4a75a
fix: Fix deepcopy to prevent race conditions
JiangJiaWei1103 Jan 26, 2026
f9a39a2
Merge branch 'my-master' into epic-4374/add-node-endpoint
JiangJiaWei1103 Jan 27, 2026
2589818
Support local dev outside cluster (should be reverted)
JiangJiaWei1103 Jan 27, 2026
05926f1
feat: Add more fields
JiangJiaWei1103 Jan 27, 2026
450c50d
test: Add new test fields
JiangJiaWei1103 Jan 27, 2026
6d2a660
feat: Add storeStats
JiangJiaWei1103 Jan 27, 2026
965c368
feat: Add resource string
JiangJiaWei1103 Jan 27, 2026
6f72a63
Revert "Support local dev outside cluster (should be reverted)"
JiangJiaWei1103 Jan 27, 2026
03c21d3
fix: Follow iter order by using slice not map
JiangJiaWei1103 Jan 27, 2026
e582fe5
Merge branch 'my-master' into epic-4374/add-node-endpoint
JiangJiaWei1103 Jan 28, 2026
16f490d
docs: Add docs
JiangJiaWei1103 Jan 28, 2026
bee9a4f
test: Add back log file e2e tests
JiangJiaWei1103 Jan 28, 2026
357d774
fix: Handle NODE_LIFECYCLE_EVENT without any state transitions
JiangJiaWei1103 Jan 28, 2026
6d2dc71
Merge branch 'my-master' into epic-4374/add-node-endpoint
JiangJiaWei1103 Jan 30, 2026
721a4d7
fix: Fix merging def conflicts
JiangJiaWei1103 Jan 30, 2026
f8574cd
refactor: Add a helper to get node by node id
JiangJiaWei1103 Jan 30, 2026
00fa74e
Remove .env
JiangJiaWei1103 Jan 30, 2026
325beea
fix: Construct resource string only for alive
JiangJiaWei1103 Jan 30, 2026
bb7b067
[history server] use sessionName in task, actor, and job endpoints (#…
Future-Outlier Jan 30, 2026
72101a2
Merge remote-tracking branch 'upstream/master' into epic-4374/add-nod…
Future-Outlier Jan 30, 2026
5e2afa7
fix merge bug
Future-Outlier Jan 30, 2026
63e4be6
fix node endpoints
Future-Outlier Jan 30, 2026
ce06cdf
fix: Fix get one node summary API schema
JiangJiaWei1103 Jan 30, 2026
9f1cb93
update
Future-Outlier Jan 30, 2026
83b474a
fix TODO
Future-Outlier Jan 30, 2026
4ba9f9e
Merge branch 'my-master' into epic-4374/add-node-endpoint
JiangJiaWei1103 Feb 3, 2026
3788bc8
Merge branch 'my-master' into epic-4374/add-node-endpoint
JiangJiaWei1103 Feb 5, 2026
c9a4050
fix: Check rsc str for non-empty string
JiangJiaWei1103 Feb 5, 2026
79684af
fix: Avoid missing key
JiangJiaWei1103 Feb 5, 2026
7fcd1db
fix: Sort resource keys first
JiangJiaWei1103 Feb 5, 2026
9210d0d
test: Verify hostnamelist view and clean code
JiangJiaWei1103 Feb 5, 2026
0847170
refactor: Use existing helper to get cluster session key
JiangJiaWei1103 Feb 6, 2026
21b1d58
refactor: Use existing helper to get cluster session key
JiangJiaWei1103 Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 108 additions & 3 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type EventHandler struct {

ClusterTaskMap *types.ClusterTaskMap
ClusterActorMap *types.ClusterActorMap
ClusterNodeMap *types.ClusterNodeMap
}

var eventFilePattern = regexp.MustCompile(`-\d{4}-\d{2}-\d{2}-\d{2}$`)
Expand All @@ -44,6 +45,9 @@ func NewEventHandler(reader storage.StorageReader) *EventHandler {
ClusterActorMap: &types.ClusterActorMap{
ClusterActorMap: make(map[string]*types.ActorMap),
},
ClusterNodeMap: &types.ClusterNodeMap{
ClusterNodeMap: make(map[string]*types.NodeMap),
},
}
}

Expand Down Expand Up @@ -202,6 +206,16 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
return fmt.Errorf("clusterName is not a string, got %T", clusterNameVal)
}

sessionNameVal, ok := eventMap["sessionName"]
if !ok {
return fmt.Errorf("event missing 'sessionName' field")
}
sessionName, ok := sessionNameVal.(string)
if !ok {
return fmt.Errorf("sessionName is not a string, got %T", sessionNameVal)
}
clusterSessionID := currentClusterName + "_" + sessionName

logrus.Infof("current eventType: %v", eventType)
switch eventType {
case types.TASK_DEFINITION_EVENT:
Expand Down Expand Up @@ -229,7 +243,6 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
t.State = existingEvents[len(existingEvents)-1].State
}
})

case types.TASK_LIFECYCLE_EVENT:
lifecycleEvent, ok := eventMap["taskLifecycleEvent"].(map[string]any)
if !ok {
Expand Down Expand Up @@ -324,7 +337,6 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
t.EndTime = lastEvent.Timestamp
}
})

case types.ACTOR_DEFINITION_EVENT:
actorDef, ok := eventMap["actorDefinitionEvent"]
if !ok {
Expand Down Expand Up @@ -520,11 +532,14 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
}
a.NumRestarts = restartCount
})

case types.ACTOR_TASK_DEFINITION_EVENT:
// TODO: Handle actor task definition event
// This is related to GET /api/v0/tasks (type=ACTOR_TASK)
logrus.Debugf("ACTOR_TASK_DEFINITION_EVENT received, not yet implemented")
case types.NODE_DEFINITION_EVENT:
return h.handleNodeDefinitionEvent(eventMap, clusterSessionID)
case types.NODE_LIFECYCLE_EVENT:
return h.handleNodeLifecycleEvent(eventMap, clusterSessionID)
default:
logrus.Infof("Event not supported, skipping: %v", eventMap)
}
Expand Down Expand Up @@ -708,3 +723,93 @@ func (h *EventHandler) GetActorsMap(clusterName string) map[string]types.Actor {
}
return actors
}

// handleNodeDefinitionEvent processes NODE_DEFINITION_EVENT and merges it with the existing node map for a given cluster session.
func (h *EventHandler) handleNodeDefinitionEvent(eventMap map[string]any, clusterSessionID string) error {
nodeDef, exists := eventMap["nodeDefinitionEvent"]
if !exists {
return fmt.Errorf("event does not have 'nodeDefinitionEvent' field")
}

jsonNodeDefinition, err := json.Marshal(nodeDef)
if err != nil {
return fmt.Errorf("failed to marshal node definition event: %w", err)
}

var currNode types.Node
if err := json.Unmarshal(jsonNodeDefinition, &currNode); err != nil {
return fmt.Errorf("failed to unmarshal node definition event: %w", err)
}

currNode.NodeID, err = utils.ConvertBase64ToHex(currNode.NodeID)
if err != nil {
return fmt.Errorf("failed to convert node ID from base64 to hex: %w", err)
}

nodeMap := h.ClusterNodeMap.GetOrCreateNodeMap(clusterSessionID)
nodeMap.CreateOrMergeNode(currNode.NodeID, func(n *types.Node) {
// TODO(jwj): Handle merging of node definition event to prevent overwriting lifecycle-derived fields.
existingStateTransitions := n.StateTransitions

*n = currNode

if len(existingStateTransitions) > 0 {
n.StateTransitions = existingStateTransitions
}
})

return nil
}

// handleNodeLifecycleEvent processes NODE_LIFECYCLE_EVENT and merges state transitions.
func (h *EventHandler) handleNodeLifecycleEvent(eventMap map[string]any, clusterSessionID string) error {
nodeLifecycle, exists := eventMap["nodeLifecycleEvent"]
if !exists {
return fmt.Errorf("event does not have 'nodeLifecycleEvent' field")
}

jsonNodeLifecycle, err := json.Marshal(nodeLifecycle)
if err != nil {
return fmt.Errorf("failed to marshal node lifecycle event: %w", err)
}

var currNode types.Node
if err := json.Unmarshal(jsonNodeLifecycle, &currNode); err != nil {
return fmt.Errorf("failed to unmarshal node lifecycle event: %w", err)
}

currNode.NodeID, err = utils.ConvertBase64ToHex(currNode.NodeID)
if err != nil {
return fmt.Errorf("failed to convert node ID from base64 to hex: %w", err)
}

nodeMap := h.ClusterNodeMap.GetOrCreateNodeMap(clusterSessionID)
nodeMap.CreateOrMergeNode(currNode.NodeID, func(n *types.Node) {
// Merge state transitions.
n.StateTransitions = MergeStateTransitions[types.NodeStateTransition](
n.StateTransitions,
currNode.StateTransitions,
)
})

return nil
}

func (h *EventHandler) GetNodeMap(clusterSessionID string) map[string]types.Node {
h.ClusterNodeMap.RLock()
defer h.ClusterNodeMap.RUnlock()

nodeMap, ok := h.ClusterNodeMap.ClusterNodeMap[clusterSessionID]
if !ok {
return map[string]types.Node{}
}

nodeMap.Lock()
defer nodeMap.Unlock()

nodes := make(map[string]types.Node, len(nodeMap.NodeMap))
for id, node := range nodeMap.NodeMap {
nodes[id] = node.DeepCopy()
}
return nodes
}
52 changes: 52 additions & 0 deletions historyserver/pkg/eventserver/state_transition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package eventserver
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file defines the interface and processing helpers (e.g., MergeStateTransitions) that can be reused across different events' state transitions.


import (
"sort"
"time"
)

// StateTransition defines a common interface for state transitions.
type StateTransition interface {
GetState() string
GetTimestamp() time.Time
}

// StateTransitionKey is a unique key for deduplication.
type StateTransitionKey struct {
State string
Timestamp int64
}

// MergeStateTransitions merges new state transitions into existing ones, avoiding duplicates and maintaining chronological order.
func MergeStateTransitions[T StateTransition](existingStateTransitions []T, newStateTransitions []T) []T {
// Build a map of existing transition keys for deduplication.
existingKeys := make(map[StateTransitionKey]bool, len(existingStateTransitions))
for _, tr := range existingStateTransitions {
key := StateTransitionKey{
State: tr.GetState(),
Timestamp: tr.GetTimestamp().UnixNano(),
}
existingKeys[key] = true
}

// Append new state transitions that haven't seen before.
mergedStateTransitions := make([]T, len(existingStateTransitions))
copy(mergedStateTransitions, existingStateTransitions)
for _, tr := range newStateTransitions {
key := StateTransitionKey{
State: tr.GetState(),
Timestamp: tr.GetTimestamp().UnixNano(),
}
if !existingKeys[key] {
mergedStateTransitions = append(mergedStateTransitions, tr)
existingKeys[key] = true
}
}

// Sort state transitions by timestamp to maintain chronological order.
sort.Slice(mergedStateTransitions, func(i, j int) bool {
return mergedStateTransitions[i].GetTimestamp().Before(mergedStateTransitions[j].GetTimestamp())
})

return mergedStateTransitions
}
155 changes: 155 additions & 0 deletions historyserver/pkg/eventserver/types/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package types

import (
"sync"
"time"
)

// NodeState is the state of a node.
type NodeState string

// TODO(jwj): Handle redeclaration of states with the actor type.
// actor.go also defines ALIVE and DEAD.
const (
NODE_ALIVE NodeState = "ALIVE"
NODE_DEAD NodeState = "DEAD"
)

// NodeAliveSubState provides more granular state information for nodes in the ALIVE state.
type NodeAliveSubState string

const (
UNSPECIFIED NodeAliveSubState = "UNSPECIFIED"
DRAINING NodeAliveSubState = "DRAINING"
)

// NodeDeathInfoReason specifies the reason why a node died.
type NodeDeathInfoReason string

const (
DEATH_REASON_UNSPECIFIED NodeDeathInfoReason = "UNSPECIFIED"
EXPECTED_TERMINATION NodeDeathInfoReason = "EXPECTED_TERMINATION"
UNEXPECTED_TERMINATION NodeDeathInfoReason = "UNEXPECTED_TERMINATION"
AUTOSCALER_DRAIN_PREEMPTED NodeDeathInfoReason = "AUTOSCALER_DRAIN_PREEMPTED"
AUTOSCALER_DRAIN_IDLE NodeDeathInfoReason = "AUTOSCALER_DRAIN_IDLE"
)

type NodeDeathInfo struct {
Reason NodeDeathInfoReason `json:"reason"`
ReasonMessage string `json:"reasonMessage"`
}

// NodeStateTransition represents a change in a node's state at a specific timestamp.
type NodeStateTransition struct {
// State of a node (ALIVE, DEAD).
State NodeState `json:"state"`

// Timestamp when the state transition occurred.
Timestamp time.Time `json:"timestamp"`

// Resources available on a node (cpu, gpu, etc.), available only in the ALIVE state.
Resources map[string]float64 `json:"resources,omitempty"`

// Reason why a node died (UNSPECIFIED, EXPECTED_TERMINATION, UNEXPECTED_TERMINATION, AUTOSCALER_DRAIN_PREEMPTED, AUTOSCALER_DRAIN_IDLE),
// available only in the DEAD state
DeathInfo *NodeDeathInfo `json:"deathInfo,omitempty"`

// Sub-state of a node in the ALIVE state (UNSPECIFIED, DRAINING), available only in the ALIVE state.
AliveSubState NodeAliveSubState `json:"aliveSubState,omitempty"`
}

func (n NodeStateTransition) GetState() string {
return string(n.State)
}

func (n NodeStateTransition) GetTimestamp() time.Time {
return n.Timestamp
}

type Node struct {
// NodeID is the hexadecimal representation of the node ID.
NodeID string `json:"nodeId"`
NodeIPAddress string `json:"nodeIpAddress,omitempty"`
StartTimestamp time.Time `json:"startTimestamp,omitempty"`
Labels map[string]string `json:"labels,omitempty"`

// TODO(jwj): Make it clearer.
// Available only when there's at least one NODE_LIFECYCLE_EVENT.
StateTransitions []NodeStateTransition `json:"stateTransitions,omitempty"`
}

type NodeMap struct {
NodeMap map[string]Node
Mu sync.Mutex
}

func (n *NodeMap) Lock() {
n.Mu.Lock()
}

func (n *NodeMap) Unlock() {
n.Mu.Unlock()
}

func NewNodeMap() *NodeMap {
return &NodeMap{
NodeMap: make(map[string]Node),
}
}

type ClusterNodeMap struct {
ClusterNodeMap map[string]*NodeMap
Mu sync.RWMutex
}

func (c *ClusterNodeMap) RLock() {
c.Mu.RLock()
}

func (c *ClusterNodeMap) RUnlock() {
c.Mu.RUnlock()
}

func (c *ClusterNodeMap) Lock() {
c.Mu.Lock()
}

func (c *ClusterNodeMap) Unlock() {
c.Mu.Unlock()
}

// GetOrCreateNodeMap retrieves the NodeMap for the given cluster session, creating it if it doesn't exist.
func (c *ClusterNodeMap) GetOrCreateNodeMap(clusterSessionID string) *NodeMap {
c.Lock()
defer c.Unlock()

nodeMap, exists := c.ClusterNodeMap[clusterSessionID]
if !exists {
nodeMap = NewNodeMap()
c.ClusterNodeMap[clusterSessionID] = nodeMap
}
return nodeMap
}

// CreateOrMergeNode retrieves or creates a Node and applies the merge function.
func (n *NodeMap) CreateOrMergeNode(nodeId string, mergeFn func(*Node)) {
n.Lock()
defer n.Unlock()

node, exists := n.NodeMap[nodeId]
if !exists {
node = Node{NodeID: nodeId}
}

mergeFn(&node)
n.NodeMap[nodeId] = node
}

func (n Node) DeepCopy() Node {
cp := n
if len(n.StateTransitions) > 0 {
cp.StateTransitions = make([]NodeStateTransition, len(n.StateTransitions))
copy(cp.StateTransitions, n.StateTransitions)
}
return cp
}
Loading
Loading