Skip to content

Commit b2bc535

Browse files
author
Hussain Jafari
committed
intermediate changes trying sharded testing
1 parent 628a595 commit b2bc535

File tree

6 files changed

+257
-46
lines changed

6 files changed

+257
-46
lines changed

src/pseudopeople/dataset.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,19 +132,17 @@ def _clean_input_data(self) -> None:
132132

133133
def _reformat_dates_for_noising(self) -> None:
134134
"""Formats date columns so they can be noised as strings."""
135-
data = self.data.copy()
136-
137135
for date_column in [COLUMNS.dob.name, COLUMNS.ssa_event_date.name]:
138136
# Format both the actual column, and the shadow version that will be used
139137
# to copy from a household member
140138
for column in [date_column, COPY_HOUSEHOLD_MEMBER_COLS.get(date_column)]:
141-
if column in data.columns and isinstance(column, str):
139+
if column in self.data.columns and isinstance(column, str):
142140
# Avoid running strftime on large data, since that will
143141
# re-parse the format string for each row
144142
# https://github.com/pandas-dev/pandas/issues/44764
145143
# Year is already guaranteed to be 4-digit: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-timestamp-limits
146-
is_na = data[column].isna()
147-
data_column = data.loc[~is_na, column]
144+
is_na = self.data[column].isna()
145+
data_column = self.data.loc[~is_na, column]
148146
year_string = data_column.dt.year.astype(str)
149147
month_string = _zfill_fast(data_column.dt.month.astype(str), 2)
150148
day_string = _zfill_fast(data_column.dt.day.astype(str), 2)
@@ -159,10 +157,8 @@ def _reformat_dates_for_noising(self) -> None:
159157
f"Invalid date format in {self.dataset_schema.name}."
160158
)
161159

162-
data[column] = pd.Series(np.nan, dtype=str)
163-
data.loc[~is_na, column] = result
164-
165-
self.data = data
160+
self.data[column] = pd.Series(np.nan, dtype=str)
161+
self.data.loc[~is_na, column] = result
166162

