Skip to content

Commit 3270caf

Browse files
committed
Merge remote-tracking branch 'origin/develop'
2 parents b8cd5a2 + 6c6d7fe commit 3270caf

File tree

26 files changed

+97
-118
lines changed

26 files changed

+97
-118
lines changed

madoop/__main__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ def __call__(self, parser, *args, **kwargs):
8787
src = madoop_dir/"example"
8888
dst = pathlib.Path("example")
8989
if dst.exists():
90-
print(f"Error: directory already exists: {dst}")
91-
parser.exit(1)
90+
parser.error(f"directory already exists: {dst}")
9291
shutil.copytree(src, dst)
9392
print(textwrap.dedent(f"""\
9493
Created {dst}, try:

madoop/mapreduce.py

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
"""
66
import contextlib
7+
import collections
78
import hashlib
89
import logging
910
import math
@@ -84,15 +85,15 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
8485
)
8586

8687
# Move files from temporary output dir to user-specified output dir
87-
for filename in reduce_output_dir.glob("*"):
88+
total_size = 0
89+
for filename in sorted(reduce_output_dir.glob("*")):
90+
st_size = filename.stat().st_size
91+
total_size += st_size
8892
shutil.copy(filename, output_dir)
93+
output_path = output_dir.parent/last_two(filename)
94+
LOGGER.debug("%s size=%sB", output_path, st_size)
8995

9096
# Remind user where to find output
91-
total_size = 0
92-
for outpath in sorted(output_dir.iterdir()):
93-
st_size = outpath.stat().st_size
94-
total_size += st_size
95-
LOGGER.debug("%s size=%sB", outpath, st_size)
9697
LOGGER.debug("total output size=%sB", total_size)
9798
LOGGER.info("Output directory: %s", output_dir)
9899

@@ -220,37 +221,25 @@ def keyhash(key):
220221
return int(hexdigest, base=16)
221222

222223

223-
def partition_keys(inpath, outpaths):
224-
"""Allocate lines of inpath among outpaths using hash of key."""
224+
def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats):
225+
"""Allocate lines of inpath among outpaths using hash of key.
226+
227+
Update the data structures provided by the caller input_keys_stats and
228+
output_keys_stats. Both map a filename to a set of of keys.
229+
230+
"""
225231
assert len(outpaths) == MAX_NUM_REDUCE
226232
outparent = outpaths[0].parent
227233
assert all(i.parent == outparent for i in outpaths)
228-
outnames = [i.name for i in outpaths]
229-
LOGGER.debug(
230-
"partition %s >> %s/{%s}",
231-
last_two(inpath), outparent.name, ",".join(outnames),
232-
)
233234
with contextlib.ExitStack() as stack:
234235
outfiles = [stack.enter_context(p.open("a")) for p in outpaths]
235236
for line in stack.enter_context(inpath.open()):
236237
key = line.partition('\t')[0]
238+
input_keys_stats[inpath].add(key)
237239
reducer_idx = keyhash(key) % MAX_NUM_REDUCE
238240
outfiles[reducer_idx].write(line)
239-
240-
241-
def keyspace(path):
242-
"""Return the number of unique keys in {path}.
243-
244-
WARNING: This is a terribly slow implementation. It would be faster to
245-
record this information while grouping.x
246-
247-
"""
248-
keys = set()
249-
with path.open() as infile:
250-
for line in infile:
251-
key = line.partition('\t')[0]
252-
keys.add(key)
253-
return keys
241+
outpath = outpaths[reducer_idx]
242+
output_keys_stats[outpath].add(key)
254243

255244

