Skip to content

Patch 326: validate run summary channels #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ jobs:
pip install -e .
conda list

- name: Install Jupyter and dependencies
run: |
pip install jupyter
pip install ipykernel
python -m ipykernel install --user --name aurora-test
# Install any other dependencies you need
# - name: Install Jupyter and dependencies
# run: |
# pip install jupyter
# pip install ipykernel
# python -m ipykernel install --user --name aurora-test
# # Install any other dependencies you need

- name: Execute Jupyter Notebooks
run: |
jupyter nbconvert --to notebook --execute docs/examples/dataset_definition.ipynb
jupyter nbconvert --to notebook --execute docs/examples/make_cas04_single_station_h5.ipynb
jupyter nbconvert --to notebook --execute docs/examples/operate_aurora.ipynb
jupyter nbconvert --to notebook --execute tests/test_run_on_commit.ipynb
jupyter nbconvert --to notebook --execute docs/tutorials/pole_zero_fitting/lemi_pole_zero_fitting_example.ipynb
jupyter nbconvert --to notebook --execute docs/tutorials/processing_configuration.ipynb
jupyter nbconvert --to notebook --execute docs/tutorials/synthetic_data_processing.ipynb
# Replace "notebook.ipynb" with your notebook's filename
# - name: Execute Jupyter Notebooks
# run: |
# jupyter nbconvert --to notebook --execute docs/examples/dataset_definition.ipynb
# jupyter nbconvert --to notebook --execute docs/examples/make_cas04_single_station_h5.ipynb
# jupyter nbconvert --to notebook --execute docs/examples/operate_aurora.ipynb
# jupyter nbconvert --to notebook --execute tests/test_run_on_commit.ipynb
# jupyter nbconvert --to notebook --execute docs/tutorials/pole_zero_fitting/lemi_pole_zero_fitting_example.ipynb
# jupyter nbconvert --to notebook --execute docs/tutorials/processing_configuration.ipynb
# jupyter nbconvert --to notebook --execute docs/tutorials/synthetic_data_processing.ipynb
# # Replace "notebook.ipynb" with your notebook's filename

# - name: Commit changes (if any)
# run: |
Expand Down
86 changes: 79 additions & 7 deletions aurora/pipelines/run_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ def __init__(self, **kwargs):
self.column_dtypes = [str, str, pd.Timestamp, pd.Timestamp]
self._input_dict = kwargs.get("input_dict", None)
self.df = kwargs.get("df", None)
self._mini_summary_columns = ["survey", "station_id", "run_id", "start", "end"]
self._mini_summary_columns = [
"survey",
"station_id",
"run_id",
"start",
"end",
]

def clone(self):
"""
Expand Down Expand Up @@ -120,7 +126,9 @@ def check_runs_are_valid(self, drop=False, **kwargs):
run_obj = m.get_run(row.station_id, row.run_id, row.survey)
runts = run_obj.to_runts()
if runts.dataset.to_array().data.__abs__().sum() == 0:
logger.critical("CRITICAL: Detected a run with all zero values")
logger.critical(
"CRITICAL: Detected a run with all zero values"
)
self.df["valid"].at[i_row] = False
# load each run, and take the median of the sum of the absolute values
if drop:
Expand All @@ -131,6 +139,60 @@ def drop_invalid_rows(self):
self.df = self.df[self.df.valid]
self.df.reset_index(drop=True, inplace=True)

def validate_channels(self, drop=False):
"""
Check to make sure each run has the same input and output channels

optional: drop runs that do not have the same number of channels.


"""

if len(self.df) <= 1:
return
if len(self.df) == 2:
if (
self.df.iloc[0].input_channels
!= self.df.iloc[1].input_channels
):
logger.warning(
"Input channels are not the same: "
f"row[0]: {self.df.iloc[0].input_channels} != "
f"row[1]: {self.df.iloc[1].input_channels}"
)
if (
self.df.iloc[0].output_channels
!= self.df.iloc[1].output_channels
):
logger.warning(
"Output channels are not the same: "
f"row[0]: {self.df.iloc[0].output_channels} != "
f"row[1]: {self.df.iloc[1].output_channels}"
)
return
else:
common_input_channels = self.df.input_channels.mode()[0]
common_output_channels = self.df.output_channels.mode()[0]
for row in self.df.itertuples():
if row.input_channels != common_input_channels:
self.df["valid"].at[row.Index] = False
logger.warning(
"Input channels are not the same: "
f"run {row.run_id} row {row.Index}: {row.input_channels} != "
f"{common_input_channels}"
)
if row.output_channels != common_output_channels:
self.df["valid"].at[row.Index] = False
logger.warning(
"output channels are not the same: "
f"run {row.run_id} row {row.Index}: {row.output_channels} != "
f"{common_output_channels}"
)

if drop:
self.drop_invalid_rows()
return

# BELOW FUNCTION CAN BE COPIED FROM METHOD IN KernelDataset()
# def drop_runs_shorter_than(self, duration, units="s"):
# if units != "s":
Expand Down Expand Up @@ -221,7 +283,9 @@ def channel_summary_to_run_summary(
channel_scale_factors = n_station_runs * [None]
i = 0
for group_values, group in grouper:
group_info = dict(zip(group_by_columns, group_values)) # handy for debug
group_info = dict(
zip(group_by_columns, group_values)
) # handy for debug
# for k, v in group_info.items():
# print(f"{k} = {v}")
survey_ids[i] = group_info["survey"]
Expand All @@ -232,9 +296,15 @@ def channel_summary_to_run_summary(
sample_rates[i] = group.sample_rate.iloc[0]
channels_list = group.component.to_list()
num_channels = len(channels_list)
input_channels[i] = [x for x in channels_list if x in allowed_input_channels]
output_channels[i] = [x for x in channels_list if x in allowed_output_channels]
channel_scale_factors[i] = dict(zip(channels_list, num_channels * [1.0]))
input_channels[i] = [
x for x in channels_list if x in allowed_input_channels
]
output_channels[i] = [
x for x in channels_list if x in allowed_output_channels
]
channel_scale_factors[i] = dict(
zip(channels_list, num_channels * [1.0])
)
i += 1

data_dict = {}
Expand Down Expand Up @@ -286,7 +356,9 @@ def extract_run_summary_from_mth5(mth5_obj, summary_type="run"):
return out_df


def extract_run_summaries_from_mth5s(mth5_list, summary_type="run", deduplicate=True):
def extract_run_summaries_from_mth5s(
mth5_list, summary_type="run", deduplicate=True
):
"""
ToDo: Move this method into mth5? or mth5_helpers?
ToDo: Make this a class so that the __repr__ is a nice visual representation of the
Expand Down
62 changes: 51 additions & 11 deletions aurora/pipelines/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def apply_prewhitening(decimation_obj, run_xrds_input):
run_xrds = run_xrds_input.differentiate("time")

else:
msg = f"{decimation_obj.prewhitening_type} pre-whitening not implemented"
msg = (
f"{decimation_obj.prewhitening_type} pre-whitening not implemented"
)
logger.exception(msg)
raise NotImplementedError(msg)
return run_xrds
Expand Down Expand Up @@ -194,7 +196,9 @@ def truncate_to_clock_zero(decimation_obj, run_xrds):
pass # time series start is already clock zero
else:
windowing_scheme = window_scheme_from_decimation(decimation_obj)
number_of_steps = delta_t_seconds / windowing_scheme.duration_advance
number_of_steps = (
delta_t_seconds / windowing_scheme.duration_advance
)
n_partial_steps = number_of_steps - np.floor(number_of_steps)
n_clip = n_partial_steps * windowing_scheme.num_samples_advance
n_clip = int(np.round(n_clip))
Expand Down Expand Up @@ -222,8 +226,10 @@ def nan_to_mean(xrds):
for ch in xrds.keys():
null_values_present = xrds[ch].isnull().any()
if null_values_present:
nan_count = np.count_nonzero(np.isnan(xrds[ch]))
logger.info(
"Null values detected in xrds -- this is not expected and should be examined"
f"{nan_count} Null values detected in xrds channel {ch}. "
"Check if this is unexpected."
)
value = np.nan_to_num(np.nanmean(xrds[ch].data))
xrds[ch] = xrds[ch].fillna(value)
Expand Down Expand Up @@ -259,7 +265,9 @@ def run_ts_to_stft(decimation_obj, run_xrds_orig):
if not np.prod(windowed_obj.to_array().data.shape):
raise ValueError

