Skip to content

Pass DagRun to task_instance_mutation_hook for run-aware task mutation#68198

Open
dheerajturaga wants to merge 2 commits into
apache:mainfrom
dheerajturaga:feature/mutation-hook-dag-run
Open

Pass DagRun to task_instance_mutation_hook for run-aware task mutation#68198
dheerajturaga wants to merge 2 commits into
apache:mainfrom
dheerajturaga:feature/mutation-hook-dag-run

Conversation

@dheerajturaga
Copy link
Copy Markdown
Member

Cluster policies could mutate a TaskInstance before scheduling but had no supported way to read the DagRun it belongs to -- so routing a task to a queue based on the run's configuration was impossible. Reaching the DagRun via TaskInstance.get_dagrun() opens a nested committing session that trips the scheduler's prohibit_commit guard and crashes the scheduler.

Add an optional dag_run argument to the task_instance_mutation_hook policy hook and thread the in-scope DagRun through every scheduler-side call site (no extra database access). A hook can now route on dag_run.conf, for example:

def task_instance_mutation_hook(task_instance, dag_run=None):
    if dag_run is not None and (dag_run.conf or {}).get("route") == "high":
        task_instance.queue = "high_priority_queue"

Backward compatibility is preserved through the local-settings plugin shim. Pluggy passes a hookimpl only the parameters it declares without a default, so this commit also makes make_plugin_from_local_settings register a function as-is when its parameters are a subset of the hookspec's, and shim it (forwarding the call positionally) when it renames a parameter or gives a hookspec parameter a default value. As a result every signature keeps working: the legacy single-argument hook, an explicit (task_instance, dag_run) hook, and the ergonomic (task_instance, dag_run=None) hook all behave correctly; a hook declaring an unknown parameter is still rejected.

The dag_run may be None during early task-instance construction (when the hook is re-applied from refresh_from_task before the instance is bound to a run), and the hook must not open a new session; both are documented on the hookspec and in the cluster-policies guide, which also corrects the prior claim that the hook runs on the worker.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    ClaudeCode Opus 4.8

Cluster policies could mutate a TaskInstance before scheduling but had no
supported way to read the DagRun it belongs to -- so routing a task to a queue
based on the run's configuration was impossible. Reaching the DagRun via
TaskInstance.get_dagrun() opens a nested committing session that trips the
scheduler's prohibit_commit guard and crashes the scheduler.

Add an optional dag_run argument to the task_instance_mutation_hook policy hook
and thread the in-scope DagRun through every scheduler-side call site (no extra
database access). A hook can now route on dag_run.conf, for example:

    def task_instance_mutation_hook(task_instance, dag_run=None):
        if dag_run is not None and (dag_run.conf or {}).get("route") == "high":
            task_instance.queue = "high_priority_queue"

Backward compatibility is preserved through the local-settings plugin shim.
Pluggy passes a hookimpl only the parameters it declares without a default, so
this commit also makes make_plugin_from_local_settings register a function as-is
when its parameters are a subset of the hookspec's, and shim it (forwarding the
call positionally) when it renames a parameter or gives a hookspec parameter a
default value. As a result every signature keeps working: the legacy
single-argument hook, an explicit (task_instance, dag_run) hook, and the
ergonomic (task_instance, dag_run=None) hook all behave correctly; a hook
declaring an unknown parameter is still rejected.

The dag_run may be None during early task-instance construction (when the hook
is re-applied from refresh_from_task before the instance is bound to a run), and
the hook must not open a new session; both are documented on the hookspec and in
the cluster-policies guide, which also corrects the prior claim that the hook
runs on the worker.
…it conf

When a DAG run is created without an explicit conf dict (all scheduled runs,
and manual triggers that omit config), dag_run.conf was stored as None even
though the param defaults had already been merged and validated. Any runtime
consumer reading dag_run.conf -- tasks using {{ dag_run.conf }}, the
task_instance_mutation_hook reading dag_run.conf for queue routing, sensors
checking conf values -- silently got None for scheduled runs.

The fix is one line: pass the already-computed copied_params dict as conf when
the caller supplied none. copied_params is the result of deep_merge(conf) and
validate(), so no extra work is done -- the resolved value is just preserved
instead of discarded.

Manual/triggered runs that supply an explicit conf are unaffected: their
caller-supplied dict is stored as before.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant