@@ -3,8 +3,10 @@ package job
33import (
44 "bufio"
55 "context"
6+ "crypto/rand"
67 "fmt"
78 "log"
9+ "math/big"
810 "net/http"
911 "os"
1012 "os/exec"
@@ -27,21 +29,17 @@ import (
2729 "github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
2830 "github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
2931 rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
30- "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
3132 rayscheme "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
3233)
3334
3435const (
3536 dashboardAddr = "http://localhost:8265"
3637 clusterTimeout = 120.0
3738 portforwardtimeout = 60.0
38- jobIDTimeout = 60.0
39- jobIDPollInterval = 1.0
4039)
4140
4241type SubmitJobOptions struct {
4342 cmdFactory cmdutil.Factory
44- dashboardClient utils.RayDashboardClientInterface
4543 ioStreams * genericiooptions.IOStreams
4644 RayJob * rayv1.RayJob
4745 workerNodeSelectors map [string ]string
@@ -471,10 +469,14 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
471469 }
472470 fmt .Printf ("Portforwarding started on %s\n " , dashboardAddr )
473471
474- // Initialize dashboard client after port-forwarding is ready
475- options .dashboardClient = & utils.RayDashboardClient {}
476- if err := options .dashboardClient .InitClient (portforwardctx , strings .TrimPrefix (dashboardAddr , "http://" ), nil ); err != nil {
477- return fmt .Errorf ("failed to initialize dashboard client: %w" , err )
472+ // If submission ID is not provided by the user, generate one.
473+ if options .submissionID == "" {
474+ generatedID , err := generateSubmissionID ()
475+ if err != nil {
476+ return fmt .Errorf ("failed to generate submission ID: %w" , err )
477+ }
478+ options .submissionID = generatedID
479+ fmt .Printf ("Generated submission ID for Ray job: %s\n " , options .submissionID )
478480 }
479481
480482 // Submitting ray job to cluster
@@ -503,38 +505,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
503505 }
504506 }()
505507
506- var rayJobID string
507- if options .submissionID != "" {
508- rayJobID = options .submissionID
509- } else {
510- // Create a channel to receive rayJobID from the API
511- rayJobIDChan := make (chan string )
512-
513- // Poll the API for the rayJobID
514- go func () {
515- pollStart := time .Now ()
516- for {
517- jobID , err := options .getJobIDViaAPI (portforwardctx )
518- if err == nil {
519- rayJobIDChan <- jobID
520- break
521- }
522- if time .Since (pollStart ).Seconds () > jobIDTimeout {
523- close (rayJobIDChan )
524- break
525- }
526- sleepDur := time .Duration (jobIDPollInterval * float64 (time .Second ))
527- time .Sleep (sleepDur )
528- }
529- }()
530-
531- // Wait till rayJobID is populated or the timeout occurs
532- jobID , ok := <- rayJobIDChan
533- if ! ok {
534- return fmt .Errorf ("submit failed: timeout waiting for job ID from API after %v" , jobIDTimeout )
535- }
536- rayJobID = jobID
537- }
508+ rayJobID := options .submissionID
538509
539510 rayCmdStdOutScanner := bufio .NewScanner (rayCmdStdOut )
540511 rayCmdStdErrScanner := bufio .NewScanner (rayCmdStdErr )
@@ -693,24 +664,6 @@ func (options *SubmitJobOptions) raySubmitCmd() ([]string, error) {
693664 return raySubmitCmd , nil
694665}
695666
696- // Get the job ID from the dashboard API
697- func (options * SubmitJobOptions ) getJobIDViaAPI (ctx context.Context ) (string , error ) {
698- jobs , err := options .dashboardClient .ListJobs (ctx )
699- if err != nil {
700- return "" , fmt .Errorf ("failed to list jobs via dashboard client: %w" , err )
701- }
702-
703- if jobs == nil || len (* jobs ) == 0 {
704- return "" , fmt .Errorf ("no jobs returned from dashboard" )
705- }
706-
707- // Basically, there is only one job in the list, so we can just return the first one.
708- for _ , job := range * jobs {
709- return job .SubmissionId , nil
710- }
711- return "" , fmt .Errorf ("no jobs found from dashboard" )
712- }
713-
714667// Decode RayJob YAML if we decide to submit job using kube client
715668func decodeRayJobYaml (rayJobFilePath string ) (* rayv1.RayJob , error ) {
716669 decodedRayJob := & rayv1.RayJob {}
@@ -751,3 +704,21 @@ func runtimeEnvHasWorkingDir(runtimePath string) (string, error) {
751704func isRayClusterReady (rayCluster * rayv1.RayCluster ) bool {
752705 return meta .IsStatusConditionTrue (rayCluster .Status .Conditions , "Ready" ) || rayCluster .Status .State == rayv1 .Ready
753706}
707+
708+ // Generates a 16-character random ID with a prefix, mimicking Ray Job submission_id.
709+ // ref: ray/python/ray/dashboard/modules/job/job_manager.py
710+ func generateSubmissionID () (string , error ) {
711+ // ASCII letters and digits, excluding confusing characters I, l, O, 0, o.
712+ const possibleChars = "abcdefghijkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789"
713+
714+ idRunes := make ([]rune , 16 )
715+ for i := range idRunes {
716+ // Securely generate a random index.
717+ idx , err := rand .Int (rand .Reader , big .NewInt (int64 (len (possibleChars ))))
718+ if err != nil {
719+ return "" , err
720+ }
721+ idRunes [i ] = rune (possibleChars [idx .Int64 ()])
722+ }
723+ return fmt .Sprintf ("raysubmit_%s" , string (idRunes )), nil
724+ }
0 commit comments