diff --git a/examples/advanced/feature_election/README.md b/examples/advanced/feature_election/README.md new file mode 100644 index 0000000000..d0ee6f5403 --- /dev/null +++ b/examples/advanced/feature_election/README.md @@ -0,0 +1,336 @@ +# Feature Election for NVIDIA FLARE + +A plug-and-play horizontal federated feature selection framework for tabular datasets in NVIDIA FLARE. + +This work originates from FLASH: A Framework for Federated Learning with Attribute Selection and Hyperparameter Optimization, presented at [FLTA IEEE 2025](https://flta-conference.org/flta-2025/) achieving the **Best Student Paper Award**. + +Feature Election enables multiple clients with tabular datasets to collaboratively identify the most relevant features without sharing raw data. It works by using conventional feature selection algorithms on the client side and performing a weighted aggregation of their results. + +FLASH is available on [GitHub](https://github.com/parasecurity/FLASH) + +## Citation + +If you use Feature Election in your research, please cite the FLASH framework paper: + +**IEEE Style:** +> I. Christofilogiannis, G. Valavanis, A. Shevtsov, I. Lamprou and S. Ioannidis, "FLASH: A Framework for Federated Learning with Attribute Selection and Hyperparameter Optimization," 2025 3rd International Conference on Federated Learning Technologies and Applications (FLTA), Dubrovnik, Croatia, 2025, pp. 93-100, doi: 10.1109/FLTA67013.2025.11336571. + +**BibTeX:** +```bibtex +@INPROCEEDINGS{11336571, + author={Christofilogiannis, Ioannis and Valavanis, Georgios and Shevtsov, Alexander and Lamprou, Ioannis and Ioannidis, Sotiris}, + booktitle={2025 3rd International Conference on Federated Learning Technologies and Applications (FLTA)}, + title={FLASH: A Framework for Federated Learning with Attribute Selection and Hyperparameter Optimization}, + year={2025}, + pages={93-100}, + doi={10.1109/FLTA67013.2025.11336571} +} +``` + +## NVIDIA FLARE Installation + +For the complete installation instructions, see [Installation](https://nvflare.readthedocs.io/en/main/installation.html) + +```bash +pip install nvflare +``` + +Install optional dependencies: + +```bash +pip install PyImpetus # Optional: enables permutation importance methods +``` + +> **Note:** `scikit-learn ≥ 1.0` is required for most feature selection methods and is automatically installed with `nvflare`. + +## Code Structure + +``` +feature_election/ +| +|-- job.py # Main entry point - creates and runs FL job +|-- client.py # Client-side executor with data loading and local feature selection +|-- prepare_data.py # Synthetic dataset generation and client data splitting utilities +``` + +## Data + +Feature Election works with any tabular dataset represented as a pandas DataFrame. In a real FL experiment, each client would have their own local dataset — only feature selections and scores are shared, never raw data. + +For the quick-start example, synthetic data is generated automatically. To use your own data, modify `client.py` to load it: + +```python +class MyDataExecutor(FeatureElectionExecutor): + def _load_data_if_needed(self, fl_ctx): + if self._data_loaded: + return + + # Retrieve the site name assigned by the FL platform (e.g. "site-1"). + # FeatureElectionExecutor has no client_id attribute; use fl_ctx instead. + site_name = fl_ctx.get_identity_name() + X_train, y_train = load_my_data(site_name) + self.set_data(X_train, y_train) + self._data_loaded = True +``` + +You can control the synthetic dataset configuration directly from the command line: + +```bash +python job.py \ + --n-samples 2000 \ + --n-features 200 \ + --n-informative 40 \ + --n-redundant 60 \ + --split-strategy non_iid +``` + +### Data Splitting Strategies + +| Strategy | Description | +|----------|-------------| +| `stratified` | Maintains class distribution (recommended for classification) | +| `random` | Random split | +| `non_iid` | Non-IID split with Dirichlet distribution (alpha=0.5) | + +## Model + +Feature Election follows a three-phase federated workflow: + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ PHASE 1: Local Feature Selection │ +│ Clients perform local FS using configured method (lasso, etc.) │ +│ → Each client sends: selected_features, feature_scores │ +└─────────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────────┐ +│ PHASE 2: Tuning & Global Mask Generation │ +│ If auto_tune=True: Hill-climbing to find optimal freedom_degree│ +│ → Aggregates selections using weighted voting │ +│ → Distributes global feature mask to all clients │ +└─────────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────────┐ +│ PHASE 3: FL Aggregation (Training) │ +│ Standard FedAvg training on reduced feature set │ +│ → num_rounds of federated training │ +└─────────────────────────────────────────────────────────────────┘ +``` + +The `freedom_degree` parameter controls how features are selected across clients: + +- `0` = intersection (only features selected by all clients) +- `1` = union (any feature selected by at least one client) +- `0–1` = weighted voting threshold + +### Feature Selection Methods + +| Method | Description | Best For | +|--------|-------------|----------| +| `lasso` | L1 regularization | High-dimensional sparse data | +| `elastic_net` | L1+L2 regularization | Correlated features | +| `random_forest` | Tree-based importance | Non-linear relationships | +| `mutual_info` | Information gain | Any data type | +| `pyimpetus` | Permutation importance | Robust feature selection | + +## Client + +The client code (`client.py`) is responsible for local feature selection. It loads local data, runs the configured feature selection method, and sends the resulting feature mask and scores to the server — **no raw data is ever shared**. + +```python +from nvflare.app_opt.feature_election import FeatureElectionExecutor + +executor = FeatureElectionExecutor( + fs_method='lasso', + eval_metric='f1' +) + +# Load and set client data +X_train, y_train = load_client_data() # Your data loading logic +executor.set_data(X_train, y_train, feature_names=feature_names) +``` + +The client workflow: +1. Receive the global task from the FL server. +2. Perform local feature selection using the configured method. +3. Send feature votes and scores back to the server. +4. Receive the global feature mask and train on the reduced feature set. + +### FeatureElectionExecutor Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `fs_method` | str | `"lasso"` | Feature selection method | +| `fs_params` | dict | `None` | Additional method-specific parameters | +| `eval_metric` | str | `"f1"` | Metric used to evaluate the reduced feature set | +| `task_name` | str | `"feature_election"` | Must match the server controller | + +## Server + +The server-side controller (`FeatureElectionController`) aggregates feature votes from all clients, optionally tunes the `freedom_degree` via hill-climbing, and broadcasts the final global feature mask. + +With the Recipe API, **there is no need to write custom server code** for the aggregation logic. The controller handles everything automatically: + +1. Collect feature selections and scores from all clients. +2. Run auto-tuning (if enabled) to find the optimal `freedom_degree`. +3. Compute the global feature mask using weighted voting. +4. Distribute the mask and coordinate FedAvg training on the reduced feature set. + +```python +from nvflare.app_opt.feature_election import FeatureElectionController + +controller = FeatureElectionController( + freedom_degree=0.5, + aggregation_mode='weighted', + min_clients=2, + num_rounds=5, + auto_tune=True, + tuning_rounds=4, # must be >= 2 when auto_tune=True + wait_time_after_min_received=10, # seconds; use 0 only for local simulation +) +``` + +### FeatureElectionController Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `freedom_degree` | float | `0.5` | Initial freedom degree | +| `aggregation_mode` | str | `"weighted"` | Client vote weighting (`'weighted'` or `'uniform'`) | +| `min_clients` | int | `2` | Minimum clients required per phase | +| `num_rounds` | int | `5` | FL training rounds after feature selection | +| `auto_tune` | bool | `False` | Enable hill-climbing optimisation of `freedom_degree` | +| `tuning_rounds` | int | `0` | Rounds of hill-climbing; **must be ≥ 2** when `auto_tune=True` (0 = no tuning, 1 = disabled with warning) | +| `train_timeout` | int | `300` | Per-phase timeout in seconds | +| `wait_time_after_min_received` | int | `10` | Seconds to wait for stragglers after `min_clients` have responded; set to `0` only for local simulation | + +> **Auto-tune note:** `auto_tune=True` has no effect when `tuning_rounds=0` (the default). The controller emits a warning and skips tuning in that case. Use `tuning_rounds >= 2` to activate hill-climbing. +> +> **Production note:** `wait_time_after_min_received=10` gives slower clients a window to participate in every phase. Setting it to `0` causes the controller to close each phase the instant `min_clients` responses arrive, silently excluding any later responders. + +## Job + +The job recipe (`job.py`) combines the client and server into a runnable FLARE job. It generates all necessary configuration files and submits them to the simulator or a production FLARE deployment. + +```python +from nvflare.app_opt.feature_election import FeatureElection + +fe = FeatureElection( + freedom_degree=0.5, + fs_method='lasso', + aggregation_mode='weighted', + auto_tune=True, + tuning_rounds=4 +) + +# Generate FLARE job configuration +config_paths = fe.create_flare_job( + job_name="feature_selection_job", + output_dir="./jobs/feature_selection", + min_clients=2, + num_rounds=5, + client_sites=['hospital_1', 'hospital_2', 'hospital_3'] +) +``` + +To export job configs for production deployment: + +```bash +python job.py --export-dir ./exported_jobs +``` + +Then submit to a running FLARE deployment: + +```bash +nvflare job submit -j ./jobs/feature_selection +``` + +### Job Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `--num-clients` | `3` | Number of federated clients | +| `--num-rounds` | `5` | FL training rounds | +| `--freedom-degree` | `0.5` | Feature inclusion threshold (0–1) | +| `--auto-tune` | `False` | Enable freedom degree optimization | +| `--tuning-rounds` | `4` | Rounds for auto-tuning | +| `--fs-method` | `lasso` | Feature selection method | +| `--split-strategy` | `stratified` | Data splitting strategy | +| `--n-samples` | `1000` | Total synthetic samples | +| `--n-features` | `100` | Number of features | +| `--workspace` | `/tmp/nvflare/feature_election` | Simulator workspace | + +## Run Job + +From the terminal, run with default settings: + +```bash +python job.py --num-clients 3 --num-rounds 5 +``` + +With auto-tuning enabled: + +```bash +python job.py --num-clients 3 --auto-tune --tuning-rounds 4 +``` + +With a specific feature selection method: + +```bash +# Mutual Information +python job.py --fs-method mutual_info + +# Random Forest +python job.py --fs-method random_forest + +# Elastic Net +python job.py --fs-method elastic_net +``` + +For quick simulation using the Python API: + +```python +from nvflare.app_opt.feature_election import quick_election +import pandas as pd + +df = pd.read_csv("your_data.csv") + +selected_mask, stats = quick_election( + df=df, + target_col='target', + num_clients=4, + fs_method='lasso', +) + +selected_features = df.columns[:-1][selected_mask] +print(f"Selected {len(selected_features)} features: {list(selected_features)}") +print(f"Freedom degree: {stats['freedom_degree']}") +``` + +## Troubleshooting + +**"No features selected"** — Increase `freedom_degree`, try a different `fs_method`, or check feature scaling. + +**"No feature votes received"** — Ensure client data is loaded before execution and that `task_name` matches between controller and executor. + +**"Poor performance after selection"** — Enable `auto_tune` with `tuning_rounds >= 2` to find the optimal `freedom_degree`, or switch to `weighted` aggregation mode. + +**"Auto-tune has no effect"** — `auto_tune=True` requires `tuning_rounds >= 2`. The default `tuning_rounds=0` is intentional for users who set `freedom_degree` manually; the controller logs a warning if `auto_tune=True` is combined with `tuning_rounds=0` or `tuning_rounds=1`. + +**"Slower clients are not participating"** — The default `wait_time_after_min_received=10` gives stragglers a 10-second window after the minimum quorum is reached. If clients are still being excluded, increase this value. Set to `0` only for local simulation where all clients run in the same process. + +**"Client excluded after mask distribution failure"** — If fewer than `min_clients` clients acknowledge the global mask, the entire workflow is aborted (not just Phase 3). Check network connectivity and client logs for the root cause. + +**"PyImpetus not available"** — Install with `pip install PyImpetus`. The framework falls back to mutual information if unavailable. + +Enable detailed logging for debugging: + +```python +import logging +logging.basicConfig(level=logging.DEBUG) +``` + +## Running Tests + +```bash +pytest tests/unit_test/app_opt/feature_election/test.py -v +``` \ No newline at end of file diff --git a/examples/advanced/feature_election/client.py b/examples/advanced/feature_election/client.py new file mode 100644 index 0000000000..7a93d522ba --- /dev/null +++ b/examples/advanced/feature_election/client.py @@ -0,0 +1,204 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client-side script for Feature Election example. + +This script demonstrates how to set up client data for the +FeatureElectionExecutor from nvflare.app_opt.feature_election. +""" + +import logging +import re +from typing import Optional + +from prepare_data import load_client_data + +from nvflare.apis.fl_context import FLContext +from nvflare.app_opt.feature_election.executor import FeatureElectionExecutor + +logger = logging.getLogger(__name__) + + +def get_executor( + client_id: int, + num_clients: int, + fs_method: str = "lasso", + eval_metric: str = "f1", + data_root: Optional[str] = None, + split_strategy: str = "stratified", + n_samples: int = 1000, + n_features: int = 100, + n_informative: int = 20, + n_redundant: int = 30, + n_repeated: int = 10, +) -> FeatureElectionExecutor: + """ + Create and configure a FeatureElectionExecutor with data. + + Args: + client_id: Client identifier (0 to num_clients-1) + num_clients: Total number of clients + fs_method: Feature selection method + eval_metric: Evaluation metric ('f1' or 'accuracy') + data_root: Optional path to pre-generated data + split_strategy: Data splitting strategy + n_samples: Samples per client for synthetic data + n_features: Number of features + n_informative: Number of informative features + n_redundant: Number of redundant features + n_repeated: Number of repeated features (must match SyntheticDataExecutor) + + Returns: + Configured FeatureElectionExecutor + """ + # Create executor + executor = FeatureElectionExecutor( + fs_method=fs_method, + eval_metric=eval_metric, + task_name="feature_election", + ) + + # Load data for this client + X_train, y_train, X_val, y_val, feature_names = load_client_data( + client_id=client_id, + num_clients=num_clients, + data_root=data_root, + split_strategy=split_strategy, + n_samples=n_samples, + n_features=n_features, + n_informative=n_informative, + n_redundant=n_redundant, + n_repeated=n_repeated, + ) + + # Set data on executor + executor.set_data( + X_train=X_train, + y_train=y_train, + X_val=X_val, + y_val=y_val, + feature_names=feature_names, + ) + + logger.info( + f"Client {client_id} executor configured: " + f"{X_train.shape[0]} train, {X_val.shape[0]} val, " + f"{X_train.shape[1]} features, method={fs_method}" + ) + + return executor + + +class SyntheticDataExecutor(FeatureElectionExecutor): + """ + FeatureElectionExecutor with built-in synthetic data loading. + + This executor automatically loads synthetic data based on + client_id extracted from the FL context. + + Args: + fs_method: Feature selection method + eval_metric: Evaluation metric + num_clients: Total number of clients in federation + split_strategy: Data splitting strategy + n_samples: Samples per client + n_features: Number of features + n_informative: Number of informative features + n_redundant: Number of redundant features + n_repeated: Number of repeated features + """ + + def __init__( + self, + fs_method: str = "lasso", + eval_metric: str = "f1", + num_clients: int = 3, + split_strategy: str = "stratified", + n_samples: int = 1000, + n_features: int = 100, + n_informative: int = 20, + n_redundant: int = 30, + n_repeated: int = 10, + task_name: str = "feature_election", + ): + super().__init__( + fs_method=fs_method, + eval_metric=eval_metric, + task_name=task_name, + ) + self.num_clients = num_clients + self.split_strategy = split_strategy + self.n_samples = n_samples + self.n_features = n_features + self.n_informative = n_informative + self.n_redundant = n_redundant + self.n_repeated = n_repeated + self._data_loaded = False + + def _load_data_if_needed(self, fl_ctx: FLContext) -> None: + """Load data based on client identity from FL context.""" + if self._data_loaded: + return + + # Extract client ID from site name + site_name = fl_ctx.get_identity_name() + + # Site names are assumed to use 1-based trailing numbers (e.g. "site-1", + # "hospital-2"), so we subtract 1 to get a 0-based client_id. Zero-based + # names (e.g. "worker-0") would yield client_id = -1, which the range check + # below will catch and raise as an explicit error rather than silently loading + # the wrong client's data. + try: + if site_name.startswith("site-"): + client_id = int(site_name.split("-")[1]) - 1 + else: + match = re.search(r"(\d+)$", site_name) + if match: + client_id = int(match.group()) - 1 + else: + client_id = 0 + except (ValueError, IndexError) as e: + logger.error(f"Failed to parse client_id from '{site_name}': {e}. Defaulting to client_id=0") + client_id = 0 + + # Range validation is deliberately outside the try/except so an out-of-range + # result raises unconditionally rather than being caught and silently falling + # back to client_id=0, which would load the wrong data without any hard error. + if not (0 <= client_id < self.num_clients): + raise ValueError( + f"Extracted client_id {client_id} from '{site_name}' is out of range " f"[0, {self.num_clients - 1}]" + ) + + # Load data using the parsed ID + X_train, y_train, X_val, y_val, feature_names = load_client_data( + client_id=client_id, + num_clients=self.num_clients, + split_strategy=self.split_strategy, + n_samples=self.n_samples, + n_features=self.n_features, + n_informative=self.n_informative, + n_redundant=self.n_redundant, + n_repeated=self.n_repeated, + ) + + self.set_data(X_train, y_train, X_val, y_val, feature_names) + self._data_loaded = True + + logger.info(f"Successfully loaded synthetic data for {site_name} (client_id={client_id})") + + def execute(self, task_name, shareable, fl_ctx, abort_signal): + """Override execute to ensure data is loaded before processing.""" + self._load_data_if_needed(fl_ctx) + return super().execute(task_name, shareable, fl_ctx, abort_signal) diff --git a/examples/advanced/feature_election/job.py b/examples/advanced/feature_election/job.py new file mode 100644 index 0000000000..5ba5e1f563 --- /dev/null +++ b/examples/advanced/feature_election/job.py @@ -0,0 +1,136 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +from typing import Optional + +from client import SyntheticDataExecutor + +from nvflare.app_common.widgets.validation_json_generator import ValidationJsonGenerator +from nvflare.app_opt.feature_election.controller import FeatureElectionController +from nvflare.job_config.api import FedJob + +logger = logging.getLogger(__name__) + + +def create_feature_election_job( + job_name: str = "feature_election_synthetic", + num_clients: int = 3, + freedom_degree: float = 0.5, + aggregation_mode: str = "weighted", + num_rounds: int = 5, + auto_tune: bool = False, + tuning_rounds: int = 4, + fs_method: str = "lasso", + eval_metric: str = "f1", + split_strategy: str = "stratified", + n_samples: int = 1000, + n_features: int = 100, + n_informative: int = 20, + n_redundant: int = 30, + n_repeated: int = 10, + export_dir: Optional[str] = None, +) -> FedJob: + job = FedJob(name=job_name) + + controller = FeatureElectionController( + freedom_degree=freedom_degree, + aggregation_mode=aggregation_mode, + min_clients=num_clients, + num_rounds=num_rounds, + task_name="feature_election", + auto_tune=auto_tune, + tuning_rounds=tuning_rounds, + # All clients run locally in the simulator, so there are no stragglers; + # skip the straggler wait window to avoid ~10 s overhead per phase. + wait_time_after_min_received=0, + ) + job.to_server(controller) + job.to_server(ValidationJsonGenerator()) + + executor = SyntheticDataExecutor( + fs_method=fs_method, + eval_metric=eval_metric, + num_clients=num_clients, + split_strategy=split_strategy, + n_samples=n_samples, + n_features=n_features, + n_informative=n_informative, + n_redundant=n_redundant, + n_repeated=n_repeated, + task_name="feature_election", + ) + + # FIXED: Uses to_clients instead of to_client + job.to_clients(executor) + + if export_dir: + job.export_job(export_dir) + + return job + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--job-name", default="feature_election_synthetic") + parser.add_argument("--num-clients", type=int, default=3) + parser.add_argument("--num-rounds", type=int, default=5) + parser.add_argument("--export-dir", default=None) + parser.add_argument("--freedom-degree", type=float, default=0.5) + parser.add_argument("--aggregation-mode", default="weighted") + parser.add_argument("--auto-tune", action="store_true") + parser.add_argument("--tuning-rounds", type=int, default=4) + parser.add_argument("--fs-method", default="lasso") + parser.add_argument("--eval-metric", default="f1") + parser.add_argument("--split-strategy", default="stratified") + parser.add_argument("--n-samples", type=int, default=1000) + parser.add_argument("--n-features", type=int, default=100) + parser.add_argument("--n-informative", type=int, default=20) + parser.add_argument("--n-redundant", type=int, default=30) + parser.add_argument("--n-repeated", type=int, default=10) + parser.add_argument("--workspace", default="/tmp/nvflare/feature_election") + parser.add_argument("--threads", type=int, default=1) + + args = parser.parse_args() + logging.basicConfig(level=logging.INFO) + + job = create_feature_election_job( + job_name=args.job_name, + num_clients=args.num_clients, + freedom_degree=args.freedom_degree, + aggregation_mode=args.aggregation_mode, + num_rounds=args.num_rounds, + auto_tune=args.auto_tune, + tuning_rounds=args.tuning_rounds, + fs_method=args.fs_method, + eval_metric=args.eval_metric, + split_strategy=args.split_strategy, + n_samples=args.n_samples, + n_features=args.n_features, + n_informative=args.n_informative, + n_redundant=args.n_redundant, + n_repeated=args.n_repeated, + export_dir=args.export_dir, + ) + + job.simulator_run( + workspace=args.workspace, + n_clients=args.num_clients, + threads=args.threads, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/feature_election/prepare_data.py b/examples/advanced/feature_election/prepare_data.py new file mode 100644 index 0000000000..f10b27a005 --- /dev/null +++ b/examples/advanced/feature_election/prepare_data.py @@ -0,0 +1,394 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Data preparation utilities for Feature Election example. + +Creates synthetic high-dimensional datasets for testing federated +feature selection across multiple clients. +""" + +import json +import logging +from pathlib import Path +from typing import List, Optional, Tuple + +import numpy as np +import pandas as pd +from sklearn.datasets import make_classification +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder + +logger = logging.getLogger(__name__) + + +def create_synthetic_dataset( + n_samples: int = 1000, + n_features: int = 100, + n_informative: int = 20, + n_redundant: int = 30, + n_repeated: int = 10, + random_state: int = 42, +) -> Tuple[pd.DataFrame, List[str]]: + """ + Create a synthetic high-dimensional dataset for testing. + + Args: + n_samples: Number of samples + n_features: Total number of features + n_informative: Number of informative features + n_redundant: Number of redundant features + n_repeated: Number of repeated features + random_state: Random seed + + Returns: + Tuple of (DataFrame with features and target, feature names) + """ + X, y = make_classification( + n_samples=n_samples, + n_features=n_features, + n_informative=n_informative, + n_redundant=n_redundant, + n_repeated=n_repeated, + n_clusters_per_class=2, + flip_y=0.01, + random_state=random_state, + ) + + feature_names = [f"feature_{i:03d}" for i in range(n_features)] + df = pd.DataFrame(X, columns=feature_names) + df["target"] = y + + logger.info( + f"Created synthetic dataset: {n_samples} samples, {n_features} features " + f"({n_informative} informative, {n_redundant} redundant, {n_repeated} repeated)" + ) + return df, feature_names + + +def split_data_for_clients( + df: pd.DataFrame, + num_clients: int, + strategy: str = "stratified", + random_state: int = 42, +) -> List[pd.DataFrame]: + """ + Split dataset across multiple clients. + + Args: + df: Full dataset with 'target' column + num_clients: Number of clients to split data for + strategy: Splitting strategy ('stratified', 'random', 'non_iid') + random_state: Random seed + + Returns: + List of DataFrames, one per client + """ + if strategy == "stratified": + return _split_stratified(df, num_clients, random_state) + elif strategy == "random": + return _split_random(df, num_clients, random_state) + elif strategy == "non_iid": + return _split_non_iid(df, num_clients, random_state) + else: + raise ValueError(f"Unknown strategy: {strategy}") + + +def _split_stratified(df: pd.DataFrame, num_clients: int, random_state: int) -> List[pd.DataFrame]: + """Stratified split maintaining class distribution across clients.""" + np.random.seed(random_state) + + if len(df) < num_clients: + logger.warning(f"Not enough samples ({len(df)}) for {num_clients} clients. Using simple split.") + return _split_random(df, num_clients, random_state) + + client_indices = [[] for _ in range(num_clients)] + + for class_label in df["target"].unique(): + class_indices = df.index[df["target"] == class_label].tolist() + np.random.shuffle(class_indices) + + if len(class_indices) >= num_clients: + # Enough samples: distribute round-robin + for i, idx in enumerate(class_indices): + client_indices[i % num_clients].append(idx) + else: + # Fewer samples than clients: randomly assign each sample to a distinct client + chosen_clients = np.random.choice(num_clients, size=len(class_indices), replace=False) + for j, idx in enumerate(class_indices): + client_indices[chosen_clients[j]].append(idx) + client_dfs = [] + for i, indices in enumerate(client_indices): + if len(indices) == 0: + raise ValueError( + f"Client {i} received 0 samples in stratified split. " + "Increase the dataset size or reduce the number of clients." + ) + np.random.shuffle(indices) + client_dfs.append(df.loc[indices].copy()) + + return client_dfs + + +def _split_random(df: pd.DataFrame, num_clients: int, random_state: int) -> List[pd.DataFrame]: + """Random split without stratification.""" + np.random.seed(random_state) + indices = np.arange(len(df)) + np.random.shuffle(indices) + + client_dfs = [] + samples_per_client = len(df) // num_clients + + for i in range(num_clients): + start = i * samples_per_client + end = start + samples_per_client if i < num_clients - 1 else len(df) + if end <= start: + raise ValueError( + f"Client {i} received 0 samples in random split " + f"({len(df)} samples, {num_clients} clients). " + "Increase the dataset size or reduce the number of clients." + ) + client_dfs.append(df.iloc[indices[start:end]].copy()) + + return client_dfs + + +def _split_non_iid( + df: pd.DataFrame, + num_clients: int, + random_state: int, + alpha: float = 0.5, +) -> List[pd.DataFrame]: + """ + Non-IID split using Dirichlet distribution. + + Creates heterogeneous data distributions across clients, + simulating real-world federated scenarios. + """ + y = df["target"].values + + if y.dtype == object: + le = LabelEncoder() + y = le.fit_transform(y) + + num_classes = len(np.unique(y)) + np.random.seed(random_state) + + # Dirichlet distribution for label assignment + label_distribution = np.random.dirichlet([alpha] * num_clients, num_classes) + + client_indices = [[] for _ in range(num_clients)] + + for k in range(num_classes): + idx_k = np.where(y == k)[0] + np.random.shuffle(idx_k) + + proportions = (label_distribution[k] * len(idx_k)).astype(int) + # Ensure proportions sum correctly and no negative values + total_assigned = proportions[:-1].sum() + proportions[-1] = max(0, len(idx_k) - total_assigned) + + start = 0 + for i, prop in enumerate(proportions): + client_indices[i].extend(idx_k[start : start + prop]) + start += prop + # Check that no client ended up with an empty dataset + for i, indices in enumerate(client_indices): + if len(indices) == 0: + raise ValueError( + f"Client {i} received 0 samples due to extreme Dirichlet split (alpha={alpha}). " + "Increase alpha or the total sample count." + ) + return [df.iloc[indices].copy() for indices in client_indices] + + +def _safe_train_test_split( + X: np.ndarray, + y: np.ndarray, + test_size: float, + random_state: int, + client_id: int, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """ + Perform train/test split with stratification when possible. + + Falls back to random split if any class has fewer than 2 samples. + + Args: + X: Feature array + y: Target array + test_size: Fraction of data for validation + random_state: Random seed + client_id: Client identifier for logging + + Returns: + Tuple of (X_train, X_val, y_train, y_val) + """ + unique, counts = np.unique(y, return_counts=True) + can_stratify = np.all(counts >= 2) + + if can_stratify: + return train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y) + else: + logger.warning( + f"Client {client_id}: Cannot stratify (some classes have <2 samples). " + f"Using random split instead. Class distribution: {dict(zip(unique, counts))}" + ) + return train_test_split(X, y, test_size=test_size, random_state=random_state) + + +def load_client_data( + client_id: int, + num_clients: int, + data_root: Optional[str] = None, + split_strategy: str = "stratified", + test_size: float = 0.2, + random_state: int = 42, + n_samples: int = 1000, + n_features: int = 100, + n_informative: int = 20, + n_redundant: int = 30, + n_repeated: int = 10, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[str]]: + """ + Load or generate data for a specific client. + + Args: + client_id: Client identifier (0 to num_clients-1) + num_clients: Total number of clients + data_root: Optional path to pre-generated data + split_strategy: Data splitting strategy + test_size: Fraction of data for validation + random_state: Random seed + n_samples: Samples per client (total dataset will be n_samples × num_clients) + n_features: Number of features + n_informative: Number of informative features + n_redundant: Number of redundant features + n_repeated: Number of repeated features + + Returns: + Tuple of (X_train, y_train, X_val, y_val, feature_names) + """ + # Check for pre-generated data + if data_root: + data_path = Path(data_root) / f"client_{client_id}.csv" + if not data_path.exists(): + logger.warning( + f"data_root was provided but expected file not found: {data_path}. " + "Falling back to synthetic data generation." + ) + else: + logger.info(f"Loading pre-generated data from {data_path}") + df = pd.read_csv(data_path) + feature_names = [c for c in df.columns if c != "target"] + X = df.drop(columns=["target"]).values + y = df["target"].values + + X_train, X_val, y_train, y_val = _safe_train_test_split( + X, y, test_size, random_state + client_id, client_id + ) + return X_train, y_train, X_val, y_val, feature_names + + # Generate synthetic data + total_samples = n_samples * num_clients + df, feature_names = create_synthetic_dataset( + n_samples=total_samples, + n_features=n_features, + n_informative=n_informative, + n_redundant=n_redundant, + n_repeated=n_repeated, + random_state=random_state, + ) + + # Split among clients + client_dfs = split_data_for_clients(df, num_clients, split_strategy, random_state) + client_df = client_dfs[client_id] + + # Separate features and target + X = client_df.drop(columns=["target"]).values + y = client_df["target"].values + + # Train/validation split + X_train, X_val, y_train, y_val = _safe_train_test_split(X, y, test_size, random_state + client_id, client_id) + + logger.info(f"Client {client_id}: {len(X_train)} train samples, {len(X_val)} val samples") + + return X_train, y_train, X_val, y_val, feature_names + + +def prepare_data_for_all_clients( + output_dir: str, + num_clients: int = 3, + split_strategy: str = "stratified", + random_state: int = 42, + **dataset_kwargs, +) -> None: + """ + Pre-generate and save data for all clients. + + Args: + output_dir: Directory to save client data files + num_clients: Number of clients + split_strategy: Data splitting strategy + random_state: Random seed + **dataset_kwargs: Additional arguments for create_synthetic_dataset + """ + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + # Generate full dataset + total_samples = dataset_kwargs.pop("n_samples", 1000) * num_clients + df, feature_names = create_synthetic_dataset( + n_samples=total_samples, + random_state=random_state, + **dataset_kwargs, + ) + + # Split and save + client_dfs = split_data_for_clients(df, num_clients, split_strategy, random_state) + + for i, client_df in enumerate(client_dfs): + filepath = output_path / f"client_{i}.csv" + client_df.to_csv(filepath, index=False) + logger.info(f"Saved {len(client_df)} samples to {filepath}") + + # Save metadata + metadata = { + "num_clients": num_clients, + "split_strategy": split_strategy, + "random_state": random_state, + "feature_names": feature_names, + "total_samples": total_samples, + } + + with open(output_path / "metadata.json", "w") as f: + json.dump(metadata, f, indent=2) + + logger.info(f"Data preparation complete. Files saved to {output_path}") + + +if __name__ == "__main__": + # Example: Generate data for 3 clients + logging.basicConfig(level=logging.INFO) + + prepare_data_for_all_clients( + output_dir="./data", + num_clients=3, + split_strategy="stratified", + n_samples=1000, + n_features=100, + n_informative=20, + n_redundant=30, + n_repeated=10, + ) diff --git a/examples/advanced/feature_election/requirements.txt b/examples/advanced/feature_election/requirements.txt new file mode 100644 index 0000000000..9adaf4f476 --- /dev/null +++ b/examples/advanced/feature_election/requirements.txt @@ -0,0 +1,7 @@ +nvflare>=2.5.0 +numpy>=1.21.0 +pandas>=1.3.0 +scikit-learn>=1.0.0 + +# Optional: PyImpetus for advanced feature selection +# pyimpetus>=0.0.6 diff --git a/examples/advanced/feature_election/server.py b/examples/advanced/feature_election/server.py new file mode 100644 index 0000000000..e69b940e59 --- /dev/null +++ b/examples/advanced/feature_election/server.py @@ -0,0 +1,120 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Server-side configuration for Feature Election example. + +This script provides factory functions for creating the +FeatureElectionController from nvflare.app_opt.feature_election. +""" + +import logging + +from nvflare.app_opt.feature_election.controller import FeatureElectionController + +logger = logging.getLogger(__name__) + + +def get_controller( + freedom_degree: float = 0.5, + aggregation_mode: str = "weighted", + min_clients: int = 2, + num_rounds: int = 5, + auto_tune: bool = False, + tuning_rounds: int = 4, +) -> FeatureElectionController: + """ + Create and configure a FeatureElectionController. + + Args: + freedom_degree: Controls feature inclusion (0=intersection, 1=union) + aggregation_mode: How to weight client votes ('weighted' or 'uniform') + min_clients: Minimum clients required to proceed + num_rounds: Number of FL training rounds after feature selection + auto_tune: Whether to automatically tune freedom_degree + tuning_rounds: Number of rounds for auto-tuning + + Returns: + Configured FeatureElectionController + """ + controller = FeatureElectionController( + freedom_degree=freedom_degree, + aggregation_mode=aggregation_mode, + min_clients=min_clients, + num_rounds=num_rounds, + task_name="feature_election", + auto_tune=auto_tune, + tuning_rounds=tuning_rounds, + ) + + logger.info( + f"Controller configured: FD={freedom_degree}, " + f"mode={aggregation_mode}, rounds={num_rounds}, " + f"auto_tune={auto_tune}" + ) + + return controller + + +# Default configurations for common scenarios +CONFIGS = { + "basic": { + "freedom_degree": 0.5, + "aggregation_mode": "weighted", + "min_clients": 2, + "num_rounds": 5, + "auto_tune": False, + "tuning_rounds": 0, + }, + "auto_tune": { + "freedom_degree": 0.6, + "aggregation_mode": "weighted", + "min_clients": 2, + "num_rounds": 10, + "auto_tune": True, + "tuning_rounds": 4, + }, + "conservative": { + "freedom_degree": 0.2, + "aggregation_mode": "weighted", + "min_clients": 2, + "num_rounds": 5, + "auto_tune": False, + "tuning_rounds": 0, + }, + "aggressive": { + "freedom_degree": 0.8, + "aggregation_mode": "weighted", + "min_clients": 2, + "num_rounds": 5, + "auto_tune": False, + "tuning_rounds": 0, + }, +} + + +def get_controller_by_name(config_name: str = "basic") -> FeatureElectionController: + """ + Get a controller with a predefined configuration. + + Args: + config_name: One of 'basic', 'auto_tune', 'conservative', 'aggressive' + + Returns: + Configured FeatureElectionController + """ + if config_name not in CONFIGS: + raise ValueError(f"Unknown config: {config_name}. Available: {list(CONFIGS.keys())}") + + return get_controller(**CONFIGS[config_name]) diff --git a/nvflare/app_opt/feature_election/__init__.py b/nvflare/app_opt/feature_election/__init__.py new file mode 100644 index 0000000000..585603a479 --- /dev/null +++ b/nvflare/app_opt/feature_election/__init__.py @@ -0,0 +1,64 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Feature Election for NVIDIA FLARE + +A plug-and-play horizontal federated feature selection framework for tabular datasets. + +This module provides: +- FeatureElection: High-level API for feature election +- FeatureElectionController: Server-side FLARE controller +- FeatureElectionExecutor: Client-side FLARE executor +- Helper functions for quick deployment + +Example: + Basic usage:: + + from nvflare.app_opt.feature_election import quick_election + import pandas as pd + + df = pd.read_csv("data.csv") + selected_mask, stats = quick_election( + df=df, + target_col='target', + num_clients=4, + fs_method='lasso', + freedom_degree=0.3 + ) + + FLARE deployment:: + + from nvflare.app_opt.feature_election import FeatureElection + + fe = FeatureElection(freedom_degree=0.5, fs_method='lasso') + config_paths = fe.create_flare_job( + job_name="feature_selection", + output_dir="./jobs" + ) +""" + +from .controller import FeatureElectionController +from .executor import FeatureElectionExecutor +from .feature_election import FeatureElection, load_election_results, quick_election + +__version__ = "1.0.0" +__all__ = [ + "FeatureElection", + "FeatureElectionController", + "FeatureElectionExecutor", + "quick_election", + "load_election_results", +] diff --git a/nvflare/app_opt/feature_election/controller.py b/nvflare/app_opt/feature_election/controller.py new file mode 100644 index 0000000000..f7137802ed --- /dev/null +++ b/nvflare/app_opt/feature_election/controller.py @@ -0,0 +1,571 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +from typing import Dict + +import numpy as np + +from nvflare.apis.client import Client +from nvflare.apis.controller_spec import ClientTask +from nvflare.apis.fl_constant import ReturnCode +from nvflare.apis.fl_context import FLContext +from nvflare.apis.impl.controller import Controller, Task +from nvflare.apis.shareable import Shareable +from nvflare.apis.signal import Signal + +logger = logging.getLogger(__name__) + + +class FeatureElectionController(Controller): + """ + Three-phase FL controller for federated feature selection and FedAvg training. + + Phase 1 — Local Feature Selection: each client runs its configured FS method and + returns a feature mask and per-feature scores. + + Phase 2 — Tuning & Global Mask Distribution: the server optionally runs hill-climbing + to find the optimal ``freedom_degree``, then aggregates client masks via weighted voting + and distributes the global feature mask to all clients. If fewer than ``min_clients`` + clients acknowledge the mask, the entire workflow is aborted. + + Phase 3 — FedAvg Training: standard federated averaging on the reduced feature set + for ``num_rounds`` rounds. + + Args: + freedom_degree: Threshold in [0, 1] controlling which features survive the vote. + 0 = intersection (all clients must select), 1 = union (any client suffices). + aggregation_mode: ``'weighted'`` weights each client by sample count; + ``'uniform'`` treats all clients equally. + min_clients: Minimum number of clients that must respond in each phase. + num_rounds: Number of FedAvg training rounds in Phase 3. + task_name: Must match the ``task_name`` configured on ``FeatureElectionExecutor``. + train_timeout: Per-phase timeout in seconds. + auto_tune: If ``True``, Phase 2 runs hill-climbing to optimise ``freedom_degree``. + Has no effect when ``tuning_rounds=0`` (a warning is logged in that case). + tuning_rounds: Number of hill-climbing iterations. Must be >= 2 for meaningful + tuning; ``tuning_rounds=0`` disables tuning (with a warning if ``auto_tune=True``); + ``tuning_rounds=1`` is also disabled (same warning). + wait_time_after_min_received: Seconds to wait for additional client responses after + ``min_clients`` have already replied. Set to ``0`` only for local simulation; + a non-zero value (default 10 s) prevents slower clients from being silently + excluded in heterogeneous production networks. + """ + + def __init__( + self, + freedom_degree: float = 0.5, + aggregation_mode: str = "weighted", + min_clients: int = 2, + num_rounds: int = 5, + task_name: str = "feature_election", + train_timeout: int = 300, + auto_tune: bool = False, + tuning_rounds: int = 0, + wait_time_after_min_received: int = 10, + ): + super().__init__() + + # Configuration + self.freedom_degree = freedom_degree + self.aggregation_mode = aggregation_mode + self.custom_task_name = task_name + self.min_clients = min_clients + self.fl_rounds = num_rounds + self.train_timeout = train_timeout + self.wait_time_after_min_received = wait_time_after_min_received + self.auto_tune = auto_tune + self.tuning_rounds = tuning_rounds if auto_tune else 0 + if auto_tune and self.tuning_rounds == 0: + logger.warning( + "auto_tune=True has no effect when tuning_rounds=0 (the default). " + "Set tuning_rounds >= 2 to enable hill-climbing optimisation of freedom_degree." + ) + elif auto_tune and self.tuning_rounds == 1: + logger.warning( + "auto_tune requires tuning_rounds >= 2 to explore alternative freedom degrees " + "(one baseline evaluation plus at least one neighbour to compare). " + "Got tuning_rounds=1; auto-tuning will be disabled." + ) + self.tuning_rounds = 0 + + # State + self.global_feature_mask = None + self.global_weights = None + self.cached_client_selections = {} + self.phase_results = {} + + # Hill Climbing for auto-tuning + self.tuning_history = [] + self.search_step = 0.1 + self.current_direction = 1 + + self.n_features = None + + def advance_tuning(self, score: float, first_step: bool = False) -> None: + """Record a tuning-round score and update freedom_degree for the next round. + + This is the public interface for the simulation path in + :meth:`FeatureElection.simulate_election` so that the simulation does not + need to mutate private controller state directly. The real FL path in + ``control_flow`` uses the same internal helpers. + + Args: + score: Weighted evaluation score for the current ``freedom_degree``. + first_step: ``True`` only on the very first tuning round; passed + through to ``_calculate_next_fd`` to seed the initial direction. + """ + self.tuning_history.append((self.freedom_degree, score)) + self.freedom_degree = self._calculate_next_fd(first_step=first_step) + + def start_controller(self, fl_ctx: FLContext) -> None: + logger.info("Initializing FeatureElectionController (Base Controller Mode)") + + def stop_controller(self, fl_ctx: FLContext): + # Save results + workspace = fl_ctx.get_engine().get_workspace() + run_dir = workspace.get_run_dir(fl_ctx.get_job_id()) + results = { + "global_mask": self.global_feature_mask.tolist() if self.global_feature_mask is not None else None, + "freedom_degree": float(self.freedom_degree), + "num_features_selected": ( + int(np.sum(self.global_feature_mask)) if self.global_feature_mask is not None else 0 + ), + } + try: + with open(os.path.join(run_dir, "feature_election_results.json"), "w") as f: + json.dump(results, f, indent=2) + except OSError as e: + logger.error(f"Failed to write feature election results to {run_dir}: {e}") + logger.info("Stopping Feature Election Controller") + + def process_result_of_unknown_task( + self, client: Client, task_name: str, client_task_id: str, result: Shareable, fl_ctx: FLContext + ): + """ + Called when a result is received for an unknown task. + This is a fallback - normally results come through task_done_cb. + """ + logger.warning(f"Received result for unknown task '{task_name}' from {client.name}") + + def control_flow(self, abort_signal: Signal, fl_ctx: FLContext) -> None: + """Main Orchestration Loop""" + try: + # --- PHASE 1: LOCAL FEATURE SELECTION (ELECTION) --- + if not self._phase_one_election(abort_signal, fl_ctx): + return + + # --- PHASE 2: TUNING & GLOBAL MASKING --- + if not self._phase_two_tuning_and_masking(abort_signal, fl_ctx): + return + + # --- PHASE 3: AGGREGATION ROUNDS (FL TRAINING) --- + self._phase_three_aggregation(abort_signal, fl_ctx) + + logger.info("Feature Election Workflow Completed Successfully.") + + except Exception as e: + logger.exception(f"Workflow failed: {e}") + abort_signal.trigger() + raise + + # ============================================================================== + # PHASE IMPLEMENTATIONS + # ============================================================================== + + def _result_received_cb(self, client_task: ClientTask, fl_ctx: FLContext): + """ + Callback called when a result is received from a client. + This is the proper way to collect results in NVFLARE. + """ + client_name = client_task.client.name + result = client_task.result + + if result is None: + logger.warning(f"No result from client {client_name}") + return + + rc = result.get_return_code() + if rc != ReturnCode.OK: + logger.warning(f"Client {client_name} returned error: {rc}") + return + + # Store the result + self.phase_results[client_name] = result + logger.debug(f"Received result from {client_name}") + + def _broadcast_and_gather( + self, task_data: Shareable, abort_signal: Signal, fl_ctx: FLContext, timeout: int = 0 + ) -> Dict[str, Shareable]: + """ + Helper to send tasks and collect results safely. + Uses result_received_cb to properly collect results. + """ + # Clear buffer + self.phase_results = {} + + # Create Task with callback + task = Task( + name=self.custom_task_name, + data=task_data, + timeout=timeout, + result_received_cb=self._result_received_cb, + ) + + # Broadcast and wait for results. + # wait_time_after_min_received > 0 gives slower clients a window to respond + # after min_clients have already replied, preventing silent exclusion. + self.broadcast_and_wait( + task=task, + min_responses=self.min_clients, + wait_time_after_min_received=self.wait_time_after_min_received, + fl_ctx=fl_ctx, + abort_signal=abort_signal, + ) + + # Also collect any results from client_tasks (backup method) + for client_task in task.client_tasks: + client_name = client_task.client.name + if client_name not in self.phase_results and client_task.result is not None: + rc = client_task.result.get_return_code() + if rc == ReturnCode.OK: + self.phase_results[client_name] = client_task.result + logger.debug(f"Collected result from task.client_tasks: {client_name}") + + logger.info(f"Collected {len(self.phase_results)} results") + return self.phase_results + + def _phase_one_election(self, abort_signal: Signal, fl_ctx: FLContext) -> bool: + logger.info("=== PHASE 1: Local Feature Selection & Election ===") + + task_data = Shareable() + task_data["request_type"] = "feature_selection" + + # Broadcast and collect results + results = self._broadcast_and_gather(task_data, abort_signal, fl_ctx) + + if not results: + logger.error("No feature votes received. Aborting.") + return False + + # Extract client data + self.cached_client_selections = self._extract_client_data(results) + + if not self.cached_client_selections: + logger.error("Received responses, but failed to extract selection data. Aborting.") + return False + + logger.info(f"Phase 1 Complete. Processed votes from {len(self.cached_client_selections)} clients.") + return True + + def _phase_two_tuning_and_masking(self, abort_signal: Signal, fl_ctx: FLContext): + logger.info("=== PHASE 2: Tuning & Global Mask Generation ===") + + # 1. Run Tuning Loop (if enabled) + if self.auto_tune and self.tuning_rounds > 0: + logger.info(f"Starting Auto-tuning ({self.tuning_rounds} rounds)...") + + for i in range(self.tuning_rounds): + if abort_signal.triggered: + logger.warning("Abort signal received during tuning") + break + + # Evaluate current freedom_degree + mask = self.aggregate_selections(self.cached_client_selections) + + task_data = Shareable() + task_data["request_type"] = "tuning_eval" + task_data["tuning_mask"] = mask.tolist() + + results = self._broadcast_and_gather(task_data, abort_signal, fl_ctx) + + # Aggregate scores using the same weighting as mask aggregation so + # the tuning objective is consistent with the actual aggregation_mode. + weighted_score, total_weight = 0.0, 0.0 + for v in results.values(): + if "tuning_score" not in v: + continue + n = v.get("num_samples", 1) if self.aggregation_mode == "weighted" else 1 + weighted_score += v["tuning_score"] * n + total_weight += n + if total_weight == 0.0: + logger.warning( + f"Tuning round {i + 1}: no clients returned a valid score; " + "skipping history entry to avoid corrupting hill-climbing signal." + ) + continue + score = weighted_score / total_weight + + logger.info( + f"Tuning Round {i + 1}/{self.tuning_rounds}: FD={self.freedom_degree:.4f} -> Score={score:.4f}" + ) + self.tuning_history.append((self.freedom_degree, score)) + + # Calculate next FD for next iteration (if not last round) + if i < self.tuning_rounds - 1: + self.freedom_degree = self._calculate_next_fd(first_step=(i == 0)) + + # Select best FD from evaluated options + if self.tuning_history: + best_fd, best_score = max(self.tuning_history, key=lambda x: x[1]) + self.freedom_degree = best_fd + logger.info(f"Tuning Complete. Optimal Freedom Degree: {best_fd:.4f} (Score: {best_score:.4f})") + else: + logger.warning("No tuning results, keeping initial freedom_degree") + + # 2. Generate Final Mask + final_mask = self.aggregate_selections(self.cached_client_selections) + self.global_feature_mask = final_mask + n_sel = np.sum(final_mask) + logger.info( + f"Final Global Mask: {n_sel} features selected " + f"(FD={self.freedom_degree:.4f}, aggregation_mode={self.aggregation_mode})" + ) + + # 3. Distribute mask to clients + task_data = Shareable() + task_data["request_type"] = "apply_mask" + task_data["global_feature_mask"] = final_mask.tolist() + + mask_results = self._broadcast_and_gather(task_data, abort_signal, fl_ctx) + if len(mask_results) < self.min_clients: + logger.warning( + f"Global mask distribution incomplete: only {len(mask_results)}/{self.min_clients} " + "clients acknowledged. The entire FL workflow is being aborted — " + "Phase 3 aggregation will not run." + ) + return False + logger.info(f"Global mask distributed to {len(mask_results)} clients") + return True + + def _phase_three_aggregation(self, abort_signal: Signal, fl_ctx: FLContext): + logger.info(f"=== PHASE 3: Aggregation Rounds (FL Training - {self.fl_rounds} Rounds) ===") + + completed_rounds = 0 + for i in range(1, self.fl_rounds + 1): + if abort_signal.triggered: + logger.warning( + f"Abort signal received during FL training after {completed_rounds}/{self.fl_rounds} rounds" + ) + break + + logger.info(f"--- FL Round {i}/{self.fl_rounds} ---") + + task_data = Shareable() + task_data["request_type"] = "train" + if self.global_weights is not None: + task_data["params"] = { + k: v.tolist() if isinstance(v, np.ndarray) else v for k, v in self.global_weights.items() + } + results = self._broadcast_and_gather(task_data, abort_signal, fl_ctx, timeout=self.train_timeout) + if len(results) < self.min_clients: + logger.warning( + f"FL Round {i}: only {len(results)}/{self.min_clients} clients responded; " + "proceeding with partial results." + ) + + # Aggregate Weights (FedAvg) + self._aggregate_weights(results) + completed_rounds += 1 + + if completed_rounds == self.fl_rounds: + logger.info(f"FL Training phase complete ({completed_rounds}/{self.fl_rounds} rounds)") + else: + logger.warning(f"FL Training phase ended early: {completed_rounds}/{self.fl_rounds} rounds completed") + + # ============================================================================== + # HELPER METHODS + # ============================================================================== + + def _aggregate_weights(self, results: Dict[str, Shareable]): + """FedAvg-style weight aggregation""" + total_samples = 0 + weighted_weights = {} + + for shareable in results.values(): + if "params" not in shareable: + continue + n = shareable.get("num_samples", 1) + weights = shareable.get("params") + if weights is not None: + # Initialize weighted_weights from first valid weights + if not weighted_weights: + weighted_weights = {k: np.zeros_like(np.array(v)) for k, v in weights.items()} + # Validate all keys before accumulating — a partial update would corrupt FedAvg. + # Check both directions: unexpected client keys and missing client keys. + client_valid = True + missing_keys = [k for k in weighted_weights if k not in weights] + if missing_keys: + logger.warning(f"Client weights are missing expected keys {missing_keys}; skipping client") + client_valid = False + for k, v in weights.items(): + if not client_valid: + break + v_array = np.array(v) + if k not in weighted_weights: + logger.warning(f"Unexpected weight key '{k}' from client, skipping client") + client_valid = False + break + if weighted_weights[k].shape != v_array.shape: + logger.error( + f"Weight shape mismatch for key '{k}': expected {weighted_weights[k].shape}, got {v_array.shape}" + ) + client_valid = False + break + if client_valid: + for k, v in weights.items(): + weighted_weights[k] += np.array(v) * n + total_samples += n + + if total_samples > 0: + self.global_weights = {k: v / total_samples for k, v in weighted_weights.items()} + logger.info(f"Aggregated weights from {len(results)} clients ({total_samples} samples)") + else: + logger.warning("Weight aggregation skipped: no clients returned valid parameters; global weights unchanged") + + def _extract_client_data(self, results: Dict[str, Shareable]) -> Dict[str, Dict]: + """Extract feature selection data from client results""" + client_data = {} + for key, contrib in results.items(): + if "selected_features" in contrib: + selected = np.array(contrib["selected_features"]) + + # Get n_features from first client response + if self.n_features is None: + self.n_features = len(selected) + logger.debug(f"Inferred n_features={self.n_features} from {key}") + + client_data[key] = { + "selected_features": selected, + "feature_scores": np.array(contrib["feature_scores"]), + "num_samples": contrib.get("num_samples", 1), + } + logger.debug(f"Extracted {np.sum(contrib['selected_features'])} features from {key}") + return client_data + + def aggregate_selections(self, client_selections: Dict[str, Dict]) -> np.ndarray: + """ + Aggregate feature selections from all clients. + + Freedom degree controls the blend between intersection and union: + - FD=0: Intersection (only features selected by ALL clients) + - FD=1: Union (features selected by ANY client) + - 0 0 else np.ones(len(weights)) / len(weights) + + intersection = np.all(masks, axis=0) + union = np.any(masks, axis=0) + + # Handle edge cases + if self.freedom_degree <= 0.01: + return intersection + if self.freedom_degree >= 0.99: + return union + + return self._weighted_election(masks, scores, weights, intersection, union) + + def _weighted_election( + self, masks: np.ndarray, scores: np.ndarray, weights: np.ndarray, intersection: np.ndarray, union: np.ndarray + ) -> np.ndarray: + """ + Perform weighted voting for features in the difference set. + Uses aggregation_mode to determine weighting strategy. + """ + diff_mask = union & ~intersection + if not np.any(diff_mask): + return intersection + + # Compute aggregated scores based on aggregation_mode + agg_scores = np.zeros(len(intersection)) + + # Determine weights based on aggregation mode + if self.aggregation_mode == "uniform": + # Equal weight for all clients + effective_weights = np.ones(len(weights)) / len(weights) + else: # "weighted" mode (default) + # Use sample-size-based weights + effective_weights = weights + + for i, (m, s) in enumerate(zip(masks, scores)): + valid = m.astype(bool) + if not np.any(valid): + logger.warning(f"Client {i} has no selected features, skipping") + continue + min_s, max_s = np.min(s[valid]), np.max(s[valid]) + if max_s > min_s: + norm_s = np.where(valid, (s - min_s) / (max_s - min_s), 0.0) + else: + norm_s = np.where(valid, 0.5, 0.0) + agg_scores += norm_s * effective_weights[i] + + # Select top features from (Union - Intersection) based on freedom_degree + n_add = int(np.ceil(np.sum(diff_mask) * self.freedom_degree)) + if n_add > 0: + diff_indices = np.where(diff_mask)[0] + diff_scores = agg_scores[diff_indices] + top_indices = diff_indices[np.argsort(diff_scores, kind="stable")[-n_add:]] + selected_diff = np.zeros_like(diff_mask) + selected_diff[top_indices] = True + return intersection | selected_diff + # No features to add + else: + return intersection + + def _calculate_next_fd(self, first_step: bool) -> float: + """Hill-climbing to find optimal freedom degree""" + MIN_FD, MAX_FD = 0.05, 1.0 + + if first_step: + # Choose the initial direction so the first step stays within [MIN_FD, MAX_FD]. + # Starting near 1.0 in the positive direction would clip immediately and waste + # a tuning round; prefer going negative when headroom above is smaller. + if self.freedom_degree + self.search_step > MAX_FD: + self.current_direction = -1 + return np.clip(self.freedom_degree + (self.current_direction * self.search_step), MIN_FD, MAX_FD) + + if len(self.tuning_history) < 2: + return self.freedom_degree + + curr_fd, curr_score = self.tuning_history[-1] + prev_fd, prev_score = self.tuning_history[-2] + + if curr_score > prev_score: + new_fd = curr_fd + (self.current_direction * self.search_step) + else: + self.current_direction *= -1 + self.search_step = max(self.search_step * 0.5, 1e-3) + # Step from curr_fd (not prev_fd) so the explorer backtracks from its + # current position rather than skipping the region between the last two + # evaluated points. + new_fd = curr_fd + (self.current_direction * self.search_step) + + return np.clip(new_fd, MIN_FD, MAX_FD) diff --git a/nvflare/app_opt/feature_election/executor.py b/nvflare/app_opt/feature_election/executor.py new file mode 100644 index 0000000000..88b5bd096d --- /dev/null +++ b/nvflare/app_opt/feature_election/executor.py @@ -0,0 +1,416 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Dict, Optional, Tuple + +import numpy as np +import pandas as pd +from sklearn.ensemble import RandomForestClassifier +from sklearn.feature_selection import mutual_info_classif +from sklearn.linear_model import ElasticNet, Lasso, LogisticRegression +from sklearn.metrics import accuracy_score, f1_score +from sklearn.preprocessing import StandardScaler + +from nvflare.apis.executor import Executor +from nvflare.apis.fl_constant import ReturnCode +from nvflare.apis.fl_context import FLContext +from nvflare.apis.shareable import Shareable, make_reply +from nvflare.apis.signal import Signal + +try: + from PyImpetus import PPIMBC + + PYIMPETUS_AVAILABLE = True +except ImportError: + PYIMPETUS_AVAILABLE = False + +logger = logging.getLogger(__name__) + +LASSO_ELASTIC_NET_ZERO_THRESHOLD: float = 1e-6 + + +class FeatureElectionExecutor(Executor): + """ + Client-side executor for the Feature Election federated workflow. + + Handles four request types dispatched by ``FeatureElectionController``: + + * ``feature_selection`` — runs the configured FS method on local data and returns + a boolean feature mask and per-feature scores. + * ``tuning_eval`` — evaluates a candidate mask proposed by the controller during + the hill-climbing phase and returns the local score. + * ``apply_mask`` — permanently slices ``X_train`` / ``X_val`` to the selected + features. **Idempotent**: if the same mask is received a second time (e.g. due + to task retransmission) the call returns ``OK`` immediately without modifying data. + * ``train`` — performs one FedAvg round on the masked feature set and returns the + updated model weights. + + Args: + fs_method: Feature selection algorithm. One of ``'lasso'``, ``'elastic_net'``, + ``'mutual_info'``, ``'random_forest'``, ``'pyimpetus'``. + fs_params: Extra keyword arguments forwarded to the FS algorithm. + eval_metric: ``'f1'`` (weighted) or ``'accuracy'``, used for tuning eval and + local scoring. + task_name: Must match the ``task_name`` on ``FeatureElectionController``. + + Note: + Call :meth:`set_data` before the executor is registered with the FL runtime. + ``FeatureElectionExecutor`` has no ``client_id`` attribute; use + ``fl_ctx.get_identity_name()`` inside ``_load_data_if_needed`` to retrieve the + site name assigned by the FL platform. + """ + + def __init__( + self, + fs_method: str = "lasso", + fs_params: Optional[Dict] = None, + eval_metric: str = "f1", + task_name: str = "feature_election", + ): + super().__init__() + self.fs_method = fs_method.lower() + self.fs_params = fs_params or {} + self.eval_metric = eval_metric + self.task_name = task_name + + # Data + self.X_train = None + self.y_train = None + self.X_val = None + self.y_val = None + + # Scaler fitted on X_train; stored so _handle_train and _handle_tuning_eval + # use the same parameters rather than each reconstructing an identical instance. + # Reset to None whenever X_train changes (set_data, apply_mask). + self.scaler = None + + # Use LogisticRegression with LBFGS solver - much faster convergence than SGDClassifier + # for small-to-medium datasets. warm_start=True allows incremental training across rounds. + self.global_feature_mask = None + self.model = LogisticRegression(max_iter=1000, solver="lbfgs", warm_start=True, random_state=42) + self._model_initialized = False # Track if model has been fit + + self._set_default_params() + + def _set_default_params(self): + defaults = { + "lasso": {"alpha": 0.01}, + "elastic_net": {"alpha": 0.01, "l1_ratio": 0.5}, + "mutual_info": {}, + "random_forest": {"n_estimators": 100, "random_state": 42}, + "pyimpetus": {"p_val_thresh": 0.05}, + } + if self.fs_method in defaults: + self.fs_params = {**defaults[self.fs_method], **self.fs_params} + + def set_data(self, X_train, y_train, X_val=None, y_val=None, feature_names=None): + """ + Set data for the executor. + X_val and y_val are optional; if not provided, training data is used for evaluation. + """ + # Validate that feature_names matches X_train dimensions to prevent misalignment + if feature_names is not None: + if len(feature_names) != X_train.shape[1]: + raise ValueError( + f"Length of feature_names ({len(feature_names)}) must match " + f"number of features in X_train ({X_train.shape[1]})." + ) + + # Coerce pandas inputs to numpy so positional indexing inside _handle_train + # and elsewhere is always consistent regardless of the DataFrame/Series index. + self.X_train = X_train.values if isinstance(X_train, pd.DataFrame) else X_train + self.y_train = y_train.values if isinstance(y_train, pd.Series) else y_train + self.scaler = None # invalidate cached scaler whenever X_train changes + + # If X_val is provided, ensure it has the same feature count as X_train + if X_val is not None: + X_val = X_val.values if isinstance(X_val, pd.DataFrame) else X_val + y_val = y_val.values if isinstance(y_val, pd.Series) else y_val + if X_val.shape[1] != self.X_train.shape[1]: + raise ValueError( + f"X_val feature count ({X_val.shape[1]}) does not match " + f"X_train feature count ({self.X_train.shape[1]})." + ) + self.X_val = X_val + self.y_val = y_val + else: + self.X_val = self.X_train + self.y_val = self.y_train + + self.feature_names = feature_names + + def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: + if task_name != self.task_name: + return make_reply(ReturnCode.TASK_UNKNOWN) + + request_type = shareable.get("request_type") + + if request_type == "feature_selection": + return self._handle_feature_selection() + elif request_type == "tuning_eval": + return self._handle_tuning_eval(shareable) + elif request_type == "apply_mask": + return self._handle_apply_mask(shareable) + elif request_type == "train": + return self._handle_train(shareable) + else: + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def evaluate_model(self, X_train, y_train, X_val, y_val) -> float: + """ + Helper method to train and evaluate a model locally. + Required for the 'simulate_election' functionality and tests. + """ + if len(y_train) == 0 or len(y_val) == 0: + return 0.0 + + try: + # Scale + scaler = StandardScaler() + X_train_scaled = scaler.fit_transform(X_train) + X_val_scaled = scaler.transform(X_val) + + # Quick train + model = LogisticRegression(max_iter=200, random_state=42) + model.fit(X_train_scaled, y_train) + y_pred = model.predict(X_val_scaled) + + if self.eval_metric == "accuracy": + return accuracy_score(y_val, y_pred) + return f1_score(y_val, y_pred, average="weighted") + except Exception as e: + logger.warning(f"Local evaluation failed: {e}") + return 0.0 + + def _handle_feature_selection(self) -> Shareable: + if self.X_train is None: + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + try: + mask, scores = self.perform_feature_selection() + resp = make_reply(ReturnCode.OK) + resp["selected_features"] = mask.tolist() + resp["feature_scores"] = scores.tolist() + resp["num_samples"] = len(self.X_train) + return resp + except Exception as e: + logger.error(f"FS failed: {e}") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def _handle_tuning_eval(self, shareable: Shareable) -> Shareable: + try: + mask = np.array(shareable.get("tuning_mask"), dtype=bool) + if self.X_train is None or np.sum(mask) == 0: + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + if len(mask) != self.X_train.shape[1]: + logger.error(f"Tuning mask length ({len(mask)}) doesn't match feature count ({self.X_train.shape[1]})") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + X_tr = self.X_train[:, mask] + X_v = self.X_val[:, mask] + + # Use helper + score = self.evaluate_model(X_tr, self.y_train, X_v, self.y_val) + + resp = make_reply(ReturnCode.OK) + resp["tuning_score"] = float(score) + resp["num_samples"] = len(self.y_train) + return resp + except Exception as e: + logger.error(f"Tuning eval failed: {e}") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def _handle_apply_mask(self, shareable: Shareable) -> Shareable: + try: + if self.X_train is None: + logger.error("apply_mask received before set_data was called") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + mask = np.array(shareable.get("global_feature_mask"), dtype=bool) + + # Idempotency guard: if this exact mask was already applied (e.g. task + # retransmission), X_train is already sliced down to the selected features + # so re-applying would raise an IndexError. Return OK immediately. + if self.global_feature_mask is not None: + if np.array_equal(mask, self.global_feature_mask): + logger.info("Mask already applied (duplicate task delivery); returning OK") + return make_reply(ReturnCode.OK) + # A different mask arrived after the first was already applied — the + # executor's feature space has already been permanently reduced, so + # this is an unrecoverable state. Log clearly and fail fast. + logger.error( + f"Received a different mask after mask was already applied. " + f"Expected mask length {len(self.global_feature_mask)} " + f"(checksum {self.global_feature_mask.sum()}), " + f"got length {len(mask)} (checksum {mask.sum()}). " + "This is unrecoverable — the executor feature space has already been reduced." + ) + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + # Validate mask length against the *current* (pre-mask) feature count + if len(mask) != self.X_train.shape[1]: + logger.error(f"Mask length ({len(mask)}) doesn't match number of features ({self.X_train.shape[1]})") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + logger.info(f"Permanently applying mask: {np.sum(mask)} features selected") + + self.global_feature_mask = mask + self.X_train = self.X_train[:, mask] + self.X_val = self.X_val[:, mask] + if self.feature_names is not None: + self.feature_names = [self.feature_names[i] for i in np.where(mask)[0]] + self.scaler = None # feature count changed; cached scaler is invalid + return make_reply(ReturnCode.OK) + except Exception as e: + logger.error(f"Mask application failed: {e}") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def _handle_train(self, shareable: Shareable) -> Shareable: + try: + # Fit the scaler once per feature set; reuse across rounds so training + # and evaluation always use identical normalisation parameters. + if self.scaler is None: + self.scaler = StandardScaler() + self.scaler.fit(self.X_train) + X_tr = self.scaler.transform(self.X_train) + + # Parse global parameters from the server if present. We extract them + # here but assign them immediately before the warm-start fit below, so + # the weight assignment is always the last write to coef_/intercept_ + # before model.fit() — regardless of whether the model needed an init fit. + global_coef = None + global_intercept = None + if "params" in shareable: + p = shareable["params"] + if "weight_0" in p and "weight_1" in p: + coef = np.array(p["weight_0"]) + if coef.ndim == 1: + coef = coef.reshape(1, -1) # Binary: (n_features,) -> (1, n_features) + global_coef = coef + global_intercept = np.array(p["weight_1"]) + + # Ensure model structure (coef_ shape) is established before any weight + # assignment. Guarantee at least one sample per class so LogisticRegression + # does not raise "only one class in data" on sorted or tiny splits. + if not self._model_initialized: + unique_classes = np.unique(self.y_train) + init_idx = [int(np.where(self.y_train == c)[0][0]) for c in unique_classes] + n_extra = max(0, min(10, len(self.y_train)) - len(init_idx)) + # np.setdiff1d avoids building a full O(n) intermediate list before + # slicing; assume_unique=True skips the dedup sort since init_idx + # already contains one distinct index per class. + remaining = np.setdiff1d(np.arange(len(self.y_train)), init_idx, assume_unique=True) + init_idx += remaining[:n_extra].tolist() + self.model.fit(X_tr[init_idx], self.y_train[init_idx]) + self._model_initialized = True + + # Assign aggregated weights immediately before the warm-start fit so that + # model.fit() always starts from the global model — never from the init fit. + # Handles both binary (1, n_features) and multi-class (n_classes, n_features). + if global_coef is not None: + self.model.coef_ = global_coef + self.model.intercept_ = global_intercept + + # Train with warm_start=True continues from current weights + self.model.fit(X_tr, self.y_train) + self._model_initialized = True + + resp = make_reply(ReturnCode.OK) + # Send full coef_ to support both binary and multi-class classification + resp["params"] = {"weight_0": self.model.coef_.tolist(), "weight_1": self.model.intercept_.tolist()} + resp["num_samples"] = len(self.X_train) + return resp + except Exception as e: + logger.error(f"Training failed: {e}") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def perform_feature_selection(self) -> Tuple[np.ndarray, np.ndarray]: + n_features = self.X_train.shape[1] + + scaler = StandardScaler() + X_scaled = scaler.fit_transform(self.X_train) + + if self.fs_method == "lasso": + # Intentional: Lasso regression is used on classification labels as a + # continuous proxy target. This follows the FLASH paper methodology + # (Christofilogiannis et al., FLTA 2025): L1 regularisation drives + # irrelevant feature coefficients to exactly zero, giving a sparse + # boolean mask directly from the non-zero entries. Using the regression + # form rather than LogisticRegression avoids multi-class coefficient + # expansion and keeps the output a single coefficient vector. + s = Lasso(**self.fs_params).fit(X_scaled, self.y_train) + scores = np.abs(s.coef_) + return scores > LASSO_ELASTIC_NET_ZERO_THRESHOLD, scores + + elif self.fs_method == "elastic_net": + # Intentional: ElasticNet regression on classification labels (same + # rationale as Lasso above). The L1+L2 mix handles correlated features + # better than pure Lasso while still producing exact zeros for selection. + s = ElasticNet(**self.fs_params).fit(X_scaled, self.y_train) + scores = np.abs(s.coef_) + return scores > LASSO_ELASTIC_NET_ZERO_THRESHOLD, scores + + elif self.fs_method == "mutual_info": + # "k" controls how many top features to select; defaults to top 50%. + # Pop it before forwarding the rest to mutual_info_classif so sklearn + # does not receive an unexpected keyword argument. + mi_params = dict(self.fs_params) + k_raw = mi_params.pop("k", None) + k = max(1, int(k_raw)) if k_raw is not None else max(1, n_features // 2) + scores = mutual_info_classif(self.X_train, self.y_train, random_state=42, **mi_params) + mask = np.zeros(n_features, dtype=bool) + mask[np.argsort(scores)[-k:]] = True + return mask, scores + + elif self.fs_method == "random_forest": + # "k" controls how many top features to select; defaults to top 50%. + # Pop it before forwarding the rest to RandomForestClassifier so sklearn + # does not receive an unexpected keyword argument. + rf_params = dict(self.fs_params) + k_raw = rf_params.pop("k", None) + k = max(1, int(k_raw)) if k_raw is not None else max(1, n_features // 2) + rf = RandomForestClassifier(**rf_params) + rf.fit(self.X_train, self.y_train) + scores = rf.feature_importances_ + mask = np.zeros(n_features, dtype=bool) + mask[np.argsort(scores)[-k:]] = True + return mask, scores + + elif self.fs_method == "pyimpetus": + if not PYIMPETUS_AVAILABLE: + # This fallback uses X_scaled (StandardScaler-transformed) while the + # real PyImpetus path below uses raw self.X_train. Feature scores will + # differ across environments where PyImpetus is or is not installed. + # PyImpetus works best without data scaling. + logger.warning("PyImpetus not available, falling back to mutual_info") + scores = mutual_info_classif(X_scaled, self.y_train, random_state=42) + mask = np.zeros(n_features, dtype=bool) + k = max(1, n_features // 2) + mask[np.argsort(scores)[-k:]] = True + return mask, scores + + # Extract base model separately, then forward remaining fs_params as kwargs + base_model = self.fs_params.get("model", LogisticRegression(max_iter=1000, random_state=42)) + ppimbc_kwargs = {k: v for k, v in self.fs_params.items() if k != "model"} + model = PPIMBC(base_model, **ppimbc_kwargs) + selected_features = model.fit(self.X_train, self.y_train) + mask = np.zeros(n_features, dtype=bool) + mask[selected_features] = True + scores = np.zeros(n_features) + scores[selected_features] = 1.0 + return mask, scores + + else: + return np.ones(n_features, dtype=bool), np.ones(n_features) diff --git a/nvflare/app_opt/feature_election/feature_election.py b/nvflare/app_opt/feature_election/feature_election.py new file mode 100644 index 0000000000..ace5ac168d --- /dev/null +++ b/nvflare/app_opt/feature_election/feature_election.py @@ -0,0 +1,621 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Feature Election Library for NVIDIA FLARE +High-level API for federated feature selection and training workflow. +""" + +import json +import logging +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Union + +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder + +logger = logging.getLogger(__name__) + + +class FeatureElection: + """ + High-level interface for Feature Election in NVIDIA FLARE. + Simplifies integration with tabular datasets for federated feature selection. + + This class provides: + - Easy data preparation and splitting + - Local simulation for testing + - Result management and persistence + """ + + def __init__( + self, + freedom_degree: float = 0.5, + fs_method: str = "lasso", + aggregation_mode: str = "weighted", + auto_tune: bool = False, + tuning_rounds: int = 5, + eval_metric: str = "f1", + wait_time_after_min_received: int = 10, + fs_params: Optional[Dict] = None, + ): + if not 0 <= freedom_degree <= 1: + raise ValueError("freedom_degree must be between 0 and 1") + if aggregation_mode not in ["weighted", "uniform"]: + raise ValueError("aggregation_mode must be 'weighted' or 'uniform'") + if eval_metric not in ["f1", "accuracy"]: + raise ValueError("eval_metric must be 'f1' or 'accuracy'") + + self.freedom_degree = freedom_degree + self.fs_method = fs_method + self.aggregation_mode = aggregation_mode + self.auto_tune = auto_tune + self.tuning_rounds = tuning_rounds + self.eval_metric = eval_metric + self.wait_time_after_min_received = wait_time_after_min_received + # FS hyperparameters (e.g. {"alpha": 0.1} for Lasso) forwarded to the + # executor; None means the executor uses its own defaults. + self.fs_params = fs_params or {} + + # Storage for results + self.global_mask = None + self.selected_feature_names = None + self.election_stats = {} + + def create_flare_job( + self, + job_name: str = "feature_election", + output_dir: str = "jobs/feature_election", + min_clients: int = 2, + num_rounds: int = 5, + client_sites: Optional[List[str]] = None, + ) -> Dict[str, str]: + """ + Generate FLARE job configuration. + """ + job_path = Path(output_dir) / job_name + job_path.mkdir(parents=True, exist_ok=True) + (job_path / "app" / "config").mkdir(parents=True, exist_ok=True) + (job_path / "app" / "custom").mkdir(parents=True, exist_ok=True) + + # Server config + server_config = { + "format_version": 2, + "workflows": [ + { + "id": "feature_election_workflow", + "path": "nvflare.app_opt.feature_election.controller.FeatureElectionController", + "args": { + "freedom_degree": float(self.freedom_degree), + "aggregation_mode": self.aggregation_mode, + "min_clients": min_clients, + "num_rounds": num_rounds, + "task_name": "feature_election", + "auto_tune": self.auto_tune, + "tuning_rounds": self.tuning_rounds, + "wait_time_after_min_received": self.wait_time_after_min_received, + }, + } + ], + "components": [], + } + + # Client config + client_config = { + "format_version": 2, + "executors": [ + { + "tasks": ["feature_election"], + "executor": { + "path": "nvflare.app_opt.feature_election.executor.FeatureElectionExecutor", + "args": { + "fs_method": self.fs_method, + "eval_metric": self.eval_metric, + "task_name": "feature_election", + "fs_params": self.fs_params if self.fs_params else None, + }, + }, + } + ], + "task_result_filters": [], + "task_data_filters": [], + } + + if client_sites is None: + client_sites = [f"site-{i + 1}" for i in range(min_clients)] + + meta_config = { + "name": job_name, + "resource_spec": {site: {"num_of_gpus": 0, "mem_per_gpu_in_GiB": 0} for site in client_sites}, + "min_clients": min_clients, + "mandatory_clients": [], + "deploy_map": {"app": ["@ALL"]}, + "task_data_filters": [], + "task_result_filters": [], + } + + # Write files + paths = { + "server_config": job_path / "app" / "config" / "config_fed_server.json", + "client_config": job_path / "app" / "config" / "config_fed_client.json", + "meta": job_path / "meta.json", + "readme": job_path / "README.md", + } + + with open(paths["server_config"], "w") as f: + json.dump(server_config, f, indent=2) + with open(paths["client_config"], "w") as f: + json.dump(client_config, f, indent=2) + with open(paths["meta"], "w") as f: + json.dump(meta_config, f, indent=2) + + # Create README + with open(paths["readme"], "w") as f: + f.write(f"# {job_name}\n\nFeature Election job (Auto-tune: {self.auto_tune})") + + logger.info(f"FLARE job configuration created in {job_path}") + return {k: str(v) for k, v in paths.items()} + + def prepare_data_splits( + self, + df: pd.DataFrame, + target_col: str, + num_clients: int = 3, + split_strategy: str = "stratified", + split_ratios: Optional[List[float]] = None, + random_state: int = 42, + dirichlet_alpha: float = 0.5, + ) -> List[Tuple[pd.DataFrame, pd.Series]]: + """Prepare data splits for federated clients.""" + X = df.drop(columns=[target_col]) + y = df[target_col] + + # Track before auto-assignment so we can warn if the user supplied + # split_ratios for a strategy that does not use them. + user_provided_split_ratios = split_ratios is not None + + if split_ratios is None: + if num_clients == 2: + split_ratios = [0.6, 0.4] + elif num_clients == 3: + split_ratios = [0.5, 0.3, 0.2] + else: + split_ratios = [1.0 / num_clients] * num_clients + + if abs(sum(split_ratios) - 1.0) > 0.001: + raise ValueError(f"Split ratios must sum to 1.0, got {sum(split_ratios)}") + if len(split_ratios) != num_clients: + raise ValueError(f"len(split_ratios) ({len(split_ratios)}) must equal num_clients ({num_clients})") + + # "non_iid" is the canonical name (matches README / prepare_data.py / CLI). + # Accept "dirichlet" as a legacy alias so existing callers are not broken. + if split_strategy == "dirichlet": + split_strategy = "non_iid" + + if split_strategy == "non_iid" and user_provided_split_ratios: + logger.warning( + "split_ratios is ignored by the 'non_iid' strategy; " + "class proportions are drawn from a Dirichlet distribution controlled " + "by dirichlet_alpha. The provided split_ratios will not be used." + ) + + client_data = [] + indices = np.arange(len(df)) + + if split_strategy == "stratified": + remaining_y, remaining_indices = y, indices + for i in range(num_clients - 1): + size = split_ratios[i] / sum(split_ratios[i:]) + try: + c_idx, r_idx = train_test_split( + remaining_indices, test_size=1 - size, stratify=remaining_y, random_state=random_state + i + ) + except ValueError as e: + # Only fall back to non-stratified splitting for sklearn's own + # stratification errors (e.g. a class with fewer than 2 samples). + # Any other ValueError — such as test_size=0 from a zero ratio — + # should propagate so the caller gets a meaningful error message. + if "stratif" not in str(e).lower() and "least populated" not in str(e).lower(): + raise + c_idx, r_idx = train_test_split( + remaining_indices, test_size=1 - size, random_state=random_state + i + ) + client_data.append((X.iloc[c_idx], y.iloc[c_idx])) + remaining_indices = r_idx + remaining_y = y.iloc[remaining_indices] + client_data.append((X.iloc[remaining_indices], y.iloc[remaining_indices])) + + elif split_strategy == "random": + np.random.seed(random_state) + np.random.shuffle(indices) + start = 0 + for i, ratio in enumerate(split_ratios): + if i == len(split_ratios) - 1: + c_idx = indices[start:] # last client gets all remaining + else: + end = start + int(len(indices) * ratio) + c_idx = indices[start:end] + start = end + if len(c_idx) == 0: + raise ValueError( + f"Client {i} received 0 samples from random split. " + "Increase the dataset size or adjust split_ratios." + ) + client_data.append((X.iloc[c_idx], y.iloc[c_idx])) + + elif split_strategy == "non_iid": + # Non-IID split logic + le = LabelEncoder() + y_encoded = le.fit_transform(y) + n_classes = len(le.classes_) + np.random.seed(random_state) + label_distribution = np.random.dirichlet([dirichlet_alpha] * num_clients, n_classes) + + client_indices = [[] for _ in range(num_clients)] + for k in range(n_classes): + idx_k = np.where(y_encoded == k)[0] + np.random.shuffle(idx_k) + proportions = (label_distribution[k] * len(idx_k)).astype(int) + # Assign any rounding remainder to the last client so that every + # sample in this class is distributed exactly once. This matches + # the convention used in prepare_data.py and the random/sequential + # split strategies above (last client takes indices[start:]). + total_assigned = proportions[:-1].sum() + proportions[-1] = max(0, len(idx_k) - total_assigned) + + start = 0 + for i, prop in enumerate(proportions): + client_indices[i].extend(idx_k[start : start + prop]) + start += prop + + for i, indices_i in enumerate(client_indices): + if len(indices_i) == 0: + raise ValueError( + f"Client {i} received 0 samples from Dirichlet split (alpha={dirichlet_alpha}). " + "Increase the dataset size or reduce the number of clients." + ) + client_data.append((X.iloc[indices_i], y.iloc[indices_i])) + + else: + # Fallback for sequential or other + start = 0 + for i, ratio in enumerate(split_ratios): + if i == len(split_ratios) - 1: + c_idx = indices[start:] # last client gets all remaining + else: + end = start + int(len(indices) * ratio) + c_idx = indices[start:end] + start = end + if len(c_idx) == 0: + raise ValueError( + f"Client {i} received 0 samples from sequential split. " + "Increase the dataset size or adjust split_ratios." + ) + client_data.append((X.iloc[c_idx], y.iloc[c_idx])) + + return client_data + + def simulate_election( + self, + client_data: List[Tuple[Union[pd.DataFrame, np.ndarray], Union[pd.Series, np.ndarray]]], + feature_names: Optional[List[str]] = None, + ) -> Dict: + """Simulate election locally.""" + # Local import to avoid circular dependency + from .controller import FeatureElectionController + from .executor import FeatureElectionExecutor + + controller = FeatureElectionController( + freedom_degree=self.freedom_degree, + aggregation_mode=self.aggregation_mode, + min_clients=len(client_data), + auto_tune=self.auto_tune, + tuning_rounds=self.tuning_rounds, + wait_time_after_min_received=self.wait_time_after_min_received, + ) + + # Accumulate feature names from the first DataFrame client encountered; + # validate all subsequent DataFrame clients against that reference. + # Using a separate local variable avoids in-loop reassignment of the + # parameter, making the capture-once intent explicit. + resolved_feature_names = feature_names + n_features_ref = None # established from the first client; all others must match + client_selections = {} + executors = [] + for i, (X, y) in enumerate(client_data): + X_np = X.values if isinstance(X, pd.DataFrame) else X + y_np = y.values if isinstance(y, pd.Series) else y + + # Validate feature count consistency across all clients (DataFrame and ndarray). + if n_features_ref is None: + n_features_ref = X_np.shape[1] + elif X_np.shape[1] != n_features_ref: + raise ValueError( + f"Client {i} has {X_np.shape[1]} features but client 0 has {n_features_ref}. " + "All clients must have the same number of features." + ) + + if isinstance(X, pd.DataFrame): + client_cols = X.columns.tolist() + if resolved_feature_names is None: + resolved_feature_names = client_cols + elif client_cols != resolved_feature_names: + raise ValueError( + f"Client {i} has different column labels than client 0. " + f"Expected {len(resolved_feature_names)} columns ({resolved_feature_names[:3]}...), " + f"got {len(client_cols)} ({client_cols[:3]}...). " + "All DataFrame clients must have identical feature columns." + ) + + # Split into train/val so tuning scores are not evaluated on training data. + # Attempt stratified split so minority classes appear in both halves (mirrors + # _safe_train_test_split in prepare_data.py); fall back to random if any class + # has fewer than 2 samples (e.g. after a Dirichlet split). + try: + X_train_sim, X_val_sim, y_train_sim, y_val_sim = train_test_split( + X_np, y_np, test_size=0.2, random_state=42 + i, stratify=y_np + ) + except ValueError: + X_train_sim, X_val_sim, y_train_sim, y_val_sim = train_test_split( + X_np, y_np, test_size=0.2, random_state=42 + i + ) + + executor = FeatureElectionExecutor( + fs_method=self.fs_method, eval_metric=self.eval_metric, fs_params=self.fs_params + ) + executor.set_data( + X_train_sim, y_train_sim, X_val=X_val_sim, y_val=y_val_sim, feature_names=resolved_feature_names + ) + executors.append(executor) + + # Local Selection + + try: + selected_mask, feature_scores = executor.perform_feature_selection() + except (TypeError, ValueError) as e: + raise RuntimeError(f"Feature selection returned unexpected format: {e}") + + if not np.any(selected_mask): + raise ValueError( + f"Client {i}: feature selection rejected all features " + f"(fs_method='{self.fs_method}', fs_params={self.fs_params}). " + "An all-False client mask would bias the global mask toward the intersection " + "of other clients' masks without any signal from this client. " + "Consider lowering the regularisation strength " + "(e.g. reduce 'alpha' for Lasso/ElasticNet)." + ) + + initial_score = executor.evaluate_model(X_train_sim, y_train_sim, X_val_sim, y_val_sim) + + # Apply mask to evaluate on held-out val set + X_sel_tr = X_train_sim[:, selected_mask] + X_sel_val = X_val_sim[:, selected_mask] + fs_score = executor.evaluate_model(X_sel_tr, y_train_sim, X_sel_val, y_val_sim) + + client_selections[f"client_{i}"] = { + "selected_features": selected_mask.tolist(), + "feature_scores": feature_scores.tolist(), + # Use the train-split size so this matches what the real executor + # reports via _handle_feature_selection (len(self.X_train)), keeping + # mask-aggregation weights and tuning-score weights consistent. + "num_samples": len(X_train_sim), + "initial_score": float(initial_score), + "fs_score": float(fs_score), + } + + # Simulate Controller Aggregation with optional auto-tuning. + # Use controller.tuning_rounds (not self.tuning_rounds) so that edge-case + # normalisation performed by FeatureElectionController.__init__ (e.g. the + # tuning_rounds=1 no-op guard) is respected and simulation stays in sync + # with what the real FL deployment would do. + if self.auto_tune and controller.tuning_rounds > 0: + logger.info(f"Starting local auto-tuning ({controller.tuning_rounds} rounds)...") + + for t in range(controller.tuning_rounds): + # Generate mask at current freedom_degree + candidate_mask = controller.aggregate_selections(client_selections) + + # Evaluate across all clients + if np.sum(candidate_mask) == 0: + score = 0.0 + else: + # Use the same weighting as the real controller so the tuning + # objective is consistent with the actual aggregation_mode. + weighted_score, total_weight = 0.0, 0.0 + for i_exec, exec_i in enumerate(executors): + if exec_i.X_train.shape[1] != len(candidate_mask): + raise ValueError( + f"Executor {i_exec} X_train has {exec_i.X_train.shape[1]} features " + f"but candidate_mask has {len(candidate_mask)} entries — " + "X_train must remain full-width throughout the tuning loop" + ) + X_masked = exec_i.X_train[:, candidate_mask] + X_val_masked = exec_i.X_val[:, candidate_mask] + s = exec_i.evaluate_model(X_masked, exec_i.y_train, X_val_masked, exec_i.y_val) + n = len(exec_i.X_train) if self.aggregation_mode == "weighted" else 1 + weighted_score += s * n + total_weight += n + score = weighted_score / total_weight if total_weight > 0 else 0.0 + + logger.info( + f"Tuning Round {t + 1}/{controller.tuning_rounds}: " + f"FD={controller.freedom_degree:.4f} -> Score={score:.4f}" + ) + if t < controller.tuning_rounds - 1: + controller.advance_tuning(score, first_step=(t == 0)) + else: + # Final round: record the score but do not advance to a new FD. + controller.tuning_history.append((controller.freedom_degree, score)) + + # Select best FD + if controller.tuning_history: + best_fd, best_score = max(controller.tuning_history, key=lambda x: x[1]) + controller.freedom_degree = best_fd + self.freedom_degree = best_fd + logger.info(f"Tuning Complete. Optimal Freedom Degree: {best_fd:.4f} (Score: {best_score:.4f})") + + self.global_mask = controller.aggregate_selections(client_selections) + + # Build Stats + masks = np.array([sel["selected_features"] for sel in client_selections.values()]) + self.election_stats = { + "num_clients": len(client_data), + "num_features_original": len(self.global_mask), + "num_features_selected": int(np.sum(self.global_mask)), + "reduction_ratio": float(1 - (np.sum(self.global_mask) / len(self.global_mask))), + "freedom_degree": float(self.freedom_degree), + "fs_method": self.fs_method, + "auto_tune": self.auto_tune, + "tuning_history": ( + [(float(fd), float(s)) for fd, s in controller.tuning_history] + if self.auto_tune and controller.tuning_rounds > 0 + else [] + ), + "intersection_features": int(np.sum(np.all(masks, axis=0))), + "union_features": int(np.sum(np.any(masks, axis=0))), + "client_stats": client_selections, + } + + if resolved_feature_names is not None: + if len(resolved_feature_names) != len(self.global_mask): + raise ValueError( + f"Feature names length ({len(resolved_feature_names)}) doesn't match global mask length ({len(self.global_mask)})" + ) + self.selected_feature_names = [name for i, name in enumerate(resolved_feature_names) if self.global_mask[i]] + + return self.election_stats + + def apply_mask(self, X: Union[pd.DataFrame, np.ndarray]) -> Union[pd.DataFrame, np.ndarray]: + """Apply global feature mask to new data.""" + if self.global_mask is None: + raise ValueError("No global mask available. Run simulate_election() first.") + + if isinstance(X, pd.DataFrame): + if self.selected_feature_names: + return X[self.selected_feature_names] + # Convert boolean mask to integer indices for iloc + selected_indices = np.where(self.global_mask)[0] + return X.iloc[:, selected_indices] + return X[:, self.global_mask] + + def save_results(self, filepath: str): + """Save results to JSON.""" + results = { + "freedom_degree": float(self.freedom_degree), + "fs_method": self.fs_method, + "aggregation_mode": self.aggregation_mode, + "auto_tune": self.auto_tune, + "eval_metric": self.eval_metric, + "global_mask": self.global_mask.tolist() if self.global_mask is not None else None, + "selected_feature_names": self.selected_feature_names, + "election_stats": { + k: ( + v.tolist() + if isinstance(v, np.ndarray) + else ( + int(v) + if isinstance(v, np.integer) + else ( + float(v) + if isinstance(v, np.floating) + else bool(v) if isinstance(v, np.bool_) else v.item() if isinstance(v, np.generic) else v + ) + ) + ) + for k, v in self.election_stats.items() + if k != "client_stats" # client_stats is a nested dict excluded from top-level persistence by design + }, + } + with open(filepath, "w") as f: + json.dump(results, f, indent=2) + + def load_results(self, filepath: str): + """Load results from JSON.""" + with open(filepath, "r") as f: + results = json.load(f) + + self.freedom_degree = results.get("freedom_degree", 0.5) + self.fs_method = results.get("fs_method", "lasso") + self.aggregation_mode = results.get("aggregation_mode", "weighted") + self.auto_tune = results.get("auto_tune", False) + self.eval_metric = results.get("eval_metric", "f1") + + if results.get("global_mask") is not None: + self.global_mask = np.array(results["global_mask"]) + + self.selected_feature_names = results.get("selected_feature_names") + self.election_stats = results.get("election_stats", {}) + + +# --- HELPER FUNCTIONS --- + + +_FEATURE_ELECTION_INIT_PARAMS = { + "aggregation_mode", + "auto_tune", + "tuning_rounds", + "eval_metric", + "wait_time_after_min_received", + "fs_params", +} + +_PREPARE_DATA_PARAMS = { + "split_ratios", + "random_state", + "dirichlet_alpha", +} + + +def quick_election( + df: pd.DataFrame, + target_col: str, + num_clients: int = 3, + freedom_degree: float = 0.5, + fs_method: str = "lasso", + split_strategy: str = "stratified", + **kwargs, +) -> Tuple[np.ndarray, Dict]: + """ + Quick Feature Election for tabular data (one-line solution). + + ``**kwargs`` are routed to either :class:`FeatureElection` or + :meth:`FeatureElection.prepare_data_splits` based on the parameter name. + Recognised split parameters: ``split_ratios``, ``random_state``, + ``dirichlet_alpha``. All other kwargs are forwarded to + :class:`FeatureElection` (e.g. ``aggregation_mode``, ``auto_tune``, + ``fs_params``). + """ + init_kwargs = {k: v for k, v in kwargs.items() if k not in _PREPARE_DATA_PARAMS} + split_kwargs = {k: v for k, v in kwargs.items() if k in _PREPARE_DATA_PARAMS} + + unknown = set(kwargs) - _FEATURE_ELECTION_INIT_PARAMS - _PREPARE_DATA_PARAMS + if unknown: + raise TypeError(f"quick_election() got unexpected keyword argument(s): {sorted(unknown)}") + + fe = FeatureElection(freedom_degree=freedom_degree, fs_method=fs_method, **init_kwargs) + client_data = fe.prepare_data_splits(df, target_col, num_clients, split_strategy=split_strategy, **split_kwargs) + stats = fe.simulate_election(client_data) + return fe.global_mask, stats + + +def load_election_results(filepath: str) -> Dict: + """ + Load election results from a JSON file. + """ + with open(filepath, "r") as f: + results = json.load(f) + return results diff --git a/tests/unit_test/app_opt/feature_election/__init__.py b/tests/unit_test/app_opt/feature_election/__init__.py new file mode 100644 index 0000000000..4fc25d0d3c --- /dev/null +++ b/tests/unit_test/app_opt/feature_election/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit_test/app_opt/feature_election/test.py b/tests/unit_test/app_opt/feature_election/test.py new file mode 100644 index 0000000000..99286be4e2 --- /dev/null +++ b/tests/unit_test/app_opt/feature_election/test.py @@ -0,0 +1,246 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Enhanced Unit Tests for Feature Election +Covers: +1. Initialization & Validation +2. Data Splitting Strategies +3. Job Configuration (FLARE) +4. Simulation (Election, Auto-tuning, Mask Application) +""" + +import json +import sys +from importlib.util import find_spec +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest +from sklearn.datasets import make_classification + +from nvflare.app_opt.feature_election import FeatureElection, quick_election + +PYIMPETUS_AVAILABLE = find_spec("PyImpetus") is not None + + +@pytest.fixture +def sample_data(): + """Create a consistent sample dataset for testing.""" + X, y = make_classification(n_samples=200, n_features=20, n_informative=10, n_redundant=5, random_state=42) + feature_names = [f"feature_{i}" for i in range(20)] + df = pd.DataFrame(X, columns=feature_names) + df["target"] = y + return df + + +class TestConfigurationAndValidation: + """Tests for class initialization, parameter validation, and Job Config generation.""" + + def test_initialization_defaults(self): + """Test default values.""" + fe = FeatureElection() + assert fe.freedom_degree == 0.5 + assert fe.auto_tune is False + assert fe.tuning_rounds == 5 + assert fe.fs_method == "lasso" + + def test_initialization_custom(self): + """Test custom parameters including new auto-tune args.""" + fe = FeatureElection(freedom_degree=0.8, fs_method="random_forest", auto_tune=True, tuning_rounds=10) + assert fe.freedom_degree == 0.8 + assert fe.auto_tune is True + assert fe.tuning_rounds == 10 + + def test_invalid_parameters(self): + """Test parameter bounds.""" + with pytest.raises(ValueError, match="freedom_degree"): + FeatureElection(freedom_degree=1.1) + + with pytest.raises(ValueError, match="aggregation_mode"): + FeatureElection(aggregation_mode="invalid_mode") + + def test_create_flare_job_structure(self, tmp_path): + """Test that the generated FL job contains all new fields (auto_tune, phases).""" + fe = FeatureElection(freedom_degree=0.5, auto_tune=True, tuning_rounds=3) + + output_dir = tmp_path / "jobs" + paths = fe.create_flare_job( + job_name="autotune_job", output_dir=str(output_dir), min_clients=2, num_rounds=10 # Total FL rounds + ) + + # 1. Check file existence + assert Path(paths["server_config"]).exists() + assert Path(paths["client_config"]).exists() + + # 2. Validate Server Config + with open(paths["server_config"]) as f: + server_config = json.load(f) + + workflow_args = server_config["workflows"][0]["args"] + + # Check standard args + assert workflow_args["freedom_degree"] == 0.5 + assert workflow_args["min_clients"] == 2 + + # Check NEW auto-tune args + assert workflow_args["auto_tune"] is True + assert workflow_args["tuning_rounds"] == 3 + assert workflow_args["num_rounds"] == 10 # Should be passed to controller for FL phase + + # 3. Validate Client Config + with open(paths["client_config"]) as f: + client_config = json.load(f) + + exec_args = client_config["executors"][0]["executor"]["args"] + assert exec_args["task_name"] == "feature_election" + + +class TestDataPreparation: + """Tests for data splitting logic.""" + + def test_split_stratified_counts(self, sample_data): + fe = FeatureElection() + splits = fe.prepare_data_splits(sample_data, "target", num_clients=3, split_strategy="stratified") + + assert len(splits) == 3 + # Check that we haven't lost data + total_len = sum(len(split_data) for split_data, _ in splits) + assert total_len == 200 + + def test_split_invalid_ratios(self, sample_data): + fe = FeatureElection() + with pytest.raises(ValueError): + fe.prepare_data_splits(sample_data, "target", split_ratios=[0.8, 0.8]) # > 1.0 + + +class TestSimulationLogic: + """ + Tests the in-memory simulation of the Feature Election process. + This simulates what happens inside the FLARE Controller/Executor interaction. + """ + + def test_simulate_election_basic(self, sample_data): + """Test standard one-shot election.""" + fe = FeatureElection(freedom_degree=0.5, fs_method="lasso", auto_tune=False) + client_data = fe.prepare_data_splits(sample_data, "target", num_clients=2) + + stats = fe.simulate_election(client_data) + + assert fe.global_mask is not None + assert 0 < np.sum(fe.global_mask) <= 20 + assert stats["freedom_degree"] == 0.5 + + def test_simulate_election_with_autotune(self, sample_data): + """ + Test that simulation runs with auto_tune=True. + + Note: In a pure simulation (without full FL communication overhead), + we want to ensure the logic flows through the tuning steps. + """ + # Start with a low freedom degree that likely needs adjustment + initial_fd = 0.1 + fe = FeatureElection(freedom_degree=initial_fd, fs_method="lasso", auto_tune=True, tuning_rounds=3) + + client_data = fe.prepare_data_splits(sample_data, "target", num_clients=2) + + stats = fe.simulate_election(client_data) + + # The simulation should have updated the freedom_degree in the stats + # It might be the same if 0.1 was optimal, but the object state should be consistent + assert fe.global_mask is not None + assert "freedom_degree" in stats + + # Ensure stats structure contains expected keys + assert "num_features_selected" in stats + assert "reduction_ratio" in stats + + # Verify that tuning actually ran: tuning_history must contain exactly + # tuning_rounds entries (one per hill-climbing iteration). An empty list + # here means the tuning code path was silently skipped. + assert "tuning_history" in stats + assert len(stats["tuning_history"]) == 3, ( + f"Expected 3 tuning history entries (one per tuning_round), " f"got {len(stats['tuning_history'])}" + ) + # Each entry must be a (freedom_degree, score) pair with valid types + for fd, score in stats["tuning_history"]: + assert isinstance(fd, float), f"FD entry {fd!r} is not a float" + assert isinstance(score, float), f"Score entry {score!r} is not a float" + assert 0.0 <= fd <= 1.0, f"FD {fd} is outside [0, 1]" + assert 0.0 <= score <= 1.0, f"Tuning score {score} is outside [0, 1]" + + def test_boundary_conditions(self, sample_data): + """Test Intersection (FD=0) and Union (FD=1).""" + client_data = FeatureElection().prepare_data_splits(sample_data, "target", num_clients=2) + + # Intersection + fe_int = FeatureElection(freedom_degree=0.0) + stats_int = fe_int.simulate_election(client_data) + n_int = stats_int["num_features_selected"] + + # Union + fe_union = FeatureElection(freedom_degree=1.0) + stats_union = fe_union.simulate_election(client_data) + n_union = stats_union["num_features_selected"] + + assert n_int <= n_union + # Intersection should match intersection_features stat + assert n_int == stats_int["intersection_features"] + + def test_apply_mask_consistency(self, sample_data): + """Ensure applying the mask returns the correct dataframe shape.""" + fe = FeatureElection(freedom_degree=0.5) + client_data = fe.prepare_data_splits(sample_data, "target", num_clients=2) + fe.simulate_election(client_data) + + num_selected = np.sum(fe.global_mask) + + # Apply to new data + X_new = sample_data.drop(columns=["target"]) + X_filtered = fe.apply_mask(X_new) + + assert X_filtered.shape[1] == num_selected + assert X_filtered.shape[0] == 200 + + +class TestQuickElectionHelper: + """Test the 'one-line' helper function.""" + + def test_quick_election_workflow(self, sample_data): + """Test the end-to-end quick helper.""" + mask, stats = quick_election( + sample_data, target_col="target", num_clients=2, fs_method="lasso", freedom_degree=0.6 + ) + + assert isinstance(mask, np.ndarray) + assert mask.dtype == bool + assert stats["num_clients"] == 2 + + +@pytest.mark.skipif(not PYIMPETUS_AVAILABLE, reason="PyImpetus not installed") +class TestAdvancedFeatures: + """Tests requiring optional dependencies.""" + + def test_pyimpetus_method(self, sample_data): + fe = FeatureElection(fs_method="pyimpetus") + client_data = fe.prepare_data_splits(sample_data, "target", num_clients=2) + stats = fe.simulate_election(client_data) + + assert stats["fs_method"] == "pyimpetus" + assert fe.global_mask is not None + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__]))