Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3d9bae4
add meta file
popojk Jan 23, 2026
f2633bf
add metadata collect logic
popojk Jan 26, 2026
a119251
add dataset collect logic
popojk Jan 27, 2026
a620950
minor fix
popojk Jan 27, 2026
896d996
add MetaKeyInfo helper for metadata retrieval.
fweilun Jan 27, 2026
40fd1fd
Merge branch 'master' into add_metadata_write_logic_in_history_server
fweilun Jan 27, 2026
e478fa9
nit fix
popojk Jan 28, 2026
97a442d
Merge remote-tracking branch 'origin/add_metadata_write_logic_in_hist…
popojk Jan 28, 2026
f91d13f
refactor
popojk Jan 28, 2026
f8ce56e
check nil for MetaKeyInfo function
fweilun Jan 28, 2026
1ed1ea0
correct MetaKeyInfo path.
fweilun Jan 28, 2026
1c1c361
fix potential meta collector bug
popojk Jan 29, 2026
688c3eb
Merge remote-tracking branch 'origin/add_metadata_write_logic_in_hist…
popojk Jan 29, 2026
750d7a8
refactor meta.go code
popojk Jan 29, 2026
1f665f6
add meta collect enable and dashboard url envvar
popojk Feb 1, 2026
0f34112
fix url info cache issue
popojk Feb 1, 2026
6e1ce62
fix e2e test error
popojk Feb 1, 2026
e58dae8
resolve conflict
popojk Feb 1, 2026
a2b9fc8
fix meta collect enable logic
popojk Feb 1, 2026
89db5fd
add demo script
popojk Feb 1, 2026
2c49cd6
add enable meta collect config to ray cluster yaml
popojk Feb 1, 2026
d603e09
resolve conflict
popojk Feb 3, 2026
aadca4b
add rayservice test yaml
popojk Feb 5, 2026
3536abe
Merge branch 'master' into add_metadata_write_logic_in_history_server
400Ping Feb 5, 2026
a5cd2a0
fix ci error
400Ping Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
go r.WatchPrevLogsLoops()
if r.EnableMeta {
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
go r.PersistMetaLoop(stop) //Todo(alex): This should be removed when Ray core implemented events for placement groups, applications, and datasets
}

select {
Expand Down
240 changes: 240 additions & 0 deletions historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package logcollector

import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"path"
"sync"
"time"

"github.com/ray-project/kuberay/historyserver/pkg/collector/types"
"github.com/ray-project/kuberay/historyserver/pkg/utils"
"github.com/sirupsen/logrus"
)

// TODO(alex): This file is just a work around because some ray resource events are not implemented yet.
// We should delete this file after history server can get the resources by ray events

// JobResourcesUrlInfoMap is a thread-safe map for storing job resource URL information
// TODO(alex): consider to use lru cache if needed in order to prevent memory leak
type JobResourcesUrlInfoMap struct {
data map[string]*types.JobUrlInfo
mu sync.RWMutex
}

func (j *JobResourcesUrlInfoMap) RLock() {
j.mu.RLock()
}

func (j *JobResourcesUrlInfoMap) RUnlock() {
j.mu.RUnlock()
}

func (j *JobResourcesUrlInfoMap) Lock() {
j.mu.Lock()
}

func (j *JobResourcesUrlInfoMap) Unlock() {
j.mu.Unlock()
}

func (j *JobResourcesUrlInfoMap) Get(key string) (*types.JobUrlInfo, bool) {
j.RLock()
defer j.RUnlock()
val, ok := j.data[key]
return val, ok
}

func (j *JobResourcesUrlInfoMap) Set(key string, val *types.JobUrlInfo) {
j.Lock()
defer j.Unlock()
j.data[key] = val
}

func (j *JobResourcesUrlInfoMap) Delete(key string) {
j.Lock()
defer j.Unlock()
delete(j.data, key)
}

func (j *JobResourcesUrlInfoMap) Keys() []string {
j.RLock()
defer j.RUnlock()
keys := make([]string, 0, len(j.data))
for k := range j.data {
keys = append(keys, k)
}
return keys
}

var metaCommonUrlInfo = []*types.UrlInfo{
{
Key: utils.OssMetaFile_Applications,
Url: "http://localhost:8265/api/serve/applications/",
Type: "URL",
},
{
Key: utils.OssMetaFile_PlacementGroups,
Url: "http://localhost:8265/api/v0/placement_groups",
Type: "URL",
},
}

var jobsUrlInfo = &types.UrlInfo{
Key: utils.OssMetaFile_Jobs,
Url: "http://localhost:8265/api/jobs/",
Type: "URL",
}

var jobResourcesUrlInfo = &JobResourcesUrlInfoMap{
data: make(map[string]*types.JobUrlInfo),
}

func (r *RayLogHandler) PersistMetaLoop(stop <-chan struct{}) {
// create meta directory
if err := r.Writer.CreateDirectory(r.MetaDir); err != nil {
logrus.Errorf("CreateDirectory %s error %v", r.MetaDir, err)
return
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := r.PersistMeta(); err != nil {
logrus.Errorf("Failed to persist meta: %v", err)
}
case <-stop:
logrus.Warnf("Received stop signal, returning from PersistMetaLoop")
return
}
}
}

func (r *RayLogHandler) PersistMeta() error {
for _, metaurl := range metaCommonUrlInfo {
if _, err := r.PersistUrlInfo(metaurl); err != nil {
logrus.Errorf("Failed to persist URL info for %s: %v", metaurl.Url, err)
// no need break or return
}
}
// Datasets API is called by job ID, so we should handle it in a separate function
r.PersistDatasetsMeta()

return nil
}

func (r *RayLogHandler) PersistUrlInfo(urlinfo *types.UrlInfo) ([]byte, error) {

logrus.Infof("Requesting URL %s for key file %s", urlinfo.Url, urlinfo.Key)

resp, err := r.HttpClient.Get(urlinfo.Url)
if err != nil {
logrus.Errorf("Failed to request %s: %v", urlinfo.Url, err)
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
logrus.Errorf("Failed to read response body from %s: %v", urlinfo.Url, err)
return nil, err
}

// check in memory cache, if the response body is the same with that in cache, skip writting into object store
md5Hash := md5.Sum(body)
md5Hex := hex.EncodeToString(md5Hash[:])
if md5Hex == urlinfo.Hash {
logrus.Debugf("Meta URL %s response data has not changed, no need to persist", urlinfo.Url)
return body, nil
}

objectName := path.Join(r.MetaDir, urlinfo.Key)
logrus.Debugf("Creating object %s...", objectName)
err = r.Writer.WriteFile(objectName, bytes.NewReader(body))
if err != nil {
logrus.Errorf("Failed to create object '%s': %v", objectName, err)
return body, err
}
// Write hash after object store persisted to prevent data inconsistency
urlinfo.Hash = md5Hex

logrus.Debugf("Successfully created object %s", objectName)
return body, nil
}

