Skip to content

sequential row noising#509

Merged
hussain-jafari merged 13 commits intoepic/full_scale_testingfrom
hjafari/feature/MIC-5885_sequential_row_noising
May 7, 2025
Merged

sequential row noising#509
hussain-jafari merged 13 commits intoepic/full_scale_testingfrom
hjafari/feature/MIC-5885_sequential_row_noising

Conversation

@hussain-jafari
Copy link
Contributor

sequential row noising

Description

  • Category: feature
  • JIRA issue: MIC-5982

Add tests for sequential row noising.
Read in data using load_standard_dataset and noise by looping through noise types, all shard-wise except RI with ACS, CPS, and WIC.

Testing

Ran new tests with ACS, CPS, and WIC on USA and RI and SSA on RI.

Copy link
Collaborator

@zmbc zmbc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool stuff! I had a few things I wanted to check about target_proportions and then a few more nitpicky comments


from typing import TYPE_CHECKING

import dask.dataframe as dd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to import this here without a try/except or an if TYPE_CHECKING as it will make dask a required dependency

Comment on lines 95 to 96
if str(source) == RI_FILEPATH and has_small_shards:
dataset_data = [pd.concat(dataset_data).reset_index()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this when actually noising using generate_X? If not, I'm concerned that we are no longer testing the right thing.

assert set(noised_data.columns) == set(original_data.columns)
assert (noised_data.dtypes == original_data.dtypes).all()
for noise_type in NOISE_TYPES:
prenoised_dataframes = [dataset.data.copy() for dataset in datasets]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prenoised_dataframes = [dataset.data.copy() for dataset in datasets]
pre_noise_dataframes = [dataset.data.copy() for dataset in datasets]

Perhaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting...my gut reaction was that I definitely want there to be a "d" at the end. But the more I sit with it the less I care.

prenoised_dataframes = [dataset.data.copy() for dataset in datasets]
if isinstance(noise_type, RowNoiseType):
if config.has_noise_type(dataset_schema.name, noise_type.name):
[noise_type(dataset, config) for dataset in datasets]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a matter of taste but I'd prefer a one-line for loop to this unassigned list comprehension expression

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I got really confused as to why this list wasn't being saved as anything...

name="test_do_not_respond",
observed_numerator=numerator,
observed_denominator=denominator,
# 3% uncertainty on either side
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find where this was written right now but I have a hunch this was meant to be percentage points, i.e. expected_noise - 0.03, expected_noise + 0.03.

name="test_omit_row",
observed_numerator=numerator,
observed_denominator=denominator,
# 3% uncertainty on either side
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect this to have uncertainty

observed_numerator=numerators[probability_name],
observed_denominator=denominators[probability_name],
target_proportion=expected_noise,
name_additional=f"noised_data",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't need the f string

assert np.isclose(sum(worker["memory_limit"] / 1024**3 for worker in workers.values()), available_memory, rtol=0.01)
available_memory = psutil.virtual_memory().total / (1024**3)
assert np.isclose(
sum(worker["memory_limit"] / 1024**3 for worker in workers.values()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this checking we were close to an expected memory that was profiled or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checking that the memory of the dask cluster is close to what we expected when setting up the dask cluster

return self.column_name, self.operator, self.value


def get_generate_data_filters(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this name sounds awkward. Maybe get_data_filters()?

Comment on lines 95 to 96
if str(source) == RI_FILEPATH and has_small_shards:
dataset_data = [pd.concat(dataset_data).reset_index()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this when actually noising using generate_X? If not, I'm concerned that we are no longer testing the right thing.

prenoised_dataframes = [dataset.data.copy() for dataset in datasets]
if isinstance(noise_type, RowNoiseType):
if config.has_noise_type(dataset_schema.name, noise_type.name):
[noise_type(dataset, config) for dataset in datasets]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

filters = []

# add year filter for SSA
if dataset_schema.name == DatasetNames.SSA:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much cleaner than we had before, and has removed a lot of duplicated code, but I think we can do even better!

If we add these attributes to DatasetSchema, we can simplify this code:

  • has_state_filter
  • has_year_lower_filter
  • has_year_upper_filter
  • has_exact_year_filter

Then this could be

if dataset_schema.has_state_filter and state is not None:
    state_column = cast(str, dataset_schema.state_column_name)
    filters.append(DataFilter(state_column, "==", get_state_abbreviation(state)))

if year is not None:
    try:
        if dataset_schema.has_year_lower_filter:
            date_lower_filter = DataFilter(
                dataset_schema.date_column_name,
                ">=",
                pd.Timestamp(year=year, month=1, day=1),
            )
            filters.append(date_lower_filter)
        
        if dataset_schema.has_year_upper_filter:
            date_lower_filter = DataFilter(
                dataset_schema.date_column_name,
                "<=",
                pd.Timestamp(year=year, month=12, day=31),
            )
            filters.append(date_upper_filter)
    except(pd.errors.OutOfBoundsDatetime, ValueError):
        raise ValueError(f"Invalid year provided: '{year}'")
    
    if dataset_schema.has_exact_year_filter:
        filters.append(DataFilter(dataset_schema.date_column_name, "==", year))

return self.column_name, self.operator, self.value


def get_generate_data_filters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what conversation led to this encapsulation, but I really like it!

Should we add a unit test against this function? I believe we're testing all of this filtering at an integration level so not sure it's precisely necessary but would be nice. @rmudambi ?

if state is not None:
state_column_name = cast(str, DATASET_SCHEMAS.census.state_column_name)
filters.append(DataFilter(state_column_name, "==", get_state_abbreviation(state)))
filters: list[DataFilter] = get_generate_data_filters(DATASET_SCHEMAS.census, year, state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that there's a discrete filter-generator function, how reasonable would it be to just generate the filters inside of the _generate_dataset? That would be slightly dryer and then filter stuff could be even more encapulated

from pathlib import Path
from typing import Any, Literal

import dask.dataframe as dd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here as Zeb said elsewhere - let's not make dask a required dependency

source = paths.SAMPLE_DATA_ROOT
elif isinstance(source, str) or isinstance(source, Path):
source = Path(source)
validate_source_compatibility(source, dataset_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you add a docstring to validate_source_compatibility? I know it's not a new thing in this PR but it would have been helpful when reviewing.

dataset = Dataset(dataset_schema, original_data, SEED)
NOISE_TYPES.omit_row(dataset, config)
noised_data = dataset.data
has_small_shards = dataset_name == "acs" or dataset_name == "cps" or dataset_name == "wic"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining what you're doing here - I wasn't expecting this.

# 3% uncertainty on either side
target_proportion=(expected_noise * 0.97, expected_noise * 1.03),
name_additional=f"noised_data",
run_column_noising_tests(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there column noising stuff in this row-noising PR? Should I be reviewing all this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you can ignore this

@@ -0,0 +1,222 @@
from __future__ import annotations

import dask.dataframe as dd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - don't import dask globally

from pseudopeople.schema_entities import DATASET_SCHEMAS


def run_do_not_respond_tests(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these all exact copy/pastes from the previous tests?

client = get_client()
client.shutdown()
client.shutdown() # type: ignore [no-untyped-call]
time.sleep(30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, it's not the source of the bug from before (and we'd never want to sleep 30 seconds in a unit test, anyway)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - this will all change when you rebase on the epic branch

assert np.isclose(sum(worker["memory_limit"] / 1024**3 for worker in workers.values()), available_memory, rtol=0.01)
available_memory = psutil.virtual_memory().total / (1024**3)
assert np.isclose(
sum(worker["memory_limit"] / 1024**3 for worker in workers.values()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checking that the memory of the dask cluster is close to what we expected when setting up the dask cluster


datasets = [Dataset(dataset_schema, data, SEED) for data in dataset_data]
seed = SEED
if dataset_schema.name != DatasetNames.CENSUS and year is not None:
Copy link
Contributor

@stevebachmeier stevebachmeier May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this for census?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why, but the generate_decennial_census function doesn't update the seed (unlike every other generate_data function).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bug to me!

datasets = [Dataset(dataset_schema, data, SEED) for data in dataset_data]
seed = SEED
if dataset_schema.name != DatasetNames.CENSUS and year is not None:
seed = seed * 10_000 + year
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't change it, but can't you also just do seed += year?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work for this particular case, but I wanted to make it obvious that this code was taken from the generate_data functions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters in these tests, but this 10_000 is actually important, quite hacky, and needs (at least!) a comment to explain it.

It's there because we don't want different years of the same dataset to get "the same" noise using the same (e.g. default) seed. What I mean by the same noise is, you'd get typos in the same place in the same column, even though the underlying data is different. This is similar to what we also do with different seeds per-shard:

# Use a different seed for each data file/shard, otherwise the randomness will duplicate
# and the Nth row in each shard will get the same noise
data_path_seed = f"{seed}_{data_file_index}"
noised_data = _prep_and_noise_dataset(
data, dataset, configuration_tree, data_path_seed
)

So how does 10_000 get involved? If we simply did seed += year there could be collisions. For example, the user passing in seed=1234, year=2019 would be the same as if they passed seed=1233, year=2020. So we're no longer getting the property of seeds that any different is totally different. With the * 10_000, as long as years are always less than 10,000, such collisions cannot occur, and the desired property is restored.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seed = hash(f"{seed}_{year}") might be a clearer way to achieve the same.

[noise_type(dataset, config) for dataset in datasets]
for dataset in datasets:
# noise datasets in place
noise_type(dataset, config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😌

@hussain-jafari hussain-jafari merged commit 71cab55 into epic/full_scale_testing May 7, 2025
8 checks passed
@hussain-jafari hussain-jafari deleted the hjafari/feature/MIC-5885_sequential_row_noising branch May 7, 2025 18:02
hussain-jafari added a commit that referenced this pull request May 7, 2025
Category: feature
JIRA issue: MIC-5982
Add tests for sequential row noising.
Read in data using load_standard_dataset and noise by looping through noise types, all shard-wise except RI with ACS, CPS, and WIC.

Testing
Ran new tests with ACS, CPS, and WIC on USA and RI and SSA on RI.
hussain-jafari added a commit that referenced this pull request May 7, 2025
Category: feature
JIRA issue: MIC-5982
Add tests for sequential row noising.
Read in data using load_standard_dataset and noise by looping through noise types, all shard-wise except RI with ACS, CPS, and WIC.

Testing
Ran new tests with ACS, CPS, and WIC on USA and RI and SSA on RI.
hussain-jafari added a commit that referenced this pull request Jul 24, 2025
Category: feature
JIRA issue: MIC-5982
Add tests for sequential row noising.
Read in data using load_standard_dataset and noise by looping through noise types, all shard-wise except RI with ACS, CPS, and WIC.

Testing
Ran new tests with ACS, CPS, and WIC on USA and RI and SSA on RI.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

Comments