diff --git a/historyserver/cmd/collector/main.go b/historyserver/cmd/collector/main.go index c45165a5949..f7cef2b66d7 100644 --- a/historyserver/cmd/collector/main.go +++ b/historyserver/cmd/collector/main.go @@ -3,11 +3,13 @@ package main import ( "encoding/json" "flag" + "fmt" "os" "os/signal" "path" "sync" "syscall" + "strconv" "time" "github.com/sirupsen/logrus" @@ -30,6 +32,7 @@ func main() { logBatching := 1000 eventsPort := 8080 pushInterval := time.Minute + supportUnsupportedData := false runtimeClassConfigPath := "/var/collector-config/data" flag.StringVar(&role, "role", "Worker", "") @@ -44,6 +47,20 @@ func main() { flag.Parse() + // Read SUPPORT_RAY_EVENT_UNSUPPORTED_DATA environment variable if collector runs in head node + if envValue := os.Getenv("SUPPORT_RAY_EVENT_UNSUPPORTED_DATA"); role == "Head" && envValue != "" { + if parsed, err := strconv.ParseBool(envValue); err == nil { + supportUnsupportedData = parsed + } else { + logrus.Warnf("Invalid value for SUPPORT_RAY_EVENT_UNSUPPORTED_DATA: %s, using default: %v", envValue, supportUnsupportedData) + } + } + + dashboardAddress := os.Getenv("RAY_DASHBOARD_ADDRESS") + if dashboardAddress == "" { + panic(fmt.Errorf("missing RAY_DASHBOARD_ADDRESS in environment variables")) + } + sessionDir, err := utils.GetSessionDir() if err != nil { panic("Failed to get session dir: " + err.Error()) @@ -75,14 +92,16 @@ func main() { } globalConfig := types.RayCollectorConfig{ - RootDir: rayRootDir, - SessionDir: sessionDir, - RayNodeName: rayNodeId, - Role: role, - RayClusterName: rayClusterName, - RayClusterID: rayClusterId, - PushInterval: pushInterval, - LogBatching: logBatching, + RootDir: rayRootDir, + SessionDir: sessionDir, + RayNodeName: rayNodeId, + Role: role, + RayClusterName: rayClusterName, + RayClusterID: rayClusterId, + PushInterval: pushInterval, + LogBatching: logBatching, + DashboardAddress: dashboardAddress, + SupportRayEventUnSupportData: supportUnsupportedData, } logrus.Info("Using collector config: ", globalConfig) diff --git a/historyserver/config/raycluster-azureblob.yaml b/historyserver/config/raycluster-azureblob.yaml index e57494695f6..8382315736f 100644 --- a/historyserver/config/raycluster-azureblob.yaml +++ b/historyserver/config/raycluster-azureblob.yaml @@ -75,11 +75,15 @@ spec: image: collector:v0.1.0 imagePullPolicy: IfNotPresent env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://localhost:8265" # reference: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite#connect-to-the-emulator-by-using-the-azure-storage-explorer - name: AZURE_STORAGE_CONNECTION_STRING value: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-service.azurite-dev.svc.cluster.local:10000/devstoreaccount1;" - name: AZURE_STORAGE_CONTAINER value: ray-historyserver + - name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA + value: "true" command: [collector, --role=Head, --runtime-class-name=azureblob, --ray-cluster-name=raycluster-historyserver, --ray-root-dir=log, --events-port=8084] volumeMounts: - name: historyserver @@ -161,6 +165,8 @@ spec: image: collector:v0.1.0 imagePullPolicy: IfNotPresent env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://raycluster-historyserver-head-svc:8265" # reference: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite#connect-to-the-emulator-by-using-the-azure-storage-explorer - name: AZURE_STORAGE_CONNECTION_STRING value: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-service.azurite-dev.svc.cluster.local:10000/devstoreaccount1;" diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index 1e3dfc33d30..0acffe3fb40 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -77,6 +77,8 @@ spec: image: collector:v0.1.0 imagePullPolicy: IfNotPresent env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://localhost:8265" - name: S3DISABLE_SSL value: "true" - name: AWS_S3ID @@ -93,6 +95,8 @@ spec: value: "test" - name: S3FORCE_PATH_STYLE value: "true" + - name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA + value: "true" command: - collector - --role=Head @@ -182,6 +186,8 @@ spec: image: collector:v0.1.0 imagePullPolicy: IfNotPresent env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://raycluster-historyserver-head-svc:8265" - name: AWS_S3ID value: minioadmin - name: AWS_S3SECRET diff --git a/historyserver/config/rayjob_workaround.yaml b/historyserver/config/rayjob_workaround.yaml new file mode 100644 index 00000000000..600dc1c2268 --- /dev/null +++ b/historyserver/config/rayjob_workaround.yaml @@ -0,0 +1,48 @@ +# RayJob that demonstrates endpoints used by the "persist metadata" workaround: +# - Placement groups (api/v0/placement_groups) +# - Datasets (api/jobs/ + api/data/datasets/{job_id}) +# +# Use this with the History Server setup guide (Step 6). Ensure the collector +# has SUPPORT_RAY_EVENT_UNSUPPORTED_DATA enabled so it persists these to blob storage. +# After the job runs, you can view the data in Ray Dashboard, and after cluster +# teardown, via History Server (from persisted meta). +# +# Note: RayJob connects to the RayCluster created by RayService above via clusterSelector +# +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-workaround +spec: + entrypoint: | + python -c " + import ray + import time + + ray.init() + + # --- 1. Placement group (visible under api/v0/placement_groups) --- + pg = ray.util.placement_group( + name='workaround_pg', + strategy='PACK', + bundles=[{'CPU': 0.2}] + ) + ray.get(pg.ready()) + print('Placement group created:', pg.id.hex()) + + # --- 2. Dataset (creates job + dataset; visible under api/jobs/ and api/data/datasets/) --- + ds = ray.data.range(10) + n = ds.count() + print('Dataset created, count:', n) + + # Keep running so the collector can poll Dashboard API and persist meta (e.g. every 5s). + # Wait long enough for at least one full persist cycle. + print('Waiting 15s for collector to persist metadata...') + time.sleep(15) + + # Align with existing guide: allow events to be sent before shutdown. + time.sleep(5) + print('Done.') + " + clusterSelector: + ray.io/cluster: raycluster-historyserver diff --git a/historyserver/config/rayservice_workaround.yaml b/historyserver/config/rayservice_workaround.yaml new file mode 100644 index 00000000000..a8a89cfebbf --- /dev/null +++ b/historyserver/config/rayservice_workaround.yaml @@ -0,0 +1,260 @@ +# RayService that deploys a simple serve application and creates a RayCluster +# The RayCluster created by this RayService will have the label ray.io/cluster: raycluster-historyserver +# so that RayJob can connect to it via clusterSelector +apiVersion: ray.io/v1 +kind: RayService +metadata: + name: rayservice-workaround + labels: + ray.io/cluster: raycluster-historyserver +spec: + serviceUnhealthySecondThreshold: 300 + deploymentUnhealthySecondThreshold: 300 + serveConfigV2: | + applications: + - name: fruit_app + import_path: fruit.deployment_graph + route_prefix: /fruit + runtime_env: + working_dir: "https://github.com/ray-project/test_dag/archive/78b4a5da38796123d9f9ffff59bab2792a043e95.zip" + deployments: + - name: MangoStand + num_replicas: 1 + user_config: + price: 3 + ray_actor_options: + num_cpus: 0.1 + - name: OrangeStand + num_replicas: 1 + user_config: + price: 2 + ray_actor_options: + num_cpus: 0.1 + - name: PearStand + num_replicas: 1 + user_config: + price: 1 + ray_actor_options: + num_cpus: 0.1 + - name: FruitMarket + num_replicas: 1 + ray_actor_options: + num_cpus: 0.1 + rayClusterConfig: + rayVersion: '2.52.0' + headGroupSpec: + rayStartParams: + dashboard-host: 0.0.0.0 + num-cpus: "0" + serviceType: ClusterIP + template: + metadata: + labels: + test: rayservice-workaround + ray.io/cluster: raycluster-historyserver + spec: + imagePullSecrets: + affinity: + containers: + - env: + - name: RAY_enable_ray_event + value: "true" + - name: RAY_enable_core_worker_ray_event_to_aggregator + value: "true" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR + value: "http://localhost:8084/v1/events" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" + image: rayproject/ray:2.52.0 + imagePullPolicy: IfNotPresent + command: + - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' + securityContext: + allowPrivilegeEscalation: true + privileged: true + name: ray-head + lifecycle: + postStart: + exec: + command: + - /bin/sh + - -lc + - -- + - | + GetNodeId(){ + while true; + do + nodeid=$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*') + if [ -n "$nodeid" ]; then + echo "$(date) raylet started: \"$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*')\" => ${nodeid}" >> /tmp/ray/init.log + echo $nodeid > /tmp/ray/raylet_node_id + break + else + echo "$(date) raylet not start >> /tmp/ray/init.log" + sleep 1 + fi + done + } + GetNodeId + resources: + limits: + cpu: "5" + memory: 10G + requests: + cpu: "50m" + memory: 1G + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + - name: collector + image: collector:v0.1.0 + imagePullPolicy: IfNotPresent + env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://localhost:8265" + - name: S3DISABLE_SSL + value: "true" + - name: AWS_S3ID + value: minioadmin + - name: AWS_S3SECRET + value: minioadmin + - name: AWS_S3TOKEN + value: "" + - name: S3_BUCKET + value: "ray-historyserver" + - name: S3_ENDPOINT + value: "minio-service.minio-dev:9000" + - name: S3_REGION + value: "test" + - name: S3FORCE_PATH_STYLE + value: "true" + - name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA + value: "true" + command: + - collector + - --role=Head + - --runtime-class-name=s3 + - --ray-cluster-name=rayservice-workaround + - --ray-root-dir=log + - --events-port=8084 + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + tolerations: + - key: ray + operator: Equal + value: cpu + volumes: + - name: historyserver + emptyDir: {} + - name: serve-code + configMap: + name: workaround-serve-code + workerGroupSpecs: + - groupName: cpu + maxReplicas: 1000 + minReplicas: 1 + numOfHosts: 1 + rayStartParams: {} + replicas: 1 + template: + metadata: + labels: + test: rayservice-workaround + ray.io/cluster: raycluster-historyserver + spec: + imagePullSecrets: + containers: + - env: + - name: RAY_enable_ray_event + value: "true" + - name: RAY_enable_core_worker_ray_event_to_aggregator + value: "true" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR + value: "http://localhost:8084/v1/events" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" + image: rayproject/ray:2.52.0 + command: + - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' + imagePullPolicy: IfNotPresent + name: ray-worker + securityContext: + allowPrivilegeEscalation: true + privileged: true + lifecycle: + postStart: + exec: + command: + - /bin/sh + - -lc + - -- + - | + GetNodeId(){ + while true; + do + nodeid=$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*') + if [ -n "$nodeid" ]; then + echo "$(date) raylet started: \"$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*')\" => ${nodeid}" >> /tmp/ray/init.log + echo $nodeid > /tmp/ray/raylet_node_id + break + else + echo "$(date) raylet not start >> /tmp/ray/init.log" + sleep 1 + fi + done + } + GetNodeId + resources: + limits: + cpu: "2" + memory: 2G + requests: + cpu: "50m" + memory: 1G + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + - name: collector + image: collector:v0.1.0 + imagePullPolicy: IfNotPresent + env: + - name: RAY_DASHBOARD_ADDRESS + value: "http://rayservice-workaround-head-svc:8265" + - name: AWS_S3ID + value: minioadmin + - name: AWS_S3SECRET + value: minioadmin + - name: AWS_S3TOKEN + value: "" + - name: S3_BUCKET + value: "ray-historyserver" + - name: S3_ENDPOINT + value: "minio-service.minio-dev:9000" + - name: S3_REGION + value: "test" + - name: S3FORCE_PATH_STYLE + value: "true" + - name: S3DISABLE_SSL + value: "true" + command: + - collector + - --role=Worker + - --runtime-class-name=s3 + - --ray-cluster-name=rayservice-workaround + - --ray-root-dir=log + - --events-port=8084 + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + tolerations: + - key: ray + operator: Equal + value: cpu + volumes: + - name: historyserver + emptyDir: {} diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index ac091aa9470..53fc610e717 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -15,29 +15,34 @@ import ( "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" + "github.com/ray-project/kuberay/historyserver/pkg/collector/types" "github.com/ray-project/kuberay/historyserver/pkg/storage" "github.com/ray-project/kuberay/historyserver/pkg/utils" ) type RayLogHandler struct { - Writer storage.StorageWriter - LogFiles chan string - HttpClient *http.Client - ShutdownChan chan struct{} - logFilePaths map[string]bool - MetaDir string - RayClusterName string - LogDir string - RayNodeName string - RayClusterID string - RootDir string - SessionDir string - prevLogsDir string - persistCompleteLogsDir string - PushInterval time.Duration - LogBatching int - filePathMu sync.Mutex - EnableMeta bool + Writer storage.StorageWriter + LogFiles chan string + HttpClient *http.Client + ShutdownChan chan struct{} + logFilePaths map[string]bool + MetaDir string + RayClusterName string + LogDir string + RayNodeName string + RayClusterID string + RootDir string + SessionDir string + prevLogsDir string + persistCompleteLogsDir string + PushInterval time.Duration + LogBatching int + filePathMu sync.Mutex + EnableMeta bool + DashboardAddress string + SupportRayEventUnSupportData bool + MetaCommonUrlInfo []*types.UrlInfo + JobsUrlInfo *types.UrlInfo } func (r *RayLogHandler) Run(stop <-chan struct{}) error { @@ -62,6 +67,10 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes } + //Todo(alex): This should be removed when Ray core implemented events for placement groups, applications, and datasets + if r.SupportRayEventUnSupportData { + go r.PersistMetaLoop(stop) + } <-stop logrus.Info("Received stop signal, processing all logs...") diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go new file mode 100644 index 00000000000..73b66411e01 --- /dev/null +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go @@ -0,0 +1,241 @@ +package logcollector + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "path" + "sync" + "time" + + "github.com/ray-project/kuberay/historyserver/pkg/collector/types" + "github.com/ray-project/kuberay/historyserver/pkg/utils" + "github.com/sirupsen/logrus" +) + +// TODO(alex): This file is just a work around because some ray resource events are not implemented yet. +// We should delete this file after history server can get the resources by ray events + +// JobResourcesUrlInfoMap is a thread-safe map for storing job resource URL information +// TODO(alex): consider to use lru cache if needed in order to prevent memory leak +type JobResourcesUrlInfoMap struct { + data map[string]*types.JobUrlInfo + mu sync.RWMutex +} + +func (j *JobResourcesUrlInfoMap) RLock() { + j.mu.RLock() +} + +func (j *JobResourcesUrlInfoMap) RUnlock() { + j.mu.RUnlock() +} + +func (j *JobResourcesUrlInfoMap) Lock() { + j.mu.Lock() +} + +func (j *JobResourcesUrlInfoMap) Unlock() { + j.mu.Unlock() +} + +func (j *JobResourcesUrlInfoMap) Get(key string) (*types.JobUrlInfo, bool) { + j.RLock() + defer j.RUnlock() + val, ok := j.data[key] + return val, ok +} + +func (j *JobResourcesUrlInfoMap) Set(key string, val *types.JobUrlInfo) { + j.Lock() + defer j.Unlock() + j.data[key] = val +} + +func (j *JobResourcesUrlInfoMap) Delete(key string) { + j.Lock() + defer j.Unlock() + delete(j.data, key) +} + +func (j *JobResourcesUrlInfoMap) Keys() []string { + j.RLock() + defer j.RUnlock() + keys := make([]string, 0, len(j.data)) + for k := range j.data { + keys = append(keys, k) + } + return keys +} + +var jobResourcesUrlInfo = &JobResourcesUrlInfoMap{ + data: make(map[string]*types.JobUrlInfo), +} + +// InitMetaUrlInfo initializes the meta URL info with dashboard address +func (r *RayLogHandler) InitMetaUrlInfo() { + r.MetaCommonUrlInfo = []*types.UrlInfo{ + { + Key: utils.OssMetaFile_Applications, + Url: fmt.Sprintf("%s/api/serve/applications/", r.DashboardAddress), + Type: "URL", + }, + { + Key: utils.OssMetaFile_PlacementGroups, + Url: fmt.Sprintf("%s/api/v0/placement_groups", r.DashboardAddress), + Type: "URL", + }, + } + r.JobsUrlInfo = &types.UrlInfo{ + Key: utils.OssMetaFile_Jobs, + Url: fmt.Sprintf("%s/api/jobs/", r.DashboardAddress), + Type: "URL", + } +} + +func (r *RayLogHandler) PersistMetaLoop(stop <-chan struct{}) { + // create meta directory + if err := r.Writer.CreateDirectory(r.MetaDir); err != nil { + logrus.Errorf("CreateDirectory %s error %v", r.MetaDir, err) + return + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := r.PersistMeta(); err != nil { + logrus.Errorf("Failed to persist meta: %v", err) + } + case <-stop: + logrus.Warnf("Received stop signal, returning from PersistMetaLoop") + return + } + } +} + +func (r *RayLogHandler) PersistMeta() error { + for _, metaurl := range r.MetaCommonUrlInfo { + if _, err := r.PersistUrlInfo(metaurl); err != nil { + logrus.Errorf("Failed to persist URL info for %s: %v", metaurl.Url, err) + // no need break or return + } + } + // Datasets API is called by job ID, so we should handle it in a separate function + r.PersistDatasetsMeta() + + return nil +} + +func (r *RayLogHandler) PersistUrlInfo(urlinfo *types.UrlInfo) ([]byte, error) { + + logrus.Infof("Requesting URL %s for key file %s", urlinfo.Url, urlinfo.Key) + + resp, err := r.HttpClient.Get(urlinfo.Url) + if err != nil { + logrus.Errorf("Failed to request %s: %v", urlinfo.Url, err) + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Errorf("Failed to read response body from %s: %v", urlinfo.Url, err) + return nil, err + } + + // check in memory cache, if the response body is the same with that in cache, skip writting into object store + md5Hash := md5.Sum(body) + md5Hex := hex.EncodeToString(md5Hash[:]) + if md5Hex == urlinfo.Hash { + logrus.Debugf("Meta URL %s response data has not changed, no need to persist", urlinfo.Url) + return body, nil + } + + objectName := path.Join(r.MetaDir, urlinfo.Key) + logrus.Debugf("Creating object %s...", objectName) + err = r.Writer.WriteFile(objectName, bytes.NewReader(body)) + if err != nil { + logrus.Errorf("Failed to create object '%s': %v", objectName, err) + return body, err + } + // Write hash after object store persisted to prevent data inconsistency + urlinfo.Hash = md5Hex + + logrus.Debugf("Successfully created object %s", objectName) + return body, nil +} + +func (r *RayLogHandler) PersistDatasetsMeta() { + body, err := r.PersistUrlInfo(r.JobsUrlInfo) + if err != nil { + logrus.Errorf("Failed to persist meta url %s: %v", r.JobsUrlInfo.Url, err) + return + } + var jobsData = []interface{}{} + if err := json.Unmarshal(body, &jobsData); err != nil { + logrus.Errorf("Ummarshal resp body error %v. key: %s response body: %v", err, r.JobsUrlInfo.Key, jobsData) + return + } + currentJobIDs := make(map[string]string, 0) + for _, jobinfo := range jobsData { + job := jobinfo.(map[string]interface{}) + jobid, ok := job["job_id"].(string) + if !ok { + continue + } + status, ok := job["status"].(string) + if !ok { + continue + } + currentJobIDs[jobid] = status + } + + for jobID, status := range currentJobIDs { + if existingJob, ok := jobResourcesUrlInfo.Get(jobID); !ok { + // Add new job + jobResourcesUrlInfo.Set(jobID, &types.JobUrlInfo{ + Url: &types.UrlInfo{ + Key: fmt.Sprintf("%s%s", utils.OssMetaFile_JOBDATASETS_Prefix, jobID), + Url: fmt.Sprintf("%s/api/data/datasets/%s", r.DashboardAddress, jobID), + }, + Status: status, + }) + } else if !existingJob.StopPersist { + // Update status for existing jobs only if not already stopped persisting + existingJob.Status = status + } + } + + // Process each job individually to avoid holding lock for too long + allJobIDs := jobResourcesUrlInfo.Keys() + for _, jobID := range allJobIDs { + urlInfo, ok := jobResourcesUrlInfo.Get(jobID) + if !ok { + continue + } + + if urlInfo.StopPersist { + continue + } + + var isPersistentSuccess = true + if _, err := r.PersistUrlInfo(urlInfo.Url); err != nil { + logrus.Errorf("Persis task UrlInfo %s failed, error %v", urlInfo.Url.Url, err) + isPersistentSuccess = false + } + + if urlInfo.Status == types.JOBSTATUS_FAILED || + urlInfo.Status == types.JOBSTATUS_STOPPED || + urlInfo.Status == types.JOBSTATUS_SUCCEEDED { + // Only mark StopPersist when persistent success in order to prevent data inconsistency + if isPersistentSuccess { + urlInfo.StopPersist = true + } + } + } +} diff --git a/historyserver/pkg/collector/logcollector/runtime/runtime.go b/historyserver/pkg/collector/logcollector/runtime/runtime.go index caa77ca4100..db2a1c60516 100644 --- a/historyserver/pkg/collector/logcollector/runtime/runtime.go +++ b/historyserver/pkg/collector/logcollector/runtime/runtime.go @@ -36,10 +36,14 @@ func NewCollector(config *types.RayCollectorConfig, writer storage.StorageWriter IdleConnTimeout: 90 * time.Second, // Idle connection timeout }, }, - Writer: writer, - ShutdownChan: make(chan struct{}), + Writer: writer, + ShutdownChan: make(chan struct{}), + DashboardAddress: config.DashboardAddress, + SupportRayEventUnSupportData: config.SupportRayEventUnSupportData, } logDir := strings.TrimSpace(filepath.Join(config.SessionDir, utils.RAY_SESSIONDIR_LOGDIR_NAME)) + // Initialize meta URL info + handler.InitMetaUrlInfo() handler.LogDir = logDir // rootMetaDir uses flat key format (name_id) for S3/OSS performance optimization. // See utils.connector for the design rationale. diff --git a/historyserver/pkg/collector/types/types.go b/historyserver/pkg/collector/types/types.go index c8457fec70e..54627be063f 100644 --- a/historyserver/pkg/collector/types/types.go +++ b/historyserver/pkg/collector/types/types.go @@ -20,6 +20,30 @@ type RayCollectorConfig struct { RayClusterID string LogBatching int PushInterval time.Duration + + DashboardAddress string + SupportRayEventUnSupportData bool +} + +type UrlInfo struct { + Key string + Url string + Hash string + Type string +} + +const ( + JOBSTATUS_PENDING = "PENDING" + JOBSTATUS_RUNNING = "RUNNING" + JOBSTATUS_STOPPED = "STOPPED" + JOBSTATUS_SUCCEEDED = "SUCCEEDED" + JOBSTATUS_FAILED = "FAILED" +) + +type JobUrlInfo struct { + Url *UrlInfo + Status string + StopPersist bool } // ValidateRayHanderConfig is diff --git a/historyserver/pkg/historyserver/reader.go b/historyserver/pkg/historyserver/reader.go index 644e71c39e9..a959187b434 100644 --- a/historyserver/pkg/historyserver/reader.go +++ b/historyserver/pkg/historyserver/reader.go @@ -158,3 +158,19 @@ func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, er templ["data"].(map[string]interface{})["summary"] = nodeSummary return json.Marshal(templ) } + +func (s *ServerHandler) MetaKeyInfo(rayClusterNameID, key string) []byte { + baseObject := path.Join(utils.RAY_SESSIONDIR_METADIR_NAME, key) + logrus.Infof("Prepare to get object %s info ...", baseObject) + body := s.reader.GetContent(rayClusterNameID, baseObject) + if body == nil { + logrus.Warnf("Failed to get content from object %s : body is nil", baseObject) + return nil + } + data, err := io.ReadAll(body) + if err != nil { + logrus.Errorf("Failed to read all data from object %s : %v", baseObject, err) + return nil + } + return data +} diff --git a/historyserver/pkg/historyserver/router.go b/historyserver/pkg/historyserver/router.go index e705d191f7a..9dc56c64371 100644 --- a/historyserver/pkg/historyserver/router.go +++ b/historyserver/pkg/historyserver/router.go @@ -96,7 +96,7 @@ func routerAPI(s *ServerHandler) { Writes("")) // Placeholder for specific return type ws.Route(ws.GET("/serve/applications/").To(s.getServeApplications).Filter(s.CookieHandle). - Doc("get appliations"). + Doc("get applications"). Writes("")) // Placeholder for specific return type ws.Route(ws.GET("/v0/placement_groups/").To(s.getPlacementGroups).Filter(s.CookieHandle). @@ -104,7 +104,7 @@ func routerAPI(s *ServerHandler) { Writes("")) // Placeholder for specific return type ws.Route(ws.GET("/v0/logs").To(s.getNodeLogs).Filter(s.CookieHandle). - Doc("get appliations").Param(ws.QueryParameter("node_id", "node_id")). + Doc("get logs").Param(ws.QueryParameter("node_id", "node_id")). Writes("")) // Placeholder for specific return type ws.Route(ws.GET("/v0/logs/file").To(s.getNodeLogFile).Filter(s.CookieHandle). Doc("get logfile").Param(ws.QueryParameter("node_id", "node_id")). @@ -478,14 +478,40 @@ func (s *ServerHandler) getDatasets(req *restful.Request, resp *restful.Response } func (s *ServerHandler) getServeApplications(req *restful.Request, resp *restful.Response) { + clusterName := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string) + clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string) sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string) + if sessionName == "live" { s.redirectRequest(req, resp) return } - // Return "not yet supported" for serve applications - resp.WriteErrorString(http.StatusNotImplemented, "Serve applications not yet supported") + // Dead (historical) cluster: + // Collector has already periodically persisted the live Ray Dashboard + // /api/serve/applications/ response into the meta directory under the + // OssMetaFile_Applications key. Here we simply read that snapshot and + // return it so that live and historical clusters share the same response + // format. + rayClusterNameID := fmt.Sprintf("%s_%s", clusterName, clusterNamespace) + data := s.MetaKeyInfo(rayClusterNameID, utils.OssMetaFile_Applications) + + // If no metadata has been persisted yet, return an empty applications + // payload matching Ray's ServeDetails schema. + if len(data) == 0 { + empty := map[string]interface{}{ + "applications": map[string]interface{}{}, + } + var err error + data, err = json.Marshal(empty) + if err != nil { + logrus.Errorf("Failed to marshal empty serve applications response: %v", err) + resp.WriteErrorString(http.StatusInternalServerError, err.Error()) + return + } + } + + resp.Write(data) } func (s *ServerHandler) getPlacementGroups(req *restful.Request, resp *restful.Response) { diff --git a/historyserver/test/e2e/historyserver_test.go b/historyserver/test/e2e/historyserver_test.go index 449340ac6ae..b71aa0d6fb0 100644 --- a/historyserver/test/e2e/historyserver_test.go +++ b/historyserver/test/e2e/historyserver_test.go @@ -48,6 +48,10 @@ func TestHistoryServer(t *testing.T) { name: "/v0/logs/file endpoint (dead cluster)", testFunc: testLogFileEndpointDeadCluster, }, + { + name: "/api/serve/applications endpoint (dead cluster)", + testFunc: testServeApplicationsEndpointDeadCluster, + }, } for _, tt := range tests { @@ -326,3 +330,64 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names DeleteS3Bucket(test, g, s3Client) LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed") } + +// testServeApplicationsEndpointDeadCluster verifies that the history server can +// serve the /api/serve/applications endpoint from object storage after a +// cluster is deleted. +// +// The test case follows these steps: +// 1. Prepare test environment by applying a Ray cluster +// 2. Submit a Ray job to the existing cluster +// 3. Delete RayCluster to trigger meta upload to storage +// 4. Apply History Server and get its URL +// 5. Verify that the history server can serve /api/serve/applications +// (status 200 and JSON body with an "applications" field) +// 6. Delete S3 bucket to ensure test isolation +func testServeApplicationsEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := PrepareTestEnv(test, g, namespace, s3Client) + ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster) + + // Delete RayCluster to trigger meta upload + err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s for serve applications test", namespace.Name, rayCluster.Name) + + // Wait for cluster to be fully deleted to ensure data has been uploaded. + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayCluster.Name) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + + ApplyHistoryServer(test, g, namespace) + historyServerURL := GetHistoryServerURL(test, g, namespace) + + clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name) + g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName), "Deleted cluster should not have sessionName='live'") + + client := CreateHTTPClientWithCookieJar(g) + setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName) + + test.T().Run("should return serve applications from storage", func(t *testing.T) { + g := NewWithT(t) + g.Eventually(func(gg Gomega) { + url := fmt.Sprintf("%s/api/serve/applications/", historyServerURL) + resp, err := client.Get(url) + gg.Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(resp.StatusCode).To(Equal(http.StatusOK), + "Endpoint %s should return 200, got %d: %s", url, resp.StatusCode, string(body)) + + var payload map[string]any + err = json.Unmarshal(body, &payload) + gg.Expect(err).NotTo(HaveOccurred()) + _, ok := payload["applications"] + gg.Expect(ok).To(BeTrue(), "response should contain 'applications' field") + }, TestTimeoutShort).Should(Succeed()) + }) + + DeleteS3Bucket(test, g, s3Client) + LogWithTimestamp(test.T(), "Serve applications endpoint (dead cluster) test completed") +} diff --git a/historyserver/test/support/historyserver.go b/historyserver/test/support/historyserver.go index 67b5bdaff3f..e2f5effc8e7 100644 --- a/historyserver/test/support/historyserver.go +++ b/historyserver/test/support/historyserver.go @@ -59,10 +59,10 @@ const ( // - /api/prometheus_health // - /api/data/datasets/{job_id} // - /api/jobs -// - /api/serve/applications // - /api/v0/placement_groups var HistoryServerEndpoints = []string{ "/nodes?view=summary", + "/api/serve/applications/", "/api/v0/tasks", "/api/v0/tasks/summarize", "/logical/actors",