256245
def group_stage(input_dir, output_dir):
@@ -260,22 +249,34 @@ def group_stage(input_dir, output_dir):
260249
using the hash and modulo of the key.
261250
262251
"""
263-
# Detailed keyspace debug output THIS IS SLOW
264-
all_keys = set()
265-
for inpath in sorted(input_dir.iterdir()):
266-
keys = keyspace(inpath)
267-
all_keys.update(keys)
268-
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
269-
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_keys))
270-
271252
# Compute output filenames
272253
outpaths = []
273254
for i in range(MAX_NUM_REDUCE):
274255
outpaths.append(output_dir/part_filename(i))
275256

276-
# Parition input, appending to output files
257+
# Track keyspace stats, map filename -> set of keys
258+
input_keys_stats = collections.defaultdict(set)
259+
output_keys_stats = collections.defaultdict(set)
260+
261+
# Partition input, appending to output files
277262
for inpath in sorted(input_dir.iterdir()):
278-
partition_keys(inpath, outpaths)
263+
partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats)
264+
265+
# Log input keyspace stats
266+
all_input_keys = set()
267+
for inpath, keys in sorted(input_keys_stats.items()):
268+
all_input_keys.update(keys)
269+
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
270+
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys))
271+
272+
# Log partition input and output filenames
273+
outnames = [i.name for i in outpaths]
274+
outparent = outpaths[0].parent
275+
for inpath in sorted(input_keys_stats.keys()):
276+
LOGGER.debug(
277+
"partition %s >> %s/{%s}",
278+
last_two(inpath), outparent.name, ",".join(outnames),
279+
)
279280

280281
# Remove empty output files. We won't always use the maximum number of
281282
# reducers because some MapReduce programs have fewer intermediate keys.
@@ -288,13 +289,13 @@ def group_stage(input_dir, output_dir):
288289
for path in sorted(output_dir.iterdir()):
289290
sort_file(path)
290291

291-
# Detailed keyspace debug output THIS IS SLOW
292-
all_keys = set()
293-
for outpath in sorted(output_dir.iterdir()):
294-
keys = keyspace(outpath)
295-
all_keys.update(keys)
292+
# Log output keyspace stats
293+
all_output_keys = set()
294+
for outpath, keys in sorted(output_keys_stats.items()):
295+
all_output_keys.update(keys)
296296
LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys))
297-
LOGGER.debug("%s all_unique_keys=%s", output_dir.name, len(all_keys))
297+
LOGGER.debug("%s all_unique_keys=%s", output_dir.name,
298+
len(all_output_keys))
298299

299300

300301
def reduce_stage(exe, input_dir, output_dir):

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
description="A light weight MapReduce framework for education.",
1515
long_description=LONG_DESCRIPTION,
1616
long_description_content_type="text/markdown",
17-
version="0.2.0",
17+
version="0.3.0",
1818
author="Andrew DeOrio",
1919
author_email="[email protected]",
2020
url="https://github.com/eecs485staff/madoop/",

tests/test_cli.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,22 @@ def test_hadoop_arguments(tmpdir):
9191
def test_example(tmpdir):
9292
"""Example option should copy files."""
9393
with tmpdir.as_cwd():
94-
subprocess.run(["madoop", "--example"], check=True)
94+
subprocess.run(
95+
["madoop", "--example"],
96+
check=True,
97+
stderr=subprocess.PIPE,
98+
stdout=subprocess.PIPE,
99+
)
95100
assert (tmpdir/"example/input/input01.txt").exists()
96101
assert (tmpdir/"example/input/input02.txt").exists()
97102
assert (tmpdir/"example/map.py").exists()
98103
assert (tmpdir/"example/reduce.py").exists()
99104

100105
# Call it again and it should refuse to clobber
101106
with tmpdir.as_cwd(), pytest.raises(subprocess.CalledProcessError):
102-
subprocess.run(["madoop", "--example"], check=True)
107+
subprocess.run(
108+
["madoop", "--example"],
109+
check=True,
110+
stderr=subprocess.PIPE,
111+
stdout=subprocess.PIPE,
112+
)

tests/test_stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def test_map_stage(tmpdir):
99
"""Test the map stage using word count example."""
1010
map_stage(
1111
exe=TESTDATA_DIR/"word_count/map.py",
12-
input_dir=TESTDATA_DIR/"word_count/correct/input",
12+
input_dir=TESTDATA_DIR/"word_count/input",
1313
output_dir=Path(tmpdir),
1414
)
1515
utils.assert_dirs_eq(
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
1-
cool 1
2-
file 1
3-
file 1
4-
file 1
5-
file 1
6-
streaming 1
1+
Goodbye 1

tests/testdata/word_count/correct/grouper-output/part-00001

Lines changed: 0 additions & 6 deletions
This file was deleted.
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
google 1
2-
map 1
3-
map 1
4-
map 1
5-
map 1
6-
system 1
7-
system 1
1+
Bye 1
2+
Hadoop 1
3+
Hadoop 1
4+
World 1
5+
World 1
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Hello 1
2+
Hello 1

tests/testdata/word_count/correct/input/part-00000

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)