Skip to content

Commit 3536abe

Browse files
authored
Merge branch 'master' into add_metadata_write_logic_in_history_server
Signed-off-by: Jie-Kai Chang <jiekaichang@apache.org>
2 parents aadca4b + 0e64bd8 commit 3536abe

File tree

8 files changed

+111
-71
lines changed

8 files changed

+111
-71
lines changed

historyserver/cmd/collector/main.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package main
22

33
import (
4-
"context"
54
"encoding/json"
65
"flag"
76
"fmt"
87
"os"
8+
"os/signal"
99
"path"
1010
"strconv"
1111
"time"
@@ -108,18 +108,36 @@ func main() {
108108
panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".")
109109
}
110110

111+
var wg sync.WaitGroup
112+
113+
sigChan := make(chan os.Signal, 1)
114+
stop := make(chan struct{}, 1)
115+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
116+
117+
wg.Add(1)
111118
// Create and initialize EventServer
112-
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
113-
eventServer.InitServer(eventsPort)
114-
115-
collector := runtime.NewCollector(&globalConfig, writer)
116-
_ = collector.Start(context.TODO().Done())
117-
118-
eventStop := eventServer.WaitForStop()
119-
logStop := collector.WaitForStop()
120-
<-eventStop
121-
logrus.Info("Event server shutdown")
122-
<-logStop
123-
logrus.Info("Log server shutdown")
124-
logrus.Info("All servers shutdown")
119+
go func() {
120+
defer wg.Done()
121+
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
122+
eventServer.InitServer(stop, eventsPort)
123+
logrus.Info("Event server shutdown")
124+
}()
125+
126+
wg.Add(1)
127+
go func() {
128+
defer wg.Done()
129+
collector := runtime.NewCollector(&globalConfig, writer)
130+
collector.Run(stop)
131+
logrus.Info("Log server shutdown")
132+
}()
133+
134+
<-sigChan
135+
logrus.Info("Received shutdown signal, initiating graceful shutdown...")
136+
137+
// Stop both the event server and the collector
138+
close(stop)
139+
140+
// Wait for both goroutines to complete
141+
wg.Wait()
142+
logrus.Info("Graceful shutdown complete")
125143
}

