Skip to content

Add Ensemble Information Filter runmodel #10466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

yngve-sk
Copy link
Contributor

@yngve-sk yngve-sk commented Mar 26, 2025

TODO:

  • Make existing tests pass
  • Add performance tests for enif
  • Add integration test for enif poly
  • Add snapshot test for enif

Issue
Resolves #10455, #10456, #10457, #10458, #10459

Screenshot 2025-03-25 at 12 43 40
Screenshot 2025-03-25 at 12 49 47

@yngve-sk yngve-sk force-pushed the enif-in-ert branch 8 times, most recently from ef5affc to 1d672f6 Compare March 26, 2025 14:18
@yngve-sk yngve-sk self-assigned this Mar 27, 2025
Copy link

codspeed-hq bot commented Mar 27, 2025

CodSpeed Performance Report

Merging #10466 will not alter performance

Comparing yngve-sk:enif-in-ert (1874eb6) with main (9097b8b)

Summary

✅ 25 untouched benchmarks
🆕 1 new benchmarks

Benchmarks breakdown

Benchmark BASE HEAD Change
🆕 test_speed_performance_of_doing_enif_update[setup_es_benchmark0] N/A 127.1 ms N/A

@yngve-sk yngve-sk force-pushed the enif-in-ert branch 4 times, most recently from 3dd0800 to 219cd74 Compare April 4, 2025 11:52
obs_keys = (
observations_and_responses.select("observation_key").to_numpy().reshape((-1,))
)
indexes = observations_and_responses.select("index").to_numpy().reshape((-1,))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the below call instead? In new notebooks I have used

def fetch_responses_and_parameters_from_storage(prior):
    # Get responses
    iens_active_index = np.flatnonzero(prior.get_realization_mask_with_parameters())
    selected_observations = prior.experiment.observation_keys
    observations_and_responses = prior.get_observations_and_responses(
        selected_observations,
        iens_active_index,
    )
    S = observations_and_responses.select(
        observations_and_responses.columns[5:]
    ).to_numpy()
    observation_values = (
        observations_and_responses.select("observations").to_numpy().reshape((-1,))
    )
    observation_errors = observations_and_responses.select("std").to_numpy().reshape((-1,))
    observation_names = (
        observations_and_responses.select("observation_key").to_numpy().reshape((-1,))
    )
    indexes = observations_and_responses.select("index").to_numpy().reshape((-1,))
    observation_names_unique = result = [": ".join(t) for t in list(zip(observation_names, indexes))]
    Y = S.T.copy()
    d = observation_values
    d_sd = observation_errors
    
    # Get prior and standardize it
    parameter_groups = list(prior.experiment.parameter_info.keys())
    X_full, param_group_bounds, parameter_names = _all_parameters(
        ensemble=prior,
        iens_active_index=iens_active_index,
        param_groups=parameter_groups,
    )
    scaler_x = StandardScaler()
    U_prior = scaler_x.fit_transform(X_full.T)

    return U_prior, scaler_x, Y, d, d_sd, parameter_groups, iens_active_index, parameter_names, observation_names_unique, param_group_bounds

This also loads information that is used for plotting:
call to regression plots

observation_index = 1
plot_centered_S_vs_features(
    observation_index, 
    Y.T, 
    U_prior.T, 
    gtmap.H, 
    d, 
    d_sd, 
    param_group_bounds,
    num_cols=3,
    X_post = X_full_post,
    observation_name = observation_names_unique[observation_index],
    parameter_names = parameter_names
)

call to waterfall plot

create_waterfall_chart(
    K[parameter_index,:],
    innovation_mean,
    param_prior,
    param_posterior,
    parameter_names[parameter_index],
    observation_names_unique,
    nobservations = 2
)

The K is created from a special version of the Kalman gain used in

K = np.linalg.inv(gtmap.Prec_u.toarray()) @ gtmap.H.T @ gtmap.Prec_residual_noisy()
innovation_mean = d - np.mean(Y, axis=0)
  • gtmap.Prec_u is the posterior precision (sparse)
  • gtmap.H is the sparse linear map
  • gtmap.Prec_residual_noisy() returns another sparse precision
    we need to store all of these, or the gtmap object itself.

Also
innovation_mean = d - np.mean(Y, axis=0), but perhaps not necessary? can be re-created after loading data

Do we want to store e.g. observation_names_unique, param_group_bounds and parameter_names? Perhaps not, as they may be loaded again from storage...

Copy link
Contributor

@Blunde1 Blunde1 Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standard code has the snippet

ens_mean_mask = abs(observations - ens_mean) <= alpha * (ens_std + scaled_errors)
obs_mask = np.logical_and(ens_mean_mask, ens_std_mask)

which also catches NaN in the responses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification might work?

