-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
130 lines (117 loc) · 3.97 KB
/
Copy pathscript.py
File metadata and controls
130 lines (117 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import sys
from mudata import read_h5ad
## VIASH START
par = {
"input_source": "source.h5mu",
"source_modality": "rna",
"input_target": "target.h5mu",
"target_modality": None,
"obs": None,
"var": None,
"obsm": None,
"varm": None,
"obsp": None,
"varp": None,
"uns": None,
"allow_overwrite": False,
"output": "output.h5mu",
"output_compression": None,
}
meta = {"resources_dir": "src/utils/"}
## VIASH END
sys.path.append(meta["resources_dir"])
from setup_logger import setup_logger
from compress_h5mu import write_h5ad_to_h5mu_with_compression
logger = setup_logger()
target_modality = par["target_modality"] or par["source_modality"]
logger.info(
"Reading modality '%s' from source file '%s'",
par["source_modality"],
par["input_source"],
)
try:
source_mod = read_h5ad(par["input_source"], mod=par["source_modality"])
except KeyError:
raise ValueError(
f"Modality '{par['source_modality']}' does not exist in source file "
f"'{par['input_source']}'."
)
logger.info(
"Reading modality '%s' from target file '%s'",
target_modality,
par["input_target"],
)
try:
target_mod = read_h5ad(par["input_target"], mod=target_modality)
except KeyError:
raise ValueError(
f"Modality '{target_modality}' does not exist in target file "
f"'{par['input_target']}'."
)
# Validate indices for the axes relevant to the requested slots.
needs_obs = any(par[s] for s in ("obs", "obsm", "obsp"))
needs_var = any(par[s] for s in ("var", "varm", "varp"))
mismatches = []
if needs_obs and set(source_mod.obs_names) != set(target_mod.obs_names):
mismatches.append("obs")
if needs_var and set(source_mod.var_names) != set(target_mod.var_names):
mismatches.append("var")
if mismatches:
raise ValueError(
"Index mismatch between source and target modalities: "
+ " and ".join(mismatches)
+ " indices do not match."
)
# Reindex source to match target order if needed.
if needs_obs and not (source_mod.obs_names == target_mod.obs_names).all():
logger.info("Reindexing source observations to match target order.")
source_mod = source_mod[target_mod.obs_names, :]
if needs_var and not (source_mod.var_names == target_mod.var_names).all():
logger.info("Reindexing source variables to match target order.")
source_mod = source_mod[:, target_mod.var_names]
# .obs/.var are DataFrames (column access), .obsm/.varm/.obsp/.varp are array
# containers, and .uns is a dict -- all support key-based get/set via getattr.
_slots = [
("obs", par["obs"]),
("var", par["var"]),
("obsm", par["obsm"]),
("varm", par["varm"]),
("obsp", par["obsp"]),
("varp", par["varp"]),
("uns", par["uns"]),
]
for slot_name, keys in _slots:
if not keys:
continue
source_slot = getattr(source_mod, slot_name)
target_slot = getattr(target_mod, slot_name)
missing = [k for k in keys if k not in source_slot]
if missing:
raise ValueError(
f"The following .{slot_name} keys were not found in source "
f"modality '{par['source_modality']}': {missing}"
)
existing = [k for k in keys if k in target_slot]
if existing and not par["allow_overwrite"]:
raise ValueError(
f"The following .{slot_name} keys already exist in the target "
f"modality '{target_modality}': {existing}. "
f"Use --allow_overwrite to overwrite them."
)
if existing:
logger.warning("Overwriting existing .%s keys: %s", slot_name, existing)
logger.info("Moving .%s keys: %s", slot_name, keys)
for key in keys:
target_slot[key] = source_slot[key]
logger.info(
"Writing output to '%s' with compression '%s'",
par["output"],
par["output_compression"],
)
write_h5ad_to_h5mu_with_compression(
output_file=par["output"],
h5mu=par["input_target"],
modality_name=target_modality,
modality_data=target_mod,
output_compression=par["output_compression"],
)