Skip to content

load_book: support for fixed tones, smurfgaps flags, no_headers #1219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sotodlib/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def get_obs(self,
on_missing=None,
free_tags=None,
no_signal=None,
no_headers=None,
special_channels=None,
loader_type=None,
):
"""Load TOD and supporting metadata for some observation.
Expand Down Expand Up @@ -204,6 +206,10 @@ def get_obs(self,
no_signal (bool): If True, the .signal will be set to None.
This is a way to get the axes and pointing info without
the (large) TOD blob. Not all loaders may support this.
no_headers (bool): If True, avoid loading "header"
information that tags along with the signal.
special_channels (bool): If True, load "special" readout
channels that are normally skipped (e.g. fixed tones).
loader_type (str): Name of the registered TOD loader
function to use (this will override whatever is specified
in context.yaml).
Expand Down Expand Up @@ -286,7 +292,9 @@ def get_obs(self,
loader_type = self.get('obs_loader_type', 'default')
loader_func = OBSLOADER_REGISTRY[loader_type] # Register your loader?
aman = loader_func(self.obsfiledb, obs_id, dets=dets,
samples=samples, no_signal=no_signal)
samples=samples, no_signal=no_signal,
no_headers=no_headers,
special_channels=special_channels)

if aman is None:
return meta
Expand Down Expand Up @@ -567,6 +575,11 @@ def obsloader_template(db, obs_id, dets=None, prefix=None, samples=None,
no_signal (bool): If True, loader should avoid reading signal
data (if possible) and should set .signal=None in the output.
Passing None is equivalent to passing False.
no_headers (bool): If True, loader should avoid loading "header"
information that tags along with the signal (such as frame
counters, biases, etc.)
special_channels (bool): If True, load additional special /
diagnostic readout channels that are normally skipped.

Notes:
This interface is subject to further extension. When possible
Expand Down
130 changes: 104 additions & 26 deletions sotodlib/io/load_book.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@


def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None,
no_signal=None,
no_signal=None, no_headers=None, special_channels=None,
**kwargs):
"""Obsloader function for SO "Level 3" obs/oper Books.

Expand All @@ -82,6 +82,9 @@ def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None,

if no_signal is None:
no_signal = False # from here, assume no_signal in [True, False]
if no_headers is None:
# By default, suppress headers for "obs" book loads.
no_headers = obs_id.startswith('obs_')

# Regardless of what dets have been asked for (maybe none), get
# the list of detsets implicated in this observation. Make sure
Expand Down Expand Up @@ -150,6 +153,7 @@ def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None,
results[detset] = _load_book_detset(
files, prefix=prefix, load_ancil=(ancil is None),
samples=samples, dets=dets_req, no_signal=no_signal,
no_headers=no_headers, special_channels=special_channels,
signal_buffer=signal_buffer)
if ancil is None:
ancil = results[detset]['ancil']
Expand Down Expand Up @@ -181,7 +185,8 @@ def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None,
get_frame_det_info=False)
return obs

def load_book_file(filename, dets=None, samples=None, no_signal=False):
def load_book_file(filename, dets=None, samples=None, no_signal=False,
no_headers=False, special_channels=False):
"""Load one or more g3 files (from an obs/oper book) and return the
contents as an AxisManager.

Expand All @@ -193,6 +198,10 @@ def load_book_file(filename, dets=None, samples=None, no_signal=False):
samples (tuple or None): Sample range to load.
no_signal (bool): If True, the signal data are not read and
.signal is set to None.
no_headers (bool): If True, the smurf header fields are not read,
and .primary will not be present in the output.
special_channels (bool): If True, fixed tones (untracked)
channels will be loaded and exposed at .tones.

Notes:

Expand Down Expand Up @@ -228,13 +237,13 @@ def load_book_file(filename, dets=None, samples=None, no_signal=False):
files = [(f, None, None) for f in filename]
this_detset = _load_book_detset(
files, load_ancil=True,
samples=samples, dets=dets, no_signal=no_signal)
samples=samples, dets=dets, no_signal=no_signal,
no_headers=no_headers, special_channels=special_channels)

return _concat_filesets({'?': this_detset},
this_detset['ancil'],
this_detset['timestamps'],
sample0=samples[0],
no_signal=no_signal)
sample0=samples[0])


def load_smurf_npy_data(ctx, obs_id, substr):
Expand Down Expand Up @@ -263,6 +272,7 @@ def load_smurf_npy_data(ctx, obs_id, substr):

def _load_book_detset(files, prefix='', load_ancil=True,
dets=None, samples=None, no_signal=False,
no_headers=False, special_channels=False,
signal_buffer=None):
"""Read data from a single detset.

Expand All @@ -276,14 +286,21 @@ def _load_book_detset(files, prefix='', load_ancil=True,
stream_id = None
ancil_acc = None
times_acc = None

flag_accs = {}
this_stream_dets = None

if load_ancil:
times_acc = Accumulator1d(samples=samples)
ancil_acc = AccumulatorTimesampleMap(samples=samples)
primary_acc = AccumulatorNamed(samples=samples)
bias_names = []
bias_acc = Accumulator2d(samples=samples)
signal_acc = None
this_stream_dets = None
if no_headers:
primary_acc = None
bias_names = None
bias_acc = None
else:
primary_acc = AccumulatorNamed(samples=samples)
bias_names = []
bias_acc = Accumulator2d(samples=samples)

if no_signal:
signal_acc = None
Expand All @@ -299,28 +316,43 @@ def _load_book_detset(files, prefix='', load_ancil=True,
keys_to_keep=dets,
calibrate=SIGNAL_RESCALE)

if special_channels:
tones_acc = Accumulator2d(
samples=samples,
calibrate=SIGNAL_RESCALE)
else:
tones_acc = None

if (not no_signal) or (not no_headers) or (special_channels):
# If we're not otherwise scanning every frame ... don't due
# any smurfy flags.
flag_accs['smurfgaps'] = Accumulator1d(samples=samples)

# Sniff out a smurf status frame.
smurf_proc = load_smurf.SmurfStatus._get_frame_processor()

for frame, frame_offset in _frames_iterator(files, prefix, samples,
smurf_proc=smurf_proc):
more_data = True
# This is to escape once requested number of samples (and a
# smurf dump frame) are read. Loop can exit early if no data
# are actually required from frame.
more_data = (not smurf_proc.get('dump_frame', False))

# Anything in ancil should be identical across
# filesets, so only process it once.
if load_ancil:
more_data &= times_acc.append(frame['ancil'].times, frame_offset)
more_data &= ancil_acc.append(frame['ancil'], frame_offset)
more_data |= times_acc.append(frame['ancil'].times, frame_offset)
more_data |= ancil_acc.append(frame['ancil'], frame_offset)

if 'stream_id' in frame:
if stream_id is None:
stream_id = frame['stream_id']
assert (stream_id == frame['stream_id']) # check your data files

if 'primary' in frame:
more_data &= primary_acc.append(frame['primary'], frame_offset)
if 'primary' in frame and primary_acc is not None:
more_data |= primary_acc.append(frame['primary'], frame_offset)
bias_names = _check_bias_names(frame)[:_TES_BIAS_COUNT]
more_data &= bias_acc.append(frame['tes_biases'], frame_offset)
more_data |= bias_acc.append(frame['tes_biases'], frame_offset)

if 'signal' in frame:
# Even if no_signal, we need the det list.
Expand All @@ -329,7 +361,13 @@ def _load_book_detset(files, prefix='', load_ancil=True,

# Extract the main signal
if not no_signal:
more_data &= signal_acc.append(frame['signal'], frame_offset)
more_data |= signal_acc.append(frame['signal'], frame_offset)

if 'untracked' in frame and special_channels:
more_data |= tones_acc.append(frame['untracked'], frame_offset)

if 'flag_smurfgaps' in frame and 'smurfgaps' in flag_accs:
more_data |= flag_accs['smurfgaps'].append(frame['flag_smurfgaps'])

if not more_data:
break
Expand Down Expand Up @@ -386,14 +424,16 @@ def _load_book_detset(files, prefix='', load_ancil=True,
'bias_names': bias_names,
'smurf_ch_info': ch_info,
'iir_params': iir_params,
'tones': tones_acc,
'flags': flag_accs,
'ancil': ancil_acc,
'timestamps': times_acc,
}


def _concat_filesets(results, ancil=None, timestamps=None,
sample0=0, obs_id=None, dets=None,
no_signal=False, signal_buffer=None,
signal_buffer=None,
get_frame_det_info=True):
"""Assemble multiple detset results (as returned by _load_book_detset)
into a full AxisManager.
Expand Down Expand Up @@ -462,8 +502,34 @@ def _concat_filesets(results, ancil=None, timestamps=None,
aman['signal'][dets_ofs:dets_ofs + len(d)] = d
dets_ofs += len(d)

# In sims, the whole primary block may be unpopulated.
if any([v['primary'].data is not None for v in results.values()]):
if one_result['tones'] is not None:
# Expose the "untracked" channels, i.e. fixed tones.
n_tones = 0
tone_info = []
for v in results.values():
d = v['tones'].finalize()
n_tones += d.shape[0]
for k in v['tones'].keys:
# Should look like this: sch_NONE_2_326
b, c = map(int, k.split('_')[2:])
tone_info.append((v['stream_id'], v['stream_id'] + f'_{b}_{c}', b, c))
ts, tk, tb, tc = map(np.array, zip(*tone_info))
tman = core.AxisManager(core.LabelAxis('tdets', tk),
aman.samps)
aman.wrap('tones', tman)
aman.tones.wrap('stream_id', ts, axis_map=[(0, 'tdets')])
aman.tones.wrap('band', tb, axis_map=[(0, 'tdets')])
aman.tones.wrap('channel', tc, axis_map=[(0, 'tdets')])
aman.tones.wrap_new('signal', shape=('tdets', 'samps'), dtype='float32')
dets_ofs = 0
for v in results.values():
d = v['tones'].data
aman.tones['signal'][dets_ofs:dets_ofs + len(d)] = d
dets_ofs += len(d)

# In sims, or if no_headers, the primary block may be unpopulated.
if any([(v['primary'] is not None and v['primary'].data is not None)
for v in results.values()]):
# Biases
all_bias_names = []
for v in results.values():
Expand All @@ -474,17 +540,22 @@ def _concat_filesets(results, ancil=None, timestamps=None,
aman['biases'][i * _TES_BIAS_COUNT:(i + 1) * _TES_BIAS_COUNT, :] = \
v['biases'].finalize()[:_TES_BIAS_COUNT, :]

# Primary (and other stuff to group per-stream)
# Other header data, per-stream
aman.wrap('primary', core.AxisManager(aman.samps))
aman.wrap('iir_params', core.AxisManager())
aman['iir_params'].wrap('per_stream', True)
for r in results.values():
# Primary.
_prim = core.AxisManager(aman.samps)
for k, v in r['primary'].finalize().items():
_prim.wrap(k, v, [(0, 'samps')])
aman['primary'].wrap(r['stream_id'], _prim)
# Filter parameters


if any([v['iir_params'] is not None
for v in results.values()]):
# Filter parameters, per-stream
aman.wrap('iir_params', core.AxisManager())
aman['iir_params'].wrap('per_stream', True)
for r in results.values():
_iir = None
if r.get('iir_params') is not None:
_iir = core.AxisManager()
Expand All @@ -495,6 +566,12 @@ def _concat_filesets(results, ancil=None, timestamps=None,
# flags place
aman.wrap("flags", core.FlagManager.for_tod(aman, "dets", "samps"))

# Assemble per-wafer flags by suffixing the stream_id.
for flag_key in one_result['flags'].keys():
for v in results.values():
fasr = so3g.RangesInt32.from_mask(v['flags'][flag_key].finalize())
aman.flags.wrap(f'{flag_key}_{v["stream_id"]}', fasr, axis_map=[(0, 'samps')])

if not get_frame_det_info:
return aman

Expand Down Expand Up @@ -790,8 +867,9 @@ def _extract(self, data, src_slice, dest_slice):
self.extract_at_idx,
src_slice.start, src_slice.stop)
elif self.shape is not None:
data.extract(self.data[:, dest_slice], None, self.extract_at_idx,
src_slice.start, src_slice.stop)
if len(data.names) > 0:
data.extract(self.data[:, dest_slice], None, self.extract_at_idx,
src_slice.start, src_slice.stop)
else:
_sh = [len(data.names), len(data.times)]
if self.extract_at_idx is not None:
Expand Down