windowed_obj = WindowedTimeSeries.detrend(data=windowed_obj, detrend_type="linear")
windowed_obj = WindowedTimeSeries.detrend(
data=windowed_obj, detrend_type="linear"
)
tapered_obj = windowed_obj * windowing_scheme.taper
stft_obj = windowing_scheme.apply_fft(
tapered_obj, detrend_type=decimation_obj.extra_pre_fft_detrend_type
Expand All @@ -269,7 +277,9 @@ def run_ts_to_stft(decimation_obj, run_xrds_orig):
return stft_obj


def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None):
def calibrate_stft_obj(
stft_obj, run_obj, units="MT", channel_scale_factors=None
):
"""

Parameters
Expand All @@ -291,15 +301,16 @@ def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None
Time series of calibrated Fourier coefficients
"""
for channel_id in stft_obj.keys():

channel = run_obj.get_channel(channel_id)
channel_response = channel.channel_response
if not channel_response.filters_list:
msg = f"Channel {channel_id} with empty filters list detected"
logger.warning(msg)
if channel_id == "hy":
msg = "Channel hy has no filters, try using filters from hx"
logger.warning("Channel HY has no filters, try using filters from HX")
logger.warning(
"Channel HY has no filters, try using filters from HX"
)
channel_response = run_obj.get_channel("hx").channel_response

