44
55"""
66import contextlib
7+ import collections
78import hashlib
89import logging
910import math
@@ -84,15 +85,15 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
8485 )
8586
8687 # Move files from temporary output dir to user-specified output dir
87- for filename in reduce_output_dir .glob ("*" ):
88+ total_size = 0
89+ for filename in sorted (reduce_output_dir .glob ("*" )):
90+ st_size = filename .stat ().st_size
91+ total_size += st_size
8892 shutil .copy (filename , output_dir )
93+ output_path = output_dir .parent / last_two (filename )
94+ LOGGER .debug ("%s size=%sB" , output_path , st_size )
8995
9096 # Remind user where to find output
91- total_size = 0
92- for outpath in sorted (output_dir .iterdir ()):
93- st_size = outpath .stat ().st_size
94- total_size += st_size
95- LOGGER .debug ("%s size=%sB" , outpath , st_size )
9697 LOGGER .debug ("total output size=%sB" , total_size )
9798 LOGGER .info ("Output directory: %s" , output_dir )
9899
@@ -220,37 +221,25 @@ def keyhash(key):
220221 return int (hexdigest , base = 16 )
221222
222223
223- def partition_keys (inpath , outpaths ):
224- """Allocate lines of inpath among outpaths using hash of key."""
224+ def partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats ):
225+ """Allocate lines of inpath among outpaths using hash of key.
226+
227+ Update the data structures provided by the caller input_keys_stats and
228+ output_keys_stats. Both map a filename to a set of of keys.
229+
230+ """
225231 assert len (outpaths ) == MAX_NUM_REDUCE
226232 outparent = outpaths [0 ].parent
227233 assert all (i .parent == outparent for i in outpaths )
228- outnames = [i .name for i in outpaths ]
229- LOGGER .debug (
230- "partition %s >> %s/{%s}" ,
231- last_two (inpath ), outparent .name , "," .join (outnames ),
232- )
233234 with contextlib .ExitStack () as stack :
234235 outfiles = [stack .enter_context (p .open ("a" )) for p in outpaths ]
235236 for line in stack .enter_context (inpath .open ()):
236237 key = line .partition ('\t ' )[0 ]
238+ input_keys_stats [inpath ].add (key )
237239 reducer_idx = keyhash (key ) % MAX_NUM_REDUCE
238240 outfiles [reducer_idx ].write (line )
239-
240-
241- def keyspace (path ):
242- """Return the number of unique keys in {path}.
243-
244- WARNING: This is a terribly slow implementation. It would be faster to
245- record this information while grouping.x
246-
247- """
248- keys = set ()
249- with path .open () as infile :
250- for line in infile :
251- key = line .partition ('\t ' )[0 ]
252- keys .add (key )
253- return keys
241+ outpath = outpaths [reducer_idx ]
242+ output_keys_stats [outpath ].add (key )
254243
255244
256245def group_stage (input_dir , output_dir ):
@@ -260,22 +249,34 @@ def group_stage(input_dir, output_dir):
260249 using the hash and modulo of the key.
261250
262251 """
263- # Detailed keyspace debug output THIS IS SLOW
264- all_keys = set ()
265- for inpath in sorted (input_dir .iterdir ()):
266- keys = keyspace (inpath )
267- all_keys .update (keys )
268- LOGGER .debug ("%s unique_keys=%s" , last_two (inpath ), len (keys ))
269- LOGGER .debug ("%s all_unique_keys=%s" , input_dir .name , len (all_keys ))
270-
271252 # Compute output filenames
272253 outpaths = []
273254 for i in range (MAX_NUM_REDUCE ):
274255 outpaths .append (output_dir / part_filename (i ))
275256
276- # Parition input, appending to output files
257+ # Track keyspace stats, map filename -> set of keys
258+ input_keys_stats = collections .defaultdict (set )
259+ output_keys_stats = collections .defaultdict (set )
260+
261+ # Partition input, appending to output files
277262 for inpath in sorted (input_dir .iterdir ()):
278- partition_keys (inpath , outpaths )
263+ partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats )
264+
265+ # Log input keyspace stats
266+ all_input_keys = set ()
267+ for inpath , keys in sorted (input_keys_stats .items ()):
268+ all_input_keys .update (keys )
269+ LOGGER .debug ("%s unique_keys=%s" , last_two (inpath ), len (keys ))
270+ LOGGER .debug ("%s all_unique_keys=%s" , input_dir .name , len (all_input_keys ))
271+
272+ # Log partition input and output filenames
273+ outnames = [i .name for i in outpaths ]
274+ outparent = outpaths [0 ].parent
275+ for inpath in sorted (input_keys_stats .keys ()):
276+ LOGGER .debug (
277+ "partition %s >> %s/{%s}" ,
278+ last_two (inpath ), outparent .name , "," .join (outnames ),
279+ )
279280
280281 # Remove empty output files. We won't always use the maximum number of
281282 # reducers because some MapReduce programs have fewer intermediate keys.
@@ -288,13 +289,13 @@ def group_stage(input_dir, output_dir):
288289 for path in sorted (output_dir .iterdir ()):
289290 sort_file (path )
290291
291- # Detailed keyspace debug output THIS IS SLOW
292- all_keys = set ()
293- for outpath in sorted (output_dir .iterdir ()):
294- keys = keyspace (outpath )
295- all_keys .update (keys )
292+ # Log output keyspace stats
293+ all_output_keys = set ()
294+ for outpath , keys in sorted (output_keys_stats .items ()):
295+ all_output_keys .update (keys )
296296 LOGGER .debug ("%s unique_keys=%s" , last_two (outpath ), len (keys ))
297- LOGGER .debug ("%s all_unique_keys=%s" , output_dir .name , len (all_keys ))
297+ LOGGER .debug ("%s all_unique_keys=%s" , output_dir .name ,
298+ len (all_output_keys ))
298299
299300
300301def reduce_stage (exe , input_dir , output_dir ):
0 commit comments