Skip to content

Commit 1fb3c2c

Browse files
committed
Merge remote-tracking branch 'origin/develop'
2 parents fec0a45 + b3353e7 commit 1fb3c2c

File tree

7 files changed

+133
-11
lines changed

7 files changed

+133
-11
lines changed

README_Hadoop_Streaming.md

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,14 @@ if __name__ == "__main__":
289289
## Tips and tricks
290290
These are some pro-tips for working with MapReduce programs written in Python for the Hadoop Streaming interface.
291291

292-
### Debugging
292+
### Print debug messages to `stderr`
293+
To avoid interfering with pipeline output in `stdout`, direct debugging print messages to `stderr`:
294+
295+
```python
296+
print("DEBUG finding bugs... ~(^._.)", file=sys.stderr)
297+
```
298+
299+
### Debugging with PDB
293300
We encounter a problem if we add a `breakpoint()` in `map.py`.
294301
```python
295302
for line in sys.stdin:
@@ -299,13 +306,13 @@ for line in sys.stdin:
299306
print(f"{word}\t1")
300307
```
301308

302-
PDB/PDB++ confuses the stdin being piped in from the input file for user input, so we get these errors:
309+
PDB confuses the stdin being piped in from the input file for user input, so we get these errors:
303310
```console
304311
$ cat input/input* | ./map.py
305312
...
306-
(Pdb++) *** SyntaxError: invalid syntax
307-
(Pdb++) *** SyntaxError: invalid syntax
308-
(Pdb++) *** SyntaxError: invalid syntax
313+
(Pdb) *** SyntaxError: invalid syntax
314+
(Pdb) *** SyntaxError: invalid syntax
315+
(Pdb) *** SyntaxError: invalid syntax
309316
...
310317
```
311318

@@ -325,7 +332,7 @@ Now our debugger works correctly.
325332
```console
326333
$ cat input/input* | ./map.py
327334
...
328-
(Pdb++)
335+
(Pdb)
329336
```
330337

331338
Don't forget to remove your temporary changes!

madoop/mapreduce.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def mapreduce(
4343
# Executable scripts must have valid shebangs
4444
is_executable(map_exe)
4545
is_executable(reduce_exe)
46+
if partitioner:
47+
is_executable(partitioner)
4648

4749
# Create a tmp directory which will be automatically cleaned up
4850
with tempfile.TemporaryDirectory(prefix="madoop-") as tmpdir:
@@ -173,7 +175,13 @@ def is_executable(exe):
173175
stderr=subprocess.PIPE,
174176
check=True,
175177
)
176-
except (subprocess.CalledProcessError, OSError) as err:
178+
except subprocess.CalledProcessError as err:
179+
raise MadoopError(
180+
f"Failed executable test: {err}"
181+
f"\n{err.stdout.decode()}" if err.stdout else ""
182+
f"{err.stderr.decode()}" if err.stderr else ""
183+
) from err
184+
except OSError as err:
177185
raise MadoopError(f"Failed executable test: {err}") from err
178186

179187

@@ -198,12 +206,17 @@ def map_single_chunk(exe, input_path, output_path, chunk):
198206
check=True,
199207
input=chunk,
200208
stdout=outfile,
209+
stderr=subprocess.PIPE
201210
)
202211
except subprocess.CalledProcessError as err:
203212
raise MadoopError(
204213
f"Command returned non-zero: "
205-
f"{exe} < {input_path} > {output_path}"
214+
f"{exe} < {input_path} > {output_path}\n"
215+
f"{err}"
216+
f"\n{err.stderr.decode()}" if err.stderr else ""
206217
) from err
218+
except OSError as err:
219+
raise MadoopError(f"Command returned non-zero: {err}") from err
207220

208221