indices_to_flip = channel_response.get_indices_of_filters_to_remove(
Expand All @@ -308,7 +319,9 @@ def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None
indices_to_flip = [
i for i in indices_to_flip if channel.metadata.filter.applied[i]
]
filters_to_remove = [channel_response.filters_list[i] for i in indices_to_flip]
filters_to_remove = [
channel_response.filters_list[i] for i in indices_to_flip
]
if not filters_to_remove:
logger.warning("No filters to remove")
calibration_response = channel_response.complex_response(
Expand All @@ -321,7 +334,9 @@ def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None
channel_scale_factor = 1.0
calibration_response /= channel_scale_factor
if units == "SI":
logger.warning("Warning: SI Units are not robustly supported issue #36")
logger.warning(
"Warning: SI Units are not robustly supported issue #36"
)
stft_obj[channel_id].data /= calibration_response
return stft_obj

Expand Down Expand Up @@ -353,7 +368,9 @@ def prototype_decimate(config, run_xrds):
num_channels = len(channel_labels)
new_data = np.full((num_observations, num_channels), np.nan)
for i_ch, ch_label in enumerate(channel_labels):
new_data[:, i_ch] = ssig.decimate(run_xrds[ch_label], int(config.factor))
new_data[:, i_ch] = ssig.decimate(
run_xrds[ch_label], int(config.factor)
)

xr_da = xr.DataArray(
new_data,
Expand Down Expand Up @@ -387,7 +404,9 @@ def prototype_decimate_2(config, run_xrds):
xr_ds: xr.Dataset
Decimated version of the input run_xrds
"""
new_xr_ds = run_xrds.coarsen(time=int(config.factor), boundary="trim").mean()
new_xr_ds = run_xrds.coarsen(
time=int(config.factor), boundary="trim"
).mean()
attr_dict = run_xrds.attrs
attr_dict["sample_rate"] = config.sample_rate
new_xr_ds.attrs = attr_dict
Expand Down Expand Up @@ -422,3 +441,24 @@ def prototype_decimate_3(config, run_xrds):
attr_dict["sample_rate"] = config.sample_rate
new_xr_ds.attrs = attr_dict
return new_xr_ds


def prototype_decimate_4(config, run_xrds):
"""
use scipy filters resample_poly

:param config: DESCRIPTION
:type config: TYPE
:param run_xrds: DESCRIPTION
:type run_xrds: TYPE
:return: DESCRIPTION
:rtype: TYPE

"""
new_ds = run_xrds.fillna(0)
new_ds = new_ds.sps_filters.resample_poly(
config.sample_rate, pad_type="mean"
)

new_ds.attrs["sample_rate"] = config.sample_rate
return new_ds
30 changes: 21 additions & 9 deletions aurora/pipelines/transfer_function_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from loguru import logger


ESTIMATOR_LIBRARY = {"OLS": RegressionEstimator, "RME": TRME, "RME_RR": TRME_RR}
ESTIMATOR_LIBRARY = {
"OLS": RegressionEstimator,
"RME": TRME,
"RME_RR": TRME_RR,
}


def get_estimator_class(estimation_engine):
Expand Down Expand Up @@ -201,7 +205,6 @@ def process_transfer_functions(
estimator_class = get_estimator_class(dec_level_config.estimator.engine)
iter_control = set_up_iter_control(dec_level_config)
for band in transfer_function_obj.frequency_bands.bands():

X, Y, RR = get_band_for_tf_estimate(
band, dec_level_config, local_stft_obj, remote_stft_obj
)
Expand All @@ -213,7 +216,9 @@ def process_transfer_functions(
coherence_weights_jj84,
)

Wjj84 = coherence_weights_jj84(band, local_stft_obj, remote_stft_obj)
Wjj84 = coherence_weights_jj84(
band, local_stft_obj, remote_stft_obj
)
apply_weights(X, Y, RR, Wjj84, segment=True, dropna=False)
if "simple_coherence" in segment_weights:
from aurora.transfer_function.weights.coherence_weights import (
Expand All @@ -228,7 +233,9 @@ def process_transfer_functions(
multiple_coherence_weights,
)

W = multiple_coherence_weights(band, local_stft_obj, remote_stft_obj)
W = multiple_coherence_weights(
band, local_stft_obj, remote_stft_obj
)
apply_weights(X, Y, RR, W, segment=True, dropna=False)

# if there are channel weights apply them here
Expand All @@ -237,33 +244,38 @@ def process_transfer_functions(
X, Y, RR = stack_fcs(X, Y, RR)

# Should only be needed if weights were applied
X, Y, RR = drop_nans(X, Y, RR)
# X, Y, RR = drop_nans(X, Y, RR)

W = effective_degrees_of_freedom_weights(X, RR, edf_obj=None)
X, Y, RR = apply_weights(X, Y, RR, W, segment=False, dropna=True)

if dec_level_config.estimator.estimate_per_channel:
for ch in dec_level_config.output_channels:
Y_ch = Y[ch].to_dataset() # keep as a dataset, maybe not needed
Y_ch = Y[
ch
].to_dataset() # keep as a dataset, maybe not needed

X_, Y_, RR_ = handle_nan(X, Y_ch, RR, drop_dim="observation")

# see note #1
# if RR is not None:
# W = effective_degrees_of_freedom_weights(X_, RR_, edf_obj=None)
# X_, Y_, RR_ = apply_weights(X_, Y_, RR_, W, segment=False)

regression_estimator = estimator_class(
X=X_, Y=Y_, Z=RR_, iter_control=iter_control
)
regression_estimator.estimate()
transfer_function_obj.set_tf(regression_estimator, band.center_period)
transfer_function_obj.set_tf(
regression_estimator, band.center_period
)
else:
X, Y, RR = handle_nan(X, Y, RR, drop_dim="observation")
regression_estimator = estimator_class(
X=X, Y=Y, Z=RR, iter_control=iter_control
)
regression_estimator.estimate()
transfer_function_obj.set_tf(regression_estimator, band.center_period)
transfer_function_obj.set_tf(
regression_estimator, band.center_period
)

return transfer_function_obj
Loading
Loading