Skip to content

Commit a04bf42

Browse files
author
Hussain Jafari
committed
noise datasets shardwise
1 parent 529d1bd commit a04bf42

File tree

5 files changed

+282
-162
lines changed

5 files changed

+282
-162
lines changed

src/pseudopeople/filter.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
from __future__ import annotations
22

3+
from collections.abc import Sequence
34
from dataclasses import dataclass
5+
from typing import cast
46

57
import pandas as pd
68

9+
from pseudopeople.constants.metadata import DatasetNames
10+
from pseudopeople.schema_entities import DatasetSchema
11+
from pseudopeople.utilities import get_state_abbreviation
12+
713

814
@dataclass
915
class DataFilter:
@@ -13,3 +19,51 @@ class DataFilter:
1319

1420
def to_tuple(self) -> tuple[str, str, str | int | pd.Timestamp]:
1521
return self.column_name, self.operator, self.value
22+
23+
24+
def get_generate_data_filters(
25+
dataset_schema: DatasetSchema, year: int | None, state: str | None
26+
) -> Sequence[DataFilter]:
27+
filters = []
28+
29+
# add year filter for SSA
30+
if dataset_schema.name == DatasetNames.SSA:
31+
if year is not None:
32+
try:
33+
filters.append(
34+
DataFilter(
35+
dataset_schema.date_column_name,
36+
"<=",
37+
pd.Timestamp(year=year, month=12, day=31),
38+
)
39+
)
40+
except (pd.errors.OutOfBoundsDatetime, ValueError):
41+
raise ValueError(f"Invalid year provided: '{year}'")
42+
# add state filters except for SSA which does not have a state column
43+
else:
44+
if state is not None:
45+
state_column = cast(str, dataset_schema.state_column_name)
46+
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
47+
48+
# add non-SSA year filters
49+
if dataset_schema.name == DatasetNames.ACS or dataset_schema.name == DatasetNames.CPS:
50+
if year is not None:
51+
try:
52+
date_lower_filter = DataFilter(
53+
dataset_schema.date_column_name,
54+
">=",
55+
pd.Timestamp(year=year, month=1, day=1),
56+
)
57+
date_upper_filter = DataFilter(
58+
dataset_schema.date_column_name,
59+
"<=",
60+
pd.Timestamp(year=year, month=12, day=31),
61+
)
62+
filters.extend([date_lower_filter, date_upper_filter])
63+
except (pd.errors.OutOfBoundsDatetime, ValueError):
64+
raise ValueError(f"Invalid year provided: '{year}'")
65+
else:
66+
if year is not None and dataset_schema.name != DatasetNames.SSA:
67+
filters.append(DataFilter(dataset_schema.date_column_name, "==", year))
68+
69+
return filters

src/pseudopeople/noise_functions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from typing import TYPE_CHECKING
44

