15
15
import sys
16
16
import yaml
17
17
from .config_generator import ConfigProcessor
18
+ from multiprocessing import Pool , cpu_count
18
19
19
20
logger = logging .getLogger (__name__ )
20
21
@@ -63,50 +64,69 @@ def __traverse_path(self, path: str, yaml_dict: dict):
63
64
if isinstance (yaml_dict [current_key ], dict ):
64
65
return self .__traverse_path (path = "." .join (keys ), yaml_dict = yaml_dict [current_key ])
65
66
else :
66
- raise Exception ("{1}[{0}] is not traversable." .format (current_key , yaml_dict ))
67
+ raise Exception ("{1}[{0}] is not traversable." .format (
68
+ current_key , yaml_dict ))
67
69
else :
68
- raise Exception ("Key not found for {0} in dictionary {1}." .format (current_key , yaml_dict ))
70
+ raise Exception ("Key not found for {0} in dictionary {1}." .format (
71
+ current_key , yaml_dict ))
69
72
70
73
71
- def merge_configs (directories , levels , output_dir ):
74
+ def merge_configs (directories , levels , output_dir , enable_parallel ):
72
75
"""
73
76
Method for running the merge configuration logic under different formats
74
77
:param directories: list of paths for leaf directories
75
78
:param levels: list of hierarchy levels to traverse
76
79
:param output_dir: where to save the generated configs
80
+ :param enable_parallel: to enable parallel config generation
77
81
"""
78
82
config_processor = ConfigProcessor ()
79
- merge_logic (config_processor , directories , levels , output_dir )
83
+ process_config = []
84
+ for path in directories :
85
+ process_config .append ((config_processor , path , levels , output_dir ))
86
+
87
+ if enable_parallel :
88
+ logger .info ("Processing config in parallel" )
89
+ with Pool (cpu_count ()) as p :
90
+ p .map (merge_logic , process_config )
91
+ else :
92
+ for config in process_config :
93
+ merge_logic (config )
80
94
81
95
82
- def merge_logic (config_processor , directories , levels , output_dir ):
96
+ def merge_logic (process_params ):
83
97
"""
84
98
Method implementing the merge config logic
85
- :param config_processor: the HIML config Processor
86
- :param directories: list of paths for directories to run the config merge logic
87
- :param levels: list of hierarchy levels to traverse
88
- :param output_dir: where to save the generated configs
99
+ :param process_params: tuple that contains config for running the merge_logic
89
100
"""
90
- for path in directories :
101
+ config_processor = process_params [0 ]
102
+ path = process_params [1 ]
103
+ levels = process_params [2 ]
104
+ output_dir = process_params [3 ]
105
+
106
+ # load the !include tag
107
+ Loader .add_constructor ('!include' , Loader .include )
91
108
92
- # use the HIML deep merge functionality
93
- output = dict (
94
- config_processor .process (path = path , output_format = "yaml" , print_data = False , multi_line_string = True ))
109
+ # override the Yaml SafeLoader with our custom loader
110
+ yaml .SafeLoader = Loader
95
111
96
- # exchange the levels to which to run for with the values extracted from the yaml structure
97
- level_values = [output .get (level ) for level in levels ]
112
+ # for path in directories:
113
+ # use the HIML deep merge functionality
114
+ output = dict (
115
+ config_processor .process (path = path , output_format = "yaml" , print_data = False , multi_line_string = True ))
116
+ # exchange the levels to which to run for with the values extracted from the yaml structure
117
+ level_values = [output .get (level ) for level in levels ]
98
118
99
- # create the publish path and all level_values except the last one
100
- publish_path = os .path .join (output_dir , '' ) + '/' .join (level_values [:- 1 ])
101
- if not os .path .exists (publish_path ):
102
- os .makedirs (publish_path )
119
+ # create the publish path and all level_values except the last one
120
+ publish_path = os .path .join (output_dir , '' ) + '/' .join (level_values [:- 1 ])
121
+ if not os .path .exists (publish_path ):
122
+ os .makedirs (publish_path )
103
123
104
- # create the yaml file for output using the publish_path and last level_values element
105
- filename = "{0}/{1}.yaml" .format (publish_path , level_values [- 1 ])
106
- logger .info ("Found input config directory: %s" , path )
107
- logger .info ("Storing generated config to: %s" , filename )
108
- with open (filename , "w+" ) as f :
109
- f .write (yaml .dump (output ))
124
+ # create the yaml file for output using the publish_path and last level_values element
125
+ filename = "{0}/{1}.yaml" .format (publish_path , level_values [- 1 ])
126
+ logger .info ("Found input config directory: %s" , path )
127
+ logger .info ("Storing generated config to: %s" , filename )
128
+ with open (filename , "w+" ) as f :
129
+ f .write (yaml .dump (output ))
110
130
111
131
112
132
def is_leaf_directory (dir , leaf_directories ):
@@ -116,7 +136,7 @@ def is_leaf_directory(dir, leaf_directories):
116
136
def get_leaf_directories (src , leaf_directories ):
117
137
"""
118
138
Method for doing a deep search of directories matching either the desired
119
- leaf directorie .
139
+ leaf directories .
120
140
:param src: the source path to start looking from
121
141
:return: the list of absolute paths
122
142
"""
@@ -149,20 +169,17 @@ def parser_options(args):
149
169
help = 'hierarchy levels, for instance: env, region, cluster' , required = True )
150
170
parser .add_argument ('--leaf-directories' , dest = 'leaf_directories' , nargs = '+' ,
151
171
help = 'leaf directories, for instance: cluster' , required = True )
172
+ parser .add_argument ('--enable-parallel' , dest = 'enable_parallel' , default = False ,
173
+ action = 'store_true' , help = 'Process config using multiprocessing' )
152
174
return parser .parse_args (args )
153
175
154
176
155
177
def run (args = None ):
156
178
opts = parser_options (args )
157
179
158
- # load the !include tag
159
- Loader .add_constructor ('!include' , Loader .include )
160
-
161
- # override the Yaml SafeLoader with our custom loader
162
- yaml .SafeLoader = Loader
163
-
164
180
# extract the list of absolute paths for leaf directories
165
181
dirs = get_leaf_directories (opts .path , opts .leaf_directories )
166
182
167
183
# merge the configs using HIML
168
- merge_configs (dirs , opts .hierarchy_levels , opts .output_dir )
184
+ merge_configs (dirs , opts .hierarchy_levels ,
185
+ opts .output_dir , opts .enable_parallel )
0 commit comments