Skip to content

Conversation

@astro-anand
Copy link

Description

Adding a cluster policy that support retries on a different worker queue for watcher sensor tasks. This does not currently work due to a circular import issue when applying the cluster policy.

Breaking Change?

No

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

current issues

Circular import - I think the cluster policy may need to be packaged separately

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/usr/local/lib/python3.12/site-packages/airflow/__init__.py", line 74, in <module>
    settings.initialize()
  File "/usr/local/lib/python3.12/site-packages/airflow/settings.py", line 781, in initialize
    load_policy_plugins(POLICY_PLUGIN_MANAGER)
  File "/usr/local/lib/python3.12/site-packages/airflow/settings.py", line 200, in load_policy_plugins
    pm.load_setuptools_entrypoints("airflow.policy")
  File "/usr/local/lib/python3.12/site-packages/pluggy/_manager.py", line 416, in load_setuptools_entrypoints
    plugin = ep.load()
             ^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/metadata/__init__.py", line 205, in load
    module = import_module(match.group('module'))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/astronomer_cosmos/cosmos/__init__.py", line 17, in <module>
    from cosmos.airflow.dag import DbtDag
  File "/usr/local/airflow/astronomer_cosmos/cosmos/airflow/dag.py", line 11, in <module>
    from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs
  File "/usr/local/airflow/astronomer_cosmos/cosmos/converter.py", line 35, in <module>
    from cosmos.listeners.task_instance_listener import _get_profile_config_attribute
  File "/usr/local/airflow/astronomer_cosmos/cosmos/listeners/task_instance_listener.py", line 15, in <module>
    from cosmos.operators.base import AbstractDbtBase
  File "/usr/local/airflow/astronomer_cosmos/cosmos/operators/__init__.py", line 1, in <module>
    from .local import DbtBuildLocalOperator as DbtBuildOperator
  File "/usr/local/airflow/astronomer_cosmos/cosmos/operators/local.py", line 20, in <module>
    from airflow import DAG
ImportError: cannot import name 'DAG' from partially initialized module 'airflow' (most likely due to a circular import) (/usr/local/lib/python3.12/site-packages/airflow/__init__.py)

@netlify
Copy link

netlify bot commented Jan 27, 2026

Deploy Preview for astronomer-cosmos canceled.

Name Link
🔨 Latest commit 2c41bac
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/697a28d94c36700008d8312a

@tatiana
Copy link
Collaborator

tatiana commented Jan 28, 2026

Hi @astro-anand , this is really exciting!

I get an impression we may be missing a file. Could you confirm, please?

Regarding the circular dependency issue, I noticed the cosmos/operators/local.py only actually uses DAG in one specific function:

DAG.bulk_write_to_db([self.dag], session=session) # type: ignore[attr-defined, call-arg, arg-type]

Could you give a try to move the import into the top of that function instead, and see if it works?

@astro-anand
Copy link
Author

astro-anand commented Jan 28, 2026

Hey @tatiana, I changed the code but the circular import was then caused by a different import that eventually resolved to needing DAG. I noticed that the source of this issue is the cosmos init.py file. I set AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=TRUE and the circular import issue went away.

If I'm reading the logs correctly, Airflow installs plugins as part of initializing the airflow package (in its __init__.py). Cosmos requires the airflow package, so if cosmos defines a cluster policy, then we end up with a circular import issue. What do you think would be the best way to handle this?

I suppose the cluster policy could set the cosmos configuration value to ensure that the imports resolve correctly, however, if users want to adopt the cluster policy they'd need to refactor all their cosmos imports to use the full import path. Do you think that's a fair ask?

Update: I did some testing on the above, but I haven't figured out how to only conditionally load the plugin if the retry config exists.

@pankajastro pankajastro added this to the Cosmos 1.13.0 milestone Jan 29, 2026
@pankajastro
Copy link
Contributor

Hi @astro-anand, could you please check whether the changes here fix the circular import issue? I tested this locally and didn’t encounter circular import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants