[History Server][Feat] Add metadata collect logic#4431
[History Server][Feat] Add metadata collect logic#4431popojk wants to merge 25 commits intoray-project:masterfrom
Conversation
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
|
I think overall looks good |
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
fweilun
left a comment
There was a problem hiding this comment.
nit. Overall, looks great.
| var metaCommonUrlInfo = []*types.UrlInfo{ | ||
| &types.UrlInfo{Key: utils.OssMetaFile_Applications, | ||
| Url: "http://localhost:8265/api/serve/applications/", | ||
| Type: "URL", | ||
| }, | ||
|
|
||
| &types.UrlInfo{ | ||
| Key: utils.OssMetaFile_PlacementGroups, | ||
| Url: "http://localhost:8265/api/v0/placement_groups", | ||
| Type: "URL", | ||
| }, | ||
| } |
There was a problem hiding this comment.
Is &types.UrlInfo removable?
The slice type is already declared as []*types.UrlInfo.
| return body, nil | ||
| } | ||
|
|
||
| func (r *RayLogHandler) PersisDatasetsMeta() { |
There was a problem hiding this comment.
Looks like a typo here. (Persist)
400Ping
left a comment
There was a problem hiding this comment.
Overall LGTM, please resolve conflicts.
Signed-off-by: fweilun <william20020503@gmail.com>
| } | ||
| currentJobIDs := make(map[string]string, 0) | ||
| for _, jobinfo := range jobsData { | ||
| job := jobinfo.(map[string]interface{}) |
There was a problem hiding this comment.
Unsafe type assertion may cause panic
Medium Severity
The type assertion jobinfo.(map[string]interface{}) uses the bare form without checking if the assertion succeeded. If the API returns unexpected data (e.g., a null element in the jobs array or a different data structure), this will panic. Other code in this codebase uses the ok-form pattern for similar assertions.
historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go
Outdated
Show resolved
Hide resolved
…ory_server' into add_metadata_write_logic_in_history_server Signed-off-by: Alex Wu <c.alexwu@gmail.com>
historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go
Outdated
Show resolved
Hide resolved
historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go
Outdated
Show resolved
Hide resolved
historyserver/pkg/collector/logcollector/runtime/logcollector/meta.go
Outdated
Show resolved
Hide resolved
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
…ory_server' into add_metadata_write_logic_in_history_server Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Future-Outlier
left a comment
There was a problem hiding this comment.
Hi, @popojk @400Ping @fweilun can you give me an example to test these 3 endpoints?
currently I only see 1 example.
ray job submit --address http://localhost:8265 -- python -c "
import ray
ray.init(address='auto')
pg = ray.util.placement_group(name='my_pg', strategy='PACK', bundles=[{'CPU': 0.2}])
ray.get(pg.ready())
print(f'Created: {pg.id.hex()}')
"
Future-Outlier
left a comment
There was a problem hiding this comment.
-
I need you to provide an example that can run end-to-end and lets me view, via the Ray Dashboard, information for:
- Serve applications
- Dataset endpoints
- Placement group endpoints
And please record a video showing it.
a. If possible, add a new file named
rayjob-workaround.yamland include the example there.
Try to make it look as similar torayjob.yamlas possible. -
There’s a mistake in the PR description: running
kubectl apply -f historyserver/config/historyserver.yamlwill not create a RayCluster. -
For the collector, please add an environment variable to enable this “persist metadata” function (when enabled, it should try to fetch data from blob storage).
env:
- name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA
value: 1 # or true- The endpoints will hit port 8265. Is there a way to know the Ray Dashboard port ahead of time? Hardcoding it doesn’t feel great.
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
| }) | ||
| } else if !existingJob.StopPersist { | ||
| // Update status for existing jobs only if not already stopped persisting | ||
| existingJob.Status = status |
There was a problem hiding this comment.
Race condition modifying struct fields outside mutex
Medium Severity
After calling jobResourcesUrlInfo.Get(), the returned *types.JobUrlInfo pointer is modified (existingJob.Status and urlInfo.StopPersist) outside of any lock. The Get method releases its read lock before the modifications occur, creating a data race if these fields are accessed concurrently.
Additional Locations (1)
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
|
Here's the yaml I used to test: |
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go
Show resolved
Hide resolved
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
| dashboardAddress := os.Getenv("RAY_DASHBOARD_ADDRESS") | ||
| if dashboardAddress == "" { | ||
| panic(fmt.Errorf("missing RAY_DASHBOARD_ADDRESS in environment variables")) | ||
| } |
There was a problem hiding this comment.
Worker nodes panic when RAY_DASHBOARD_ADDRESS is unset
Medium Severity
The collector unconditionally panics if RAY_DASHBOARD_ADDRESS is not set, regardless of node role. However, the dashboard address is only needed when SupportRayEventUnSupportData is enabled, which only applies to Head nodes. Worker nodes will crash unnecessarily if this environment variable is missing, even though they never use it for metadata collection.
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
@Future-Outlier Changes made:
|
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Jie-Kai Chang <jiekaichang@apache.org>
Signed-off-by: 400Ping <jiekaichang@apache.org>


Why are these changes needed?
We need to implement metadata collect logic in order to implement dead cluster behavior for bellow history server API endpoints:
/api/data/datasets/{job_id}
/api/serve/applications/
/api/v0/placement_groups/
Related issue number
#4384
#4385
#4386
Tests
The new added features are more about web api calls, which is hard to do unit test. So we do bellow manual tests:
kubectl apply -f historyserver/config/raycluster.yaml, which will run our collector as sidecar container2.Execute
kubectl port-forward svc/raycluster-historyserver-head-svc 8265:8265in order to access our ray cluster3.Execute bellow command to add a placement group to ray cluster
4.Check minio, the placement group data is collected and stored

5.Execute

curl -sS http://localhost:8265/api/v0/placement_groupsto get data from ray cluster directly, we get the same dataExecute
kubectl apply -f historyserver/config/rayjob.yamlto test if datasets data will be collectedDatasets data were collected as expected
Checks