Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8c33370
make 'dask[distributed]' optoinal via 'covalent[dask]'
araghukas Apr 12, 2026
e9c06db
add unit tests
araghukas Apr 13, 2026
a900c9a
ensure executor module load fails silently
araghukas Apr 13, 2026
060e181
move changelog line to unreleased section
araghukas Apr 13, 2026
e37e5c5
add setuptools requirement
araghukas Apr 13, 2026
be222f9
move setuptools installation out of requirements
araghukas Apr 13, 2026
ab92272
update for 'links' -> 'edges' renaming in new networkx
araghukas Apr 13, 2026
21d96d9
set lower bound for networkx
araghukas Apr 13, 2026
ac626f7
fix tests
araghukas Apr 13, 2026
8e6da8a
fix typo for test
araghukas Apr 13, 2026
7c43d94
rename key in test from 'status' to 'job_status'
araghukas Apr 13, 2026
68af396
move locust requirement to load test workflow to avoid gevent depende…
araghukas Apr 13, 2026
d21d918
include content-type header to resolve dispatch bug
araghukas Apr 13, 2026
0fc7bf6
add another ignore_files entry in docs how to test
araghukas Apr 14, 2026
d02c46f
stop python 3.10 support, start python 3.13 support
araghukas Apr 14, 2026
c0efdbf
fix container in test matrix
araghukas Apr 14, 2026
7f0b875
fix names
araghukas Apr 14, 2026
656e992
more links -> edges renaming
araghukas Apr 14, 2026
87483b4
fix path in mock
araghukas Apr 14, 2026
8c02257
add content-type header for draw_request post
araghukas Apr 14, 2026
b02cb59
restore 'links' key in extract_graph for 'json_graph.node_link_data'
araghukas Apr 14, 2026
d2aa087
support 'links' as fallback key after 'edges' in extract_graph
araghukas Apr 14, 2026
6ae9bae
update changelog
araghukas Apr 14, 2026
994d000
keep DaskCluster process alive by doing join on admin thread
araghukas Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
run: python setup.py sdist && pip install dist/*.tar.gz

- name: Install test requirements
run: pip install --no-cache-dir -r ./tests/requirements.txt
run: pip install --no-cache-dir -r ./tests/requirements.txt "locust>=2.11.0"

- name: Start Covalent dispatcher server
run: covalent start -d --ignore-migrations
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.13"

- name: Install Python dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.13"

- name: Install covalent
run: python -m pip install .
Expand Down
76 changes: 38 additions & 38 deletions .github/workflows/test_matrix.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
[
{
"name": "Debian 12 / Python 3.10 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py310:latest",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.10 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py310:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.11 / Dask",
"os": "ubuntu-latest",
Expand Down Expand Up @@ -48,20 +32,20 @@
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "MacOS 13 / Python 3.10 / Dask",
"os": "macos-13",
"python-version": "3.10",
"name": "Debian 13 / Python 3.13 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py313:latest",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
"trigger": ["push", "schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "MacOS 13 / Python 3.10 / Local",
"os": "macos-13",
"python-version": "3.10",
"name": "Debian 13 / Python 3.13 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py313:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "MacOS 13 / Python 3.11 / Dask",
Expand All @@ -88,25 +72,25 @@
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 13 / Python 3.11 / Local",
"name": "MacOS 13 / Python 3.12 / Local",
"os": "macos-13",
"python-version": "3.11",
"python-version": "3.12",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 14 / Python 3.10 / Dask",
"os": "macos-14",
"python-version": "3.10",
"name": "MacOS 13 / Python 3.13 / Dask",
"os": "macos-13",
"python-version": "3.13",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 14 / Python 3.10 / Local",
"os": "macos-14",
"python-version": "3.10",
"name": "MacOS 13 / Python 3.13 / Local",
"os": "macos-13",
"python-version": "3.13",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
Expand Down Expand Up @@ -144,17 +128,17 @@
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.10 / Dask",
"os": "macos-15",
"python-version": "3.10",
"name": "MacOS 14 / Python 3.13 / Dask",
"os": "macos-14",
"python-version": "3.13",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.10 / Local",
"os": "macos-15",
"python-version": "3.10",
"name": "MacOS 14 / Python 3.13 / Local",
"os": "macos-14",
"python-version": "3.13",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
Expand Down Expand Up @@ -190,5 +174,21 @@
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.13 / Dask",
"os": "macos-15",
"python-version": "3.13",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.13 / Local",
"os": "macos-15",
"python-version": "3.13",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
}
]
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ jobs:
- name: Build distribution
id: build-dist
if: env.NEED_PYTHON || env.BUILD_AND_RUN_ALL
run: python setup.py sdist
run: |
pip install --no-cache-dir setuptools
python setup.py sdist

- name: Validate distribution
if: env.BUILD_AND_RUN_ALL
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Changed

- Make `dask[distributed]` and optional dependency via `covalent[dask]` extra
- Default executor changed from `dask` to `local` in `defaults.py`
- Dropped Python 3.10 support, added Python 3.13; updated `test_matrix.json` and CI workflows accordingly
- Fixed `extract_graph` to handle both `"edges"` (networkx ≥3.x) and `"links"` (older networkx) as the edge key; added lower bound on networkx version
- Fixed various 422 errors by setting additional `Content-Type: application/json` headers as necessary
- Moved `locust` out of `tests/requirements.txt` into the benchmark workflow only, to avoid a `gevent` conflict
- Various test fixes: renamed `status` → `job_status`, corrected mock paths, updated graph key references

## [0.240.0-rc.0] - 2025-05-14

### Authors
Expand Down
11 changes: 9 additions & 2 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,11 @@ def register_manifest(
stripped = strip_local_uris(manifest) if push_assets else manifest
endpoint = BASE_ENDPOINT

r = APIClient(dispatcher_addr).post(endpoint, data=stripped.model_dump_json())
r = APIClient(dispatcher_addr).post(
endpoint,
data=stripped.model_dump_json(),
headers={"Content-Type": "application/json"},
)
r.raise_for_status()

parsed_resp = ResultSchema.model_validate(r.json())
Expand Down Expand Up @@ -513,7 +517,10 @@ def register_derived_manifest(

params = {"reuse_previous_results": reuse_previous_results}
r = APIClient(dispatcher_addr).post(
endpoint, data=stripped.model_dump_json(), params=params
endpoint,
data=stripped.model_dump_json(),
headers={"Content-Type": "application/json"},
params=params,
)
r.raise_for_status()

Expand Down
4 changes: 2 additions & 2 deletions covalent/_serialize/transport_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def serialize_transport_graph(tg, storage_path: str) -> TransportGraphSchema:
g = tg.get_internal_graph_copy()
return TransportGraphSchema(
nodes=_serialize_nodes(g, storage_path),
links=_serialize_edges(g),
edges=_serialize_edges(g),
)


Expand All @@ -151,7 +151,7 @@ def deserialize_transport_graph(t: TransportGraphSchema) -> _TransportGraph:
tg = _TransportGraph()
g = tg._graph
nodes = [deserialize_node(n) for n in t.nodes]
edges = [_deserialize_edge(e) for e in t.links]
edges = [_deserialize_edge(e) for e in t.edges]
for node in nodes:
node_id = node["id"]
attrs = node["attrs"]
Expand Down
15 changes: 12 additions & 3 deletions covalent/_shared_files/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
from dataclasses import dataclass, field
from typing import Dict

import dask.system
try:
import dask.system

_dask_cpu_count = dask.system.CPU_COUNT
_DASK_AVAILABLE = True
except ImportError:
_dask_cpu_count = os.cpu_count() or 1
_DASK_AVAILABLE = False

prefix_separator = ":"

Expand Down Expand Up @@ -129,7 +136,7 @@ def get_default_dask_config():
),
"mem_per_worker": "auto",
"threads_per_worker": 1,
"num_workers": dask.system.CPU_COUNT,
"num_workers": _dask_cpu_count,
}


Expand Down Expand Up @@ -172,7 +179,9 @@ def get_default_executor() -> dict:

return (
"local"
if os.environ.get("COVALENT_DISABLE_DASK") == "1" or get_config("sdk.no_cluster") == "true"
if not _DASK_AVAILABLE
or os.environ.get("COVALENT_DISABLE_DASK") == "1"
or get_config("sdk.no_cluster") == "true"
else "dask"
)

Expand Down
2 changes: 1 addition & 1 deletion covalent/_shared_files/schemas/transport_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

class TransportGraphSchema(BaseModel):
nodes: List[ElectronSchema]
links: List[EdgeSchema]
edges: List[EdgeSchema]

# For use by redispatch
def reset(self):
Expand Down
12 changes: 6 additions & 6 deletions covalent/_workflow/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ def serialize(self, metadata_only: bool = False) -> bytes:
data["nodes"][idx].pop(field, None)

# Remove the non-source-target fields from the scheduler workflow input data.
for idx, node in enumerate(data["links"]):
for name in data["links"][idx].copy():
for idx, node in enumerate(data["edges"]):
for name in data["edges"][idx].copy():
if name not in ["source", "target"]:
data["links"][idx].pop("edge_name", None)
data["edges"][idx].pop("edge_name", None)

data["lattice_metadata"] = self.lattice_metadata
return cloudpickle.dumps(data)
Expand Down Expand Up @@ -475,10 +475,10 @@ def serialize_to_json(self, metadata_only: bool = False) -> str:
data["nodes"][idx].pop(field, None)

# Remove the non-source-target fields from the scheduler workflow input data.
for idx, node in enumerate(data["links"]):
for name in data["links"][idx].copy():
for idx, node in enumerate(data["edges"]):
for name in data["edges"][idx].copy():
if name not in ["source", "target"]:
data["links"][idx].pop("edge_name", None)
data["edges"][idx].pop("edge_name", None)

data["lattice_metadata"] = encode_metadata(self.lattice_metadata)
return json.dumps(data)
Expand Down
5 changes: 4 additions & 1 deletion covalent/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ def _load_executors(self, executor_dir: str) -> None:
# Import the module that contains the plugin
module_spec = importlib.util.spec_from_file_location(module_name, module_file)
the_module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(the_module)
try:
module_spec.loader.exec_module(the_module)
except ImportError:
continue

self._populate_executor_map_from_module(the_module)

Expand Down
5 changes: 4 additions & 1 deletion covalent/executor/executor_plugins/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from enum import Enum
from typing import Any, Callable, Dict, List, Literal, Optional

from dask.distributed import CancelledError, Client, Future
try:
from dask.distributed import CancelledError, Client, Future
except ImportError as e:
raise ImportError("'covalent[dask]' is required for DaskExecutor.") from e
from pydantic import BaseModel

from covalent._shared_files import TaskRuntimeError, logger
Expand Down
24 changes: 17 additions & 7 deletions covalent_dispatcher/_cli/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,9 @@
from typing import Optional

import click
import dask.system
import psutil
import requests
import sqlalchemy
from dask.distributed import Client
from distributed.comm import unparse_address
from distributed.comm.core import CommClosedError
from distributed.core import connect, rpc
from furl import furl
from natsort import natsorted
from rich.box import ROUNDED
Expand All @@ -57,6 +52,19 @@

from .._db.datastore import DataStore

_DASK_AVAILABLE = False
try:
import dask.system as _dask_system
from dask.distributed import Client
from distributed.comm import unparse_address
from distributed.comm.core import CommClosedError
from distributed.core import connect, rpc

_DASK_AVAILABLE = True
except ImportError:
CommClosedError = OSError # safe stand-in; these code paths are never reached without dask


UI_PIDFILE = get_config("dispatcher.cache_dir") + "/ui.pid"
UI_LOGFILE = get_config("user_interface.log_dir") + "/covalent_ui.log"
UI_SRVDIR = f"{os.path.dirname(os.path.abspath(__file__))}/../../covalent_ui"
Expand Down Expand Up @@ -855,13 +863,15 @@ async def _get_cluster_logs(uri):


def _get_cluster_admin_address():
if not _DASK_AVAILABLE:
return None
try:
admin_host = get_config("dask.admin_host")
admin_port = get_config("dask.admin_port")
admin_server_addr = unparse_address("tcp", f"{admin_host}:{admin_port}")
return admin_server_addr
except KeyError:
return
return None


@click.command()
Expand All @@ -877,7 +887,7 @@ def _get_cluster_admin_address():
is_flag=False,
nargs=1,
type=int,
default=dask.system.CPU_COUNT,
default=_dask_system.CPU_COUNT if _DASK_AVAILABLE else (os.cpu_count() or 1),
show_default=True,
help="Scale cluster by adding/removing workers to match `nworkers`",
)
Expand Down
Loading
Loading