historyserver/cmd/historyserver/main.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,23 @@ func main() {
6363
// WaitGroup to track goroutine completion
6464
var wg sync.WaitGroup
6565

66+
sigChan := make(chan os.Signal, 1)
67+
stop := make(chan struct{}, 1)
68+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
69+
6670
// Start EventHandler in background goroutine
67-
eventStop := make(chan struct{}, 1)
6871
wg.Add(1)
6972
go func() {
7073
defer wg.Done()
7174
logrus.Info("Starting EventHandler in background...")
72-
if err := eventHandler.Run(eventStop, 2); err != nil {
75+
if err := eventHandler.Run(stop, 2); err != nil {
7376
logrus.Errorf("EventHandler stopped with error: %v", err)
7477
}
7578
logrus.Info("EventHandler shutdown complete")
7679
}()
7780

7881
handler := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler)
7982

80-
sigChan := make(chan os.Signal, 1)
81-
stop := make(chan struct{}, 1)
82-
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
83-
8483
wg.Add(1)
8584
go func() {
8685
defer wg.Done()
@@ -92,8 +91,7 @@ func main() {
9291
logrus.Info("Received shutdown signal, initiating graceful shutdown...")
9392

9493
// Stop both the server and the event handler
95-
stop <- struct{}{}
96-
eventStop <- struct{}{}
94+
close(stop)
9795

9896
// Wait for both goroutines to complete
9997
wg.Wait()

historyserver/pkg/collector/eventserver/eventserver.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ import (
77
"io"
88
"net/http"
99
"os"
10-
"os/signal"
1110
"path"
1211
"strings"
1312
"sync"
14-
"syscall"
1513
"time"
1614

1715
"github.com/emicklei/go-restful/v3"
@@ -83,7 +81,7 @@ func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, c
8381
return server
8482
}
8583

86-
func (es *EventServer) InitServer(port int) {
84+
func (es *EventServer) InitServer(stop <-chan struct{}, port int) {
8785
ws := new(restful.WebService)
8886
ws.Path("/v1")
8987
ws.Consumes(restful.MIME_JSON)
@@ -101,16 +99,10 @@ func (es *EventServer) InitServer(port int) {
10199
es.periodicFlush()
102100
}()
103101

104-
// Handle SIGTERM signal
105-
sigChan := make(chan os.Signal, 1)
106-
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
107-
108-
go func() {
109-
<-sigChan
110-
logrus.Info("Received SIGTERM, flushing events to storage")
111-
es.flushEvents()
112-
close(es.stopped)
113-
}()
102+
<-stop
103+
logrus.Info("Received stop signal, flushing events to storage")
104+
es.flushEvents()
105+
close(es.stopped)
114106
}
115107

116108
// watchNodeIDFile watches /tmp/ray/raylet_node_id for content changes
@@ -297,10 +289,6 @@ func (es *EventServer) periodicFlush() {
297289
}
298290
}
299291

300-
func (es *EventServer) WaitForStop() <-chan struct{} {
301-
return es.stopped
302-
}
303-
304292
func (es *EventServer) flushEvents() {
305293
es.mutex.Lock()
306294
if len(es.events) == 0 {
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package runtime
22

33
type RayLogCollector interface {
4-
Start(stop <-chan struct{}) error
5-
WaitForStop() <-chan struct{}
4+
Run(stop <-chan struct{}) error
65
}

historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import (
66
"io/fs"
77
"net/http"
88
"os"
9-
"os/signal"
109
"path"
1110
"path/filepath"
1211
"strings"
1312
"sync"
14-
"syscall"
1513
"time"
1614

1715
"github.com/fsnotify/fsnotify"
@@ -47,11 +45,6 @@ type RayLogHandler struct {
4745
JobsUrlInfo *types.UrlInfo
4846
}
4947

50-
func (r *RayLogHandler) Start(stop <-chan struct{}) error {
51-
go r.Run(stop)
52-
return nil
53-
}
54-
5548
func (r *RayLogHandler) Run(stop <-chan struct{}) error {
5649
// watchPath := r.LogDir
5750
r.prevLogsDir = filepath.Join("/tmp", "ray", "prev-logs")
@@ -66,10 +59,6 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
6659
}
6760
defer watcher.Close()
6861

69-
// Setup signal handling for SIGTERM
70-
sigChan := make(chan os.Signal, 1)
71-
signal.Notify(sigChan, syscall.SIGTERM)
72-
7362
// WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup
7463
// to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories.
7564
// After scanning, it watches for new directories and files. This ensures incomplete
@@ -83,22 +72,12 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
8372
go r.PersistMetaLoop(stop)
8473
}
8574

86-
select {
87-
case <-sigChan:
88-
logrus.Info("Received SIGTERM, processing all logs...")
89-
r.processSessionLatestLogs()
90-
close(r.ShutdownChan)
91-
case <-stop:
92-
logrus.Info("Received stop signal, processing all logs...")
93-
r.processSessionLatestLogs()
94-
close(r.ShutdownChan)
95-
}
96-
logrus.Warnf("Receive stop single, so stop ray collector ")
97-
return nil
98-
}
75+
<-stop
76+
logrus.Info("Received stop signal, processing all logs...")
77+
r.processSessionLatestLogs()
78+
close(r.ShutdownChan)
9979

100-
func (r *RayLogHandler) WaitForStop() <-chan struct{} {
101-
return r.ShutdownChan
80+
return nil
10281
}
10382

10483
// processSessionLatestLogs processes logs in /tmp/ray/session_latest/logs directory

historyserver/pkg/historyserver/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func NewServerHandler(c *types.RayHistoryServerConfig, dashboardDir string, read
3939
}
4040
}
4141

42-
func (s *ServerHandler) Run(stop chan struct{}) error {
42+
func (s *ServerHandler) Run(stop <-chan struct{}) error {
4343
s.RegisterRouter()
4444
port := ":8080"
4545
server := &http.Server{

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,18 @@ func GetPluginName() string { return "kai-scheduler" }
3636
func (k *KaiScheduler) Name() string { return GetPluginName() }
3737

3838
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, object metav1.Object) error {
39-
_, ok := object.(*rayv1.RayCluster)
40-
if !ok {
41-
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
39+
// In K8sJobMode and InteractiveMode, RayJob first creates a RayCluster,
40+
// and then creates the submitter pod after the RayCluster is ready.
41+
// KAI-Scheduler does not handle this two-phase creation pattern.
42+
// Other schedulers like Yunikorn and Volcano support this by pre-creating PodGroups.
43+
// For more details, see https://github.com/ray-project/kuberay/pull/4418#pullrequestreview-3751609041
44+
if rayJob, ok := object.(*rayv1.RayJob); ok {
45+
switch rayJob.Spec.SubmissionMode {
46+
case rayv1.K8sJobMode:
47+
return fmt.Errorf("KAI-Scheduler does not support RayJob with K8sJobMode: the submitter pod is created after RayCluster is ready, preventing proper gang scheduling")
48+
case rayv1.InteractiveMode:
49+
return fmt.Errorf("KAI-Scheduler does not support RayJob with InteractiveMode: the user-provided submitter runs after RayCluster is ready, preventing proper gang scheduling")
50+
}
4251
}
4352
return nil
4453
}

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,19 @@ func createTestRayCluster(labels map[string]string) *rayv1.RayCluster {
2121
}
2222
}
2323

24+
func createTestRayJob(labels map[string]string) *rayv1.RayJob {
25+
return &rayv1.RayJob{
26+
ObjectMeta: metav1.ObjectMeta{
27+
Name: "test-job",
28+
Namespace: "default",
29+
Labels: labels,
30+
},
31+
Spec: rayv1.RayJobSpec{
32+
RayClusterSpec: &rayv1.RayClusterSpec{},
33+
},
34+
}
35+
}
36+
2437
func createTestPod() *corev1.Pod {
2538
return &corev1.Pod{
2639
ObjectMeta: metav1.ObjectMeta{
@@ -139,3 +152,39 @@ func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
139152
a.Equal("existing-value", pod.Labels["existing-label"])
140153
a.Equal("ray", pod.Labels["app"])
141154
}
155+
156+
func TestAddMetadataToChildResource_WithRayJob(t *testing.T) {
157+
a := assert.New(t)
158+
scheduler := &KaiScheduler{}
159+
ctx := context.Background()
160+
161+
rayJob := createTestRayJob(map[string]string{
162+
QueueLabelName: "test-queue",
163+
})
164+
pod := createTestPod()
165+
166+
scheduler.AddMetadataToChildResource(ctx, rayJob, pod, "test-group")
167+
168+
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
169+
170+
a.NotNil(pod.Labels)
171+
a.Equal("test-queue", pod.Labels[QueueLabelName])
172+
}
173+
174+
func TestAddMetadataToChildResource_WithRayJob_WithoutQueueLabel(t *testing.T) {
175+
a := assert.New(t)
176+
scheduler := &KaiScheduler{}
177+
ctx := context.Background()
178+
179+
rayJob := createTestRayJob(map[string]string{})
180+
pod := createTestPod()
181+
182+
scheduler.AddMetadataToChildResource(ctx, rayJob, pod, "test-group")
183+
184+
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
185+
186+
if pod.Labels != nil {
187+
_, exists := pod.Labels[QueueLabelName]
188+
a.False(exists)
189+
}
190+
}

0 commit comments

Comments
 (0)