5+
import dask.dataframe as dd
56
import numpy as np
67
import numpy.typing as npt
78
import pandas as pd
@@ -95,7 +96,7 @@ def apply_do_not_respond(
9596

9697
# Helper function to format group dataframe and merging with their dependents
9798
def merge_dependents_and_guardians(
98-
dependents_df: pd.DataFrame, full_data: pd.DataFrame
99+
dependents_df: pd.DataFrame | dd.DataFrame, full_data: pd.DataFrame | dd.DataFrame
99100
) -> pd.DataFrame:
100101
# Merge dependents with their guardians. We have to merge twice to check
101102
# if either guardian is living at a separate location from the dependent.

tests/integration/release/test_release.py

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from __future__ import annotations
22

33
from collections.abc import Callable
4+
from pathlib import Path
45
from typing import Any, Literal
56

7+
import dask.dataframe as dd
68
import numpy as np
79
import pandas as pd
8-
import pytest
910
from _pytest.fixtures import FixtureRequest
1011
from layered_config_tree import LayeredConfigTree
1112
from pytest_mock import MockerFixture
@@ -14,24 +15,39 @@
1415
from pseudopeople.configuration import Keys, get_configuration
1516
from pseudopeople.configuration.entities import NO_NOISE
1617
from pseudopeople.configuration.noise_configuration import NoiseConfiguration
18+
from pseudopeople.constants import paths
1719
from pseudopeople.constants.metadata import DatasetNames
1820
from pseudopeople.constants.noise_type_metadata import (
1921
GUARDIAN_DUPLICATION_ADDRESS_COLUMNS,
2022
)
2123
from pseudopeople.dataset import Dataset
2224
from pseudopeople.entity_types import ColumnNoiseType, RowNoiseType
23-
from pseudopeople.interface import generate_decennial_census, generate_social_security
25+
from pseudopeople.filter import get_generate_data_filters
26+
from pseudopeople.interface import (
27+
generate_social_security,
28+
get_dataset_filepaths,
29+
validate_source_compatibility,
30+
)
31+
from pseudopeople.loader import load_standard_dataset
2432
from pseudopeople.noise_entities import NOISE_TYPES
2533
from pseudopeople.noise_functions import merge_dependents_and_guardians
2634
from pseudopeople.schema_entities import COLUMNS, DATASET_SCHEMAS
35+
from pseudopeople.utilities import DASK_ENGINE, get_engine_from_string
2736
from tests.integration.conftest import SEED, _get_common_datasets
28-
from tests.integration.release.conftest import DATASET_ARG_TO_FULL_NAME_MAPPER
37+
from tests.integration.release.conftest import (
38+
DATASET_ARG_TO_FULL_NAME_MAPPER,
39+
RI_FILEPATH,
40+
)
2941
from tests.integration.release.utilities import (
3042
run_do_not_respond_tests,
3143
run_guardian_duplication_tests,
3244
run_omit_row_tests,
3345
)
34-
from tests.utilities import initialize_dataset_with_sample, run_column_noising_tests
46+
from tests.utilities import (
47+
get_single_noise_type_config,
48+
initialize_dataset_with_sample,
49+
run_column_noising_tests,
50+
)
3551

3652
ROW_TEST_FUNCTIONS = {
3753
"omit_row": run_omit_row_tests,
@@ -40,69 +56,55 @@
4056
}
4157

4258

43-
def test_release_runs(
59+
def test_release_row_noising(
4460
dataset_params: tuple[
4561
str,
4662
Callable[..., pd.DataFrame],
47-
str | None,
63+
Path | str | None,
4864
int | None,
4965
str | None,
5066
Literal["pandas", "dask"],
5167
],
5268
fuzzy_checker: FuzzyChecker,
53-
mocker: MockerFixture,
5469
) -> None:
55-
# keep all columns when generating unnoised data because some of them are used in testing
56-
mocker.patch(
57-
"pseudopeople.dataset.Dataset.drop_non_schema_columns", side_effect=lambda df, _: df
58-
)
59-
60-
# create unnoised dataset
61-
dataset_name, dataset_func, source, year, state, engine = dataset_params
62-
unnoised_data_kwargs = {
63-
"source": source,
64-
"config": NO_NOISE,
65-
"year": year,
66-
"engine": engine,
67-
}
68-
if dataset_func != generate_social_security:
69-
unnoised_data_kwargs["state"] = state
70-
unnoised_data = dataset_func(**unnoised_data_kwargs)
71-
72-
# In our standard noising process, i.e. when noising a shard of data, we
73-
# 1) clean and reformat the data, 2) noise the data, and 3) do some post-processing.
74-
# We're replicating steps 1 and 2 in this test and skipping 3.
70+
dataset_name, _, source, year, state, engine_name = dataset_params
7571
full_dataset_name = DATASET_ARG_TO_FULL_NAME_MAPPER[dataset_name]
7672
dataset_schema = DATASET_SCHEMAS.get_dataset_schema(full_dataset_name)
77-
dataset = Dataset(dataset_schema, unnoised_data, SEED)
78-
# don't unnecessarily keep in memory
79-
del unnoised_data
80-
dataset._clean_input_data()
81-
# convert datetime columns to datetime types for _reformat_dates_for_noising
82-
# because the post-processing that occured in generating the unnoised data
83-
# in step 3 mentioned above converts these columns to object dtypes
84-
for col in [COLUMNS.dob.name, COLUMNS.ssa_event_date.name]:
85-
if col in dataset.data:
86-
dataset.data[col] = pd.to_datetime(
87-
dataset.data[col], format=dataset_schema.date_format
88-
)
89-
copy_col = "copy_" + col
90-
if copy_col in dataset.data:
91-
dataset.data[copy_col] = pd.to_datetime(
92-
dataset.data[copy_col], format=dataset_schema.date_format
93-
)
94-
dataset._reformat_dates_for_noising()
95-
9673
config = get_configuration()
74+
# config = NoiseConfiguration(LayeredConfigTree(config_dict))
75+
76+
# update parameters
77+
if source is None:
78+
source = paths.SAMPLE_DATA_ROOT
79+
elif isinstance(source, str) or isinstance(source, Path):
80+
source = Path(source)
81+
validate_source_compatibility(source, dataset_schema)
82+
83+
engine = get_engine_from_string(engine_name)
84+
85+
data_file_paths = get_dataset_filepaths(Path(source), dataset_schema.name)
86+
filters = get_generate_data_filters(dataset_schema, year, state)
87+
unnoised_data = [load_standard_dataset(path, filters, engine) for path in data_file_paths]
88+
89+
if engine == DASK_ENGINE:
90+
# TODO: [MIC-5960] move this compute to later in the code
91+
dataset_data: list[pd.DataFrame] = [data.compute() for data in unnoised_data if len(data) != 0] # type: ignore [operator]
92+
else:
93+
dataset_data = [data for data in unnoised_data if len(data) != 0] # type: ignore [misc]
94+
95+
if str(source) == RI_FILEPATH and (dataset_name == "acs" or dataset_name == "cps"):
96+
dataset_data = [pd.concat(dataset_data).reset_index()]
97+
98+
datasets = [Dataset(dataset_schema, data, SEED) for data in dataset_data]
9799

98100
for noise_type in NOISE_TYPES:
99-
original_data = dataset.data.copy()
101+
prenoised_dataframes = [dataset.data.copy() for dataset in datasets]
100102
if isinstance(noise_type, RowNoiseType):
101-
if config.has_noise_type(dataset.dataset_schema.name, noise_type.name):
102-
noise_type(dataset, config)
103+
if config.has_noise_type(dataset_schema.name, noise_type.name):
104+
[noise_type(dataset, config) for dataset in datasets]
103105
test_function = ROW_TEST_FUNCTIONS[noise_type.name]
104106
test_function(
105-
original_data, dataset.data, config, full_dataset_name, fuzzy_checker
107+
prenoised_dataframes, datasets, config, full_dataset_name, fuzzy_checker
106108
)
107109

108110

0 commit comments

Comments
 (0)