1919MAX_INPUT_SPLIT_SIZE = 2 ** 20 # 1 MB
2020
2121# The number of reducers is dynamically determined by the number of unique keys
22- # but will not be more than MAX_NUM_REDUCE
23- MAX_NUM_REDUCE = 4
22+ # but will not be more than num_reducers
2423
2524# Madoop logger
2625LOGGER = logging .getLogger ("madoop" )
2726
2827
29- def mapreduce (input_dir , output_dir , map_exe , reduce_exe ):
28+ def mapreduce (input_path , output_dir , map_exe , reduce_exe , num_reducers ):
3029 """Madoop API."""
3130 # Do not clobber existing output directory
3231 output_dir = pathlib .Path (output_dir )
@@ -54,8 +53,8 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
5453 reduce_output_dir .mkdir ()
5554
5655 # Copy and rename input files: part-00000, part-00001, etc.
57- input_dir = pathlib .Path (input_dir )
58- prepare_input_files (input_dir , map_input_dir )
56+ input_path = pathlib .Path (input_path )
57+ prepare_input_files (input_path , map_input_dir )
5958
6059 # Executables must be absolute paths
6160 map_exe = pathlib .Path (map_exe ).resolve ()
@@ -74,6 +73,7 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
7473 group_stage (
7574 input_dir = map_output_dir ,
7675 output_dir = reduce_input_dir ,
76+ num_reducers = num_reducers
7777 )
7878
7979 # Run the reducing stage
@@ -98,25 +98,23 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
9898 LOGGER .info ("Output directory: %s" , output_dir )
9999
100100
101- def prepare_input_files (input_dir , output_dir ):
101+ def prepare_input_files (input_path , output_dir ):
102102 """Copy and split input files. Rename to part-00000, part-00001, etc.
103103
104- If a file in input_dir is smaller than MAX_INPUT_SPLIT_SIZE, then copy it
105- to output_dir. For larger files, split into blocks of MAX_INPUT_SPLIT_SIZE
106- bytes and write block to output_dir. Input files will never be combined.
104+ The input_path can be a file or a directory of files. If a file is smaller
105+ than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
106+ split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
107+ output_dir. Input files will never be combined.
107108
108109 The number of files created will be the number of mappers since we will
109110 assume that the number of tasks per mapper is 1. Apache Hadoop has a
110111 configurable number of tasks per mapper, however for both simplicity and
111112 because our use case has smaller inputs we use 1.
112113
113114 """
114- assert input_dir .is_dir (), f"Can't find input_dir '{ input_dir } '"
115-
116- # Split and copy input files
117115 part_num = 0
118116 total_size = 0
119- for inpath in sorted ( input_dir . glob ( '*' ) ):
117+ for inpath in normalize_input_paths ( input_path ):
120118 assert inpath .is_file ()
121119
122120 # Compute output filenames
@@ -148,6 +146,26 @@ def prepare_input_files(input_dir, output_dir):
148146 LOGGER .debug ("total input size=%sB" , total_size )
149147
150148
149+ def normalize_input_paths (input_path ):
150+ """Return a list of filtered input files.
151+
152+ If input_path is a file, then use it. If input_path is a directory, then
153+ grab all the *files* inside. Ignore subdirectories.
154+
155+ """
156+ input_paths = []
157+ if input_path .is_dir ():
158+ for path in sorted (input_path .glob ('*' )):
159+ if path .is_file ():
160+ input_paths .append (path )
161+ else :
162+ LOGGER .warning ("Ignoring non-file: %s" , path )
163+ elif input_path .is_file ():
164+ input_paths .append (input_path )
165+ assert input_paths , f"No input: { input_path } "
166+ return input_paths
167+
168+
151169def is_executable (exe ):
152170 """Verify exe is executable and raise exception if it is not.
153171
@@ -222,37 +240,43 @@ def keyhash(key):
222240 return int (hexdigest , base = 16 )
223241
224242
225- def partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats ):
243+ def partition_keys (
244+ inpath ,
245+ outpaths ,
246+ input_keys_stats ,
247+ output_keys_stats ,
248+ num_reducers ):
226249 """Allocate lines of inpath among outpaths using hash of key.
227250
228251 Update the data structures provided by the caller input_keys_stats and
229252 output_keys_stats. Both map a filename to a set of of keys.
230253
231254 """
232- assert len (outpaths ) == MAX_NUM_REDUCE
255+ assert len (outpaths ) == num_reducers
233256 outparent = outpaths [0 ].parent
234257 assert all (i .parent == outparent for i in outpaths )
235258 with contextlib .ExitStack () as stack :
236259 outfiles = [stack .enter_context (p .open ("a" )) for p in outpaths ]
237260 for line in stack .enter_context (inpath .open ()):
238261 key = line .partition ('\t ' )[0 ]
239262 input_keys_stats [inpath ].add (key )
240- reducer_idx = keyhash (key ) % MAX_NUM_REDUCE
263+ reducer_idx = keyhash (key ) % num_reducers
241264 outfiles [reducer_idx ].write (line )
242265 outpath = outpaths [reducer_idx ]
243266 output_keys_stats [outpath ].add (key )
244267
245268
246- def group_stage (input_dir , output_dir ):
269+ def group_stage (input_dir , output_dir , num_reducers ):
247270 """Run group stage.
248271
249272 Process each mapper output file, allocating lines to grouper output files
250273 using the hash and modulo of the key.
251274
252275 """
253276 # Compute output filenames
277+ LOGGER .debug ("%s reducers" , num_reducers )
254278 outpaths = []
255- for i in range (MAX_NUM_REDUCE ):
279+ for i in range (num_reducers ):
256280 outpaths .append (output_dir / part_filename (i ))
257281
258282 # Track keyspace stats, map filename -> set of keys
@@ -261,7 +285,8 @@ def group_stage(input_dir, output_dir):
261285
262286 # Partition input, appending to output files
263287 for inpath in sorted (input_dir .iterdir ()):
264- partition_keys (inpath , outpaths , input_keys_stats , output_keys_stats )
288+ partition_keys (inpath , outpaths , input_keys_stats ,
289+ output_keys_stats , num_reducers )
265290
266291 # Log input keyspace stats
267292 all_input_keys = set ()
0 commit comments