func (r *RayLogHandler) PersistDatasetsMeta() {

body, err := r.PersistUrlInfo(jobsUrlInfo)
if err != nil {
logrus.Errorf("Failed to persist meta url %s: %v", jobsUrlInfo.Url, err)
return
}
var jobsData = []interface{}{}
if err := json.Unmarshal(body, &jobsData); err != nil {
logrus.Errorf("Ummarshal resp body error %v. key: %s response body: %v", err, jobsUrlInfo.Key, jobsData)
return
}
currentJobIDs := make(map[string]string, 0)
for _, jobinfo := range jobsData {
job := jobinfo.(map[string]interface{})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe type assertion may cause panic

Medium Severity

The type assertion jobinfo.(map[string]interface{}) uses the bare form without checking if the assertion succeeded. If the API returns unexpected data (e.g., a null element in the jobs array or a different data structure), this will panic. Other code in this codebase uses the ok-form pattern for similar assertions.

Fix in Cursor Fix in Web

jobid, ok := job["job_id"].(string)
if !ok {
continue
}
status, ok := job["status"].(string)
if !ok {
continue
}
currentJobIDs[jobid] = status
}

for jobID, status := range currentJobIDs {
if existingJob, ok := jobResourcesUrlInfo.Get(jobID); !ok {
// Add new job
jobResourcesUrlInfo.Set(jobID, &types.JobUrlInfo{
Url: &types.UrlInfo{
Key: fmt.Sprintf("%s%s", utils.OssMetaFile_JOBDATASETS_Prefix, jobID),
Url: fmt.Sprintf("http://localhost:8265/api/data/datasets/%s", jobID),
},
Status: status,
})
} else if !existingJob.StopPersist {
// Update status for existing jobs only if not already stopped persisting
existingJob.Status = status
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition modifying struct fields outside mutex

Medium Severity

After calling jobResourcesUrlInfo.Get(), the returned *types.JobUrlInfo pointer is modified (existingJob.Status and urlInfo.StopPersist) outside of any lock. The Get method releases its read lock before the modifications occur, creating a data race if these fields are accessed concurrently.

Additional Locations (1)

Fix in Cursor Fix in Web

}
}

// Process each job individually to avoid holding lock for too long
allJobIDs := jobResourcesUrlInfo.Keys()
for _, jobID := range allJobIDs {
urlInfo, ok := jobResourcesUrlInfo.Get(jobID)
if !ok {
continue
}

if urlInfo.StopPersist {
continue
}

var isPersistentSuccess = true
if _, err := r.PersistUrlInfo(urlInfo.Url); err != nil {
logrus.Errorf("Persis task UrlInfo %s failed, error %v", urlInfo.Url.Url, err)
isPersistentSuccess = false
}

if urlInfo.Status == types.JOBSTATUS_FAILED ||
urlInfo.Status == types.JOBSTATUS_STOPPED ||
urlInfo.Status == types.JOBSTATUS_SUCCEEDED {
// Only mark StopPersist when persistent success in order to prevent data inconsistency
if isPersistentSuccess {
urlInfo.StopPersist = true
}
}
}
}
21 changes: 21 additions & 0 deletions historyserver/pkg/collector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ type RayCollectorConfig struct {
PushInterval time.Duration
}

type UrlInfo struct {
Key string
Url string
Hash string
Type string
}

const (
JOBSTATUS_PENDING = "PENDING"
JOBSTATUS_RUNNING = "RUNNING"
JOBSTATUS_STOPPED = "STOPPED"
JOBSTATUS_SUCCEEDED = "SUCCEEDED"
JOBSTATUS_FAILED = "FAILED"
)

type JobUrlInfo struct {
Url *UrlInfo
Status string
StopPersist bool
}

// ValidateRayHanderConfig is
func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList {
var allErrs field.ErrorList
Expand Down
16 changes: 16 additions & 0 deletions historyserver/pkg/historyserver/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,19 @@ func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, er
func (h *ServerHandler) getGrafanaHealth(req *restful.Request, resp *restful.Response) {
resp.WriteErrorString(http.StatusNotImplemented, "Grafana health not yet supported")
}

func (s *ServerHandler) MetaKeyInfo(rayClusterNameID, key string) []byte {
baseObject := path.Join(utils.RAY_SESSIONDIR_METADIR_NAME, key)
logrus.Infof("Prepare to get object %s info ...", baseObject)
body := s.reader.GetContent(rayClusterNameID, baseObject)
if body == nil {
logrus.Warnf("Failed to get content from object %s : body is nil", baseObject)
return nil
}
data, err := io.ReadAll(body)
if err != nil {
logrus.Errorf("Failed to read all data from object %s : %v", baseObject, err)
return nil
}
return data
}
Loading