Conversation
…p deprecation warnings.
…sts for `CASA6` executable search and tclean/parallel execution in an independent subprocess.
There was a problem hiding this comment.
Pull request overview
This pull request introduces a new parallel image cleaning utility (pclean) for CASA's tclean function, along with comprehensive tests and several improvements to modernize time handling and executable discovery across the pipeline codebase.
- Adds
pcleanmodule providing flexible serial/parallel execution of CASA'stcleanwith MPI support - Modernizes datetime handling to use timezone-aware UTC datetimes
- Replaces deprecated
distutils.spawn.find_executablewithshutil.which
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| pipeline/infrastructure/utils/pclean.py | New module implementing pclean() for serial/parallel tclean execution and find_executable() for locating MPI launchers |
| pipeline/infrastructure/utils/pclean_test.py | Comprehensive test suite covering unit tests for executable discovery and integration tests for serial/parallel tclean execution |
| pipeline/infrastructure/utils/conversion.py | Updated three datetime conversion functions to use timezone-aware UTC datetimes |
| pipeline/infrastructure/renderer/logger.py | Migrated from deprecated distutils to shutil.which for executable discovery and updated logging statements |
| pipeline/infrastructure/casa_tools.py | Updated timing code to use timezone-aware UTC datetimes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| LOG.debug('Executing command: %s', ' '.join(call_args)) | ||
|
|
||
| try: | ||
| completed = subprocess.run(call_args, check=True, shell=False, capture_output=False, text=True) |
There was a problem hiding this comment.
Setting capture_output=False means stdout and stderr won't be captured, so lines 163-164 and 167-168 will always log None. Either set capture_output=True to capture the output for logging, or remove the logging statements for stdout/stderr since they will be empty.
| completed = subprocess.run(call_args, check=True, shell=False, capture_output=False, text=True) | |
| completed = subprocess.run(call_args, check=True, shell=False, capture_output=True, text=True) |
| # read result | ||
| with open(output_file, 'rb') as f: | ||
| output = pickle.load(f) |
There was a problem hiding this comment.
If the subprocess crashes or is killed before it can write to the output file, attempting to open and read the output file will raise a FileNotFoundError or EOFError (for an empty pickle file). Consider checking if the output file exists and has content before attempting to unpickle it, or wrap this in a try-except to provide a clearer error message.
* Fix typos Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…nix_seconds_to_datetime` and the associated test.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def find_executable(start_dir: Optional[str] = None) -> Dict[str, Optional[str]]: | ||
| """Search upward from start_dir for MPI-related executables. | ||
|
|
||
| The function looks for 'bin/mpirun', 'bin/mpicasa' and 'bin/casa' in the | ||
| current directory and each parent until the filesystem root is reached. | ||
|
|
||
| Args: | ||
| start_dir: Directory to start searching from. If None, use cwd. | ||
|
|
||
| Returns: | ||
| Mapping from executable name to absolute path or None if not found. | ||
| """ |
There was a problem hiding this comment.
The docstring states the function looks for executables in "the current directory and each parent" but the implementation actually starts from 'start_dir' (not necessarily the current directory) if provided. Consider clarifying the docstring to say "Searches upward from start_dir (or current directory if None) for MPI-related executables in bin/ subdirectories of each parent directory until the filesystem root is reached."
| def test_serial_dirty_image(self, valid_ms, image_output): | ||
| """Test serial tclean execution produces expected output.""" | ||
| pclean.pclean(vis=[valid_ms], | ||
| field='helms30', spw=['0'], antenna=['0,1,2,3,4,5,6,7,8,9,10&'], | ||
| scan=['10'], intent='OBSERVE_TARGET#ON_SOURCE', datacolumn='data', | ||
| imagename=image_output, | ||
| imsize=[90, 90], cell=['0.91arcsec'], phasecenter='ICRS 01:03:01.3200 -000.32.59.640', | ||
| stokes='I', specmode='cube', nchan=117, | ||
| start='214.4501854310GHz', width='15.6245970MHz', outframe='LSRK', | ||
| perchanweightdensity=True, gridder='standard', mosweight=False, | ||
| usepointing=False, pblimit=0.2, deconvolver='hogbom', restoration=True, | ||
| restoringbeam='common', pbcor=True, weighting='briggsbwtaper', | ||
| robust=0.5, npixels=0, niter=0, threshold='0.0mJy', nsigma=0.0, | ||
| interactive=False, fullsummary=False, usemask='auto-multithresh', | ||
| sidelobethreshold=1.25, noisethreshold=5.0, lownoisethreshold=2.0, | ||
| negativethreshold=0.0, minbeamfrac=0.1, growiterations=75, | ||
| dogrowprune=True, minpercentchange=1.0, fastnoise=False, | ||
| savemodel='none', parallel=False) | ||
|
|
||
| # Verify output was created | ||
| assert self._image_exists(image_output), 'Output image was not created' | ||
|
|
||
| def test_parallel_dirty_image(self, valid_ms, image_output): | ||
| """Test parallel tclean execution produces expected output.""" | ||
| pclean.pclean(vis=[valid_ms], | ||
| field='helms30', spw=['0'], antenna=['0,1,2,3,4,5,6,7,8,9,10&'], | ||
| scan=['10'], intent='OBSERVE_TARGET#ON_SOURCE', datacolumn='data', | ||
| imagename=image_output, | ||
| imsize=[90, 90], cell=['0.91arcsec'], phasecenter='ICRS 01:03:01.3200 -000.32.59.640', | ||
| stokes='I', specmode='cube', nchan=117, | ||
| start='214.4501854310GHz', width='15.6245970MHz', outframe='LSRK', | ||
| perchanweightdensity=True, gridder='standard', mosweight=False, | ||
| usepointing=False, pblimit=0.2, deconvolver='hogbom', restoration=True, | ||
| restoringbeam='common', pbcor=True, weighting='briggsbwtaper', | ||
| robust=0.5, npixels=0, niter=0, threshold='0.0mJy', nsigma=0.0, | ||
| interactive=False, fullsummary=False, usemask='auto-multithresh', | ||
| sidelobethreshold=1.25, noisethreshold=5.0, lownoisethreshold=2.0, | ||
| negativethreshold=0.0, minbeamfrac=0.1, growiterations=75, | ||
| dogrowprune=True, minpercentchange=1.0, fastnoise=False, | ||
| savemodel='none', parallel=True) | ||
|
|
||
| # Verify output was created | ||
| assert self._image_exists(image_output), 'Output image was not created' | ||
|
|
||
| def test_parallel_with_custom_nproc(self, valid_ms, image_output): | ||
| """Test parallel execution with custom number of processes.""" | ||
| pclean.pclean(vis=[valid_ms], | ||
| field='helms30', spw=['0'], antenna=['0,1,2,3,4,5,6,7,8,9,10&'], | ||
| scan=['10'], intent='OBSERVE_TARGET#ON_SOURCE', datacolumn='data', | ||
| imagename=image_output, | ||
| imsize=[90, 90], cell=['0.91arcsec'], phasecenter='ICRS 01:03:01.3200 -000.32.59.640', | ||
| stokes='I', specmode='cube', nchan=117, | ||
| start='214.4501854310GHz', width='15.6245970MHz', outframe='LSRK', | ||
| perchanweightdensity=True, gridder='standard', mosweight=False, | ||
| usepointing=False, pblimit=0.2, deconvolver='hogbom', restoration=True, | ||
| restoringbeam='common', pbcor=True, weighting='briggsbwtaper', | ||
| robust=0.5, npixels=0, niter=0, threshold='0.0mJy', nsigma=0.0, | ||
| interactive=False, fullsummary=False, usemask='auto-multithresh', | ||
| sidelobethreshold=1.25, noisethreshold=5.0, lownoisethreshold=2.0, | ||
| negativethreshold=0.0, minbeamfrac=0.1, growiterations=75, | ||
| dogrowprune=True, minpercentchange=1.0, fastnoise=False, | ||
| savemodel='none', parallel={'nproc': 3}) | ||
|
|
||
| # Verify output was created | ||
| assert self._image_exists(image_output), 'Output image was not created' | ||
|
|
||
| def test_serial_with_cleaning(self, valid_ms, image_output): | ||
| """Test serial tclean with actual deconvolution iterations.""" | ||
| pclean.pclean(vis=[valid_ms], | ||
| field='helms30', spw=['0'], antenna=['0,1,2,3,4,5,6,7,8,9,10&'], | ||
| scan=['10'], intent='OBSERVE_TARGET#ON_SOURCE', datacolumn='data', | ||
| imagename=image_output, | ||
| imsize=[90, 90], cell=['0.91arcsec'], phasecenter='ICRS 01:03:01.3200 -000.32.59.640', | ||
| stokes='I', specmode='cube', nchan=117, | ||
| start='214.4501854310GHz', width='15.6245970MHz', outframe='LSRK', | ||
| perchanweightdensity=True, gridder='standard', mosweight=False, | ||
| usepointing=False, pblimit=0.2, deconvolver='hogbom', restoration=True, | ||
| restoringbeam='common', pbcor=True, weighting='briggsbwtaper', | ||
| robust=0.5, npixels=0, niter=10, threshold='0.0mJy', nsigma=0.0, | ||
| interactive=False, fullsummary=False, usemask='auto-multithresh', | ||
| sidelobethreshold=1.25, noisethreshold=5.0, lownoisethreshold=2.0, | ||
| negativethreshold=0.0, minbeamfrac=0.1, growiterations=75, | ||
| dogrowprune=True, minpercentchange=1.0, fastnoise=False, | ||
| savemodel='none', parallel=False) | ||
|
|
||
| # Verify output and residual images exist | ||
| assert self._image_exists(image_output), 'Output image was not created' | ||
| assert self._residual_exists(image_output), 'Residual image was not created' | ||
|
|
||
| def test_parallel_with_cleaning(self, valid_ms, image_output): | ||
| """Test parallel tclean with actual deconvolution iterations.""" | ||
| pclean.pclean(vis=[valid_ms], | ||
| field='helms30', spw=['0'], antenna=['0,1,2,3,4,5,6,7,8,9,10&'], | ||
| scan=['10'], intent='OBSERVE_TARGET#ON_SOURCE', datacolumn='data', | ||
| imagename=image_output, | ||
| imsize=[90, 90], cell=['0.91arcsec'], phasecenter='ICRS 01:03:01.3200 -000.32.59.640', | ||
| stokes='I', specmode='cube', nchan=117, | ||
| start='214.4501854310GHz', width='15.6245970MHz', outframe='LSRK', | ||
| perchanweightdensity=True, gridder='standard', mosweight=False, | ||
| usepointing=False, pblimit=0.2, deconvolver='hogbom', restoration=True, | ||
| restoringbeam='common', pbcor=True, weighting='briggsbwtaper', | ||
| robust=0.5, npixels=0, niter=10, threshold='0.0mJy', nsigma=0.0, | ||
| interactive=False, fullsummary=False, usemask='auto-multithresh', | ||
| sidelobethreshold=1.25, noisethreshold=5.0, lownoisethreshold=2.0, | ||
| negativethreshold=0.0, minbeamfrac=0.1, growiterations=75, | ||
| dogrowprune=True, minpercentchange=1.0, fastnoise=False, | ||
| savemodel='none', parallel=True) | ||
|
|
||
| # Verify output and residual images exist | ||
| assert self._image_exists(image_output), 'Output image was not created' | ||
| assert self._residual_exists(image_output), 'Residual image was not created' |
There was a problem hiding this comment.
There is significant code duplication across test methods. The same tclean parameter dictionary (fields like field, spw, antenna, scan, etc.) is repeated in multiple tests with only minor variations (parallel parameter, niter value). Consider extracting common parameters into a fixture or class-level constant dictionary, then using dictionary unpacking to pass them to pclean. This would make the tests more maintainable and make it clearer which parameters differ between tests.
| # prepare temporary files | ||
| with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as f_in: | ||
| input_file = f_in.name | ||
| pickle.dump({'args': args, 'parallel': True, 'kwargs': kwargs}, f_in) | ||
|
|
||
| with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as f_out: | ||
| output_file = f_out.name |
There was a problem hiding this comment.
The temporary files containing pickled data (input_file and output_file) are created with default permissions that may be world-readable on some systems. Consider using more restrictive permissions (e.g., mode=0o600) or tempfile.TemporaryDirectory with appropriate permissions to prevent other users from potentially reading sensitive tclean parameters or results. While pickle is used consistently throughout the codebase, temporary files with pickled data should have restricted access.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #14 +/- ##
==========================================
+ Coverage 16.67% 16.77% +0.09%
==========================================
Files 835 837 +2
Lines 108091 108227 +136
Branches 17101 17114 +13
==========================================
+ Hits 18024 18152 +128
- Misses 89686 89690 +4
- Partials 381 385 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…mps across multiple modules.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # from casampi.MPIEnvironment import MPIEnvironment | ||
| # from casampi.MPICommandClient import MPICommandClient | ||
| # __client = MPICommandClient() | ||
| # __client.set_log_mode('redirect') | ||
| # __client.start_services() | ||
|
|
||
|
|
There was a problem hiding this comment.
These commented-out lines of code appear to be dead code that was left in during development. If these lines are not needed, they should be removed to keep the codebase clean. If they might be needed in the future, consider adding a comment explaining why they're kept and under what circumstances they would be uncommented.
| # from casampi.MPIEnvironment import MPIEnvironment | |
| # from casampi.MPICommandClient import MPICommandClient | |
| # __client = MPICommandClient() | |
| # __client.set_log_mode('redirect') | |
| # __client.start_services() | |
Resolve conflicts: - pipeline/domain/state.py - pipeline/infrastructure/utils/conversion_test.py
Resolved conflicts: - `pipeline/infrastructure/renderer/htmlrenderer.py` - `pipeline/infrastructure/renderer/logger.py`
- Target macOS 15+ in pixi system-requirements
Set macos = "15.0" so pixi prefers the latest `casatools``
wheels on `osx-arm64``. `uv`` may still fall back to a 14.0
wheel when no 15.0 build is available.
- Update `Pixi` setup action to version `0.9.4` and adjust `Pixi`` version to `0.62.2`
- Disable `pytest-xdist` for parallel workers with '-n 0' to avoid
resource contention on the GitHub macOS runners.
This pull request introduces a new utility module for parallelized image cleaning (
pclean) in CASA, along with comprehensive tests and several improvements to time handling and executable discovery. The main changes include adding the newpcleanfunction for flexible serial/parallel execution, a utility for discovering MPI-related executables, and a full test suite. It also modernizes time zone handling in several utility functions and updates how executables are found in the renderer logger.New parallel cleaning utility and tests
pipeline/infrastructure/utils/pclean.pyproviding thepcleanfunction, which allows running CASA'stcleaneither serially or in parallel (using MPI if available), and includes a helper to search for MPI-related executables in parent directories.pipeline/infrastructure/utils/pclean_test.pywith thorough unit and integration tests for both thepcleanfunction and the executable finder, covering error handling, parallel execution, and output verification.Improvements to time zone handling
pipeline/infrastructure/utils/conversion.py(unix_seconds_to_datetime,mjd_seconds_to_datetime,get_epoch_as_datetime) to use timezone-aware UTC datetimes, replacing legacy UTC conversion calls for consistency and correctness. [1] [2] [3]Executable discovery modernization
pipeline/infrastructure/renderer/logger.pyto useshutil.whichinstead of the deprecateddistutils.spawn.find_executablefor locating external executables (mogrify,sips), and updated logging calls for clarity. [1] [2]Minor logging and timing improvements
pipeline/infrastructure/casa_tools.pyto use timezone-aware UTC datetimes for tool call timing.