Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pipeline/infrastructure/casa_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def format_arg_value(arg_val):
tool_call = '{!s}.{!s}({!s})'.format(fn.__module__, fn.__name__, ', '.join(msg_args))
CASACALLS_LOG.log(level, tool_call)

start_time = datetime.datetime.utcnow()
start_time = datetime.datetime.now(datetime.timezone.utc)
try:
return fn(*args, **kwargs)
finally:
end_time = datetime.datetime.utcnow()
end_time = datetime.datetime.now(datetime.timezone.utc)
elapsed = end_time - start_time
LOG.log(level, '{} CASA tool call took {}s'.format(tool_call, elapsed.total_seconds()))

Expand Down
22 changes: 10 additions & 12 deletions pipeline/infrastructure/renderer/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# *****************************************************************************
import collections
import distutils.spawn as spawn
import shutil
import itertools
import os
import platform
Expand All @@ -33,31 +33,29 @@
LOG = logging.get_logger(__name__)

# Set the command used to shrink plots down to thumbnails. If set to None, no
# thumbnails will be generated
THUMBNAIL_CMD = None
# thumbnails will be generated
THUMBNAIL_CMD = None


# first try to find ImageMagick's 'mogrify' command. We assume that
# ImageMagick's 'convert' commnand can be found in the same directory. We
# do not search for 'convert' directly as some utilities also provide a
# 'convert' command which may come earlier on the PATH.
mogrify_path = spawn.find_executable('mogrify')
# 'convert' command which may come earlier on the PATH.
mogrify_path = shutil.which('mogrify')
if mogrify_path:
bin_dir = os.path.dirname(mogrify_path)
convert_path = os.path.join(bin_dir, 'convert')
if os.path.exists(convert_path):
LOG.trace('Using convert executable at \'%s\' to generate '
'thumbnails' % convert_path)
THUMBNAIL_CMD = lambda full, thumb : (convert_path, full, '-thumbnail', '250x188', thumb)
LOG.trace("Using convert executable at '%s' to generate thumbnails", convert_path)
THUMBNAIL_CMD = lambda full, thumb: (convert_path, full, '-thumbnail', '250x188', thumb)

if THUMBNAIL_CMD is None and platform.system() == 'Darwin':
# macOS only: fallback to sips if ImageMagick, e.g. from MacPorts or Homebrew is not found on macOS. sips is a system
# executable that should be available on all macOS systems.
sips_path = spawn.find_executable('sips')
sips_path = shutil.which('sips')
if sips_path:
LOG.trace('Using sips executable at \'%s\' to generate thumbnails'
% sips_path)
THUMBNAIL_CMD = lambda full, thumb : (sips_path, '-Z', '250', '--out', thumb, full)
LOG.trace("Using sips executable at '%s' to generate thumbnails", sips_path)
THUMBNAIL_CMD = lambda full, thumb: (sips_path, '-Z', '250', '--out', thumb, full)

