Skip to content

Commit e3acc76

Browse files
sequential row noising (#509)
Category: feature JIRA issue: MIC-5982 Add tests for sequential row noising. Read in data using load_standard_dataset and noise by looping through noise types, all shard-wise except RI with ACS, CPS, and WIC. Testing Ran new tests with ACS, CPS, and WIC on USA and RI and SSA on RI.
1 parent f25b8f6 commit e3acc76

File tree

8 files changed

+404
-321
lines changed

8 files changed

+404
-321
lines changed

src/pseudopeople/filter.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
from __future__ import annotations
22

3+
from collections.abc import Sequence
34
from dataclasses import dataclass
5+
from typing import cast
46

57
import pandas as pd
68

9+
from pseudopeople.constants.metadata import DatasetNames
10+
from pseudopeople.schema_entities import DatasetSchema
11+
from pseudopeople.utilities import get_state_abbreviation
12+
713

814
@dataclass
915
class DataFilter:
@@ -13,3 +19,37 @@ class DataFilter:
1319

1420
def to_tuple(self) -> tuple[str, str, str | int | pd.Timestamp]:
1521
return self.column_name, self.operator, self.value
22+
23+
24+
def get_data_filters(
25+
dataset_schema: DatasetSchema, year: int | None = 2020, state: str | None = None
26+
) -> Sequence[DataFilter]:
27+
filters = []
28+
if dataset_schema.has_state_filter and state is not None:
29+
state_column = cast(str, dataset_schema.state_column_name)
30+
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
31+
32+
if year is not None:
33+
try:
34+
if dataset_schema.has_year_lower_filter:
35+
date_lower_filter = DataFilter(
36+
dataset_schema.date_column_name,
37+
">=",
38+
pd.Timestamp(year=year, month=1, day=1),
39+
)
40+
filters.append(date_lower_filter)
41+
42+
if dataset_schema.has_year_upper_filter:
43+
date_upper_filter = DataFilter(
44+
dataset_schema.date_column_name,
45+
"<=",
46+
pd.Timestamp(year=year, month=12, day=31),
47+
)
48+
filters.append(date_upper_filter)
49+
except (pd.errors.OutOfBoundsDatetime, ValueError):
50+
raise ValueError(f"Invalid year provided: '{year}'")
51+
52+
if dataset_schema.has_exact_year_filter:
53+
filters.append(DataFilter(dataset_schema.date_column_name, "==", year))
54+
55+
return filters

src/pseudopeople/interface.py

Lines changed: 11 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from pseudopeople.constants import paths
1515
from pseudopeople.dataset import noise_data
1616
from pseudopeople.exceptions import DataSourceError
17-
from pseudopeople.filter import DataFilter
17+
from pseudopeople.filter import DataFilter, get_data_filters
1818
from pseudopeople.loader import load_standard_dataset
1919
from pseudopeople.schema_entities import DATASET_SCHEMAS, DatasetSchema
2020
from pseudopeople.utilities import (
@@ -196,6 +196,9 @@ def _generate_dataset(
196196

197197

198198
def validate_source_compatibility(source: Path, dataset_schema: DatasetSchema) -> None:
199+
"""Validate that a given source is compatible with the provided dataset schema by checking that
200+
1) data exist for said schema in the provided source path and that 2) the data is the expected version
201+
as specified in its CHANGELOG."""
199202
# TODO [MIC-4546]: Clean this up w/ metadata and update test_interface.py tests to be generic
200203
directories = [x.name for x in source.iterdir() if x.is_dir()]
201204
if dataset_schema.name not in directories:
@@ -341,12 +344,7 @@ def generate_decennial_census(
341344
The simulated population has no data for this dataset in the
342345
specified year or state.
343346
"""
344-
filters: list[DataFilter] = []
345-
if year is not None:
346-
filters.append(DataFilter(DATASET_SCHEMAS.census.date_column_name, "==", year))
347-
if state is not None:
348-
state_column_name = cast(str, DATASET_SCHEMAS.census.state_column_name)
349-
filters.append(DataFilter(state_column_name, "==", get_state_abbreviation(state)))
347+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.census, year, state)
350348
return _generate_dataset(
351349
DATASET_SCHEMAS.census,
352350
source,
@@ -472,26 +470,9 @@ def generate_american_community_survey(
472470
The simulated population has no data for this dataset in the
473471
specified year or state.
474472
"""
475-
filters = []
473+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.acs, year, state)
476474
if year is not None:
477-
try:
478-
date_lower_filter = DataFilter(
479-
DATASET_SCHEMAS.acs.date_column_name,
480-
">=",
481-
pd.Timestamp(year=year, month=1, day=1),
482-
)
483-
date_upper_filter = DataFilter(
484-
DATASET_SCHEMAS.acs.date_column_name,
485-
"<=",
486-
pd.Timestamp(year=year, month=12, day=31),
487-
)
488-
filters.extend([date_lower_filter, date_upper_filter])
489-
except (pd.errors.OutOfBoundsDatetime, ValueError):
490-
raise ValueError(f"Invalid year provided: '{year}'")
491475
seed = seed * 10_000 + year
492-
if state is not None:
493-
state_column = cast(str, DATASET_SCHEMAS.acs.state_column_name)
494-
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
495476
return _generate_dataset(
496477
DATASET_SCHEMAS.acs, source, seed, config, filters, verbose, engine_name=engine
497478
)
@@ -612,26 +593,9 @@ def generate_current_population_survey(
612593
The simulated population has no data for this dataset in the
613594
specified year or state.
614595
"""
615-
filters = []
596+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.cps, year, state)
616597
if year is not None:
617-
try:
618-
date_lower_filter = DataFilter(
619-
DATASET_SCHEMAS.acs.date_column_name,
620-
">=",
621-
pd.Timestamp(year=year, month=1, day=1),
622-
)
623-
date_upper_filter = DataFilter(
624-
DATASET_SCHEMAS.acs.date_column_name,
625-
"<=",
626-
pd.Timestamp(year=year, month=12, day=31),
627-
)
628-
filters.extend([date_lower_filter, date_upper_filter])
629-
except (pd.errors.OutOfBoundsDatetime, ValueError):
630-
raise ValueError(f"Invalid year provided: '{year}'")
631598
seed = seed * 10_000 + year
632-
if state is not None:
633-
state_column = cast(str, DATASET_SCHEMAS.cps.state_column_name)
634-
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
635599
return _generate_dataset(
636600
DATASET_SCHEMAS.cps, source, seed, config, filters, verbose, engine_name=engine
637601
)
@@ -743,13 +707,9 @@ def generate_taxes_w2_and_1099(
743707
The simulated population has no data for this dataset in the
744708
specified year or state.
745709
"""
746-
filters = []
710+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.tax_w2_1099, year, state)
747711
if year is not None:
748-
filters.append(DataFilter(DATASET_SCHEMAS.tax_w2_1099.date_column_name, "==", year))
749712
seed = seed * 10_000 + year
750-
if state is not None:
751-
state_column = cast(str, DATASET_SCHEMAS.tax_w2_1099.state_column_name)
752-
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
753713
return _generate_dataset(
754714
DATASET_SCHEMAS.tax_w2_1099,
755715
source,
@@ -878,13 +838,9 @@ def generate_women_infants_and_children(
878838
The simulated population has no data for this dataset in the
879839
specified year or state.
880840
"""
881-
filters = []
841+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.wic, year, state)
882842
if year is not None:
883-
filters.append(DataFilter(DATASET_SCHEMAS.wic.date_column_name, "==", year))
884843
seed = seed * 10_000 + year
885-
if state is not None:
886-
state_column = cast(str, DATASET_SCHEMAS.wic.state_column_name)
887-
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
888844
return _generate_dataset(
889845
DATASET_SCHEMAS.wic, source, seed, config, filters, verbose, engine_name=engine
890846
)
@@ -984,18 +940,8 @@ def generate_social_security(
984940
The simulated population has no data for this dataset in the
985941
specified year or any prior years.
986942
"""
987-
filters = []
943+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.ssa, year)
988944
if year is not None:
989-
try:
990-
filters.append(
991-
DataFilter(
992-
DATASET_SCHEMAS.ssa.date_column_name,
993-
"<=",
994-
pd.Timestamp(year=year, month=12, day=31),
995-
)
996-
)
997-
except (pd.errors.OutOfBoundsDatetime, ValueError):
998-
raise ValueError(f"Invalid year provided: '{year}'")
999945
seed = seed * 10_000 + year
1000946
return _generate_dataset(
1001947
DATASET_SCHEMAS.ssa, source, seed, config, filters, verbose, engine_name=engine
@@ -1108,13 +1054,9 @@ def generate_taxes_1040(
11081054
The simulated population has no data for this dataset in the
11091055
specified year or state.
11101056
"""
1111-
filters = []
1057+
filters: Sequence[DataFilter] = get_data_filters(DATASET_SCHEMAS.tax_1040, year, state)
11121058
if year is not None:
1113-
filters.append(DataFilter(DATASET_SCHEMAS.tax_1040.date_column_name, "==", year))
11141059
seed = seed * 10_000 + year
1115-
if state is not None:
1116-
state_column = cast(str, DATASET_SCHEMAS.tax_1040.state_column_name)
1117-
filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))
11181060
return _generate_dataset(
11191061
DATASET_SCHEMAS.tax_1040,
11201062
source,

src/pseudopeople/noise_functions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
)
3030

3131
if TYPE_CHECKING:
32+
import dask.dataframe as dd
33+
3234
from pseudopeople.configuration.noise_configuration import NoiseConfiguration
3335
from pseudopeople.dataset import Dataset
3436

@@ -95,7 +97,7 @@ def apply_do_not_respond(
9597

9698
# Helper function to format group dataframe and merging with their dependents
9799
def merge_dependents_and_guardians(
98-
dependents_df: pd.DataFrame, full_data: pd.DataFrame
100+
dependents_df: pd.DataFrame | dd.DataFrame, full_data: pd.DataFrame | dd.DataFrame
99101
) -> pd.DataFrame:
100102
# Merge dependents with their guardians. We have to merge twice to check
101103
# if either guardian is living at a separate location from the dependent.

src/pseudopeople/schema_entities.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,10 @@ class DatasetSchema:
590590
date_format: str
591591
state_column_name: str | None
592592
row_noise_types: tuple[RowNoiseType, ...]
593+
has_state_filter: bool
594+
has_year_lower_filter: bool
595+
has_year_upper_filter: bool
596+
has_exact_year_filter: bool
593597

594598

595599
class __DatasetSchemas(NamedTuple):
@@ -626,6 +630,10 @@ class __DatasetSchemas(NamedTuple):
626630
# NOISE_TYPES.duplication,
627631
),
628632
date_format=DATEFORMATS.MM_DD_YYYY,
633+
has_state_filter=True,
634+
has_year_lower_filter=False,
635+
has_year_upper_filter=False,
636+
has_exact_year_filter=True,
629637
)
630638
acs: DatasetSchema = DatasetSchema(
631639
DatasetNames.ACS,
@@ -657,6 +665,10 @@ class __DatasetSchemas(NamedTuple):
657665
# NOISE_TYPES.duplication,
658666
),
659667
date_format=DATEFORMATS.MM_DD_YYYY,
668+
has_state_filter=True,
669+
has_year_lower_filter=True,
670+
has_year_upper_filter=True,
671+
has_exact_year_filter=False,
660672
)
661673
cps: DatasetSchema = DatasetSchema(
662674
DatasetNames.CPS,
@@ -686,6 +698,10 @@ class __DatasetSchemas(NamedTuple):
686698
# NOISE_TYPES.duplication,
687699
),
688700
date_format=DATEFORMATS.MM_DD_YYYY,
701+
has_state_filter=True,
702+
has_year_lower_filter=True,
703+
has_year_upper_filter=True,
704+
has_exact_year_filter=False,
689705
)
690706
wic: DatasetSchema = DatasetSchema(
691707
DatasetNames.WIC,
@@ -713,6 +729,10 @@ class __DatasetSchemas(NamedTuple):
713729
# NOISE_TYPES.duplication,
714730
),
715731
date_format=DATEFORMATS.MMDDYYYY,
732+
has_state_filter=True,
733+
has_year_lower_filter=False,
734+
has_year_upper_filter=False,
735+
has_exact_year_filter=True,
716736
)
717737
ssa: DatasetSchema = DatasetSchema(
718738
DatasetNames.SSA,
@@ -734,6 +754,10 @@ class __DatasetSchemas(NamedTuple):
734754
# NOISE_TYPES.duplication,
735755
),
736756
date_format=DATEFORMATS.YYYYMMDD,
757+
has_state_filter=False,
758+
has_year_lower_filter=False,
759+
has_year_upper_filter=True,
760+
has_exact_year_filter=False,
737761
)
738762
tax_w2_1099: DatasetSchema = DatasetSchema(
739763
DatasetNames.TAXES_W2_1099,
@@ -770,6 +794,10 @@ class __DatasetSchemas(NamedTuple):
770794
# NOISE_TYPES.duplication,
771795
),
772796
date_format=DATEFORMATS.MM_DD_YYYY,
797+
has_state_filter=True,
798+
has_year_lower_filter=False,
799+
has_year_upper_filter=False,
800+
has_exact_year_filter=True,
773801
)
774802
tax_1040: DatasetSchema = DatasetSchema(
775803
DatasetNames.TAXES_1040,
@@ -809,6 +837,10 @@ class __DatasetSchemas(NamedTuple):
809837
state_column_name=COLUMNS.mailing_state.name,
810838
row_noise_types=(NOISE_TYPES.omit_row,),
811839
date_format=DATEFORMATS.MM_DD_YYYY,
840+
has_state_filter=True,
841+
has_year_lower_filter=False,
842+
has_year_upper_filter=False,
843+
has_exact_year_filter=True,
812844
)
813845

814846
##################

0 commit comments

Comments
 (0)