Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 72 additions & 15 deletions terrawrap/utils/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Module for containing CLI convenience functions"""

from __future__ import print_function

import errno
import logging
import subprocess
import tempfile
Expand Down Expand Up @@ -159,6 +161,67 @@ def execute_command(
return exit_code, stdout


def _decode_chunk_safely(chunk: bytes) -> str:
"""Safely decode a byte chunk to string with fallback handling."""
try:
return chunk.decode(errors="replace")
except UnicodeDecodeError:
return chunk.decode(errors="ignore")


def _read_live_output(stdout_read, process, print_output: bool) -> int:
"""Read live output from process with memory-efficient buffering."""
buffer_size = 8192 # 8KB buffer

while True:
try:
chunk = stdout_read.read(buffer_size)
if not chunk and process.poll() is not None:
break

if chunk and print_output:
decoded_chunk = _decode_chunk_safely(chunk)
print(decoded_chunk, end="", flush=True)

except OSError as os_error:
if os_error.errno == errno.ENOMEM: # Cannot allocate memory
logger.warning(
"Memory allocation issue while reading output, reducing buffer size"
)
buffer_size = max(1024, buffer_size // 2)
continue
raise

return buffer_size


def _collect_final_output(stdout_read, buffer_size: int) -> List[str]:
"""Collect final output from file handle with memory error handling."""
stdout_read.seek(0)
stdout = []

try:
while True:
chunk = stdout_read.read(buffer_size)
if not chunk:
break

decoded_chunk = _decode_chunk_safely(chunk)
decoded_lines = decoded_chunk.splitlines(keepends=True)
stdout.extend(decoded_lines)

except OSError as os_error:
if os_error.errno == errno.ENOMEM: # Cannot allocate memory
logger.warning(
"Memory allocation issue while reading final output, truncating"
)
stdout.append("...[Output truncated due to memory constraints]...\n")
else:
raise

return stdout


def _execute_command(
args: Union[List[str], str],
print_output: bool,
Expand Down Expand Up @@ -190,24 +253,18 @@ def _execute_command(
# pylint: disable=consider-using-with
process = subprocess.Popen(args, *pargs, **kwargs)

while True:
output = stdout_read.read(1).decode(errors="replace")

if output == "" and process.poll() is not None:
break

if print_output and output:
print(output, end="", flush=True)

# Read live output with memory-efficient buffering
buffer_size = _read_live_output(stdout_read, process, print_output)
exit_code = process.poll()

stdout_read.seek(0)
stdout = [line.decode(errors="replace") for line in stdout_read.readlines()]
# Ensure exit_code is not None (it shouldn't be after _read_live_output completes)
if exit_code is None:
exit_code = process.wait()

# Collect final output
stdout = _collect_final_output(stdout_read, buffer_size)

# ignoring mypy error below because it thinks exit_code can sometimes be None
# we know that will never be the case because the above While loop will keep looping forever
# until exit_code is not None
return exit_code, stdout # type: ignore
return exit_code, stdout


def _get_retriable_errors(out: List[str]) -> List[str]:
Expand Down
2 changes: 1 addition & 1 deletion terrawrap/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Place of record for the package version"""

__version__ = "0.10.8"
__version__ = "0.10.9"
__git_hash__ = "GIT_HASH"
119 changes: 113 additions & 6 deletions test/unit/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
"""Test git utilities"""
import errno
import os
from logging import Logger
from unittest import TestCase
from unittest.mock import patch, ANY, call
from unittest.mock import patch, ANY, call, mock_open, MagicMock
from requests.exceptions import HTTPError

from terrawrap.utils.cli import execute_command, MAX_RETRIES, Status, _post_audit_info
from terrawrap.utils.cli import (
execute_command,
MAX_RETRIES,
Status,
_post_audit_info,
_execute_command,
)


MOCK_ERROR = HTTPError()
Expand Down Expand Up @@ -38,11 +45,11 @@ def test_execute_command(self):

@patch("terrawrap.utils.cli._get_retriable_errors")
@patch("io.open")
def test_execute_command_retry(self, mock_open, mock_network_error):
def test_execute_command_retry(self, mock_io_open, mock_network_error):
"""Test retrying execution because of network errors"""
self.mock_process.poll.side_effect = [1, 1, 1, 0]
mock_network_error.side_effect = [["Throttling"], []]
mock_stdout_read = mock_open.return_value
mock_stdout_read = mock_io_open.return_value
mock_stdout_read.readline.return_value = b""