if THUMBNAIL_CMD is None:
LOG.warning('Could not find the ImageMagick \'convert\' or macOS \'sips\' command. '
Expand Down
6 changes: 3 additions & 3 deletions pipeline/infrastructure/utils/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def unix_seconds_to_datetime(unix_secs: list[int | float]) -> list[datetime.date
Returns:
List of equivalent Python datetime objects.
"""
return [datetime.datetime.utcfromtimestamp(s) for s in unix_secs]
return [datetime.datetime.fromtimestamp(s, datetime.timezone.utc) for s in unix_secs]


def mjd_seconds_to_datetime(mjd_secs: list[int | float]) -> list[datetime.datetime]:
Expand All @@ -200,7 +200,7 @@ def mjd_seconds_to_datetime(mjd_secs: list[int | float]) -> list[datetime.dateti
# 1970-01-01 is JD 40587. 86400 = seconds in a day
unix_offset = 40587 * 86400
mjd_secs_with_offsets = [s - unix_offset for s in mjd_secs]
return unix_seconds_to_datetime(mjd_secs_with_offsets)
return [datetime.datetime.fromtimestamp(s, datetime.timezone.utc) for s in mjd_secs_with_offsets]


def get_epoch_as_datetime(epoch: dict) -> datetime.datetime:
Expand All @@ -225,7 +225,7 @@ def get_epoch_as_datetime(epoch: dict) -> datetime.datetime:
t = mt.getvalue(epoch_utc)['m0']
t = qt.sub(t, base_time)
t = qt.convert(t, 's')
t = datetime.datetime.utcfromtimestamp(qt.getvalue(t)[0])
t = datetime.datetime.fromtimestamp(qt.getvalue(t)[0], datetime.timezone.utc)

return t

Expand Down
188 changes: 188 additions & 0 deletions pipeline/infrastructure/utils/pclean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import pickle
import shutil
import subprocess
import sys
import tempfile
import textwrap
from pathlib import Path
from typing import Any, Dict, Optional, Union

import casaconfig.config
import casatasks

from pipeline.infrastructure import logging

LOG = logging.get_logger(__name__)


def find_executable(start_dir: Optional[str] = None) -> Dict[str, Optional[str]]:
"""Search upward from start_dir for MPI-related executables.

The function looks for 'bin/mpirun', 'bin/mpicasa' and 'bin/casa' in the
current directory and each parent until the filesystem root is reached.

Args:
start_dir: Directory to start searching from. If None, use cwd.

Returns:
Mapping from executable name to absolute path or None if not found.
"""
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states the function looks for executables in "the current directory and each parent" but the implementation actually starts from 'start_dir' (not necessarily the current directory) if provided. Consider clarifying the docstring to say "Searches upward from start_dir (or current directory if None) for MPI-related executables in bin/ subdirectories of each parent directory until the filesystem root is reached."

Copilot uses AI. Check for mistakes.
search_patterns = ['mpirun', 'mpicasa', 'casa']
exe_dict: Dict[str, Optional[str]] = dict.fromkeys(search_patterns)
current = Path(start_dir or Path.cwd()).resolve()

for pattern in search_patterns:
cur = current
found: Optional[str] = None
while True:
candidate = cur / 'bin' / pattern
if candidate.is_file():
found = str(candidate)
break
parent = cur.parent
if cur == parent:
break
cur = parent
exe_dict[pattern] = found

return exe_dict


def pclean(
*args: Any,
parallel: Union[bool, Dict[str, int]] = False,
**kwargs: Any,
) -> Any:
"""Execute tclean and return the result.

When `parallel` is False, tclean is invoked directly in-process. When
`parallel` is True or a dict, the function serializes arguments to a
temporary file and runs a Python subprocess (optionally under MPI).
The subprocess writes a pickled result to a second temporary file which
is read back here. This allows tclean to run in parallel mode even when the
main process is not running under MPI.

Args:
*args: Positional arguments forwarded to tclean.
parallel: False to run serially, True or dict to run via subprocess/MPI.
**kwargs: Keyword arguments forwarded to tclean.

Returns:
The (picklable) return value from tclean.

Raises:
RuntimeError: If the subprocess reports an error or the call fails.
"""
if parallel is False:
return casatasks.tclean(*args, parallel=parallel, **kwargs)

# prepare temporary files
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as f_in:
input_file = f_in.name
pickle.dump({'args': args, 'parallel': True, 'kwargs': kwargs}, f_in)

with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as f_out:
output_file = f_out.name
Comment on lines +115 to +121
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temporary files containing pickled data (input_file and output_file) are created with default permissions that may be world-readable on some systems. Consider using more restrictive permissions (e.g., mode=0o600) or tempfile.TemporaryDirectory with appropriate permissions to prevent other users from potentially reading sensitive tclean parameters or results. While pickle is used consistently throughout the codebase, temporary files with pickled data should have restricted access.

Copilot uses AI. Check for mistakes.

try:
# Inline script to run inside the subprocess
script = textwrap.dedent(
f"""\

import pickle
import sys

# casaconfig should be imported/updated first before importing casatasks
# the statement order matters here
import casaconfig.config
casaconfig.config.logfile={casatasks.casalog.logfile()!r}
casaconfig.config.log2term = {bool(getattr(casaconfig.config, 'log2term', False))}

import casampi.private.start_mpi # nescary if not executed via casashell
import casatasks
from casatasks import tclean
casatasks.casalog.showconsole(onconsole={bool(getattr(casaconfig.config, 'log2term', False))})

# from casampi.MPIEnvironment import MPIEnvironment
# from casampi.MPICommandClient import MPICommandClient
# __client = MPICommandClient()
# __client.set_log_mode('redirect')
# __client.start_services()


Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These commented-out lines of code appear to be dead code that was left in during development. If these lines are not needed, they should be removed to keep the codebase clean. If they might be needed in the future, consider adding a comment explaining why they're kept and under what circumstances they would be uncommented.

Suggested change
# from casampi.MPIEnvironment import MPIEnvironment
# from casampi.MPICommandClient import MPICommandClient
# __client = MPICommandClient()
# __client.set_log_mode('redirect')
# __client.start_services()

Copilot uses AI. Check for mistakes.
with open({input_file!r}, 'rb') as f:
data = pickle.load(f)
args = data['args']
kwargs = data['kwargs']
parallel = data['parallel']

try:
result = tclean(*args, parallel=parallel, **kwargs)
with open({output_file!r}, 'wb') as f:
pickle.dump({{'success': True, 'result': result}}, f)
except Exception as exc:
with open({output_file!r}, 'wb') as f:
pickle.dump({{'success': False, 'error': str(exc), 'type': type(exc).__name__}}, f)
sys.exit(1)
"""
)

# Base call: run the Python interpreter with the inline script
# We intentionally avoid `casa` or `python -m casashell` here to reduce
# dependencies and diffiencetial between monolithic and modular CASA6 environments.
# casatasks should be importable directly from the standard Python environment.
call_args = [sys.executable, '-c', script]

# If parallel is requested, prefix with an MPI launcher
if parallel:
if isinstance(parallel, dict):
nproc = int(parallel.get('nproc', 4))
else:
nproc = 4

exe_dict = find_executable(Path(sys.executable).parent.as_posix())
# mpicasa (monolithic env) takes precedence over mpirun (modular env)
# note that shutil.which('mpirun') may also find a system-wide mpirun -- which usually does not work
mpiexec = exe_dict.get('mpicasa') or exe_dict.get('mpirun') # or shutil.which('mpirun')
if not mpiexec:
LOG.warning('No MPI launcher found; falling back to serial Python execution.')
else:
# build MPI launcher arguments
mpi_args = [mpiexec, '-n', str(nproc), '-display-allocation', '-display-map', '-oversubscribe']
call_args = mpi_args + call_args

# If an X virtual framebuffer wrapper exists, run with it
xvfb_run = shutil.which('xvfb-run')
if xvfb_run:
call_args = [xvfb_run, '-a'] + call_args

LOG.debug('Executing command: %s', ' '.join(call_args))

try:
completed = subprocess.run(call_args, check=True, shell=False, capture_output=False, text=True)
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting capture_output=False means stdout and stderr won't be captured, so lines 163-164 and 167-168 will always log None. Either set capture_output=True to capture the output for logging, or remove the logging statements for stdout/stderr since they will be empty.

Suggested change
completed = subprocess.run(call_args, check=True, shell=False, capture_output=False, text=True)
completed = subprocess.run(call_args, check=True, shell=False, capture_output=True, text=True)

Copilot uses AI. Check for mistakes.
LOG.debug('Subprocess stdout: %s', completed.stdout)
LOG.debug('Subprocess stderr: %s', completed.stderr)
except subprocess.CalledProcessError as e:
LOG.error('Subprocess failed; returncode=%s', e.returncode)
LOG.error('stderr: %s', e.stderr)
LOG.error('stdout: %s', e.stdout)
raise RuntimeError('tclean subprocess execution failed') from e

# read result
with open(output_file, 'rb') as f:
output = pickle.load(f)
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the subprocess crashes or is killed before it can write to the output file, attempting to open and read the output file will raise a FileNotFoundError or EOFError (for an empty pickle file). Consider checking if the output file exists and has content before attempting to unpickle it, or wrap this in a try-except to provide a clearer error message.

Copilot uses AI. Check for mistakes.

if not output.get('success', False):
raise RuntimeError(f'{output.get("type")}: {output.get("error")}')

return output.get('result')

finally:
# cleanup temporary files
try:
Path(input_file).unlink(missing_ok=True)
Path(output_file).unlink(missing_ok=True)
except Exception:
LOG.debug('Failed to remove temporary files', exc_info=True)


Loading