@@ -22,6 +22,7 @@ type EventHandler struct {
2222
2323 ClusterTaskMap * types.ClusterTaskMap
2424 ClusterActorMap * types.ClusterActorMap
25+ ClusterJobMap * types.ClusterJobMap
2526}
2627
2728var eventFilePattern = regexp .MustCompile (`-\d{4}-\d{2}-\d{2}-\d{2}$` )
@@ -44,6 +45,9 @@ func NewEventHandler(reader storage.StorageReader) *EventHandler {
4445 ClusterActorMap : & types.ClusterActorMap {
4546 ClusterActorMap : make (map [string ]* types.ActorMap ),
4647 },
48+ ClusterJobMap : & types.ClusterJobMap {
49+ ClusterJobMap : make (map [string ]* types.JobMap ),
50+ },
4751 }
4852}
4953
@@ -115,6 +119,7 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
115119 clusterList := h .reader .List ()
116120 for _ , clusterInfo := range clusterList {
117121 clusterNameNamespace := clusterInfo .Name + "_" + clusterInfo .Namespace
122+ clusterSessionKey := utils .BuildClusterSessionKey (clusterInfo .Name , clusterInfo .Namespace , clusterInfo .SessionName )
118123 eventFileList := append (h .getAllJobEventFiles (clusterInfo ), h .getAllNodeEventFiles (clusterInfo )... )
119124
120125 logrus .Infof ("current eventFileList for cluster %s is: %v" , clusterInfo .Name , eventFileList )
@@ -145,7 +150,7 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
145150 if curr == nil {
146151 continue
147152 }
148- curr ["clusterName" ] = clusterInfo . Name + "_" + clusterInfo . Namespace
153+ curr ["clusterName" ] = clusterSessionKey
149154 eventProcessorChannels [i % numOfEventProcessors ] <- curr
150155 }
151156 }
@@ -525,6 +530,170 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
525530 // TODO: Handle actor task definition event
526531 // This is related to GET /api/v0/tasks (type=ACTOR_TASK)
527532 logrus .Debugf ("ACTOR_TASK_DEFINITION_EVENT received, not yet implemented" )
533+
534+ case types .DRIVER_JOB_DEFINITION_EVENT :
535+ // NOTE: When event comes in, JobID will be in base64, processing will convert it to Hex
536+ jobDef , ok := eventMap ["driverJobDefinitionEvent" ]
537+ if ! ok {
538+ return fmt .Errorf ("event does not have 'driverJobDefinitionEvent'" )
539+ }
540+
541+ jsonDriverJobDefinition , err := json .Marshal (jobDef )
542+ if err != nil {
543+ return err
544+ }
545+
546+ var currJob types.Job
547+ if err := json .Unmarshal (jsonDriverJobDefinition , & currJob ); err != nil {
548+ return err
549+ }
550+
551+ // Convert JobID from base64 to hex
552+ currJob .JobID , err = utils .ConvertBase64ToHex (currJob .JobID )
553+ if err != nil {
554+ logrus .Errorf ("Failed to convert JobID from base64 to Hex, will keep JobID in base64: %v" , err )
555+ }
556+
557+ // Convert DriverNodeID from base64 to hex
558+ currJob .DriverNodeID , err = utils .ConvertBase64ToHex (currJob .DriverNodeID )
559+ if err != nil {
560+ logrus .Errorf ("Failed to convert DriverNodeID from base64 to hex, will keep DriverNodeID in base64: %v" , err )
561+ }
562+
563+ jobMap := h .ClusterJobMap .GetOrCreateJobMap (currentClusterName )
564+ jobMap .CreateOrMergeJob (currJob .JobID , func (j * types.Job ) {
565+ // If for some reason jobID is empty, we will keep whatever is in 'j'
566+ var existingJobID string
567+ if currJob .JobID == "" {
568+ existingJobID = j .JobID
569+ }
570+
571+ // ========== Lifecycle-derived fields ==========
572+ // These fields are set by DRIVER_JOB_LIFECYCLE_EVENT
573+ // We need to preserve them if lifecycle event arrived before definition event
574+ existingStateTransitions := j .StateTransitions
575+ existingState := j .State
576+ existingStartTime := j .StartTime
577+ existingEndTime := j .EndTime
578+
579+ // Overwrite with definition fields
580+ * j = currJob
581+
582+ // Restore lifecycle-derived fields if they existed
583+ if len (existingStateTransitions ) > 0 {
584+ if existingJobID != "" {
585+ // This means that jobID was somehow empty.
586+ j .JobID = existingJobID
587+ }
588+ j .StateTransitions = existingStateTransitions
589+ j .State = existingState
590+ j .StartTime = existingStartTime
591+ j .EndTime = existingEndTime
592+ }
593+
594+ // ========== Definition-only fields ==========
595+ // Status, StatusTransitions, Message, DriverExitCode
596+ // are ONLY set by DRIVER_JOB_DEFINITION_EVENT
597+ // They are already in currJob, no need to restore
598+ })
599+ case types .DRIVER_JOB_LIFECYCLE_EVENT :
600+ // NOTE: When event comes in, JobID will be in base64, processing will convert it to Hex
601+ jobLifecycleEvent , ok := eventMap ["driverJobLifecycleEvent" ].(map [string ]any )
602+ if ! ok {
603+ return fmt .Errorf ("invalid driverJobLifecycleEvent format" )
604+ }
605+
606+ // Get JobID and also convert JobID to hex from base64
607+ // Will leave it as it if it's empty or somehow not a string
608+ jobId , ok := jobLifecycleEvent ["jobId" ].(string )
609+ if ! ok {
610+ logrus .Errorf ("jobID is missing or is not a string, leaving it as is" )
611+ } else {
612+ jobId , _ = utils .ConvertBase64ToHex (jobId )
613+ }
614+ stateTransitionUnstructed , _ := jobLifecycleEvent ["stateTransitions" ].([]any )
615+
616+ if len (stateTransitionUnstructed ) == 0 || jobId == "" {
617+ return nil
618+ }
619+
620+ // TODO(chiayi): Will need to convert status timeline once it is added as well
621+ // Following fields are related to status transition:
622+ // - status
623+ // - message
624+ // - errorType
625+ // - driverExitCode
626+ var stateTransitions []types.JobStateTransition
627+ for _ , transition := range stateTransitionUnstructed {
628+ tr , ok := transition .(map [string ]any )
629+ if ! ok {
630+ continue
631+ }
632+ state , _ := tr ["state" ].(string )
633+ timestampStr , _ := tr ["timestamp" ].(string )
634+
635+ var timestamp time.Time
636+ if timestampStr != "" {
637+ timestamp , _ = time .Parse (time .RFC3339Nano , timestampStr )
638+ }
639+
640+ stateTransitions = append (stateTransitions , types.JobStateTransition {
641+ State : types .JobState (state ),
642+ Timestamp : timestamp ,
643+ })
644+ }
645+
646+ if len (stateTransitions ) == 0 {
647+ return nil
648+ }
649+
650+ jobMap := h .ClusterJobMap .GetOrCreateJobMap (currentClusterName )
651+ jobMap .CreateOrMergeJob (jobId , func (j * types.Job ) {
652+ // TODO(chiayi): take care of status (job progress) state if part of DriverJobLifecycleEvent
653+ j .JobID = jobId
654+
655+ type stateTransitionKey struct {
656+ State string
657+ Timestamp int64
658+ }
659+
660+ existingStateKeys := make (map [stateTransitionKey ]bool )
661+ for _ , t := range j .StateTransitions {
662+ existingStateKeys [stateTransitionKey {string (t .State ), t .Timestamp .UnixNano ()}] = true
663+ }
664+
665+ for _ , t := range stateTransitions {
666+ key := stateTransitionKey {string (t .State ), t .Timestamp .UnixNano ()}
667+ if ! existingStateKeys [key ] {
668+ j .StateTransitions = append (j .StateTransitions , t )
669+ existingStateKeys [key ] = true
670+ }
671+ }
672+
673+ sort .Slice (j .StateTransitions , func (i , k int ) bool {
674+ return j .StateTransitions [i ].Timestamp .Before (j .StateTransitions [k ].Timestamp )
675+ })
676+
677+ if len (j .StateTransitions ) == 0 {
678+ return
679+ }
680+
681+ lastStateTransition := j .StateTransitions [len (j .StateTransitions )- 1 ]
682+ j .State = lastStateTransition .State
683+
684+ if j .StartTime .IsZero () {
685+ for _ , t := range j .StateTransitions {
686+ if t .State == types .CREATED {
687+ j .StartTime = t .Timestamp
688+ break
689+ }
690+ }
691+ }
692+
693+ if lastStateTransition .State == types .JOBFINISHED {
694+ j .EndTime = lastStateTransition .Timestamp
695+ }
696+ })
528697 default :
529698 logrus .Infof ("Event not supported, skipping: %v" , eventMap )
530699 }
@@ -708,3 +877,41 @@ func (h *EventHandler) GetActorsMap(clusterName string) map[string]types.Actor {
708877 }
709878 return actors
710879}
880+
881+ func (h * EventHandler ) GetJobsMap (clusterName string ) map [string ]types.Job {
882+ h .ClusterJobMap .RLock ()
883+ defer h .ClusterJobMap .RUnlock ()
884+
885+ jobMap , ok := h .ClusterJobMap .ClusterJobMap [clusterName ]
886+ if ! ok {
887+ return map [string ]types.Job {}
888+ }
889+
890+ jobMap .Lock ()
891+ defer jobMap .Unlock ()
892+
893+ jobs := make (map [string ]types.Job , len (jobMap .JobMap ))
894+ for id , job := range jobMap .JobMap {
895+ jobs [id ] = job .DeepCopy ()
896+ }
897+ return jobs
898+ }
899+
900+ func (h * EventHandler ) GetJobByJobID (clusterName , jobID string ) (types.Job , bool ) {
901+ h .ClusterJobMap .RLock ()
902+ defer h .ClusterJobMap .RUnlock ()
903+
904+ jobMap , ok := h .ClusterJobMap .ClusterJobMap [clusterName ]
905+ if ! ok {
906+ return types.Job {}, false
907+ }
908+
909+ jobMap .Lock ()
910+ defer jobMap .Unlock ()
911+
912+ job , ok := jobMap .JobMap [jobID ]
913+ if ! ok {
914+ return types.Job {}, false
915+ }
916+ return job .DeepCopy (), true
917+ }
0 commit comments