77import collections
88import hashlib
99import logging
10- import math
1110import pathlib
1211import shutil
1312import subprocess
1413import tempfile
14+ import multiprocessing
15+ import concurrent .futures
1516from .exceptions import MadoopError
1617
1718
1819# Large input files are automatically split
19- MAX_INPUT_SPLIT_SIZE = 2 ** 21 # 2 MB
20-
21- # The number of reducers is dynamically determined by the number of unique keys
22- # but will not be more than num_reducers
20+ MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB
2321
2422# Madoop logger
2523LOGGER = logging .getLogger ("madoop" )
@@ -52,18 +50,15 @@ def mapreduce(
5250 LOGGER .debug ("tmpdir=%s" , tmpdir )
5351
5452 # Create stage input and output directory
55- map_input_dir = tmpdir / 'input'
5653 map_output_dir = tmpdir / 'mapper-output'
5754 reduce_input_dir = tmpdir / 'reducer-input'
5855 reduce_output_dir = tmpdir / 'output'
59- map_input_dir .mkdir ()
6056 map_output_dir .mkdir ()
6157 reduce_input_dir .mkdir ()
6258 reduce_output_dir .mkdir ()
6359
6460 # Copy and rename input files: part-00000, part-00001, etc.
6561 input_path = pathlib .Path (input_path )
66- prepare_input_files (input_path , map_input_dir )
6762
6863 # Executables must be absolute paths
6964 map_exe = pathlib .Path (map_exe ).resolve ()
@@ -73,7 +68,7 @@ def mapreduce(
7368 LOGGER .info ("Starting map stage" )
7469 map_stage (
7570 exe = map_exe ,
76- input_dir = map_input_dir ,
71+ input_dir = input_path ,
7772 output_dir = map_output_dir ,
7873 )
7974
@@ -99,7 +94,7 @@ def mapreduce(
9994 for filename in sorted (reduce_output_dir .glob ("*" )):
10095 st_size = filename .stat ().st_size
10196 total_size += st_size
102- shutil .copy (filename , output_dir )
97+ shutil .move (filename , output_dir )
10398 output_path = output_dir .parent / last_two (filename )
10499 LOGGER .debug ("%s size=%sB" , output_path , st_size )
105100
@@ -108,52 +103,36 @@ def mapreduce(
108103 LOGGER .info ("Output directory: %s" , output_dir )
109104
110105
111- def prepare_input_files (input_path , output_dir ):
112- """Copy and split input files. Rename to part-00000, part-00001, etc.
106+ def split_file (input_filename , max_chunksize ):
107+ """Iterate over the data in a file one chunk at a time."""
108+ with open (input_filename , "rb" ) as input_file :
109+ buffer = b""
113110
114- The input_path can be a file or a directory of files. If a file is smaller
115- than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
116- split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
117- output_dir. Input files will never be combined.
111+ while True :
112+ chunk = input_file .read (max_chunksize )
113+ # Break if no more data remains.
114+ if not chunk :
115+ break
118116
119- The number of files created will be the number of mappers since we will
120- assume that the number of tasks per mapper is 1. Apache Hadoop has a
121- configurable number of tasks per mapper, however for both simplicity and
122- because our use case has smaller inputs we use 1.
117+ # Add the chunk to the buffer.
118+ buffer += chunk
123119
124- """
125- part_num = 0
126- total_size = 0
127- for inpath in normalize_input_paths (input_path ):
128- assert inpath .is_file ()
129-
130- # Compute output filenames
131- st_size = inpath .stat ().st_size
132- total_size += st_size
133- n_splits = math .ceil (st_size / MAX_INPUT_SPLIT_SIZE )
134- n_splits = 1 if not n_splits else n_splits # Handle empty input file
135- LOGGER .debug (
136- "input %s size=%sB partitions=%s" , inpath , st_size , n_splits
137- )
138- outpaths = [
139- output_dir / part_filename (part_num + i ) for i in range (n_splits )
140- ]
141- part_num += n_splits
142-
143- # Copy to new output files
144- with contextlib .ExitStack () as stack :
145- outfiles = [stack .enter_context (i .open ('w' )) for i in outpaths ]
146- infile = stack .enter_context (inpath .open (encoding = "utf-8" ))
147- outparent = outpaths [0 ].parent
148- assert all (i .parent == outparent for i in outpaths )
149- outnames = [i .name for i in outpaths ]
150- logging .debug (
151- "partition %s >> %s/{%s}" ,
152- last_two (inpath ), outparent .name , "," .join (outnames ),
153- )
154- for i , line in enumerate (infile ):
155- outfiles [i % n_splits ].write (line )
156- LOGGER .debug ("total input size=%sB" , total_size )
120+ # Find the last newline character in the buffer. We don't want to
121+ # yield a chunk that ends in the middle of a line; we have to
122+ # respect line boundaries or we'll corrupt the input.
123+ last_newline = buffer .rfind (b"\n " )
124+ if last_newline != - 1 :
125+ # Yield the content up to the last newline, saving the rest
126+ # for the next chunk.
127+ yield buffer [:last_newline + 1 ]
128+
129+ # Remove processed data from the buffer. The next chunk will
130+ # start with whatever data came after the last newline.
131+ buffer = buffer [last_newline + 1 :]
132+
133+ # Yield any remaining data.
134+ if buffer :
135+ yield buffer
157136
158137
159138def normalize_input_paths (input_path ):
@@ -209,30 +188,51 @@ def part_filename(num):
209188 return f"part-{ num :05d} "
210189
211190
191+ def map_single_chunk (exe , input_path , output_path , chunk ):
192+ """Execute mapper on a single chunk."""
193+ with output_path .open ("w" ) as outfile :
194+ try :
195+ subprocess .run (
196+ str (exe ),
197+ shell = False ,
198+ check = True ,
199+ input = chunk ,
200+ stdout = outfile ,
201+ )
202+ except subprocess .CalledProcessError as err :
203+ raise MadoopError (
204+ f"Command returned non-zero: "
205+ f"{ exe } < { input_path } > { output_path } "
206+ ) from err
207+
208+
212209def map_stage (exe , input_dir , output_dir ):
213210 """Execute mappers."""
214- i = 0
215- for i , input_path in enumerate (sorted (input_dir .iterdir ()), 1 ):
216- output_path = output_dir / part_filename (i )
217- LOGGER .debug (
218- "%s < %s > %s" ,
219- exe .name , last_two (input_path ), last_two (output_path ),
220- )
221- with input_path .open () as infile , output_path .open ('w' ) as outfile :
222- try :
223- subprocess .run (
224- str (exe ),
225- shell = False ,
226- check = True ,
227- stdin = infile ,
228- stdout = outfile ,
211+ part_num = 0
212+ futures = []
213+ with concurrent .futures .ThreadPoolExecutor (
214+ max_workers = multiprocessing .cpu_count ()
215+ ) as pool :
216+ for input_path in normalize_input_paths (input_dir ):
217+ for chunk in split_file (input_path , MAX_INPUT_SPLIT_SIZE ):
218+ output_path = output_dir / part_filename (part_num )
219+ LOGGER .debug (
220+ "%s < %s > %s" ,
221+ exe .name , last_two (input_path ), last_two (output_path ),
229222 )
230- except subprocess .CalledProcessError as err :
231- raise MadoopError (
232- f"Command returned non-zero: "
233- f"{ exe } < { input_path } > { output_path } "
234- ) from err
235- LOGGER .info ("Finished map executions: %s" , i )
223+ futures .append (pool .submit (
224+ map_single_chunk ,
225+ exe ,
226+ input_path ,
227+ output_path ,
228+ chunk ,
229+ ))
230+ part_num += 1
231+ for future in concurrent .futures .as_completed (futures ):
232+ exception = future .exception ()
233+ if exception :
234+ raise exception
235+ LOGGER .info ("Finished map executions: %s" , part_num )
236236
237237
238238def sort_file (path ):
@@ -395,35 +395,61 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
395395 path .unlink ()
396396
397397 # Sort output files
398- for path in sorted (output_dir .iterdir ()):
399- sort_file (path )
398+ try :
399+ # Don't use a with statement here, because Coverage won't be able to
400+ # detect code running in a subprocess if we do.
401+ # https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
402+ # pylint: disable=consider-using-with
403+ pool = multiprocessing .Pool (processes = multiprocessing .cpu_count ())
404+ pool .map (sort_file , sorted (output_dir .iterdir ()))
405+ finally :
406+ pool .close ()
407+ pool .join ()
400408
401409 log_output_key_stats (output_keys_stats , output_dir )
402410
403411
412+ def reduce_single_file (exe , input_path , output_path ):
413+ """Execute reducer on a single file."""
414+ with input_path .open () as infile , output_path .open ("w" ) as outfile :
415+ try :
416+ subprocess .run (
417+ str (exe ),
418+ shell = False ,
419+ check = True ,
420+ stdin = infile ,
421+ stdout = outfile ,
422+ )
423+ except subprocess .CalledProcessError as err :
424+ raise MadoopError (
425+ f"Command returned non-zero: "
426+ f"{ exe } < { input_path } > { output_path } "
427+ ) from err
428+
429+
404430def reduce_stage (exe , input_dir , output_dir ):
405431 """Execute reducers."""
406432 i = 0
407- for i , input_path in enumerate ( sorted ( input_dir . iterdir ())):
408- output_path = output_dir / part_filename ( i )
409- LOGGER . debug (
410- "%s < %s > %s" ,
411- exe . name , last_two ( input_path ), last_two ( output_path ),
412- )
413- with input_path . open () as infile , output_path . open ( 'w' ) as outfile :
414- try :
415- subprocess . run (
416- str ( exe ),
417- shell = False ,
418- check = True ,
419- stdin = infile ,
420- stdout = outfile ,
421- )
422- except subprocess . CalledProcessError as err :
423- raise MadoopError (
424- f"Command returned non-zero: "
425- f" { exe } < { input_path } > { output_path } "
426- ) from err
433+ futures = []
434+ with concurrent . futures . ThreadPoolExecutor (
435+ max_workers = multiprocessing . cpu_count ()
436+ ) as pool :
437+ for i , input_path in enumerate ( sorted ( input_dir . iterdir ())):
438+ output_path = output_dir / part_filename ( i )
439+ LOGGER . debug (
440+ "%s < %s > %s" ,
441+ exe . name , last_two ( input_path ), last_two ( output_path ),
442+ )
443+ futures . append ( pool . submit (
444+ reduce_single_file ,
445+ exe ,
446+ input_path ,
447+ output_path ,
448+ ))
449+ for future in concurrent . futures . as_completed ( futures ):
450+ exception = future . exception ()
451+ if exception :
452+ raise exception
427453 LOGGER .info ("Finished reduce executions: %s" , i + 1 )
428454
429455
0 commit comments