@@ -72,7 +72,6 @@ def __init__(
7272 fs : AbstractFileSystem | None = None ,
7373 cfg_dir : str | None = CONFIG_DIR ,
7474 pipelines_dir : str | None = PIPELINES_DIR ,
75-
7675 log_level : str | None = None ,
7776 ) -> None :
7877 """Initialize the PipelineManager.
@@ -91,7 +90,7 @@ def __init__(
9190 Example: "config" or "settings".
9291 pipelines_dir: Override default pipelines directory name ('pipelines').
9392 Example: "flows" or "dags".
94-
93+
9594 log_level: Set logging level for the manager.
9695 Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
9796
@@ -111,7 +110,7 @@ def __init__(
111110 ... "key": "ACCESS_KEY",
112111 ... "secret": "SECRET_KEY"
113112 ... },
114-
113+
115114 ... log_level="DEBUG"
116115 ... )
117116 """
@@ -129,7 +128,7 @@ def _setup_filesystem(
129128 storage_options : dict | Munch | BaseStorageOptions | None ,
130129 fs : AbstractFileSystem | None ,
131130 cfg_dir : str | None ,
132- pipelines_dir : str | None
131+ pipelines_dir : str | None ,
133132 ) -> None :
134133 """Setup filesystem and configuration directories.
135134
@@ -185,7 +184,7 @@ def _initialize_managers(self) -> None:
185184 base_dir = self ._base_dir ,
186185 fs = self ._fs ,
187186 storage_options = self ._storage_options ,
188- cfg_dir = self ._cfg_dir
187+ cfg_dir = self ._cfg_dir ,
189188 )
190189
191190 # Load project configuration
@@ -201,25 +200,21 @@ def _initialize_managers(self) -> None:
201200
202201 # Initialize specialized managers
203202 self ._executor = PipelineExecutor (
204- config_manager = self ._config_manager ,
205- registry = self .registry
203+ config_manager = self ._config_manager , registry = self .registry
206204 )
207205 self ._lifecycle_manager = PipelineLifecycleManager (registry = self .registry )
208206
209207 # Initialize other components
210208 self ._project_context = None
211209 self .visualizer = PipelineVisualizer (
212- project_cfg = self ._config_manager .project_config ,
213- fs = self ._fs
210+ project_cfg = self ._config_manager .project_config , fs = self ._fs
214211 )
215212 self .io = PipelineIOManager (registry = self .registry )
216213
217214 def _ensure_directories_exist (self ) -> None :
218215 """Ensure essential directories exist."""
219216 self ._fs_helper .ensure_directories_exist (
220- self ._fs ,
221- self ._cfg_dir ,
222- self ._pipelines_dir
217+ self ._fs , self ._cfg_dir , self ._pipelines_dir
223218 )
224219
225220 def _add_modules_path (self ) -> None :
@@ -289,7 +284,6 @@ def __exit__(
289284 # Add cleanup code if needed
290285 pass
291286
292-
293287 def load_pipeline (self , name : str , reload : bool = False ) -> PipelineConfig :
294288 """Load or reload configuration for a specific pipeline.
295289
@@ -353,12 +347,8 @@ def pipeline_cfg(self) -> PipelineConfig:
353347
354348 # --- Core Execution Method ---
355349
356-
357350 def run (
358- self ,
359- name : str ,
360- run_config : RunConfig | None = None ,
361- ** kwargs
351+ self , name : str , run_config : RunConfig | None = None , ** kwargs
362352 ) -> dict [str , Any ]:
363353 """Execute a pipeline synchronously and return its results.
364354
@@ -433,7 +423,7 @@ def run(
433423 # Set project context for executor
434424 if hasattr (self , "_project_context" ) and self ._project_context is not None :
435425 self ._executor ._project_context = self ._project_context
436-
426+
437427 # Delegate to executor
438428 return self ._executor .run (name = name , run_config = run_config , ** kwargs )
439429
@@ -600,10 +590,12 @@ def show_pipelines(self, format: str = "table") -> None:
600590 try :
601591 if fmt == "json" :
602592 import json
593+
603594 print (json .dumps (names ))
604595 return None
605596 if fmt == "yaml" :
606597 import yaml # type: ignore
598+
607599 print (yaml .safe_dump (names , sort_keys = False ))
608600 return None
609601 except Exception as e :
@@ -1054,8 +1046,13 @@ def save_dag(
10541046 ... reload=True
10551047 ... )
10561048 """
1049+
10571050 return self .visualizer .save_dag (
1058- name = name , format = format , reload = reload , output_path = output_path
1051+ name = name ,
1052+ format = format ,
1053+ reload = reload ,
1054+ output_path = output_path ,
1055+ base_dir = self ._base_dir ,
10591056 )
10601057
10611058 def show_dag (
0 commit comments