1919MAX_INPUT_SPLIT_SIZE = 2 ** 20 # 1 MB
2020
2121# The number of reducers is dynamically determined by the number of unique keys
22- # but will not be more than MAX_NUM_REDUCE
23- MAX_NUM_REDUCE = 4
22+ # but will not be more than num_reducers
2423
2524# Madoop logger
2625LOGGER = logging .getLogger ("madoop" )
2726
2827
29- def mapreduce (input_path , output_dir , map_exe , reduce_exe ):
28+ def mapreduce (input_path , output_dir , map_exe , reduce_exe , num_reducers ):
3029 """Madoop API."""
3130 # Do not clobber existing output directory
3231 output_dir = pathlib .Path (output_dir )
@@ -74,6 +73,7 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe):
7473 group_stage (
7574 input_dir = map_output_dir ,
7675 output_dir = reduce_input_dir ,
76+ num_reducers = num_reducers
7777 )
7878
7979 # Run the reducing stage
@@ -240,37 +240,43 @@ def keyhash(key):
240240 return int (hexdigest , base = 16 )
241241
242242
243- def partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats ):
243+ def partition_keys (
244+ inpath ,
245+ outpaths ,
246+ input_keys_stats ,
247+ output_keys_stats ,
248+ num_reducers ):
244249 """Allocate lines of inpath among outpaths using hash of key.
245250
246251 Update the data structures provided by the caller input_keys_stats and
247252 output_keys_stats. Both map a filename to a set of of keys.
248253
249254 """
250- assert len (outpaths ) == MAX_NUM_REDUCE
255+ assert len (outpaths ) == num_reducers
251256 outparent = outpaths [0 ].parent
252257 assert all (i .parent == outparent for i in outpaths )
253258 with contextlib .ExitStack () as stack :
254259 outfiles = [stack .enter_context (p .open ("a" )) for p in outpaths ]
255260 for line in stack .enter_context (inpath .open ()):
256261 key = line .partition ('\t ' )[0 ]
257262 input_keys_stats [inpath ].add (key )
258- reducer_idx = keyhash (key ) % MAX_NUM_REDUCE
263+ reducer_idx = keyhash (key ) % num_reducers
259264 outfiles [reducer_idx ].write (line )
260265 outpath = outpaths [reducer_idx ]
261266 output_keys_stats [outpath ].add (key )
262267
263268
264- def group_stage (input_dir , output_dir ):
269+ def group_stage (input_dir , output_dir , num_reducers ):
265270 """Run group stage.
266271
267272 Process each mapper output file, allocating lines to grouper output files
268273 using the hash and modulo of the key.
269274
270275 """
271276 # Compute output filenames
277+ LOGGER .debug ("%s reducers" , num_reducers )
272278 outpaths = []
273- for i in range (MAX_NUM_REDUCE ):
279+ for i in range (num_reducers ):
274280 outpaths .append (output_dir / part_filename (i ))
275281
276282 # Track keyspace stats, map filename -> set of keys
@@ -279,7 +285,8 @@ def group_stage(input_dir, output_dir):
279285
280286 # Partition input, appending to output files
281287 for inpath in sorted (input_dir .iterdir ()):
282- partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats )
288+ partition_keys (inpath , outpaths , input_keys_stats ,
289+ output_keys_stats , num_reducers )
283290
284291 # Log input keyspace stats
285292 all_input_keys = set ()
0 commit comments