167163
@staticmethod
168164
def drop_non_schema_columns(

src/pseudopeople/entity_types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ def __call__(
149149
output_dtype = self.output_dtype_getter(input_dtype)
150150

151151
dataset.data[column_name] = ensure_dtype(dataset.data[column_name], output_dtype)
152-
153152
self.noise_function(
154153
dataset,
155154
configuration,

src/pseudopeople/interface.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,32 +38,49 @@ def _generate_dataset(
3838
filters: Sequence[DataFilter],
3939
verbose: bool,
4040
engine_name: Literal["pandas"],
41+
concat: Literal[True],
4142
) -> pd.DataFrame:
4243
...
4344

4445

4546
@overload
4647
def _generate_dataset(
4748
dataset_schema: DatasetSchema,
48-
source: Path | str | None,
49+
source: list[pd.DataFrame],
50+
seed: int,
51+
config: Path | str | dict[str, Any] | None,
52+
filters: Sequence[DataFilter],
53+
verbose: bool,
54+
engine_name: Literal["pandas"],
55+
concat_output: Literal[False],
56+
) -> list[pd.DataFrame]:
57+
...
58+
59+
60+
@overload
61+
def _generate_dataset(
62+
dataset_schema: DatasetSchema,
63+
source: Path | str | None | dd.DataFrame,
4964
seed: int,
5065
config: Path | str | dict[str, Any] | None,
5166
filters: Sequence[DataFilter],
5267
verbose: bool,
5368
engine_name: Literal["dask"],
69+
concat_output: bool,
5470
) -> dd.DataFrame:
5571
...
5672

5773

5874
def _generate_dataset(
5975
dataset_schema: DatasetSchema,
60-
source: Path | str | None,
76+
source: Path | str | None | list[pd.DataFrame] | dd.DataFrame,
6177
seed: int,
6278
config: Path | str | dict[str, Any] | None,
6379
filters: Sequence[DataFilter],
6480
verbose: bool = False,
6581
engine_name: Literal["pandas", "dask"] = "pandas",
66-
) -> pd.DataFrame | dd.DataFrame:
82+
concat_output = True,
83+
) -> pd.DataFrame | dd.DataFrame | list[pd.DataFrame]:
6784
"""
6885
Helper for generating noised datasets.
6986
@@ -85,18 +102,19 @@ def _generate_dataset(
85102
:return:
86103
Noised dataset data in a dataframe
87104
"""
105+
breakpoint()
88106
configure_logging_to_terminal(verbose)
89107
noise_configuration = get_configuration(config, dataset_schema, filters)
90108

91109
if source is None:
92110
source = paths.SAMPLE_DATA_ROOT
93-
else:
111+
elif isinstance(source, Path) or isinstance(source, str):
94112
source = Path(source)
95113
validate_source_compatibility(source, dataset_schema)
96114

97115
engine = get_engine_from_string(engine_name)
98116

99-
noised_dataset: pd.DataFrame | dd.DataFrame
117+
noised_dataset: pd.DataFrame | dd.DataFrame | list[pd.DataFrame]
100118
if engine == PANDAS_ENGINE:
101119
# We process shards serially
102120
data_file_paths = get_dataset_filepaths(source, dataset_schema.name)
@@ -137,9 +155,11 @@ def _generate_dataset(
137155
"Invalid value provided for 'state' or 'year'. No data found with "
138156
f"the user provided 'state' or 'year' filters at {source / dataset_schema.name}."
139157
)
140-
noised_dataset = pd.concat(noised_datasets_list, ignore_index=True)
141-
142-
noised_dataset = coerce_dtypes(noised_dataset, dataset_schema)
158+
if concat_output:
159+
noised_dataset = pd.concat(noised_datasets_list, ignore_index=True)
160+
noised_dataset = coerce_dtypes(noised_dataset, dataset_schema)
161+
else:
162+
noised_dataset = [coerce_dtypes(dataset, dataset_schema) for dataset in noised_dataset]
143163
else: # dask
144164
try:
145165
from distributed.client import default_client

src/pseudopeople/noise_functions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ def copy_from_household_member(
305305
:param to_noise_index: pd.Index of rows to be noised
306306
:param column_name: String for column that will be noised
307307
"""
308-
309308
copy_values = dataset.data.loc[to_noise_index, COPY_HOUSEHOLD_MEMBER_COLS[column_name]]
310309
dataset.data.loc[to_noise_index, column_name] = ensure_dtype(
311310
pd.Series(copy_values, name=column_name, index=to_noise_index),

tests/integration/release/test_release.py

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from tests.constants import TOKENS_PER_STRING_MAPPER
2828
from tests.integration.conftest import SEED, IDX_COLS
2929
from tests.integration.release.conftest import DATASET_ARG_TO_FULL_NAME_MAPPER
30-
from tests.integration.release.utilities import run_omit_row_tests, run_do_not_respond_tests, run_guardian_duplication_tests
30+
from tests.integration.release.utilities import load_unnoised_data, run_omit_row_tests, run_do_not_respond_tests, run_guardian_duplication_tests
3131
from tests.unit.test_configuration import COLUMN_NOISE_TYPES
3232
from tests.utilities import (
3333
get_single_noise_type_config,
@@ -38,7 +38,7 @@
3838
ROW_TEST_FUNCTIONS = {'omit_row': run_omit_row_tests,
3939
'do_not_respond': run_do_not_respond_tests,
4040
'duplicate_with_guardian': run_guardian_duplication_tests}
41-
NEW_PROBABILITY = 0.03
41+
NEW_PROBABILITY = 0.2
4242

4343

4444
def get_high_noise_config(
@@ -49,7 +49,7 @@ def get_high_noise_config(
4949

5050
for noise_type, probabilities in config_dict[dataset_name][Keys.ROW_NOISE].items():
5151
for probability_name, probability in probabilities.items():
52-
config_dict[dataset_name][Keys.ROW_NOISE][noise_type][probability_name] = .03
52+
config_dict[dataset_name][Keys.ROW_NOISE][noise_type][probability_name] = NEW_PROBABILITY
5353

5454
for col, noise_types in config_dict[dataset_name][Keys.COLUMN_NOISE].items():
5555
for noise_type, probabilities in noise_types.items():
@@ -62,7 +62,7 @@ def get_high_noise_config(
6262
# NOTE: this will fail default config validations
6363
new_probability = {1: 1.0}
6464
else:
65-
new_probability = .03
65+
new_probability = NEW_PROBABILITY
6666
config_dict[dataset_name][Keys.COLUMN_NOISE][col][noise_type][
6767
probability_name
6868
] = new_probability
@@ -98,6 +98,10 @@ def test_release_runs(
9898
if dataset_func != generate_social_security:
9999
unnoised_data_kwargs["state"] = state
100100
unnoised_data = dataset_func(**unnoised_data_kwargs)
101+
full_dataset_name = DATASET_ARG_TO_FULL_NAME_MAPPER[dataset_name]
102+
dataset_schema = DATASET_SCHEMAS.get_dataset_schema(full_dataset_name)
103+
#new_unnoised_data = load_unnoised_data(dataset_schema, source, year, state, engine)
104+
#breakpoint()
101105

102106
if source is None:
103107
population = 'sample'
@@ -106,20 +110,17 @@ def test_release_runs(
106110
else:
107111
population = 'usa'
108112
timestr = time.strftime("%Y%m%d-%H%M%S")
109-
filename = f"/ihme/homes/hjafari/ppl_runs_new/{timestr}_{dataset_name}_{population}.o"
113+
filename = f"/ihme/homes/hjafari/ppl_runs_new/{timestr}_{dataset_name}_{population}_{NEW_PROBABILITY*100}_percent.o"
110114

111115
# In our standard noising process, i.e. when noising a shard of data, we
112116
# 1) clean and reformat the data, 2) noise the data, and 3) do some post-processing.
113117
# We're replicating steps 1 and 2 in this test and skipping 3.
114-
full_dataset_name = DATASET_ARG_TO_FULL_NAME_MAPPER[dataset_name]
115-
dataset_schema = DATASET_SCHEMAS.get_dataset_schema(full_dataset_name)
116118
dataset = Dataset(dataset_schema, unnoised_data, SEED)
117-
# don't unnecessarily keep in memory
119+
# don't unnecessarily keep in memory now that we have the data in dataset
118120
del unnoised_data
119121
dataset._clean_input_data()
120122
# convert datetime columns to datetime types for _reformat_dates_for_noising
121-
# because the post-processing that occured in generating the unnoised data
122-
# in step 3 mentioned above converts these columns to object dtypes
123+
# because we coerce these types into object types when loading unnoised data
123124
for col in [COLUMNS.dob.name, COLUMNS.ssa_event_date.name]:
124125
if col in dataset.data:
125126
dataset.data[col] = pd.to_datetime(
@@ -133,7 +134,9 @@ def test_release_runs(
133134
dataset._reformat_dates_for_noising()
134135

135136
config = NoiseConfiguration(LayeredConfigTree(get_high_noise_config(full_dataset_name)))
136-
137+
138+
139+
137140
for noise_type in NOISE_TYPES:
138141
original_data = dataset.data.copy()
139142
# if isinstance(noise_type, RowNoiseType):
@@ -145,12 +148,15 @@ def test_release_runs(
145148
for column in dataset.data.columns:
146149
if config.has_noise_type(
147150
dataset.dataset_schema.name, noise_type.name, column
148-
):
149-
noise_type(dataset, config, column)
150-
run_column_noising_test(original_data, dataset.data, config, full_dataset_name, noise_type.name, column, fuzzy_checker, filename)
151+
):# and noise_type.name == 'copy_from_household_member' and column == 'age':
152+
if column == COLUMNS.ssa_event_type.name:
153+
pass
154+
else:
155+
noise_type(dataset, config, column)
156+
run_column_noising_test(original_data, dataset.data, config, full_dataset_name, noise_type.name, column, fuzzy_checker, filename)
151157
with check:
152-
# TODO: possible to replace missingness with smaller data structure?
153158
try:
159+
# TODO: possible to replace missingness with smaller data structure?
154160
assert dataset.missingness.equals(dataset.is_missing(dataset.data))
155161
except:
156162
breakpoint()
@@ -164,6 +170,72 @@ def test_release_runs(
164170
# run_final_tests(unnoised_data, dataset.data)
165171

166172

173+
def test_release_runs(
174+
dataset_params: tuple[
175+
str,
176+
Callable[..., pd.DataFrame],
177+
str | None,
178+
int | None,
179+
str | None,
180+
Literal["pandas", "dask"],
181+
],
182+
fuzzy_checker: FuzzyChecker,
183+
mocker: MockerFixture,
184+
) -> None:
185+
# keep all columns when generating unnoised data because some of them are used in testing
186+
mocker.patch(
187+
"pseudopeople.dataset.Dataset.drop_non_schema_columns", side_effect=lambda df, _: df
188+
)
189+
190+
# create unnoised dataset
191+
dataset_name, dataset_func, source, year, state, engine = dataset_params
192+
unnoised_data_kwargs = {
193+
"source": source,
194+
"config": NO_NOISE,
195+
"year": year,
196+
"engine": engine,
197+
"concat_output": False,
198+
}
199+
if dataset_func != generate_social_security:
200+
unnoised_data_kwargs["state"] = state
201+
unnoised_data = dataset_func(**unnoised_data_kwargs)
202+
#full_dataset_name = DATASET_ARG_TO_FULL_NAME_MAPPER[dataset_name]
203+
#dataset_schema = DATASET_SCHEMAS.get_dataset_schema(full_dataset_name)
204+
#new_unnoised_data = load_unnoised_data(dataset_schema, source, year, state, engine)
205+
206+
# In our standard noising process, i.e. when noising a shard of data, we
207+
# 1) clean and reformat the data, 2) noise the data, and 3) do some post-processing.
208+
# We're replicating steps 1 and 2 in this test and skipping 3.
209+
full_dataset_name = DATASET_ARG_TO_FULL_NAME_MAPPER[dataset_name]
210+
dataset_schema = DATASET_SCHEMAS.get_dataset_schema(full_dataset_name)
211+
dataset = Dataset(dataset_schema, unnoised_data, SEED)
212+
# don't unnecessarily keep in memory now that we have the data in dataset
213+
del unnoised_data
214+
dataset._clean_input_data()
215+
# convert datetime columns to datetime types for _reformat_dates_for_noising
216+
# because we coerce these types into object types when loading unnoised data
217+
for col in [COLUMNS.dob.name, COLUMNS.ssa_event_date.name]:
218+
if col in dataset.data:
219+
dataset.data[col] = pd.to_datetime(
220+
dataset.data[col], format=dataset_schema.date_format
221+
)
222+
copy_col = "copy_" + col
223+
if copy_col in dataset.data:
224+
dataset.data[copy_col] = pd.to_datetime(
225+
dataset.data[copy_col], format=dataset_schema.date_format
226+
)
227+
# TODO: mock this to do nothing
228+
dataset._reformat_dates_for_noising()
229+
230+
# TODO: generate dictionary of noise configs with key as noise type and value is config
231+
232+
# for noise_type, config in config_dict:
233+
# # TODO: case pandas vs dask
234+
# pre_noised = [x.copy() for x in unnoised_data]
235+
# noised_data = dataset_func(source=prenoised_data, config)
236+
# check_noise(pre_noised, noised_data)
237+
238+
167239
def _get_common_datasets(
168240
unnoised_dataset: Dataset, noised_dataset: pd.DataFrame
169241
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Index[int]]:
@@ -218,12 +290,9 @@ def run_column_noising_test(
218290
) -> None:
219291
dataset_schema = DATASET_SCHEMAS.get_dataset_schema(dataset_name)
220292
original_dataset = Dataset(dataset_schema, original_data, SEED)
221-
try:
222-
check_noised, check_original, shared_idx = _get_common_datasets(
223-
original_dataset, noised_data
224-
)
225-
except:
226-
breakpoint()
293+
check_noised, check_original, shared_idx = _get_common_datasets(
294+
original_dataset, noised_data
295+
)
227296
# TODO: remove population param which was just used in testing the tests
228297
check_column_noising(dataset_name, config, fuzzy_checker, check_noised, check_original, shared_idx, noise_type, column, filename)
229298

@@ -305,10 +374,7 @@ def check_column_noising(
305374

306375
# This is accumulating not_noised over all noise types
307376
expected_noise = avg_probability_any_token_noised * expected_noise
308-
# if no_differences:
309-
# with open(filename, "a") as f:
310-
# info = f"no differences for NOISE_TYPE_{noise_type}_COL_{col.name} and expected noise level of {expected_noise} or {expected_noise * len(check_original.loc[to_compare_idx, col.name])} simulants\n"
311-
# f.write(info)
377+
open(filename, 'a').close()
312378
try:
313379
fuzzy_checker.fuzzy_assert_proportion(
314380
name=noise_type,
@@ -332,7 +398,7 @@ def check_column_noising(
332398
f.write(info)
333399
else:
334400
with open(filename, "a") as f:
335-
info = f"NOISE_TYPE_{noise_type}_COL_{col.name} issue fuzzy checking: expected {expected_noise} but got {noise_level / len(check_original.loc[to_compare_idx, col.name])}\n"
401+
info = f"NOISE_TYPE_{noise_type}_COL_{col.name} issue fuzzy checking: expected {expected_noise} but got {noise_level / len(check_original.loc[to_compare_idx, col.name])} from {noise_level} / {len(check_original.loc[to_compare_idx, col.name])}\n"
336402
f.write(info)
337403

338404

0 commit comments

Comments
 (0)