def fetch_responses_and_parameters_from_storage(prior):
    # Get responses
    iens_active_index = np.flatnonzero(prior.get_realization_mask_with_parameters())
    selected_observations = prior.experiment.observation_keys
    observations_and_responses = prior.get_observations_and_responses(
        selected_observations,
        iens_active_index,
    )
    S = observations_and_responses.select(
        observations_and_responses.columns[5:]
    ).to_numpy()
    observation_values = (
        observations_and_responses.select("observations").to_numpy().reshape((-1,))
    )
    observation_errors = observations_and_responses.select("std").to_numpy().reshape((-1,))
    observation_names = (
        observations_and_responses.select("observation_key").to_numpy().reshape((-1,))
    )
    indexes = observations_and_responses.select("index").to_numpy().reshape((-1,))
    observation_names_unique = np.array([
        f"{k}: {i}" for k, i in zip(observation_names, indexes)
    ])
    Y = S.T.copy()
    d = observation_values
    d_sd = observation_errors

    # NaN detection and removal
    nan_mask = np.any(np.isnan(S), axis=1)
    if np.any(nan_mask):
        obs_with_nan = np.array([
            f"{key}: {idx}" for key, idx in zip(observation_names[nan_mask], indexes[nan_mask])
        ])
        logger.warning("Deactivating observations due to NaNs in responses: %s", obs_with_nan.tolist())
        S = S[~nan_mask]
        d = d[~nan_mask]
        d_sd = d_sd[~nan_mask]
        observation_names_unique = observation_names_unique[~nan_mask]

    # Get prior and standardize it
    parameter_groups = list(prior.experiment.parameter_info.keys())
    X_full, param_group_bounds, parameter_names = _all_parameters(
        ensemble=prior,
        iens_active_index=iens_active_index,
        param_groups=parameter_groups,
    )
    scaler_x = StandardScaler()
    U_prior = scaler_x.fit_transform(X_full.T)

    return U_prior, scaler_x, S.T, d, d_sd, parameter_groups, iens_active_index, parameter_names, observation_names_unique, param_group_bounds

Copy link
Contributor Author

@yngve-sk yngve-sk Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I have some questions wrt this:

On dropping NaNs
We have these options

  1. Drop all rows where the response is NaN for at least one realization
source_ensemble.get_observations_and_responses(
    observations,
    iens_active_index,
).filter(~pl.any_horizontal([pl.col(str(c)).is_nan() for c in list(iens_active_index)]))
  1. Ignore NaNs by setting them to the mean response value (or something like that), this is less loss of data, an ensemble may have responses for 99 realizations and nans for 1 of them.
    observations_and_responses = source_ensemble.get_observations_and_responses(
        observations,
        iens_active_index,
    )

    observations_and_responses = (
        observations_and_responses.with_columns(
            observations_and_responses[list(map(str, iens_active_index))]
            .mean_horizontal()
            .alias("mean")
        )
        .with_columns(
            pl.col(str(c)).fill_nan(pl.col("mean")).alias(str(c))
            for c in iens_active_index
        )
        .drop("mean")
    )
  1. Drop all realizations where there is at least one NaN (guessing this is not ideal)

I think your suggestion here is option 1, which I will add, but I'm just wondering whether that is the best way to go about it here? Going with #2 might require a tad more logic / give misleading results if we have a majority of nans, but otherwise maybe not?

Storing K:
K = np.linalg.inv(gtmap.Prec_u.toarray()) @ gtmap.H.T @ gtmap.Prec_residual_noisy()
We will store Prec_U and H as far as I know, is it slow/expensive if we just store the residual, and recompute K from that?

Outlier detection/mean masking
We only want to filter NaNs and drop the rest of the stuff there I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrt code organization / structuring I think the function you pasted in does mostly the same, I think it would be good to not keep two parallel almost-the-same versions of _load_observations_and_responses there though, so I would lean towards keeping the logic itself inlined in enif_update, rather splitting up _load_observations_and_responses to reuse some of that logic within enif_update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

observation_names_unique, param_group_bounds and parameter_names are already in storage :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop observations if there exist NaN responses of it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And write an issue that this should be tested

@Blunde1
Copy link
Contributor

Blunde1 commented Apr 4, 2025

Har sett gjennom _es_update med litt kommentarar. Hovuddelen er bra.

Lasting av data: Vi må også filtrere ut responsar/observasjonar med NaN. Eg er ganske sikker på at dette blir gjort som preprosessering av data for dei andre oppdaterings algoritmene.


graph_u_sub = config_node.load_parameter_graph()

# This will work for dim(X_scaled) on order O(n^5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will likely add fit_precision_cholesky_approximate() to graphite-maps. This should scale better, also O(n^6 or n^7)

@yngve-sk yngve-sk force-pushed the enif-in-ert branch 2 times, most recently from 8b942bb to da40329 Compare April 11, 2025 08:42
@yngve-sk yngve-sk force-pushed the enif-in-ert branch 6 times, most recently from a59a7c5 to 635c577 Compare April 24, 2025 07:57
@yngve-sk yngve-sk force-pushed the enif-in-ert branch 2 times, most recently from 5d14373 to 37559fd Compare April 24, 2025 08:11
@yngve-sk yngve-sk added this to SCOUT Apr 24, 2025
@yngve-sk yngve-sk moved this to Ready for Review in SCOUT Apr 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Ready for Review
Development

Successfully merging this pull request may close these issues.

EnIF available in ERT GUI
2 participants