Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ and this project adheres to [Semantic Versioning][].
## 2.1.3

### Changes
- `pp.adjmat` now returns the same features as used as input instead of the subset of `net`.

- `pp.adjmat` now returns the same features as used as input instead of the subset of `net`
- `pp.pseudobulk` now returns the same order features as used as input instead of shuffling them
- Added a dedicated header and 5 attemps to `_download` to mitigate 429 Client Error from Zenodo downloads

## 2.1.2

Expand Down
37 changes: 33 additions & 4 deletions src/decoupler/_download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
import time
from importlib.metadata import version

import pandas as pd
import requests
Expand All @@ -10,16 +12,16 @@
URL_INT = "https://omnipathdb.org/interactions/?genesymbols=1&"


def _download(
def _download_chunks(
url: str,
verbose: bool = False,
) -> io.BytesIO:
assert isinstance(url, str), "url must be str"
# Download with progress bar
m = f"Downloading {url}"
_log(m, level="info", verbose=verbose)
chunks = []
with requests.get(url, stream=True) as r:
__version__ = version("decoupler")
headers = {"User-Agent": f"decoupler/{__version__} (https://github.com/scverse/decoupler)"}
with requests.get(url, stream=True, headers=headers) as r:
r.raise_for_status()
with tqdm(unit="B", unit_scale=True, desc="Progress", disable=not verbose) as pbar:
for chunk in r.iter_content(chunk_size=8192):
Expand All @@ -28,6 +30,33 @@ def _download(
pbar.update(len(chunk))
# Read into bytes
data = io.BytesIO(b"".join(chunks))
return data


def _download(
url: str,
verbose: bool = False,
retries: int = 5,
wait_time: int = 20,
) -> io.BytesIO:
m = f"Downloading {url}"
_log(m, level="info", verbose=verbose)
data = None
for attempt in range(1, retries + 1):
try:
data = _download_chunks(url, verbose=False)
break
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code if e.response is not None else None
if status_code == 429 and attempt < retries:
_log(
f"429 Too Many Requests for {url}. Retrying in {wait_time}s (attempt {attempt + 1}/{retries})",
level="warn",
verbose=verbose,
)
time.sleep(wait_time)
continue
raise # Not a 429 or no retries left: re-raise
m = "Download finished"
_log(m, level="info", verbose=verbose)
return data
Expand Down
2 changes: 1 addition & 1 deletion src/decoupler/mt/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _run(
) -> tuple[pd.DataFrame, pd.DataFrame] | AnnData | None:
_log(f"{name} - Running {name}", level="info", verbose=verbose)
# Process data
mat, obs, var = extract(data, layer=layer, raw=raw, empty=empty, verbose=verbose, bsize=bsize)
mat, obs, var = extract(data, layer=layer, raw=raw, empty=empty, shuffle=True, verbose=verbose, bsize=bsize)
issparse = sps.issparse(mat)
isbacked = isinstance(mat, tuple)
# Process net
Expand Down
4 changes: 2 additions & 2 deletions src/decoupler/pp/anndata.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def pseudobulk(
assert isinstance(groups_col, str | list) or groups_col is None, "groups_col must be str or None"
assert isinstance(mode, str | dict) or callable(mode), "mode must be str, dict or callable"
# Extract data
X, obs, var = extract(adata, layer=layer, raw=raw, empty=empty, bsize=bsize, verbose=verbose)
X, obs, var = extract(adata, layer=layer, raw=raw, empty=empty, bsize=bsize, shuffle=False, verbose=verbose)
assert len(set(obs)) == len(obs), (
"Repeated elements in adata.obs_names, to make them unique run adata.obs_names_make_unique()"
)
Expand Down Expand Up @@ -597,7 +597,7 @@ def filter_by_expr(
assert isinstance(large_n, int | float) and large_n >= 0, "large_n must be numeric and > 0"
assert isinstance(min_prop, int | float) and 1 >= min_prop >= 0, "min_prop must be numeric and between 0 and 1"
# Extract inputs
X, _, var_names = extract(adata, empty=False)
X, _, var_names = extract(adata, empty=False, shuffle=False)
isbacked = isinstance(X, tuple)
assert not isbacked, "adata is in backed mode, reload adata without backed='r'"
obs = adata.obs
Expand Down
6 changes: 5 additions & 1 deletion src/decoupler/pp/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def extract(
layer: str | None = None,
raw: bool = False,
empty: bool = True,
shuffle: bool = True,
verbose: bool = False,
bsize: int = 250_000,
) -> tuple[np.ndarray, np.ndarray, np.ndarray] | tuple[tuple[np.ndarray, np.ndarray], np.ndarray, np.ndarray]:
Expand All @@ -144,6 +145,8 @@ def extract(
%(layer)s
%(raw)s
%(empty)s
shuffle
Whether to shuffle features to ensure ties are broken.
%(verbose)s

Returns
Expand All @@ -169,7 +172,8 @@ def extract(
if not isbacked:
mat, row, col = _validate_mat(mat=mat, row=row, col=col, empty=empty, verbose=verbose)
# Randomly sort features
mat, col = _break_ties(mat=mat, features=col)
if shuffle:
mat, col = _break_ties(mat=mat, features=col)
mat_tuple = (mat, row, col)
else:
msk_col = _validate_backed(mat=mat, row=row, col=col, empty=empty, verbose=verbose, bsize=bsize)
Expand Down
7 changes: 4 additions & 3 deletions tests/pp/test_anndata.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_pseudobulk(
empty,
rng,
):
adata, _ = dc.ds.toy(nobs=10000, nvar=500, bval=2, seed=42, verbose=False)
adata, _ = dc.ds.toy(nobs=10000, nvar=900, bval=2, seed=42, verbose=False)
adata.layers["counts"] = adata.X.round()
adata.obs["sample"] = adata.obs["sample"]
adata.obs["dose"] = rng.choice(["low", "medium", "high"], size=adata.n_obs, replace=True)
Expand Down Expand Up @@ -105,14 +105,15 @@ def _run_psbulk():
)
return pdata

l_mem_usage, pdata = memory_usage(_run_psbulk, retval=True, interval=0.01)
l_mem_usage, pdata = memory_usage(_run_psbulk, retval=True, interval=0.001)
l_mem_usage = max(l_mem_usage) - min(l_mem_usage)
assert isinstance(pdata, ad.AnnData)
assert pdata.shape[0] < adata.shape[0]
if empty:
assert pdata.shape[1] < adata.shape[1]
else:
assert pdata.shape[1] == adata.shape[1]
assert all(pdata.var_names == pdata.var_names)
assert not pdata.obs["sample"].str.contains("_").any()
obs_cols = {"psbulk_cells", "psbulk_counts"}
assert obs_cols.issubset(pdata.obs.columns)
Expand All @@ -139,7 +140,7 @@ def _run_psbulk_backed_data():
)
return pbdata

b_mem_usage, pbdata = memory_usage(_run_psbulk_backed_data, retval=True, interval=0.01)
b_mem_usage, pbdata = memory_usage(_run_psbulk_backed_data, retval=True, interval=0.001)
b_mem_usage = max(b_mem_usage) - min(b_mem_usage)
assert b_mem_usage < l_mem_usage
msk = pbdata.X.sum(1) != 0
Expand Down
Loading