Skip to content
69 changes: 55 additions & 14 deletions agent/skyhook-agent/src/skyhook_agent/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# limitations under the License.


import contextlib
from multiprocessing import context
import sys
import os
import shutil
Expand Down Expand Up @@ -69,7 +71,9 @@ def _get_env_config() -> tuple[str]:

SKYHOOK_LOG_DIR = os.getenv("SKYHOOK_LOG_DIR", "/var/log/skyhook")

return SKYHOOK_RESOURCE_ID, SKYHOOK_DATA_DIR, SKYHOOK_ROOT_DIR, SKYHOOK_LOG_DIR
SKYHOOK_AGENT_WRITE_LOGS = os.getenv("SKYHOOK_AGENT_WRITE_LOGS", "true").lower() == 'true'

return SKYHOOK_RESOURCE_ID, SKYHOOK_DATA_DIR, SKYHOOK_ROOT_DIR, SKYHOOK_LOG_DIR, SKYHOOK_AGENT_WRITE_LOGS

def _get_package_information(config_data: dict) -> tuple[str, str]:
return config_data["package_name"], config_data["package_version"]
Expand Down Expand Up @@ -129,15 +133,43 @@ async def _stream_process(
sink.flush()
break

# A file like context manager that black holes all writes to it. Does not need to implement read
class NullWriter:
"""A file-like context manager that discards all writes."""

def write(self, *args, **kwargs):
# Swallow everything and return len to mimic file behaviour if needed
if args:
return len(args[0])
return 0

def flush(self):
pass

def close(self):
pass

def __enter__(self):
return self

async def tee(chroot_dir: str, cmd: List[str], stdout_sink_path: str, stderr_sink_path: str, write_cmds=False, no_chmod=False, env: dict[str, str] = {}, **kwargs):
def __exit__(self, exc_type, exc_val, exc_tb):
# Nothing to cleanup, obviously
return False


async def tee(chroot_dir: str, cmd: List[str], stdout_sink_path: str, stderr_sink_path: str, write_cmds=False, no_chmod=False, env: dict[str, str] = {}, write_logs: bool=True, **kwargs):
"""
Run the cmd in a subprocess and keep the stream of stdout/stderr and merge both into
the sink_path as a log.
"""
# get the directory of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
with open(stdout_sink_path, "w") as stdout_sink_f, open(stderr_sink_path, "w") as stderr_sink_f:
# Switch out the opens with nulls in the event of not wanting to write files
if write_logs:
files = (lambda : open(stdout_sink_path, 'w'), lambda: open(stderr_sink_path, 'w'))
else:
files = (lambda: NullWriter(), lambda: NullWriter())
with files[0]() as stdout_sink_f, files[1]() as stderr_sink_f:
if write_cmds:
sys.stdout.write(" ".join(cmd) + "\n")
stdout_sink_f.write(" ".join(cmd) + "\n")
Expand Down Expand Up @@ -172,7 +204,7 @@ def get_host_path_for_steps(copy_dir: str):
return f"{copy_dir}/skyhook_dir"

def get_skyhook_directory(root_mount: str) -> str:
_, _, SKYHOOK_ROOT_DIR, _ = _get_env_config()
_, _, SKYHOOK_ROOT_DIR, _, _ = _get_env_config()
return f"{root_mount}{SKYHOOK_ROOT_DIR}"

def get_flag_dir(root_mount: str) -> str:
Expand All @@ -182,7 +214,7 @@ def get_history_dir(root_mount: str) -> str:
return f"{get_skyhook_directory(root_mount)}/history"

def get_log_dir(root_mount: str) -> str:
_, _, _, SKYHOOK_LOG_DIR = _get_env_config()
_, _, _, SKYHOOK_LOG_DIR, _ = _get_env_config()
return f"{root_mount}{SKYHOOK_LOG_DIR}"

def get_log_file(step_path: str, copy_dir: str, config_data: dict, root_mount: str, timestamp: str=None) -> str:
Expand Down Expand Up @@ -220,21 +252,23 @@ def set_flag(flag_file: str, msg: str = "") -> None:
f.write(msg)


def _run(chroot_dir: str, cmds: list[str], log_path: str, write_cmds=False, no_chmod=False, env: dict[str, str] = {}, **kwargs) -> int:
def _run(chroot_dir: str, cmds: list[str], log_path: str|None, write_cmds=False, no_chmod=False, env: dict[str, str] = {}, write_logs: bool=True, **kwargs) -> int:
"""
Synchronous wrapper around the tee command to have logs written to disk
"""
# "tee" the stdout and stderr to a file to log the step results
stderr_path = f"{log_path}.err" if log_path else None

result = asyncio.run(
tee(
chroot_dir,
cmds,
log_path,
f"{log_path}.err",
stderr_path,
write_cmds=write_cmds,
no_chmod=no_chmod,
env=env,
write_logs=write_logs,
**kwargs
)
)
Expand Down Expand Up @@ -283,7 +317,11 @@ def run_step(
return True

time.sleep(1)
log_file = get_log_file(step_path, copy_dir, config_data, chroot_dir)
_, _, _, _, SKYHOOK_AGENT_WRITE_LOGS = _get_env_config()
if SKYHOOK_AGENT_WRITE_LOGS:
log_file = get_log_file(step_path, copy_dir, config_data, chroot_dir)
else:
log_file = None

# Compile additional environment variables
env = {}
Expand All @@ -294,9 +332,11 @@ def run_step(
chroot_dir,
[step_path, *step.arguments],
log_file,
env=env)
env=env,
write_logs=SKYHOOK_AGENT_WRITE_LOGS)

cleanup_old_logs(get_log_file(step_path, copy_dir, config_data, "*"))
if SKYHOOK_AGENT_WRITE_LOGS:
cleanup_old_logs(get_log_file(step_path, copy_dir, config_data, chroot_dir, "*"))
if return_code not in step.returncodes:
print(f"FAILED: {step.path} {' '.join(step.arguments)} {return_code}")
return True
Expand Down Expand Up @@ -421,7 +461,7 @@ def summarize_check_results(results: list[bool], step_data: dict[Mode, list[Step
return False

def make_config_data_from_resource_id() -> dict:
SKYHOOK_RESOURCE_ID, _, _, _ = _get_env_config()
SKYHOOK_RESOURCE_ID, _, _, _, _ = _get_env_config()

# Interrupts don't really have config data we can read from the Package as it is run standalone.
# So read it off of SKYHOOK_RESOURCE_ID instead
Expand All @@ -441,7 +481,7 @@ def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str) -> bool:
def _make_interrupt_flag(interrupt_dir: str, interrupt_id: int) -> str:
return f"{interrupt_dir}/{interrupt_id}.complete"

SKYHOOK_RESOURCE_ID, _, _, _ = _get_env_config()
SKYHOOK_RESOURCE_ID, _, _, _, _ = _get_env_config()
config_data = make_config_data_from_resource_id()

interrupt = interrupts.inflate(interrupt_data)
Expand Down Expand Up @@ -509,7 +549,7 @@ def main(mode: Mode, root_mount: str, copy_dir: str, interrupt_data: None|str, a
if mode == Mode.INTERRUPT:
return do_interrupt(interrupt_data, root_mount, copy_dir)

_, SKYHOOK_DATA_DIR, _, _ = _get_env_config()
_, SKYHOOK_DATA_DIR, _, _, _ = _get_env_config()

# Check to see if the directory has already been copied down. If it hasn't assume that we
# are running in legacy mode and copy the directory down.
Expand Down Expand Up @@ -651,12 +691,13 @@ def cli(sys_argv: list[str]=sys.argv):
print(str.center("ENV CONFIGURATION", 20, "-"))
print(f"COPY_RESOLV: {copy_resolv}")
print(f"OVERLAY_ALWAYS_RUN_STEP: {always_run_step}")
SKYHOOK_RESOURCE_ID, SKYHOOK_DATA_DIR, SKYHOOK_ROOT_DIR, SKYHOOK_LOG_DIR = _get_env_config()
SKYHOOK_RESOURCE_ID, SKYHOOK_DATA_DIR, SKYHOOK_ROOT_DIR, SKYHOOK_LOG_DIR, SKYHOOK_AGENT_WRITE_LOGS = _get_env_config()
print(f"SKYHOOK_RESOURCE_ID: {SKYHOOK_RESOURCE_ID}")
print(f"SKYHOOK_DATA_DIR: {SKYHOOK_DATA_DIR}")
print(f"SKYHOOK_ROOT_DIR: {SKYHOOK_ROOT_DIR}")
print(f"SKYHOOK_LOG_DIR: {SKYHOOK_LOG_DIR}")
print(f"SKYHOOK_AGENT_BUFFER_LIMIT: {buff_size}")
print(f"SKYHOOK_AGENT_WRITE_LOGS: {SKYHOOK_AGENT_WRITE_LOGS}")
print(str.center("Directory CONFIGURATION", 20, "-"))
# print flag dir and log dir
config_data = make_config_data_from_resource_id()
Expand Down
Loading