-
|
Sources from slack: https://dagster.slack.com/archives/C01U954MEER/p1693262962775569 What is the best way to model non-strict dependencies? That is, I have an operation op that takes the result of some other (potentially complex) subgraphs (let’s say three subgraphs a, b and c) and combines them into one result. So, op only fails if all of the subgraphs a, b and c fail. But if, for example, a fails but b and c both return a result, op still succeeds and returns a result that combines the results of b and c. Now, since each one of a , b and c in this example can be complex graphs which can fail at different stages of the computation, I need to model op so that it waits until, for each of a, b and c , either the result is ready or it is determined that they cannot have a result and then op is run and the available results for a , b and c are passed to it. The potential use case for this is. e.g., when you want to do ensemble learning and combine predictors of different estimations for the same problem. So, if one predictor fails and cannot provide a value, it should not stop the whole prediction process. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
|
Currently, upstream failure will always cause downstream ops to be skipped. Here's an issue where we're tracking loosening this: #15761. One workaround is a combination of:
Here's an example: from dagster import op, job, Out, Output
@op(out=Out(is_required=False))
def op1():
try:
yield Output(5)
except:
...
@op(out=Out(is_required=False))
def op2():
try:
raise RuntimeError("something")
yield Output(6)
except:
...
@op
def op3(in1: list[int]):
assert in1 == [5]
@job
def job1():
op3([op1(), op2()])
|
Beta Was this translation helpful? Give feedback.
-
|
For our use case, we generate ephemeral pvc for a run where we cache model weights. If our inferencing fails we want to clean up all of our ephemeral volumes regardless. We can set time based cleanups for these pvc's, but it seems a lot cleaner to pass the pvc name down as an output and have it run after the rest of my ops complete. Something that could work in the interim is a from dagster import op, job, Out, Output, DynamicOut, DynamicOutput
@op(out={'pvc_ref': Out(str), 'inference_workload': DynamicOutput(Workload)})
def cache_model_weights():
# cache model weights into pvc
yield Output('pvc-name', output_name='pvc_ref')
# configure and dispatch inferencing steps
for idx, workload in enumerate(workloads):
yield DynamicOutput(workload, str(idx), 'inference_workload')
@op
def wait_and_terminate_cache(context, pvc_ref: str):
context.wait_for_ops_to_complete({'run_workload'})
delete_pvc(pvc_ref)
@op
def run_workload(context, workload: Workload):
weights_cache_dir = workload.pvc_cache
# run inference step
...
@job
def model_inference_job():
pvc_ref, workloads = cache_model_weights()
wait_and_terminate_cache(pvc_ref)
workloads.map(run_workload).collect() |
Beta Was this translation helpful? Give feedback.
Currently, upstream failure will always cause downstream ops to be skipped. Here's an issue where we're tracking loosening this: #15761.
One workaround is a combination of:
Here's an example:
o…