22
22
import click
23
23
from click_default_group import DefaultGroup
24
24
from datahub .configuration .config_loader import load_config_file
25
+ from expandvars import UnboundVariable
25
26
26
27
import datahub_actions as datahub_actions_package
27
28
from datahub_actions .pipeline .pipeline import Pipeline
28
29
from datahub_actions .pipeline .pipeline_manager import PipelineManager
29
30
30
31
logger = logging .getLogger (__name__ )
31
32
32
-
33
33
# Instantiate a singleton instance of the Pipeline Manager.
34
34
pipeline_manager = PipelineManager ()
35
35
@@ -52,6 +52,44 @@ def actions() -> None:
52
52
pass
53
53
54
54
55
+ def load_raw_config_file (config_file : pathlib .Path ) -> dict :
56
+ """
57
+ Load a config file as raw YAML/JSON without variable expansion.
58
+
59
+ Args:
60
+ config_file: Path to the configuration file
61
+
62
+ Returns:
63
+ dict: Raw configuration dictionary
64
+
65
+ Raises:
66
+ Exception: If the file cannot be loaded or is invalid YAML/JSON
67
+ """
68
+ try :
69
+ with open (config_file , "r" ) as f :
70
+ import yaml
71
+
72
+ return yaml .safe_load (f )
73
+ except Exception as e :
74
+ raise Exception (
75
+ f"Failed to load raw configuration file { config_file } : { e } "
76
+ ) from e
77
+
78
+
79
+ def is_pipeline_enabled (config : dict ) -> bool :
80
+ """
81
+ Check if a pipeline configuration is enabled.
82
+
83
+ Args:
84
+ config: Raw configuration dictionary
85
+
86
+ Returns:
87
+ bool: True if pipeline is enabled, False otherwise
88
+ """
89
+ enabled = config .get ("enabled" , True )
90
+ return not (enabled == "false" or enabled is False )
91
+
92
+
55
93
@actions .command (
56
94
name = "run" ,
57
95
context_settings = dict (
@@ -70,43 +108,70 @@ def run(ctx: Any, config: List[str], debug: bool) -> None:
70
108
)
71
109
72
110
if debug :
73
- # Set root logger settings to debug mode.
74
111
logging .getLogger ().setLevel (logging .DEBUG )
75
112
else :
76
- # Set root logger settings to info mode.
77
113
logging .getLogger ().setLevel (logging .INFO )
78
114
79
- # Statically configured to be registered with the pipeline Manager.
80
115
pipelines : List [Pipeline ] = []
81
-
82
116
logger .debug ("Creating Actions Pipelines..." )
83
117
84
- # If individual pipeline config was provided, create a pipeline from it.
85
- if config is not None :
86
- for pipeline_config in config :
87
- pipeline_config_file = pathlib .Path (pipeline_config )
88
- pipeline_config_dict = load_config_file (pipeline_config_file )
89
- enabled = pipeline_config_dict .get ("enabled" , True )
90
- if enabled == "false" or enabled is False :
118
+ # Phase 1: Initial validation of configs
119
+ valid_configs = []
120
+ for pipeline_config in config :
121
+ pipeline_config_file = pathlib .Path (pipeline_config )
122
+ try :
123
+ # First just load the raw config to check if it's enabled
124
+ raw_config = load_raw_config_file (pipeline_config_file )
125
+
126
+ if not is_pipeline_enabled (raw_config ):
91
127
logger .warning (
92
- f"Skipping pipeline { pipeline_config_dict .get ('name' )} as it is not enabled"
128
+ f"Skipping pipeline { raw_config .get ('name' ) or pipeline_config } as it is not enabled"
93
129
)
94
130
continue
95
131
96
- # now load the config with variable expansion
132
+ valid_configs .append (pipeline_config_file )
133
+
134
+ except Exception as e :
135
+ if len (config ) == 1 :
136
+ raise Exception (
137
+ f"Failed to load raw configuration file { pipeline_config_file } "
138
+ ) from e
139
+ logger .warning (
140
+ f"Failed to load pipeline configuration! Skipping action config file { pipeline_config_file } ...: { e } "
141
+ )
142
+
143
+ # Phase 2: Full config loading and pipeline creation
144
+ for pipeline_config_file in valid_configs :
145
+ try :
146
+ # Now load the full config with variable expansion
97
147
pipeline_config_dict = load_config_file (pipeline_config_file )
98
- pipelines .append (
99
- pipeline_config_to_pipeline (pipeline_config_dict )
100
- ) # Now, instantiate the pipeline.
148
+ pipelines .append (pipeline_config_to_pipeline (pipeline_config_dict ))
149
+ except UnboundVariable as e :
150
+ if len (valid_configs ) == 1 :
151
+ raise Exception (
152
+ "Failed to load action configuration. Unbound variable(s) provided in config YAML."
153
+ ) from e
154
+ logger .warning (
155
+ f"Failed to resolve variables in config file { pipeline_config_file } ...: { e } "
156
+ )
157
+ continue
158
+
159
+ # Exit early if no valid pipelines were created
160
+ if not pipelines :
161
+ logger .error (
162
+ f"No valid pipelines were started from { len (config )} config(s). "
163
+ "Check that at least one pipeline is enabled and all required environment variables are set."
164
+ )
165
+ sys .exit (1 )
101
166
102
167
logger .debug ("Starting Actions Pipelines" )
103
168
104
- # Start each pipeline.
169
+ # Start each pipeline
105
170
for p in pipelines :
106
171
pipeline_manager .start_pipeline (p .name , p )
107
172
logger .info (f"Action Pipeline with name '{ p .name } ' is now running." )
108
173
109
- # Now, simply run forever.
174
+ # Now, run forever only if we have valid pipelines
110
175
while True :
111
176
time .sleep (5 )
112
177
@@ -121,10 +186,10 @@ def version() -> None:
121
186
122
187
123
188
# Handle shutdown signal. (ctrl-c)
124
- def handle_shutdown (signum , frame ) :
189
+ def handle_shutdown (signum : int , frame : Any ) -> None :
125
190
logger .info ("Stopping all running Action Pipelines..." )
126
191
pipeline_manager .stop_all ()
127
- exit (1 )
192
+ sys . exit (1 )
128
193
129
194
130
195
signal .signal (signal .SIGINT , handle_shutdown )
0 commit comments