-
Couldn't load subscription status.
- Fork 4
Description
Hi,
First things first and thanks for the awesome work, truly appreciate it! I'm evaluating the use of ray with dagster and dagster-ray fits almost all the requirements I need in order to use a ray cluster the executor engine for dagster operations.
Just to clarify the problem state, what I'm looking to do the following: install specific libraries with specific versions for certain operations.
I've dig a bit into how the job submission is done to ray and, from what I can gather, pip or conda package dependencies are indeed sent to the ray cluster where conda environments are built on runtime with the specified set of package dependencies that are submitted to each ray job but it does not use these environments when running the ray jobs. It instead runs the default conda environment running on the ray head node instead of the "custom" conda envs. I've tried to figure out what config would fetch the generated conda env runtime but I couldn't find the answer for this.
The work around I use at the moment is to use a deployment with all the necessary packages that I use on all the dagster jobs and have a shared set of packages between all resources running on dagster. Not ideal, but it works
The issue can be reproduced by the following script that I've slightly modified from one of the examples on this repo.
import time
from dagster import Config, Definitions, OpExecutionContext, job, op
from dagster_ray.executor import ray_executor
class MyConfig(Config):
sleep_for: int = 3
@op
def return_one(context: OpExecutionContext, config: MyConfig) -> int:
context.log.debug(f"sleeping for {config.sleep_for} seconds...")
time.sleep(config.sleep_for)
return 1
@op
def return_two(context: OpExecutionContext, config: MyConfig) -> int:
context.log.info(f"sleeping for {config.sleep_for} seconds...")
time.sleep(config.sleep_for)
context.log.info("Waking up!")
return 2
@op(
tags={
"dagster-ray/config": {
"runtime_env": {"pip": {"packages": ["dagster", "faker"]}},
}
}
)
def return_three(context: OpExecutionContext, config: MyConfig) -> int:
context.log.info(f"sleeping for {config.sleep_for} seconds...")
time.sleep(config.sleep_for)
context.log.info("Waking up!")
import faker
faker = faker.Faker()
print(faker)
return 3
@op
def sum_one_and_two(a: int, b: int) -> int:
res = a + b
assert res == 3
return res
@op
def sum_results(a: int, b: int) -> int:
res = a + b
return res
@job(executor_def=ray_executor)
def my_job():
return_two_result = return_two()
return_one_result = return_one()
return_three_result = return_three()
result = sum_one_and_two(return_one_result, return_two_result)
print(sum_results(return_three_result, result))
definitions = Definitions(
jobs=[my_job],
)
I don't know if this is the intended behavior of dagster-ray to rely on the base environment to already have all the necessary packages pre-installed to be shared between all jobs, but, if not, if it would be possible to have some clarification about this it would be nice to have to help me assess if this should/will be supported or not by dagster-ray.
Cheers
Edit: I'm testing the latest release for dagster-ray=0.0.11 with python 3.12.