exit_code, stdout = execute_command(["echo", "1"], retry=True)
Expand All @@ -53,7 +60,7 @@ def test_execute_command_retry(self, mock_open, mock_network_error):

@patch("terrawrap.utils.cli._get_retriable_errors")
@patch("io.open")
def test_execute_command_max_retry(self, mock_open, mock_network_error):
def test_execute_command_max_retry(self, mock_io_open_2, mock_network_error):
"""Test retrying execution because of network errors up to 5 times"""
self.mock_process.poll.return_value = 255
mock_network_error.side_effect = [
Expand All @@ -64,7 +71,7 @@ def test_execute_command_max_retry(self, mock_open, mock_network_error):
["Throttling"],
["unexpected EOF"],
]
mock_stdout_read = mock_open.return_value
mock_stdout_read = mock_io_open_2.return_value
mock_stdout_read.readline.return_value = b""

exit_code, stdout = execute_command(["echo", "1"], retry=True)
Expand Down Expand Up @@ -124,3 +131,103 @@ def test_post_audit_info_statuses(self, mock_post, _):
},
timeout=30,
)

@patch("tempfile.mkstemp")
@patch.object(Logger, "warning")
def test_execute_command_memory_error(self, mock_logger, mock_mkstemp):
"""Test handling of OSError errno 12 (Cannot allocate memory) during command execution"""
# Setup mock file objects
mock_stdout_fd = 3
mock_stdout_path = "/tmp/mock_stdout"
mock_mkstemp.return_value = (mock_stdout_fd, mock_stdout_path)

# Create mock file object that raises OSError ENOMEM on first read, then succeeds
mock_file = MagicMock()
memory_error = OSError()
memory_error.errno = errno.ENOMEM # Cannot allocate memory

# Configure read to fail first, then succeed
mock_file.read.side_effect = [
memory_error, # First read fails with memory error
b"test output", # Second read succeeds with reduced buffer
b"", # Third read returns empty (process finished)
b"", # Additional reads for final output collection
b"", # More reads to handle any additional calls
] + [
b""
] * 10 # Ensure we have enough empty responses

# Mock process
mock_process = MagicMock()
mock_process.poll.return_value = 0 # Process finished successfully

with patch("builtins.open", mock_open()) as mock_file_open:
mock_file_open.return_value.__enter__.return_value = mock_file

with patch("subprocess.Popen", return_value=mock_process):
# Test that the function handles memory error gracefully
exit_code, stdout = _execute_command(
["test", "command"],
print_output=False,
capture_stderr=True,
print_command=False,
)

# Verify the function completed successfully
self.assertEqual(exit_code, 0)
self.assertIsInstance(stdout, list)

# Verify that warning was logged about memory allocation issue
mock_logger.assert_called_with(
"Memory allocation issue while reading output, reducing buffer size"
)

# Verify that read was called multiple times (initial failure, then retry)
self.assertTrue(mock_file.read.call_count >= 2)

@patch("tempfile.mkstemp")
@patch.object(Logger, "warning")
def test_execute_command_memory_error_final(self, mock_logger, mock_mkstemp):
"""Test handling of OSError errno 12 during final output reading"""
# Setup mock file objects
mock_stdout_fd = 3
mock_stdout_path = "/tmp/mock_stdout"
mock_mkstemp.return_value = (mock_stdout_fd, mock_stdout_path)

# Create mock file object that works for live reading but fails on final read
mock_file = MagicMock()
memory_error = OSError()
memory_error.errno = errno.ENOMEM # Cannot allocate memory

# Setup side effects: normal read during live output, then memory error on seek+read
mock_file.read.side_effect = [
b"",
memory_error,
] # Empty for live, error for final
mock_process = MagicMock()
mock_process.poll.return_value = 0

with patch("builtins.open", mock_open()) as mock_file_open:
mock_file_open.return_value.__enter__.return_value = mock_file

with patch("subprocess.Popen", return_value=mock_process):
# Test that the function handles memory error gracefully during final read
exit_code, stdout = _execute_command(
["test", "command"],
print_output=False,
capture_stderr=True,
print_command=False,
)

# Verify the function completed successfully
self.assertEqual(exit_code, 0)

# Verify that warning was logged about memory allocation issue during final read
mock_logger.assert_called_with(
"Memory allocation issue while reading final output, truncating"
)

# Verify that stdout contains the truncation message
self.assertIn(
"...[Output truncated due to memory constraints]...\n", stdout
)