@@ -198,6 +198,10 @@ def part_filename(num):
198198
199199def map_single_chunk (exe , input_path , output_path , chunk ):
200200 """Execute mapper on a single chunk."""
201+ LOGGER .debug (
202+ "%s < %s > %s" ,
203+ exe .name , last_two (input_path ), last_two (output_path ),
204+ )
201205 with output_path .open ("w" ) as outfile :
202206 try :
203207 ret = subprocess .run (
@@ -231,10 +235,6 @@ def map_stage(exe, input_dir, output_dir):
231235 for input_path in normalize_input_paths (input_dir ):
232236 for chunk in split_file (input_path , MAX_INPUT_SPLIT_SIZE ):
233237 output_path = output_dir / part_filename (part_num )
234- LOGGER .debug (
235- "%s < %s > %s" ,
236- exe .name , last_two (input_path ), last_two (output_path ),
237- )
238238 futures .append (pool .submit (
239239 map_single_chunk ,
240240 exe ,
@@ -429,6 +429,10 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
429429
430430def reduce_single_file (exe , input_path , output_path ):
431431 """Execute reducer on a single file."""
432+ LOGGER .debug (
433+ "%s < %s > %s" ,
434+ exe .name , last_two (input_path ), last_two (output_path ),
435+ )
432436 with input_path .open () as infile , output_path .open ("w" ) as outfile :
433437 try :
434438 ret = subprocess .run (
@@ -461,10 +465,6 @@ def reduce_stage(exe, input_dir, output_dir):
461465 ) as pool :
462466 for i , input_path in enumerate (sorted (input_dir .iterdir ())):
463467 output_path = output_dir / part_filename (i )
464- LOGGER .debug (
465- "%s < %s > %s" ,
466- exe .name , last_two (input_path ), last_two (output_path ),
467- )
468468 futures .append (pool .submit (
469469 reduce_single_file ,
470470 exe ,
0 commit comments