Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions cmd/trainer-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"net/http"
"os"
"sync/atomic"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -148,8 +149,30 @@ func main() {
setupLog.Error(err, "Could not initialize runtimes")
os.Exit(1)
}

// Register status server before goroutine starts and before mgr.Start().
// AddHealthzCheck/AddReadyzCheck return errors if called after the manager
// has already started, which is what was happening when this was inside
// the setupManagerComponents goroutine.
var statusServerReady atomic.Bool
if features.Enabled(features.TrainJobStatus) {
if err := mgr.AddHealthzCheck("status-server", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up status server health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("status-server", func(_ *http.Request) error {
if !statusServerReady.Load() {
return errors.New("status server is not ready")
}
return nil
}); err != nil {
setupLog.Error(err, "unable to set up status server ready check")
os.Exit(1)
}
}

// Set up controllers and other components using goroutines to start the manager quickly.
go setupManagerComponents(mgr, runtimes, &cfg, certsReady, enableHTTP2)
go setupManagerComponents(mgr, runtimes, &cfg, certsReady, enableHTTP2, &statusServerReady)

setupLog.Info("Starting manager")
if err = mgr.Start(ctx); err != nil {
Expand All @@ -158,7 +181,7 @@ func main() {
}
}

func setupManagerComponents(mgr ctrl.Manager, runtimes map[string]runtime.Runtime, cfg *configapi.Configuration, certsReady <-chan struct{}, enableHTTP2 bool) {
func setupManagerComponents(mgr ctrl.Manager, runtimes map[string]runtime.Runtime, cfg *configapi.Configuration, certsReady <-chan struct{}, enableHTTP2 bool, statusServerReady *atomic.Bool) {
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")
Expand All @@ -171,12 +194,12 @@ func setupManagerComponents(mgr ctrl.Manager, runtimes map[string]runtime.Runtim
setupLog.Error(err, "Could not create webhook", "webhook", failedWebhook)
os.Exit(1)
}

if features.Enabled(features.TrainJobStatus) {
if err := statusserver.SetupServer(mgr, cfg.StatusServer, enableHTTP2); err != nil {
setupLog.Error(err, "Could not create runtime status server")
os.Exit(1)
}
statusServerReady.Store(true)
}
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/statusserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"log/slog"
"net/http"
"sync/atomic"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -56,6 +57,7 @@ type Server struct {
httpServer *http.Server
client client.Client
authorizer TokenAuthorizer
ready atomic.Bool
}

var (
Expand Down Expand Up @@ -120,7 +122,9 @@ func (s *Server) Start(ctx context.Context) error {
// Handle graceful shutdown in background
serverShutdown := make(chan struct{})
go func() {
defer close(serverShutdown)
<-ctx.Done()
s.ready.Store(false)
s.log.Info("Shutting down runtime status server")
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
Expand All @@ -129,11 +133,12 @@ func (s *Server) Start(ctx context.Context) error {
}
}()

s.ready.Store(true)
defer s.ready.Store(false)
s.log.Info("Starting runtime status server with TLS", "address", s.httpServer.Addr)
if err := s.httpServer.ListenAndServeTLS("", ""); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("runtime status server failed: %w", err)
}

<-serverShutdown
return nil
}
Expand All @@ -143,6 +148,14 @@ func (s *Server) NeedLeaderElection() bool {
return false
}

// Check implements healthz.Checker and returns an error if the status server is not ready.
func (s *Server) Check(_ *http.Request) error {
if !s.ready.Load() {
return fmt.Errorf("runtime status server is not ready")
}
return nil
}

// handleTrainJobRuntimeStatus handles POST requests to update TrainJob status.
// Expected URL format: /apis/trainer.kubeflow.org/v1alpha1/namespaces/{namespace}/trainjobs/{name}/status
func (s *Server) handleTrainJobRuntimeStatus(w http.ResponseWriter, r *http.Request) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/statusserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,32 @@ func TestServerErrorResponses(t *testing.T) {
})
}
}

func TestServerCheck(t *testing.T) {
srv, err := NewServer(
nil,
&configapi.StatusServer{Port: ptr.To[int32](8080)},
&tls.Config{},
fakeAuthorizer{},
)
if err != nil {
t.Fatalf("NewServer() error: %v", err)
}

// Before ready: Check should return error
if err := srv.Check(nil); err == nil {
t.Error("Check() = nil, want error before server is ready")
}

// After marking ready: Check should return nil
srv.ready.Store(true)
if err := srv.Check(nil); err != nil {
t.Errorf("Check() = %v, want nil after server is ready", err)
}

// After marking not ready: Check should return error again
srv.ready.Store(false)
if err := srv.Check(nil); err == nil {
t.Error("Check() = nil, want error after server marked not ready")
}
}
Loading