1616
1717
1818# Large input files are automatically split
19- MAX_INPUT_SPLIT_SIZE = 2 ** 20 # 1 MB
19+ MAX_INPUT_SPLIT_SIZE = 2 ** 21 # 2 MB
2020
2121# The number of reducers is dynamically determined by the number of unique keys
2222# but will not be more than num_reducers
2525LOGGER = logging .getLogger ("madoop" )
2626
2727
28- def mapreduce (input_path , output_dir , map_exe , reduce_exe , num_reducers ):
28+ def mapreduce (
29+ input_path ,
30+ output_dir ,
31+ map_exe ,
32+ reduce_exe ,
33+ num_reducers ,
34+ partitioner = None ,
35+ ):
2936 """Madoop API."""
37+ # pylint: disable=too-many-arguments
3038 # Do not clobber existing output directory
3139 output_dir = pathlib .Path (output_dir )
3240 if output_dir .exists ():
@@ -73,7 +81,8 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
7381 group_stage (
7482 input_dir = map_output_dir ,
7583 output_dir = reduce_input_dir ,
76- num_reducers = num_reducers
84+ num_reducers = num_reducers ,
85+ partitioner = partitioner ,
7786 )
7887
7988 # Run the reducing stage
@@ -178,13 +187,13 @@ def is_executable(exe):
178187 try :
179188 subprocess .run (
180189 str (exe ),
181- shell = True ,
190+ shell = False ,
182191 input = "" .encode (),
183192 stdout = subprocess .PIPE ,
184193 stderr = subprocess .PIPE ,
185194 check = True ,
186195 )
187- except subprocess .CalledProcessError as err :
196+ except ( subprocess .CalledProcessError , OSError ) as err :
188197 raise MadoopError (f"Failed executable test: { err } " ) from err
189198
190199
@@ -212,7 +221,7 @@ def map_stage(exe, input_dir, output_dir):
212221 try :
213222 subprocess .run (
214223 str (exe ),
215- shell = True ,
224+ shell = False ,
216225 check = True ,
217226 stdin = infile ,
218227 stdout = outfile ,
@@ -240,7 +249,7 @@ def keyhash(key):
240249 return int (hexdigest , base = 16 )
241250
242251
243- def partition_keys (
252+ def partition_keys_default (
244253 inpath ,
245254 outpaths ,
246255 input_keys_stats ,
@@ -250,7 +259,6 @@ def partition_keys(
250259
251260 Update the data structures provided by the caller input_keys_stats and
252261 output_keys_stats. Both map a filename to a set of of keys.
253-
254262 """
255263 assert len (outpaths ) == num_reducers
256264 outparent = outpaths [0 ].parent
@@ -266,7 +274,80 @@ def partition_keys(
266274 output_keys_stats [outpath ].add (key )
267275
268276
269- def group_stage (input_dir , output_dir , num_reducers ):
277+ def partition_keys_custom (
278+ inpath ,
279+ outpaths ,
280+ input_keys_stats ,
281+ output_keys_stats ,
282+ num_reducers ,
283+ partitioner ,
284+ ):
285+ """Allocate lines of inpath among outpaths using a custom partitioner.
286+
287+ Update the data structures provided by the caller input_keys_stats and
288+ output_keys_stats. Both map a filename to a set of of keys.
289+ """
290+ # pylint: disable=too-many-arguments,too-many-locals
291+ assert len (outpaths ) == num_reducers
292+ outparent = outpaths [0 ].parent
293+ assert all (i .parent == outparent for i in outpaths )
294+ with contextlib .ExitStack () as stack :
295+ outfiles = [stack .enter_context (p .open ("a" )) for p in outpaths ]
296+ process = stack .enter_context (subprocess .Popen (
297+ [partitioner , str (num_reducers )],
298+ stdin = stack .enter_context (inpath .open ()),
299+ stdout = subprocess .PIPE ,
300+ text = True ,
301+ ))
302+ for line , partition in zip (
303+ stack .enter_context (inpath .open ()),
304+ stack .enter_context (process .stdout )
305+ ):
306+ try :
307+ partition = int (partition )
308+ except ValueError as err :
309+ raise MadoopError (
310+ "Partition executable returned non-integer value: "
311+ f"{ partition } for line '{ line } '."
312+ ) from err
313+ if not 0 <= partition < num_reducers :
314+ raise MadoopError (
315+ "Partition executable returned invalid value: "
316+ f"0 <= { partition } < { num_reducers } for line '{ line } '."
317+ )
318+ key = line .partition ('\t ' )[0 ]
319+ input_keys_stats [inpath ].add (key )
320+ outfiles [partition ].write (line )
321+ outpath = outpaths [partition ]
322+ output_keys_stats [outpath ].add (key )
323+
324+ return_code = process .wait ()
325+ if return_code :
326+ raise MadoopError (
327+ f"Partition executable returned non-zero: { str (partitioner )} "
328+ )
329+
330+
331+ def log_input_key_stats (input_keys_stats , input_dir ):
332+ """Log input key stats."""
333+ all_input_keys = set ()
334+ for inpath , keys in sorted (input_keys_stats .items ()):
335+ all_input_keys .update (keys )
336+ LOGGER .debug ("%s unique_keys=%s" , last_two (inpath ), len (keys ))
337+ LOGGER .debug ("%s all_unique_keys=%s" , input_dir .name , len (all_input_keys ))
338+
339+
340+ def log_output_key_stats (output_keys_stats , output_dir ):
341+ """Log output keyspace stats."""
342+ all_output_keys = set ()
343+ for outpath , keys in sorted (output_keys_stats .items ()):
344+ all_output_keys .update (keys )
345+ LOGGER .debug ("%s unique_keys=%s" , last_two (outpath ), len (keys ))
346+ LOGGER .debug ("%s all_unique_keys=%s" , output_dir .name ,
347+ len (all_output_keys ))
348+
349+
350+ def group_stage (input_dir , output_dir , num_reducers , partitioner ):
270351 """Run group stage.
271352
272353 Process each mapper output file, allocating lines to grouper output files
@@ -285,15 +366,14 @@ def group_stage(input_dir, output_dir, num_reducers):
285366
286367 # Partition input, appending to output files
287368 for inpath in sorted (input_dir .iterdir ()):
288- partition_keys (inpath , outpaths , input_keys_stats ,
289- output_keys_stats , num_reducers )
369+ if not partitioner :
370+ partition_keys_default (inpath , outpaths , input_keys_stats ,
371+ output_keys_stats , num_reducers )
372+ else :
373+ partition_keys_custom (inpath , outpaths , input_keys_stats ,
374+ output_keys_stats , num_reducers , partitioner )
290375
291- # Log input keyspace stats
292- all_input_keys = set ()
293- for inpath , keys in sorted (input_keys_stats .items ()):
294- all_input_keys .update (keys )
295- LOGGER .debug ("%s unique_keys=%s" , last_two (inpath ), len (keys ))
296- LOGGER .debug ("%s all_unique_keys=%s" , input_dir .name , len (all_input_keys ))
376+ log_input_key_stats (input_keys_stats , input_dir )
297377
298378 # Log partition input and output filenames
299379 outnames = [i .name for i in outpaths ]
@@ -315,13 +395,7 @@ def group_stage(input_dir, output_dir, num_reducers):
315395 for path in sorted (output_dir .iterdir ()):
316396 sort_file (path )
317397
318- # Log output keyspace stats
319- all_output_keys = set ()
320- for outpath , keys in sorted (output_keys_stats .items ()):
321- all_output_keys .update (keys )
322- LOGGER .debug ("%s unique_keys=%s" , last_two (outpath ), len (keys ))
323- LOGGER .debug ("%s all_unique_keys=%s" , output_dir .name ,
324- len (all_output_keys ))
398+ log_output_key_stats (output_keys_stats , output_dir )
325399
326400
327401def reduce_stage (exe , input_dir , output_dir ):
@@ -337,7 +411,7 @@ def reduce_stage(exe, input_dir, output_dir):
337411 try :
338412 subprocess .run (
339413 str (exe ),
340- shell = True ,
414+ shell = False ,
341415 check = True ,
342416 stdin = infile ,
343417 stdout = outfile ,
0 commit comments