Skip to content

183 add dask parallelism for cube single field imaging#237

Open
Jan-Willem wants to merge 92 commits into
mainfrom
183-add-dask-parallelism-for-cube-single-field-imaging
Open

183 add dask parallelism for cube single field imaging#237
Jan-Willem wants to merge 92 commits into
mainfrom
183-add-dask-parallelism-for-cube-single-field-imaging

Conversation

@Jan-Willem

Copy link
Copy Markdown
Member

No description provided.

Jan-Willem and others added 30 commits February 25, 2026 09:17
…-add-dask-parallelism-for-cube-single-field-imaging
…-add-dask-parallelism-for-cube-single-field-imaging
@CLAassistant

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ Jan-Willem
❌ Jan-Willem Steeb


Jan-Willem Steeb seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

❌ 8 Tests Failed:

Tests completed Failed Passed Skipped
865 8 857 4
View the top 3 failed test(s) by shortest run time
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_multi_cycle[False]
Stack Traces | 0.384s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc884abba0>
skunk_works = False

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_multi_cycle(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr("twhya_selfcal_5chans_lsrk_multi_cycle_truth.img.zarr")
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_multi_cycle_astroviper{_tag}.img.zarr"
    
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 10000,
                "nmajor": 4,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.8,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=False,
            thread_info=None,
            processing_function_threads=1,
            n_chunks=1,
            overwrite=True,
            disk_chunk_sizes={"frequency": 5},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:1181: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-5a4b0c64-5afa-4996-aed8-483a8f8043b0' image_cube_single_field(...)>
values = {'dict-b971a192-29d6-41f1-b91e-e2b218601080': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_multi_cycle[True]
Stack Traces | 0.391s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc882009a0>
skunk_works = True

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_multi_cycle(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr("twhya_selfcal_5chans_lsrk_multi_cycle_truth.img.zarr")
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_multi_cycle_astroviper{_tag}.img.zarr"
    
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 10000,
                "nmajor": 4,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.8,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=False,
            thread_info=None,
            processing_function_threads=1,
            n_chunks=1,
            overwrite=True,
            disk_chunk_sizes={"frequency": 5},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:1181: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-1219c1b4-d74d-40a6-8292-3304017e37f9' image_cube_single_field(...)>
values = {'dict-597eafd8-3292-49c3-a57a-3c0f7494ee9b': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_niter0[True]
Stack Traces | 0.399s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc884a93a0>
skunk_works = True

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_niter0(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr("twhya_selfcal_5chans_lsrk_niter_0_nmajor_1_briggs.img.zarr")
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_lsrk_5chans_astroviper{_tag}.img.zarr"
    
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 0,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.8,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "visibility_normalization",
                "uv_sampling_normalization",
            ],
            processing_set_data_group_name="base",
            single_precision_image=False,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=None,
            overwrite=True,
            disk_chunk_sizes={"frequency": 2},
            vizualize_graph=False,
            skunk_works=skunk_works,
        )

tests/stakeholder/test_single_field_imaging.py:240: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-f9c39897-1053-4314-89b8-f0d1c9ac63a0' image_cube_single_field(...)>
values = {'dict-10b27198-1575-452b-bfaa-2d8a07826916': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_nchunks1[True]
Stack Traces | 0.427s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc884ab100>
skunk_works = True

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_nchunks1(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        # from toolviper.dask.client import local_client
    
        # viper_client = local_client(cores=4, memory_limit="4GB")
    
        # from dask.distributed import Client
    
        # viper_client = Client(n_workers=2, threads_per_worker=2, memory_limit="4GB")
        # input("Press enter")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr(
            "twhya_selfcal_5chans_lsrk_niter_99_nmajor_1_briggs.img.zarr"
        )
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_niter_99_astroviper{_tag}.img.zarr"
        print(ps_xdt.xr_ps.summary())
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 100,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.2,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=True,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=1,
            overwrite=True,
            disk_chunk_sizes={"frequency": 1},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:800: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-dfe5b7b7-f67f-43de-9c61-905d9c6aaa51' image_cube_single_field(...)>
values = {'dict-718eada3-f904-4797-a9cf-e3807547034f': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_nchunks1[False]
Stack Traces | 0.433s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc884aa5c0>
skunk_works = False

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_nchunks1(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        # from toolviper.dask.client import local_client
    
        # viper_client = local_client(cores=4, memory_limit="4GB")
    
        # from dask.distributed import Client
    
        # viper_client = Client(n_workers=2, threads_per_worker=2, memory_limit="4GB")
        # input("Press enter")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr(
            "twhya_selfcal_5chans_lsrk_niter_99_nmajor_1_briggs.img.zarr"
        )
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_niter_99_astroviper{_tag}.img.zarr"
        print(ps_xdt.xr_ps.summary())
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 100,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.2,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=True,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=1,
            overwrite=True,
            disk_chunk_sizes={"frequency": 1},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:800: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-5a2009f7-2356-4b08-bc37-e0fd674b3637' image_cube_single_field(...)>
values = {'dict-df8e4dba-5678-4cc7-b038-c2c9741037f9': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_nchunks5[False]
Stack Traces | 0.435s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc884a99e0>
skunk_works = False

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_nchunks5(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        # from toolviper.dask.client import local_client
    
        # viper_client = local_client(cores=4, memory_limit="4GB")
    
        # from dask.distributed import Client
    
        # viper_client = Client(n_workers=2, threads_per_worker=2, memory_limit="4GB")
        # input("Press enter")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr(
            "twhya_selfcal_5chans_lsrk_niter_99_nmajor_1_briggs.img.zarr"
        )
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_niter_99_astroviper{_tag}.img.zarr"
        print(ps_xdt.xr_ps.summary())
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 100,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.2,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=True,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=5,
            overwrite=True,
            disk_chunk_sizes={"frequency": 1},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-e512610c-638c-4eb9-848c-78b1bbae880e' image_cube_single_field(...)>
values = {'dict-c0ddcc08-972d-478a-ac8c-0c544051ab76': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_nchunks5[True]
Stack Traces | 0.435s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc885c1bc0>
skunk_works = True

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_nchunks5(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        # from toolviper.dask.client import local_client
    
        # viper_client = local_client(cores=4, memory_limit="4GB")
    
        # from dask.distributed import Client
    
        # viper_client = Client(n_workers=2, threads_per_worker=2, memory_limit="4GB")
        # input("Press enter")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr(
            "twhya_selfcal_5chans_lsrk_niter_99_nmajor_1_briggs.img.zarr"
        )
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_5chans_lsrk_niter_99_astroviper{_tag}.img.zarr"
        print(ps_xdt.xr_ps.summary())
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 100,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.2,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            # deconvolver="asp",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "sky_model",
                "mask",
            ],
            processing_set_data_group_name="base",
            single_precision_image=True,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=5,
            overwrite=True,
            disk_chunk_sizes={"frequency": 1},
            vizualize_graph=False,
            write_visibility_model_to_ps=True,
            skunk_works=skunk_works,
            fft_backend="scipy",
        )

tests/stakeholder/test_single_field_imaging.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-d93f0a1b-58b0-4c4d-841b-38123cf26e38' image_cube_single_field(...)>
values = {'dict-17126754-2fc2-4fbf-b971-9c8a797f1a4e': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError
tests/stakeholder/test_single_field_imaging.py::test_single_field_imaging_niter0[False]
Stack Traces | 9.92s run time
plot_saver = <function make_plot_saver.<locals>._save at 0x7efc80d5f2e0>
skunk_works = False

    @pytest.mark.parametrize("skunk_works", [False, True])
    def test_single_field_imaging_niter0(plot_saver, skunk_works):
        dask.config.set(scheduler="synchronous")
    
        _ensure_ps_store()
        _ensure_truth_images()
        ps_xdt = open_processing_set(PS_STORE)
        img_xds = xr.open_zarr("twhya_selfcal_5chans_lsrk_niter_0_nmajor_1_briggs.img.zarr")
    
        image_params = _image_params(ps_xdt)
        _tag = "_skunk" if skunk_works else ""
        image_store = f"twhya_selfcal_lsrk_5chans_astroviper{_tag}.img.zarr"
    
>       return_dict = image_cube_single_field(
            ps_store=PS_STORE,
            image_store=image_store,
            image_params=image_params,
            imaging_weights_params={
                "weighting": "briggs",
                "robust": 0.5,
                "casa_weighting_implementation": True,
            },
            iteration_control_params={
                "niter": 0,
                "nmajor": 0,
                "threshold": 0.0,
                "gain": 0.1,
                "cyclefactor": 1.5,
                "cycleniter": 1,
                "minpsffraction": 0.05,
                "maxpsffraction": 0.8,
            },
            gridder="prolate_spheroidal",
            deconvolver="hogbom_many_threads",
            scan_intents="OBSERVE_TARGET#ON_SOURCE",
            image_data_variables_keep=[
                "sky_residual",
                "point_spread_function",
                "primary_beam",
                "beam_fit_params_point_spread_function",
                "visibility_normalization",
                "uv_sampling_normalization",
            ],
            processing_set_data_group_name="base",
            single_precision_image=False,
            thread_info=None,
            processing_function_threads=12,
            n_chunks=None,
            overwrite=True,
            disk_chunk_sizes={"frequency": 2},
            vizualize_graph=False,
            skunk_works=skunk_works,
        )

tests/stakeholder/test_single_field_imaging.py:240: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../toolviper/utils/parameter.py:61: in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../distributed_graphs/imaging/image_cube_single_field.py:404: in image_cube_single_field
    return_dict = dask.compute(dask_graph)[0]
                  ^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/base.py:685: in compute
    results = schedule(expr, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task 'image_cube_single_field-dd865935-4130-4f21-9064-26bd9842687c' image_cube_single_field(...)>
values = {'dict-97106baa-ad8a-48b9-96eb-9b227c2a20ce': {'image_params': {'image_size': [250, 250], 'cell_size': array([-4.84813... {'weighting': 'briggs', 'robust': 0.5, 'casa_weighting_implementation': True}, 'zarr_meta': {}, 'to_disk': True, ...}}

    def __call__(self, values=()):
        self._verify_values(values)
    
        def _eval(a):
            if isinstance(a, GraphNode):
                return a({k: values[k] for k in a.dependencies})
            elif isinstance(a, TaskRef):
                return values[a.key]
            else:
                return a
    
        new_argspec = tuple(map(_eval, self.args))
        if self.kwargs:
            kwargs = {k: _eval(kw) for k, kw in self.kwargs.items()}
            return self.func(*new_argspec, **kwargs)
>       return self.func(*new_argspec)
               ^^^^^^^^^^^^^^^^^^^^^^^
E       TypeError: image_cube_single_field() missing 6 required positional arguments: 'imaging_weights_params', 'iteration_control_params', 'task_coords', 'data_selection', 'image_store', and 'input_data_store'

.../hostedtoolcache/Python/3.11.15............/x64/lib/python3.11.../site-packages/dask/_task_spec.py:768: TypeError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@smcastro smcastro left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jan-Willem, thanks for this major effort to refactor and provide consistency across the project. I have a few suggestions that could improve clarify, in case you are considering modifications at the naming level.

Naming

distributed_graphs
The suffix _graphs is a bit ambiguous as this belongs to graphviper. The main purpose of this directory is to construct parallel_coords, map node_tasks over chunks, etc. and it does not implement graphs. Wouldn't a different name be more consistent and shorter? Some suggestions:

  • distributed
  • parallel

node_tasks
At start I had to read the documentation to understand the purpose of this directory. The suffix task seems to be overloaded everywhere and I was wondering if there could be a better name here that does not cause ambiguity in the overall VIPER ecosystem. For example, currently task is used as:

  • Prefect @task
  • Dask delayed task
  • Graphviper node_task parameter

Since node_tasks is a chunk (slice) worker, some suggestions could be to use
chunk_workers or map_workers, which describe the roles in place and do not collide with Prefect or Dask.

processing_functions
The suffix "functions" doesn't add anything and makes the name very long. Some suggestions:

  • processing
  • domain

Symbols

There is some symbol repetition across the layers, which could use some suffix to lighten the use and prevent confusion. For example, currently we have:

distributed_graphs.imaging.image_cube_single_field   # driver
node_tasks.imaging.image_cube_single_field           # chunk worker  
processing_functions.imaging.image_cube_single_field # science

The feather is breaking this by using feather_core for science, which could be followed by the other APIs. Some suggestions:

distributed.imaging.image_cube_single_field       #driver or 
parallel.imaging.image_cube_single_field    #driver
chunk_workers.imaging.image_cube_single_field   # chunk worker
processing.imaging.image_cube_single_field_core. # science

The _core suffix will indicate this is the core algorithm (domain layer) and will be clearer at the high level when a flowviper workflow calls it.

Tests

I would like to suggest to rename stakeholders to component, which aligns to the naming convention of the Testing Strategy (unit, component, integration). Integration will be used inside testviper and each VIPER component should contain unit and component tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants