-
Notifications
You must be signed in to change notification settings - Fork 7k
Description
What happened + What you expected to happen
I am running an actor which returns a large numpy array and a boolean and i want to wait for it to complete and pass the numpy array to another actor
so i use in my script ray.wait with fetch_local=False
i have ray head and ray worker, the actor runs on the ray worker so i would expect the ray head memory to stay the same
however when i look at the dashboard, i see that a large part of the head object store is filled with data, even though it shouldn't have
when i switch the ray.wait to ray.get but for the small reference value, i don't see the ray head object store being filled
I am sure its not just an issue with the dashboard, as first i defined the ray head memory limit (docker) to be small and it crashed due to this
On a side note, even with ray.get when i run "ray memory" i see large references in the ray head even though it doesn't seem to be copied to it any more.
Versions / Dependencies
Python: 3.12
Linux ubuntu: 22.04
Ray: 2.44.0
Running two ray containers, each with one the following commands
ray start --head --port=9654 --num-cpus=0 --block --dashboard-host=0.0.0.0 --object-store-memory=4294967296
ray start --address=172.17.0.4:9654 --num-cpus=0 --block --resources='{"Foo":7}' --object-store-memory=4294967296
Reproduction script
import ray
import numpy as np
import time
@ray.remote(num_cpus=0, max_restarts=-1, max_task_retries=1)
class TestActor:
def __init__(self):
pass
@ray.method(num_returns=1)
def wait(self):
return 1
@ray.method(num_returns=2)
def run_task_return_big_nd_array(self):
big_array = np.zeros((10000, 10000), dtype=np.uint16)
done = True
return done, big_array
def run_and_wait():
ray.init(address="ray://172.17.0.4:10001")
ray_actor = TestActor.options(resources={"Foo": 1}).remote()
ray_actor.wait.remote()
refs = []
for i in range(10):
done, res_ref = ray_actor.run_task_return_big_nd_array.remote()
ready, not_ready = ray.wait([done, res_ref], num_returns=2, fetch_local=False)
#d = ray.get(done)
refs.append([done, res_ref])
time.sleep(10)
ray.kill(ray_actor)
if name == 'main':
run_and_wait()
Issue Severity
Medium: It is a significant difficulty but I can work around it.