Skip to content

Commit 2f329b4

Browse files
authored
Reorganize workflow steps (#943)
* Reorganize workflow steps. - Move batch control script out of toast submodule - Fix det indexing in toast readout filter application - Extract noise model selection into helper functions - Extract common preprocess workflow setup / application into helper functions - If simulating signal from scanning from a map, cleanup any custom pointing model that is used. * Fix entry for so_batch_control in pyproject.toml * Fix typo
1 parent be5656a commit 2f329b4

14 files changed

+406
-221
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ so_hardware_sim = "sotodlib.scripts.hardware_sim:main"
6262
so_hardware_plot = "sotodlib.scripts.hardware_plot:main"
6363
so_hardware_trim = "sotodlib.scripts.hardware_trim:main"
6464
so_hardware_info = "sotodlib.scripts.hardware_info:main"
65+
so_batch_control = "sotodlib.scripts.so_batch_control:main"
6566
so-metadata = "sotodlib.core.metadata.cli:main"
6667
so-site-pipeline = "sotodlib.site_pipeline.cli:main"
6768
so-data-package = "sotodlib.io.imprinter_cli:main"
6869
toast_so_sim = "sotodlib.toast.scripts.so_sim:cli"
6970
toast_so_map = "sotodlib.toast.scripts.so_map:cli"
7071
toast_so_transfer = "sotodlib.toast.scripts.so_transfer:cli"
7172
toast_so_convert = "sotodlib.toast.scripts.so_convert:cli"
72-
toast_so_batch_control = "sotodlib.toast.scripts.so_batch_control:cli"
7373
get_wafer_offset = "sotodlib.toast.scripts.get_wafer_offset:main"
7474

7575
[tool.versioneer]

sotodlib/toast/scripts/so_batch_control.py renamed to sotodlib/scripts/so_batch_control.py

+26-57
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,10 @@
1212
"""
1313

1414
import argparse
15-
import datetime
1615
import os
1716
import re
18-
import sys
1917
import time
2018

21-
import numpy as np
22-
23-
from astropy import units as u
24-
25-
# Import sotodlib.toast first, since that sets default object names
26-
# to use in toast.
27-
import sotodlib.toast as sotoast
28-
29-
import toast
30-
import toast.ops
31-
from toast.mpi import MPI, Comm
32-
from toast.observation import default_values as defaults
33-
34-
from .. import ops as so_ops
35-
from .. import workflows as wrk
36-
3719

3820
def load_obs_file(path):
3921
"""Load the observation file into a list.
@@ -152,9 +134,6 @@ def find_obs(all_obs, n_job_obs, out_root, ignore_running=False, timeout_hours=2
152134

153135

154136
def main():
155-
# Get optional MPI parameters
156-
comm, procs, rank = toast.get_world()
157-
158137
parser = argparse.ArgumentParser(description="Get observation IDs for a job")
159138
parser.add_argument(
160139
"--out_root",
@@ -220,42 +199,32 @@ def main():
220199

221200
args = parser.parse_args()
222201

223-
if rank == 0:
224-
if args.get_batch is not None:
225-
# We are getting the next batch of observations
226-
all_obs = load_obs_file(args.observations)
227-
batch_obs = find_obs(all_obs, args.get_batch, args.out_root)
228-
if args.batch_list:
229-
batch_str = ",".join(batch_obs)
230-
print(f"{batch_str}", flush=True)
231-
else:
232-
for obs in batch_obs:
233-
print(f"{obs}", flush=True)
234-
elif args.get_state is not None:
235-
state = get_obs_state(args.out_root, args.get_state)
236-
print(f"{state}", flush=True)
237-
elif args.clear_state is not None:
238-
clear_obs_state(args.out_root, args.clear_state)
239-
elif args.set_state_done is not None:
240-
set_obs_state(args.out_root, args.set_state_done, "done")
241-
elif args.set_state_running is not None:
242-
set_obs_state(args.out_root, args.set_state_running, "running")
243-
elif args.cleanup:
244-
all_obs = load_obs_file(args.observations)
245-
for obs in all_obs:
246-
state = get_obs_state(args.out_root, obs)
247-
if state is not None and state == "running":
248-
clear_obs_state(args.out_root, obs)
249-
250-
if comm is not None:
251-
comm.barrier()
252-
253-
254-
def cli():
255-
world, procs, rank = toast.mpi.get_world()
256-
with toast.mpi.exception_guard(comm=world):
257-
main()
202+
if args.get_batch is not None:
203+
# We are getting the next batch of observations
204+
all_obs = load_obs_file(args.observations)
205+
batch_obs = find_obs(all_obs, args.get_batch, args.out_root)
206+
if args.batch_list:
207+
batch_str = ",".join(batch_obs)
208+
print(f"{batch_str}", flush=True)
209+
else:
210+
for obs in batch_obs:
211+
print(f"{obs}", flush=True)
212+
elif args.get_state is not None:
213+
state = get_obs_state(args.out_root, args.get_state)
214+
print(f"{state}", flush=True)
215+
elif args.clear_state is not None:
216+
clear_obs_state(args.out_root, args.clear_state)
217+
elif args.set_state_done is not None:
218+
set_obs_state(args.out_root, args.set_state_done, "done")
219+
elif args.set_state_running is not None:
220+
set_obs_state(args.out_root, args.set_state_running, "running")
221+
elif args.cleanup:
222+
all_obs = load_obs_file(args.observations)
223+
for obs in all_obs:
224+
state = get_obs_state(args.out_root, obs)
225+
if state is not None and state == "running":
226+
clear_obs_state(args.out_root, obs)
258227

259228

260229
if __name__ == "__main__":
261-
cli()
230+
main()

sotodlib/toast/ops/readout_filter.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,16 @@ def _exec(self, data, detectors=None, **kwargs):
6565

6666
# Get the rows of the focalplane table containing these dets
6767
det_table = ob.telescope.focalplane.detector_data
68-
fp_rows = np.array(
69-
[x for x, y in enumerate(det_table["name"]) if y in local_set_dets]
70-
)
68+
det_to_row = {
69+
y: x for x, y in enumerate(det_table["name"]) if y in local_set_dets
70+
}
71+
fp_rows = np.array([det_to_row[x] for x in local_dets])
7172

7273
# Get the set of all stream IDs
7374
all_wafers = set(det_table[self.wafer_key][fp_rows])
7475

75-
# The IIR filter parameters will either be in a single, top-level dictionary
76-
# or they are organized per-UFM.
76+
# The IIR filter parameters will either be in a single,
77+
# top-level dictionary or they are organized per-UFM.
7778
if (
7879
"per_stream" in ob[self.iir_params] and
7980
ob[self.iir_params]["per_stream"]

sotodlib/toast/ops/splits.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -503,12 +503,12 @@ def _exec(self, data, detectors=None, **kwargs):
503503
"write_rcond",
504504
"write_solver_products",
505505
"save_cleaned",
506-
"reset_pix_dist",
507506
]
508507

509508
# Possible traits we want to enable
510509
mapmaker_enable_traits = [
511510
"keep_final_products",
511+
"reset_pix_dist",
512512
]
513513

514514
if hasattr(self.mapmaker, "map_binning"):

sotodlib/toast/scripts/so_map.py

+32-21
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,28 @@
5858

5959
def reduce_data(job, otherargs, runargs, data):
6060
log = toast.utils.Logger.get()
61+
job_ops = job.operators
62+
job_tmpl = job.templates
6163

62-
wrk.simple_jumpcorrect(job, otherargs, runargs, data)
63-
wrk.simple_deglitch(job, otherargs, runargs, data)
64+
# Preprocess data to get flags / cuts
65+
wrk.preprocess(job, otherargs, runargs, data)
6466

65-
wrk.flag_diff_noise_outliers(job, otherargs, runargs, data)
66-
wrk.flag_noise_outliers(job, otherargs, runargs, data)
67-
wrk.deconvolve_detector_timeconstant(job, otherargs, runargs, data)
67+
# Now apply preprocessing to good detectors
6868
wrk.raw_statistics(job, otherargs, runargs, data)
6969

70-
wrk.filter_hwpss(job, otherargs, runargs, data)
71-
wrk.filter_common_mode(job, otherargs, runargs, data)
70+
wrk.diff_noise_estimation(job, otherargs, runargs, data)
71+
wrk.noise_estimation(job, otherargs, runargs, data)
72+
73+
data = wrk.demodulate(job, otherargs, runargs, data)
74+
7275
wrk.filter_ground(job, otherargs, runargs, data)
76+
wrk.filter_common_mode(job, otherargs, runargs, data)
7377
wrk.filter_poly1d(job, otherargs, runargs, data)
7478
wrk.filter_poly2d(job, otherargs, runargs, data)
79+
7580
wrk.diff_noise_estimation(job, otherargs, runargs, data)
7681
wrk.noise_estimation(job, otherargs, runargs, data)
7782

78-
data = wrk.demodulate(job, otherargs, runargs, data)
79-
8083
wrk.processing_mask(job, otherargs, runargs, data)
8184
wrk.flag_sso(job, otherargs, runargs, data)
8285
wrk.hn_map(job, otherargs, runargs, data)
@@ -152,6 +155,19 @@ def main():
152155
action="store_true",
153156
help="Map each interval separately.",
154157
)
158+
parser.add_argument(
159+
"--preprocess_copy",
160+
required=False,
161+
default=False,
162+
action="store_true",
163+
help="Perform preprocessing on a copy of the data.",
164+
)
165+
parser.add_argument(
166+
"--log_config",
167+
required=False,
168+
default=None,
169+
help="Dump out config log to this yaml file",
170+
)
155171

156172
# The operators and templates we want to configure from the command line
157173
# or a parameter file.
@@ -160,23 +176,18 @@ def main():
160176
templates = list()
161177

162178
wrk.setup_load_or_simulate_observing(parser, operators)
179+
wrk.setup_preprocess(parser, operators)
163180

164-
wrk.setup_simple_jumpcorrect(operators)
165-
wrk.setup_simple_deglitch(operators)
166-
167-
wrk.setup_flag_diff_noise_outliers(operators)
168-
wrk.setup_flag_noise_outliers(operators)
169-
wrk.setup_deconvolve_detector_timeconstant(operators)
170181
wrk.setup_raw_statistics(operators)
171182

172-
wrk.setup_filter_hwpss(operators)
173-
wrk.setup_filter_common_mode(operators)
183+
wrk.setup_demodulate(operators)
184+
174185
wrk.setup_filter_ground(operators)
186+
wrk.setup_filter_common_mode(operators)
175187
wrk.setup_filter_poly1d(operators)
176188
wrk.setup_filter_poly2d(operators)
177-
wrk.setup_noise_estimation(operators)
178189

179-
wrk.setup_demodulate(operators)
190+
wrk.setup_noise_estimation(operators)
180191

181192
wrk.setup_processing_mask(operators)
182193
wrk.setup_flag_sso(operators)
@@ -201,8 +212,8 @@ def main():
201212
os.makedirs(otherargs.out_dir, exist_ok=True)
202213

203214
# Log the config that was actually used at runtime.
204-
outlog = os.path.join(otherargs.out_dir, "config_log.toml")
205-
toast.config.dump_toml(outlog, config, comm=comm)
215+
if otherargs.log_config is not None:
216+
toast.config.dump_yaml(otherargs.log_config, config, comm=comm)
206217

207218
# If this is a dry run, exit
208219
if otherargs.dry_run:

sotodlib/toast/scripts/so_sim.py

+29-18
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,23 @@ def simulate_data(job, otherargs, runargs, data):
8989
def reduce_data(job, otherargs, runargs, data):
9090
log = toast.utils.Logger.get()
9191

92-
wrk.simple_jumpcorrect(job, otherargs, runargs, data)
93-
wrk.simple_deglitch(job, otherargs, runargs, data)
92+
wrk.preprocess(job, otherargs, runargs, data)
9493

95-
wrk.flag_diff_noise_outliers(job, otherargs, runargs, data)
96-
wrk.flag_noise_outliers(job, otherargs, runargs, data)
97-
wrk.deconvolve_detector_timeconstant(job, otherargs, runargs, data)
9894
wrk.raw_statistics(job, otherargs, runargs, data)
9995

100-
wrk.filter_hwpss(job, otherargs, runargs, data)
101-
wrk.filter_common_mode(job, otherargs, runargs, data)
96+
wrk.diff_noise_estimation(job, otherargs, runargs, data)
97+
wrk.noise_estimation(job, otherargs, runargs, data)
98+
99+
data = wrk.demodulate(job, otherargs, runargs, data)
100+
102101
wrk.filter_ground(job, otherargs, runargs, data)
102+
wrk.filter_common_mode(job, otherargs, runargs, data)
103103
wrk.filter_poly1d(job, otherargs, runargs, data)
104104
wrk.filter_poly2d(job, otherargs, runargs, data)
105+
105106
wrk.diff_noise_estimation(job, otherargs, runargs, data)
106107
wrk.noise_estimation(job, otherargs, runargs, data)
107108

108-
data = wrk.demodulate(job, otherargs, runargs, data)
109-
110109
wrk.processing_mask(job, otherargs, runargs, data)
111110
wrk.flag_sso(job, otherargs, runargs, data)
112111
wrk.hn_map(job, otherargs, runargs, data)
@@ -193,6 +192,19 @@ def main():
193192
action="store_true",
194193
help="Zero out detector data loaded from disk",
195194
)
195+
parser.add_argument(
196+
"--preprocess_copy",
197+
required=False,
198+
default=False,
199+
action="store_true",
200+
help="Perform preprocessing on a copy of the data.",
201+
)
202+
parser.add_argument(
203+
"--log_config",
204+
required=False,
205+
default=None,
206+
help="Dump out config log to this yaml file",
207+
)
196208

197209
# The operators and templates we want to configure from the command line
198210
# or a parameter file.
@@ -201,6 +213,7 @@ def main():
201213
templates = list()
202214

203215
wrk.setup_load_or_simulate_observing(parser, operators)
216+
wrk.setup_preprocess(parser, operators)
204217

205218
wrk.setup_simulate_atmosphere_signal(operators)
206219
wrk.setup_simulate_sky_map_signal(operators)
@@ -220,15 +233,8 @@ def main():
220233
wrk.setup_simulate_readout_effects(operators)
221234
wrk.setup_save_data_hdf5(operators)
222235

223-
wrk.setup_simple_jumpcorrect(operators)
224-
wrk.setup_simple_deglitch(operators)
225-
226-
wrk.setup_flag_diff_noise_outliers(operators)
227-
wrk.setup_flag_noise_outliers(operators)
228-
wrk.setup_deconvolve_detector_timeconstant(operators)
229236
wrk.setup_raw_statistics(operators)
230237

231-
wrk.setup_filter_hwpss(operators)
232238
wrk.setup_filter_common_mode(operators)
233239
wrk.setup_filter_ground(operators)
234240
wrk.setup_filter_poly1d(operators)
@@ -260,8 +266,13 @@ def main():
260266
os.makedirs(otherargs.out_dir, exist_ok=True)
261267

262268
# Log the config that was actually used at runtime.
263-
outlog = os.path.join(otherargs.out_dir, "config_log.toml")
264-
toast.config.dump_toml(outlog, config, comm=comm)
269+
if otherargs.log_config is not None:
270+
# User wants to specify the location of the yaml config log
271+
toast.config.dump_yaml(otherargs.log_config, config, comm=comm)
272+
else:
273+
# Log a toml file in the output directory
274+
outlog = os.path.join(otherargs.out_dir, "config_log.toml")
275+
toast.config.dump_toml(outlog, config, comm=comm)
265276

266277
# If this is a dry run, exit
267278
if otherargs.dry_run:

0 commit comments

Comments
 (0)