209222
def map_stage(exe, input_dir, output_dir):
@@ -300,6 +313,7 @@ def partition_keys_custom(
300313
[partitioner, str(num_reducers)],
301314
stdin=stack.enter_context(inpath.open()),
302315
stdout=subprocess.PIPE,
316+
stderr=subprocess.PIPE,
303317
text=True,
304318
))
305319
for line, partition in zip(
@@ -326,8 +340,10 @@ def partition_keys_custom(
326340

327341
return_code = process.wait()
328342
if return_code:
343+
stderr_output = process.stderr.read()
329344
raise MadoopError(
330-
f"Partition executable returned non-zero: {str(partitioner)}"
345+
f"Partition executable returned non-zero: {str(partitioner)}\n"
346+
f"{stderr_output}"
331347
)
332348

333349

@@ -419,12 +435,17 @@ def reduce_single_file(exe, input_path, output_path):
419435
check=True,
420436
stdin=infile,
421437
stdout=outfile,
438+
stderr=subprocess.PIPE
422439
)
423440
except subprocess.CalledProcessError as err:
424441
raise MadoopError(
425442
f"Command returned non-zero: "
426-
f"{exe} < {input_path} > {output_path}"
443+
f"{exe} < {input_path} > {output_path}\n"
444+
f"{err}"
445+
f"\n{err.stderr.decode()}" if err.stderr else ""
427446
) from err
447+
except OSError as err:
448+
raise MadoopError(f"Command returned non-zero: {err}") from err
428449

429450

430451
def reduce_stage(exe, input_dir, output_dir):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "madoop"
7-
version = "1.3.0"
7+
version = "1.3.1"
88
description="A light weight MapReduce framework for education."
99
license = {file = "LICENSE"}
1010
authors = [

tests/test_api.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,64 @@ def test_input_path_spaces(tmpdir):
208208
TESTDATA_DIR/"word_count/correct/output",
209209
tmpdir/"output",
210210
)
211+
212+
213+
def test_map_exe_error_msg(tmpdir):
214+
"""Map exe returns non-zero with stderr output should produce an
215+
error message and forward the stderr output.
216+
"""
217+
with tmpdir.as_cwd(), pytest.raises(
218+
madoop.MadoopError,
219+
match="Map error message to stderr"
220+
):
221+
madoop.mapreduce(
222+
input_path=TESTDATA_DIR/"word_count/input",
223+
output_dir="output",
224+
map_exe=TESTDATA_DIR/"word_count/map_error_msg.py",
225+
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
226+
num_reducers=4,
227+
partitioner=None,
228+
)
229+
230+
with tmpdir.as_cwd(), pytest.raises(
231+
madoop.MadoopError,
232+
match="Map error message to stderr"
233+
):
234+
map_stage(
235+
exe=TESTDATA_DIR/"word_count/map_error_msg.py",
236+
input_dir=TESTDATA_DIR/"word_count/input",
237+
output_dir=Path(tmpdir),
238+
)
239+
240+
241+
def test_partition_exe_error_msg(tmpdir):
242+
"""Partition exe returns non-zero with stderr output should produce an
243+
error message and forward the stderr output.
244+
"""
245+
with tmpdir.as_cwd(), pytest.raises(
246+
madoop.MadoopError,
247+
match="Partition error message to stderr"
248+
):
249+
madoop.mapreduce(
250+
input_path=TESTDATA_DIR/"word_count/input",
251+
output_dir="output",
252+
map_exe=TESTDATA_DIR/"word_count/map.py",
253+
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
254+
num_reducers=4,
255+
partitioner=TESTDATA_DIR/"word_count/partition_error_msg.py",
256+
)
257+
258+
259+
def test_reduce_exe_error_msg(tmpdir):
260+
"""Reduce exe returns non-zero with stderr output should produce an
261+
error message and forward the stderr output.
262+
"""
263+
with tmpdir.as_cwd(), pytest.raises(
264+
madoop.MadoopError,
265+
match="Reduce error message to stderr"
266+
):
267+
reduce_stage(
268+
exe=TESTDATA_DIR/"word_count/reduce_error_msg.py",
269+
input_dir=TESTDATA_DIR/"word_count/input",
270+
output_dir=Path(tmpdir),
271+
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env python3
2+
"""Invalid map executable returns non-zero with an error message."""
3+
4+
import sys
5+
6+
7+
# Avoid error on executable check which has an empty string input
8+
input_lines_n = sum(1 for _ in sys.stdin)
9+
if input_lines_n > 1:
10+
sys.stderr.write("Map error message to stderr\n")
11+
sys.exit(1)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env python3
2+
"""Invalid partition executable returns non-zero with an error message."""
3+
4+
import sys
5+
6+
# Avoid error on executable check which has an empty string input
7+
input_lines_n = sum(1 for _ in sys.stdin)
8+
if input_lines_n > 1:
9+
sys.stderr.write("Partition error message to stdout\n")
10+
sys.stderr.write("Partition error message to stderr\n")
11+
sys.exit(1)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env python3
2+
"""Invalid reduce executable returns non-zero with an error message."""
3+
4+
import sys
5+
6+
7+
# Avoid error on executable check which has an empty string input
8+
input_lines_n = sum(1 for _ in sys.stdin)
9+
if input_lines_n > 1:
10+
sys.stderr.write("Reduce error message to stderr\n")
11+
sys.exit(1)

0 commit comments

Comments
 (0)