|
1 | 1 | # Standard |
2 | 2 | from multiprocessing import set_start_method |
3 | 3 | from pathlib import Path |
| 4 | +from unittest.mock import patch |
4 | 5 | import json |
5 | 6 | import logging |
6 | 7 | import os |
|
9 | 10 |
|
10 | 11 | # Third Party |
11 | 12 | from datasets import Dataset |
| 13 | +import h5py |
12 | 14 | import pytest |
13 | 15 | import torch |
14 | 16 |
|
@@ -51,68 +53,168 @@ def create_test_data(num_samples=50): |
51 | 53 | return test_data |
52 | 54 |
|
53 | 55 |
|
| 56 | +# Define mock process_folds_with_gpu function at module level |
| 57 | +def mock_process_folds_with_gpu(args): |
| 58 | + # Extract subset_sizes from args |
| 59 | + subset_sizes = args[3] |
| 60 | + # Return a consistent result for each subset size |
| 61 | + return [ |
| 62 | + ( |
| 63 | + 0, |
| 64 | + { |
| 65 | + size: {"indices": list(range(10)), "gains": [0.5] * 10} |
| 66 | + for size in subset_sizes |
| 67 | + }, |
| 68 | + ) |
| 69 | + ] |
| 70 | + |
| 71 | + |
| 72 | +# Mock Pool class that runs functions directly without multiprocessing |
| 73 | +class MockPool: |
| 74 | + def __init__(self, processes=None): |
| 75 | + pass |
| 76 | + |
| 77 | + def __enter__(self): |
| 78 | + return self |
| 79 | + |
| 80 | + def __exit__(self, *args): |
| 81 | + pass |
| 82 | + |
| 83 | + def map(self, func, iterable): |
| 84 | + # For this test, we care about process_folds_with_gpu so we use our mock function |
| 85 | + return [mock_process_folds_with_gpu(item) for item in iterable] |
| 86 | + |
| 87 | + |
54 | 88 | def test_subset_datasets_functional(): |
55 | 89 | """Functional test for subset_datasets.""" |
56 | | - set_start_method("spawn", force=True) |
57 | 90 | logger = logging.getLogger(__name__) |
58 | 91 |
|
59 | | - try: |
60 | | - # Create a temporary directory for input/output |
61 | | - with tempfile.TemporaryDirectory() as temp_dir: |
62 | | - # Generate synthetic test data |
63 | | - test_data = create_test_data(num_samples=50) |
64 | | - |
65 | | - # Save as JSONL file |
66 | | - input_file = Path(temp_dir) / "test_data.jsonl" |
67 | | - with open(input_file, "w") as f: |
68 | | - for item in test_data: |
69 | | - f.write(json.dumps(item) + "\n") |
70 | | - |
71 | | - logger.info(f"Created test file with {len(test_data)} samples") |
72 | | - |
73 | | - # Configure subset selection |
74 | | - input_files = [str(input_file)] |
75 | | - output_dir = os.path.join(temp_dir, "output") |
76 | | - |
77 | | - # Run subset selection |
78 | | - subset_datasets( |
79 | | - input_files=input_files, |
80 | | - output_dir=output_dir, |
81 | | - batch_size=10, # Small batch size for testing |
82 | | - num_folds=2, # Fewer folds for faster testing |
83 | | - subset_sizes=[20], # Select 20 samples |
84 | | - num_gpus=2, # Use 2 threads |
85 | | - encoder_type="arctic", |
86 | | - encoder_model="Snowflake/snowflake-arctic-embed-l-v2.0", |
87 | | - epsilon=0.1, # Small epsilon for small dataset |
88 | | - testing_mode=True, # Enable testing mode |
89 | | - ) |
90 | | - |
91 | | - # Verify outputs |
92 | | - dataset_name = "test_data" |
93 | | - dataset_output_dir = os.path.join(output_dir, dataset_name) |
94 | | - |
95 | | - # Check if embeddings were generated |
96 | | - assert os.path.exists( |
97 | | - os.path.join(dataset_output_dir, "embeddings", "embeddings.h5") |
98 | | - ), "Embeddings file not found" |
99 | | - |
100 | | - # Check if subset file was created |
101 | | - assert os.path.exists( |
102 | | - os.path.join( |
103 | | - dataset_output_dir, f"{dataset_name}_samples_20_subset.jsonl" |
| 92 | + # Create a mock encoder class |
| 93 | + class MockEncoder: |
| 94 | + def __init__(self, model_name=None, device=None, testing_mode=False, **kwargs): |
| 95 | + self.model_name = model_name |
| 96 | + self.device = device |
| 97 | + self.testing_mode = testing_mode |
| 98 | + |
| 99 | + def encode(self, inputs, instruction=None, **kwargs): |
| 100 | + # Return random embeddings of the right shape |
| 101 | + if isinstance(inputs, str): |
| 102 | + inputs = [inputs] |
| 103 | + return torch.randn(len(inputs), 768) |
| 104 | + |
| 105 | + # Create mock embeddings file |
| 106 | + def create_mock_embeddings(dataset, output_dir): |
| 107 | + os.makedirs(output_dir, exist_ok=True) |
| 108 | + merged_path = os.path.join(output_dir, "embeddings.h5") |
| 109 | + |
| 110 | + # Create actual mock embeddings file |
| 111 | + with h5py.File(merged_path, "w") as f: |
| 112 | + f.create_dataset("embeddings", data=torch.randn(len(dataset), 768).numpy()) |
| 113 | + |
| 114 | + return merged_path |
| 115 | + |
| 116 | + # Mock FacilityLocationFunction for subset selection |
| 117 | + class MockFacilityLocationFunction: |
| 118 | + def __init__(self, n, sijs, mode, separate_rep): |
| 119 | + self.n = n |
| 120 | + self.sijs = sijs |
| 121 | + self.mode = mode |
| 122 | + self.separate_rep = separate_rep |
| 123 | + |
| 124 | + def maximize( |
| 125 | + self, |
| 126 | + budget, |
| 127 | + optimizer, |
| 128 | + epsilon, |
| 129 | + stopIfZeroGain, |
| 130 | + stopIfNegativeGain, |
| 131 | + verbose, |
| 132 | + ): |
| 133 | + # Return mock subset results with indices and gains |
| 134 | + return [(i, 0.5) for i in range(budget)] |
| 135 | + |
| 136 | + # Setup all the mocks |
| 137 | + with ( |
| 138 | + patch( |
| 139 | + "instructlab.sdg.subset_selection.get_encoder_class", |
| 140 | + return_value=MockEncoder, |
| 141 | + ), |
| 142 | + patch("torch.cuda.device_count", return_value=1), |
| 143 | + patch("torch.cuda.is_available", return_value=True), |
| 144 | + patch( |
| 145 | + "instructlab.sdg.subset_selection.DataProcessor.generate_embeddings", |
| 146 | + side_effect=create_mock_embeddings, |
| 147 | + ), |
| 148 | + patch("submodlib.FacilityLocationFunction", MockFacilityLocationFunction), |
| 149 | + patch( |
| 150 | + "instructlab.sdg.subset_selection.compute_pairwise_dense", |
| 151 | + return_value=torch.randn(50, 50), |
| 152 | + ), |
| 153 | + patch( |
| 154 | + "instructlab.sdg.subset_selection.process_folds_with_gpu", |
| 155 | + mock_process_folds_with_gpu, |
| 156 | + ), |
| 157 | + patch("multiprocessing.set_start_method"), |
| 158 | + ): |
| 159 | + try: |
| 160 | + # Create a temporary directory |
| 161 | + with tempfile.TemporaryDirectory() as temp_dir: |
| 162 | + # Generate test data |
| 163 | + test_data = create_test_data(num_samples=50) |
| 164 | + |
| 165 | + # Save as JSONL |
| 166 | + input_file = Path(temp_dir) / "test_data.jsonl" |
| 167 | + with open(input_file, "w") as f: |
| 168 | + for item in test_data: |
| 169 | + f.write(json.dumps(item) + "\n") |
| 170 | + |
| 171 | + # Run subset selection with fast testing mode |
| 172 | + subset_datasets( |
| 173 | + input_files=[str(input_file)], |
| 174 | + output_dir=os.path.join(temp_dir, "output"), |
| 175 | + batch_size=10, |
| 176 | + num_folds=2, |
| 177 | + subset_sizes=[10, 0.2], # Test both absolute and percentage |
| 178 | + num_gpus=1, |
| 179 | + testing_mode=True, |
104 | 180 | ) |
105 | | - ), "20-sample subset file not found" |
106 | 181 |
|
107 | | - # Check if metadata file was created |
108 | | - assert os.path.exists( |
109 | | - os.path.join( |
110 | | - output_dir, |
111 | | - f"{dataset_name}_fl_2_partitions_samples_20_metadata.npz", |
| 182 | + # Verify outputs exist |
| 183 | + dataset_name = "test_data" |
| 184 | + output_dir = os.path.join(temp_dir, "output") |
| 185 | + dataset_output_dir = os.path.join(output_dir, dataset_name) |
| 186 | + |
| 187 | + # Check embeddings file |
| 188 | + assert os.path.exists( |
| 189 | + os.path.join(dataset_output_dir, "embeddings", "embeddings.h5") |
| 190 | + ) |
| 191 | + |
| 192 | + # Check subset files |
| 193 | + assert os.path.exists( |
| 194 | + os.path.join( |
| 195 | + dataset_output_dir, f"{dataset_name}_samples_10_subset.jsonl" |
| 196 | + ) |
| 197 | + ) |
| 198 | + percent_file = os.path.join( |
| 199 | + dataset_output_dir, f"{dataset_name}_percent_0.2_subset.jsonl" |
| 200 | + ) |
| 201 | + assert os.path.exists(percent_file) |
| 202 | + |
| 203 | + # Check metadata files |
| 204 | + assert os.path.exists( |
| 205 | + os.path.join( |
| 206 | + output_dir, |
| 207 | + f"{dataset_name}_fl_2_partitions_samples_10_metadata.npz", |
| 208 | + ) |
| 209 | + ) |
| 210 | + assert os.path.exists( |
| 211 | + os.path.join( |
| 212 | + output_dir, |
| 213 | + f"{dataset_name}_fl_2_partitions_percent_0.2_metadata.npz", |
| 214 | + ) |
112 | 215 | ) |
113 | | - ), "Metadata file for 20-sample subset not found" |
114 | 216 |
|
115 | | - finally: |
116 | | - # Clean up GPU memory if available |
117 | | - if torch.cuda.is_available(): |
118 | | - torch.cuda.empty_cache() |
| 217 | + finally: |
| 218 | + # Clean up |
| 219 | + if torch.cuda.is_available(): |
| 220 | + torch.cuda.empty_cache() |
0 commit comments