Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# openpipeline_composed x.x.x

## NEW FUNCTIONALITY

* `workflows/single_cell/parallel_integration`: Add a workflow that runs multiple integration methods (harmony, scvi, scanorama, bbknn) in parallel on a preprocessed h5mu and merges each method's annotations into a single output (PR #15).

* `dataflow/move_anndata_slots`: Add a component that moves selected slots (`.obs`, `.var`, `.obsm`, `.varm`, `.obsp`, `.varp`, `.uns`) from a modality in a source MuData file into a modality in a target MuData file (PR #15).

## MINOR CHANGES

* `workflows/single_cell/process_integrate_annotate`: Set scope to `private` (PR #6).
Expand Down
4 changes: 2 additions & 2 deletions _viash.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ repositories:
- name: openpipeline
repo: openpipeline
type: vsh
tag: v4.0.4
tag: v4.1.0
- name: openpipeline_qc
repo: openpipeline_qc
type: vsh
tag: v0.2.2
tag: v0.3.0
- name: biobox
repo: biobox
type: vsh
Expand Down
9 changes: 9 additions & 0 deletions src/base/h5_compression_argument.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
arguments:
- name: "--output_compression"
description: |
Compression format to use for the output AnnData and/or Mudata H5 files.
By default no compression is applied.
type: string
choices: ["gzip", "lzf"]
required: false
example: "gzip"
4 changes: 4 additions & 0 deletions src/base/requirements/anndata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
packages:
- anndata~=0.12.16
- awkward #Required for reading VDJ data stored in AIRR format
- scipy~=1.17.1 # Exclude scipy 1.17.0 because https://github.com/scverse/anndata/issues/339
5 changes: 5 additions & 0 deletions src/base/requirements/anndata_mudata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__merge__: [/src/base/requirements/anndata.yaml, .]
packages:
- mudata~=0.3.8
script: |
exec("try:\n import zarr; from importlib.metadata import version\nexcept ModuleNotFoundError:\n exit(0)\nelse: assert int(version(\"zarr\").partition(\".\")[0]) > 2")
2 changes: 2 additions & 0 deletions src/base/requirements/openpipeline_testutils.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github:
- openpipelines-bio/core#subdirectory=packages/python/openpipeline_testutils
8 changes: 8 additions & 0 deletions src/base/requirements/python_test_setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
test_setup:
- type: apt
packages:
- git
- type: python
__merge__:
- /src/base/requirements/viashpy.yaml
- /src/base/requirements/openpipeline_testutils.yaml
2 changes: 2 additions & 0 deletions src/base/requirements/viashpy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
packages:
- viashpy==0.10.0
142 changes: 142 additions & 0 deletions src/dataflow/move_anndata_slots/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
name: move_anndata_slots
namespace: "dataflow"
scope: "public"
description: |
Move slots (.obs, .var, .obsm, .varm, .obsp, .varp, .uns) from a modality
in a source MuData file into a modality in a target MuData file.
The specified slots are copied from the source modality into the target
modality. By default, copying to a key that already exists in the target
raises an error; use --allow_overwrite to overwrite it instead.

authors:
- __merge__: /src/authors/jakub_majercik.yaml
roles: [ author ]

argument_groups:
- name: "Source"
arguments:
- name: "--input_source"
type: file
description: Source h5mu file to read slots from.
direction: input
required: true
example: source.h5mu
- name: "--source_modality"
type: string
description: Modality in the source h5mu file to read slots from.
default: "rna"
required: false

- name: "Target"
arguments:
- name: "--input_target"
type: file
description: Target h5mu file to write slots into.
direction: input
required: true
example: target.h5mu
- name: "--target_modality"
type: string
description: |
Modality in the target h5mu file to write slots into.
Defaults to the value of --source_modality.
required: false

- name: "Slots to move"
arguments:
- name: "--obs"
type: string
description: |
Column names from .obs to move from the source modality to the
target modality. If not provided, .obs is not moved.
multiple: true
required: false
- name: "--var"
type: string
description: |
Column names from .var to move from the source modality to the
target modality. If not provided, .var is not moved.
multiple: true
required: false
- name: "--obsm"
type: string
description: |
Keys from .obsm to move from the source modality to the target
modality. If not provided, .obsm is not moved.
multiple: true
required: false
- name: "--varm"
type: string
description: |
Keys from .varm to move from the source modality to the target
modality. If not provided, .varm is not moved.
multiple: true
required: false
- name: "--obsp"
type: string
description: |
Keys from .obsp to move from the source modality to the target
modality. If not provided, .obsp is not moved.
multiple: true
required: false
- name: "--varp"
type: string
description: |
Keys from .varp to move from the source modality to the target
modality. If not provided, .varp is not moved.
multiple: true
required: false
- name: "--uns"
type: string
description: |
Keys from .uns to move from the source modality to the target
modality. If not provided, .uns is not moved.
multiple: true
required: false

- name: "Options"
arguments:
- name: "--allow_overwrite"
type: boolean_true
description: |
Allow overwriting keys that already exist in the target modality.
By default, the component raises an error if a key already exists.
When enabled, existing keys are overwritten with a warning.

- name: "Output"
arguments:
- name: "--output"
alternatives: ["-o"]
type: file
description: Output h5mu file (the target with slots added from the source).
direction: output
required: true
example: output.h5mu
__merge__: [., /src/base/h5_compression_argument.yaml]

resources:
- type: python_script
path: script.py
- path: /src/utils/setup_logger.py
- path: /src/utils/compress_h5mu.py

test_resources:
- type: python_script
path: test.py

engines:
- type: docker
image: python:3.13-slim
setup:
- type: apt
packages:
- procps
- type: python
__merge__: /src/base/requirements/anndata_mudata.yaml
__merge__: [/src/base/requirements/python_test_setup.yaml, .]

runners:
- type: executable
- type: nextflow
directives:
label: [ singlecpu, lowmem ]
130 changes: 130 additions & 0 deletions src/dataflow/move_anndata_slots/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import sys
from mudata import read_h5ad

## VIASH START
par = {
"input_source": "source.h5mu",
"source_modality": "rna",
"input_target": "target.h5mu",
"target_modality": None,
"obs": None,
"var": None,
"obsm": None,
"varm": None,
"obsp": None,
"varp": None,
"uns": None,
"allow_overwrite": False,
"output": "output.h5mu",
"output_compression": None,
}
meta = {"resources_dir": "src/utils/"}
## VIASH END

sys.path.append(meta["resources_dir"])
from setup_logger import setup_logger
from compress_h5mu import write_h5ad_to_h5mu_with_compression

logger = setup_logger()

target_modality = par["target_modality"] or par["source_modality"]

logger.info(
"Reading modality '%s' from source file '%s'",
par["source_modality"],
par["input_source"],
)
try:
source_mod = read_h5ad(par["input_source"], mod=par["source_modality"])
except KeyError:
raise ValueError(
f"Modality '{par['source_modality']}' does not exist in source file "
f"'{par['input_source']}'."
)

logger.info(
"Reading modality '%s' from target file '%s'",
target_modality,
par["input_target"],
)
try:
target_mod = read_h5ad(par["input_target"], mod=target_modality)
except KeyError:
raise ValueError(
f"Modality '{target_modality}' does not exist in target file "
f"'{par['input_target']}'."
)

# Validate indices for the axes relevant to the requested slots.
needs_obs = any(par[s] for s in ("obs", "obsm", "obsp"))
needs_var = any(par[s] for s in ("var", "varm", "varp"))

mismatches = []
if needs_obs and set(source_mod.obs_names) != set(target_mod.obs_names):
mismatches.append("obs")
if needs_var and set(source_mod.var_names) != set(target_mod.var_names):
mismatches.append("var")
if mismatches:
raise ValueError(
"Index mismatch between source and target modalities: "
Comment thread
dorien-er marked this conversation as resolved.
+ " and ".join(mismatches)
+ " indices do not match."
)

# Reindex source to match target order if needed.
if needs_obs and not (source_mod.obs_names == target_mod.obs_names).all():
logger.info("Reindexing source observations to match target order.")
source_mod = source_mod[target_mod.obs_names, :]
if needs_var and not (source_mod.var_names == target_mod.var_names).all():
logger.info("Reindexing source variables to match target order.")
source_mod = source_mod[:, target_mod.var_names]

# .obs/.var are DataFrames (column access), .obsm/.varm/.obsp/.varp are array
# containers, and .uns is a dict -- all support key-based get/set via getattr.
_slots = [
("obs", par["obs"]),
("var", par["var"]),
("obsm", par["obsm"]),
("varm", par["varm"]),
("obsp", par["obsp"]),
("varp", par["varp"]),
("uns", par["uns"]),
]

for slot_name, keys in _slots:
if not keys:
continue
source_slot = getattr(source_mod, slot_name)
target_slot = getattr(target_mod, slot_name)
missing = [k for k in keys if k not in source_slot]
if missing:
raise ValueError(
f"The following .{slot_name} keys were not found in source "
f"modality '{par['source_modality']}': {missing}"
)
existing = [k for k in keys if k in target_slot]
if existing and not par["allow_overwrite"]:
raise ValueError(
f"The following .{slot_name} keys already exist in the target "
f"modality '{target_modality}': {existing}. "
f"Use --allow_overwrite to overwrite them."
)
if existing:
logger.warning("Overwriting existing .%s keys: %s", slot_name, existing)

logger.info("Moving .%s keys: %s", slot_name, keys)
for key in keys:
target_slot[key] = source_slot[key]

logger.info(
"Writing output to '%s' with compression '%s'",
par["output"],
par["output_compression"],
)
write_h5ad_to_h5mu_with_compression(
output_file=par["output"],
h5mu=par["input_target"],
modality_name=target_modality,
modality_data=target_mod,
output_compression=par["output_compression"],
)
Loading
Loading