Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,17 @@ def build_dags(self) -> Dict[str, DAG]:
dag_config = {**dag_level_args, **dag_config}

dag_config["task_groups"] = dag_config.get("task_groups", {})
dag_builder: DagBuilder = DagBuilder(
dag_name=dag_name,
dag_config=dag_config,
default_config=default_config,
yml_dag=self._serialise_config_md(dag_name, dag_config, default_config),
)
dag: Dict[str, Union[str, DAG]] = dag_builder.build()
dags[dag["dag_id"]]: DAG = dag["dag"]
try:
dag_builder: DagBuilder = DagBuilder(
dag_name=dag_name,
dag_config=dag_config,
default_config=default_config,
yml_dag=self._serialise_config_md(dag_name, dag_config, default_config),
)
dag: Dict[str, Union[str, DAG]] = dag_builder.build()
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as e:
logging.exception("Failed to build DAG '%s': %s", dag_name, e)

return dags

Expand Down Expand Up @@ -305,9 +308,17 @@ def load_yaml_dags(
suffix = [".yaml", ".yml"]
candidate_dag_files = []

def _load_from_source(factory: _DagFactory, config_path: str) -> None:
try:
factory._generate_dags(globals_dict)
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_path)
else:
logging.info("DAG loaded: %s", config_path)

if config_filepath:
factory = _DagFactory(config_filepath=config_filepath, defaults_config_path=defaults_config_path)
factory._generate_dags(globals_dict)
_load_from_source(factory, config_filepath)
elif config_dict:
factory = _DagFactory(config_dict=config_dict, defaults_config_dict=defaults_config_dict)
factory._generate_dags(globals_dict)
Expand All @@ -317,10 +328,5 @@ def load_yaml_dags(
for config_file_path in candidate_dag_files:
config_file_abs_path = str(config_file_path.absolute())
logging.info("Loading %s", config_file_abs_path)
try:
factory = _DagFactory(config_file_abs_path, defaults_config_dict=defaults_config_dict)
factory._generate_dags(globals_dict)
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
else:
logging.info("DAG loaded: %s", config_file_path)
factory = _DagFactory(config_file_abs_path, defaults_config_dict=defaults_config_dict)
_load_from_source(factory, config_file_abs_path)
Loading