@@ -43,8 +43,9 @@ def mapreduce(
4343 # Executable scripts must have valid shebangs
4444 is_executable (map_exe )
4545 is_executable (reduce_exe )
46+ # The partitioner executable expects to receive num_reducers as an arg
4647 if partitioner :
47- is_executable (partitioner )
48+ is_executable (partitioner , str ( num_reducers ) )
4849
4950 # Create a tmp directory which will be automatically cleaned up
5051 with tempfile .TemporaryDirectory (prefix = "madoop-" ) as tmpdir :
@@ -157,7 +158,7 @@ def normalize_input_paths(input_path):
157158 return input_paths
158159
159160
160- def is_executable (exe ):
161+ def is_executable (exe , * args ):
161162 """Verify exe is executable and raise exception if it is not.
162163
163164 Execute exe with an empty string input and verify that it returns zero. We
@@ -168,7 +169,7 @@ def is_executable(exe):
168169 exe = pathlib .Path (exe ).resolve ()
169170 try :
170171 subprocess .run (
171- str (exe ),
172+ [ str (exe ), * args ] ,
172173 shell = False ,
173174 input = "" .encode (),
174175 stdout = subprocess .PIPE ,
@@ -198,9 +199,13 @@ def part_filename(num):
198199
199200def map_single_chunk (exe , input_path , output_path , chunk ):
200201 """Execute mapper on a single chunk."""
202+ LOGGER .debug (
203+ "%s < %s > %s" ,
204+ exe .name , last_two (input_path ), last_two (output_path ),
205+ )
201206 with output_path .open ("w" ) as outfile :
202207 try :
203- subprocess .run (
208+ ret = subprocess .run (
204209 str (exe ),
205210 shell = False ,
206211 check = True ,
@@ -217,6 +222,8 @@ def map_single_chunk(exe, input_path, output_path, chunk):
217222 ) from err
218223 except OSError as err :
219224 raise MadoopError (f"Command returned non-zero: { err } " ) from err
225+ if ret .stderr :
226+ LOGGER .warning ("stderr: %s" , ret .stderr .decode ().rstrip ())
220227
221228
222229def map_stage (exe , input_dir , output_dir ):
@@ -229,10 +236,6 @@ def map_stage(exe, input_dir, output_dir):
229236 for input_path in normalize_input_paths (input_dir ):
230237 for chunk in split_file (input_path , MAX_INPUT_SPLIT_SIZE ):
231238 output_path = output_dir / part_filename (part_num )
232- LOGGER .debug (
233- "%s < %s > %s" ,
234- exe .name , last_two (input_path ), last_two (output_path ),
235- )
236239 futures .append (pool .submit (
237240 map_single_chunk ,
238241 exe ,
@@ -427,9 +430,13 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
427430
428431def reduce_single_file (exe , input_path , output_path ):
429432 """Execute reducer on a single file."""
433+ LOGGER .debug (
434+ "%s < %s > %s" ,
435+ exe .name , last_two (input_path ), last_two (output_path ),
436+ )
430437 with input_path .open () as infile , output_path .open ("w" ) as outfile :
431438 try :
432- subprocess .run (
439+ ret = subprocess .run (
433440 str (exe ),
434441 shell = False ,
435442 check = True ,
@@ -446,6 +453,8 @@ def reduce_single_file(exe, input_path, output_path):
446453 ) from err
447454 except OSError as err :
448455 raise MadoopError (f"Command returned non-zero: { err } " ) from err
456+ if ret .stderr :
457+ LOGGER .warning ("stderr: %s" , ret .stderr .decode ().rstrip ())
449458
450459
451460def reduce_stage (exe , input_dir , output_dir ):
@@ -457,10 +466,6 @@ def reduce_stage(exe, input_dir, output_dir):
457466 ) as pool :
458467 for i , input_path in enumerate (sorted (input_dir .iterdir ())):
459468 output_path = output_dir / part_filename (i )
460- LOGGER .debug (
461- "%s < %s > %s" ,
462- exe .name , last_two (input_path ), last_two (output_path ),
463- )
464469 futures .append (pool .submit (
465470 reduce_single_file ,
466471 exe ,
0 commit comments