diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt index 5dce3724..50b36a7b 100644 --- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt +++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt @@ -843,11 +843,11 @@ class BeamApiTest { "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5) - "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( @@ -928,11 +928,11 @@ class BeamApiTest { "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), - "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt index 9a1ddc3a..c14e4ec0 100644 --- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt +++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt @@ -765,129 +765,13 @@ class LocalApiTest { } @Test - fun run_publicGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("nonPublicGroup", "pid2", 2.0), - ) - ) - val publicGroups = createPublicGroups(listOf("group1")) - val query = - LocalQueryBuilder.from( - data, - { it.privacyUnit }, - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 1, - maxContributionsPerGroup = 2, - ), - ) - .groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups)) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateValue( - { it.value }, - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE) - - val result: Sequence> = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_publicGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0, 2.0), - TestDataRow("group1", "pid1", 0.5, 2.5), - TestDataRow("group1", "pid2", 1.0, 0.0), - TestDataRow("nonPublicGroup", "pid2", 3.0), - ) - ) - val publicGroups = createPublicGroups(listOf("group1")) - val query = - LocalQueryBuilder.from( - data, - { it.privacyUnit }, - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 1, - maxContributionsPerGroup = 2, - ), - ) - .groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups)) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateVector( - { listOf(it.value, it.anotherValue) }, - vectorSize = 2, - VectorAggregationsBuilder().vectorSum("vectorSumResult"), - VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0) - ), - ) - .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE) - - val result: Sequence> = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - ), - mapOf( - "vectorSumResult" to - // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 => - // clip it (1.5, 2.0). - // pid2: (1.0, 0.0), L_INF norm is 1.0 => no clipping. - // result: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0) - listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), - DoubleWithTolerance(value = 2.0, tolerance = 0.5), - ) - ), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_publicGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_publicGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() { val data = createInputData( listOf( TestDataRow("group1", "pid1", 1.0, 2.0), TestDataRow("group1", "pid1", 0.5, 2.5), - TestDataRow("group1", "pid2", 1.0, 0.0), + TestDataRow("group1", "pid2", 1.5, 0.0), TestDataRow("nonPublicGroup", "pid2", 3.0), ) ) @@ -906,13 +790,21 @@ class LocalApiTest { .count("cnt") .aggregateValue( { it.value }, - ValueAggregationsBuilder().sum("sumValue"), - ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)), + ValueAggregationsBuilder() + .sum("sumValue") + .mean("meanValue") + .variance("varianceValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)), ) .aggregateValue( { it.anotherValue }, - ValueAggregationsBuilder().sum("sumAnotherValue"), - ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)), + ValueAggregationsBuilder() + .sum("sumAnotherValue") + .mean("meanAnotherValue") + .variance("varianceAnotherValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 3.0)), ) .aggregateVector( { listOf(it.value, it.anotherValue) }, @@ -926,12 +818,6 @@ class LocalApiTest { val result: Sequence> = query.run() - // sumValue: pid1 contributes 1.5, pid2 contributes 1.0. Total 2.5 - // sumAnotherValue: - // pid1 contributes 2.0 + 2.5 = 4.5. Bounded by [0.0, 3.0], so clipped to 3.0 - // pid2 contributes 0.0. Bounded by [0.0, 3.0], so it is 0.0 - // Total sumAnotherValue = 3.0 + 0.0 = 3.0 - val expected = listOf( QueryPerGroupResultWithTolerance( @@ -940,12 +826,23 @@ class LocalApiTest { "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), - "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), + "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), + // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5) + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), + "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), + "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), + "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), + "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( + // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 => + // clip it (1.5, 2.0). + // pid2: (1.5, 0.0), L_INF norm is 1.5 => no clipping. + // result: (1.5, 2.0) + (1.5, 0.0) = (3.0, 2.0) "vectorSumResult" to listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), + DoubleWithTolerance(value = 3.0, tolerance = 0.5), DoubleWithTolerance(value = 2.0, tolerance = 0.5), ) ), @@ -955,7 +852,7 @@ class LocalApiTest { } @Test - fun run_privateGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_privateGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() { val data = createInputData( listOf( @@ -979,39 +876,34 @@ class LocalApiTest { .count("cnt") .aggregateValue( { it.value }, - ValueAggregationsBuilder().sum("sumValue"), - ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)), + ValueAggregationsBuilder() + .sum("sumValue") + .mean("meanValue") + .variance("varianceValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)), ) .aggregateValue( { it.anotherValue }, - ValueAggregationsBuilder().sum("sumAnotherValue"), - ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)), + ValueAggregationsBuilder() + .sum("sumAnotherValue") + .mean("meanAnotherValue") + .variance("varianceAnotherValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 2.5)), ) .aggregateVector( { listOf(it.value, it.anotherValue) }, vectorSize = 2, VectorAggregationsBuilder().vectorSum("vectorSumResult"), VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0) + maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 2.0) ), ) .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) val result: Sequence> = query.run() - // sumValue: - // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5. - // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0. - // sumAnotherValue: - // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total - // 3.0. - // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0. - // vectorSumResult: - // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0, - // 0.0), - // not clipped. - // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0). - // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0). val expected = listOf( QueryPerGroupResultWithTolerance( @@ -1020,13 +912,24 @@ class LocalApiTest { "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), - "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), + "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), + "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), + "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), + "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), + "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( + // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L1 norm is 6 => + // clip it to (1.5, 4.5) * 2.0 / 6 = (0.5, 1.5) + // pid2: (1.0, 0.0), L1 norm is 1.0 => no clipping. + // result: (0.5, 1.5) + (1.0, 0.0) = (1.5, 1.5) + // nonPublicGroup: pid2 contributes (3.0, 0.0), L1-clipped to (2.0, 0.0). "vectorSumResult" to listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), - DoubleWithTolerance(value = 2.0, tolerance = 0.5), + DoubleWithTolerance(value = 1.5, tolerance = 0.5), + DoubleWithTolerance(value = 1.5, tolerance = 0.5), ) ), ) @@ -1034,8 +937,9 @@ class LocalApiTest { assertEquals(result, expected) } + // When counting distinct privacy units different group selection mechanism is used. @Test - fun run_privateGroups_noPidCount_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_privateGroups_noCountDistinctPrivacyUnits_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { val data = createInputData( listOf( @@ -1112,174 +1016,6 @@ class LocalApiTest { assertEquals(result, expected) } - @Test - fun run_privateGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("group2", "pid1", 1.0), - ) - ) - val query = - LocalQueryBuilder.from( - data, - { it.privacyUnit }, - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy({ it.groupKey }, GroupsType.PrivateGroups()) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateValue( - { it.value }, - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) - - val result: Sequence> = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_privateGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0, 2.0), - TestDataRow("group1", "pid1", 1.5, 2.5), - TestDataRow("group1", "pid2", 3.0, -1.0), - TestDataRow("group2", "pid1", -1.0, -3.0), - ) - ) - val query = - LocalQueryBuilder.from( - data, - { it.privacyUnit }, - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy({ it.groupKey }, GroupsType.PrivateGroups()) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateVector( - { listOf(it.value, it.anotherValue) }, - vectorSize = 2, - VectorAggregationsBuilder().vectorSum("vectorSumResult"), - VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 5.0) - ), - ) - .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) - - val result: Sequence> = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - ), - mapOf( - "vectorSumResult" to - // pid1: (1.0, 2.0) + (1.5, 2.5) = (2.5, 4.5), L1 norm is 7 => - // clip it to (2.5, 4.5) * 5.0 / 7.0 = (1.8, 3.2) - // pid2: (3.0, -1.0), L1 norm is 4.0 => no clipping. - // result: (1.8, 3.2) + (3.0, -1.0) = (4.8, 2.2) - listOf( - DoubleWithTolerance(value = 4.8, tolerance = 0.5), - DoubleWithTolerance(value = 2.2, tolerance = 0.5), - ) - ), - ) - ) - assertEquals(result, expected) - } - - // When counting distinct privacy units different group selection mechanism is used. - @Test - fun run_privateGroups_noCountDistinctPrivacyUnits_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("group2", "pid1", 1.0), - ) - ) - val query = - LocalQueryBuilder.from( - data, - { it.privacyUnit }, - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy({ it.groupKey }, GroupsType.PrivateGroups()) - .count("cnt") - .aggregateValue( - { it.value }, - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 3000.0, delta = 0.001), NoiseKind.GAUSSIAN) - - val result: Sequence> = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - @Test fun run_vectorSumOnly_calculatesStatisticsCorrectly() { val data = diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt index d46ebd6b..31e4b78d 100644 --- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt +++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt @@ -833,11 +833,11 @@ class SparkApiTest { "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5) - "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( @@ -918,11 +918,11 @@ class SparkApiTest { "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), - "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt index 06fb7070..e4d895c6 100644 --- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt +++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt @@ -847,129 +847,13 @@ class SparkDataFrameApiTest { } @Test - fun run_publicGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("nonPublicGroup", "pid2", 2.0), - ) - ) - val publicGroups = createPublicGroups(listOf("group1")) - val query = - SparkDataFrameQueryBuilder.from( - data, - ColumnNames("privacyUnit"), - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 1, - maxContributionsPerGroup = 2, - ), - ) - .groupBy(ColumnNames("groupKey"), GroupsType.PublicGroups.createForDataFrame(publicGroups)) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateValue( - "value", - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE) - - val result: SparkDataFrame = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_publicGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0, 2.0), - TestDataRow("group1", "pid1", 0.5, 2.5), - TestDataRow("group1", "pid2", 1.0, 0.0), - TestDataRow("nonPublicGroup", "pid2", 3.0), - ) - ) - val publicGroups = createPublicGroups(listOf("group1")) - val query = - SparkDataFrameQueryBuilder.from( - data, - ColumnNames("privacyUnit"), - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 1, - maxContributionsPerGroup = 2, - ), - ) - .groupBy(ColumnNames("groupKey"), GroupsType.PublicGroups.createForDataFrame(publicGroups)) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateVector( - ColumnNames("value", "anotherValue"), - vectorSize = 2, - VectorAggregationsBuilder().vectorSum("vectorSumResult"), - VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0) - ), - ) - .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE) - - val result: SparkDataFrame = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - ), - mapOf( - // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 => - // clip it (1.5, 2.0). - // pid2: (1.0, 0.0), L_INF norm is 1.0 => no clipping. - // result: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0) - "vectorSumResult" to - listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), - DoubleWithTolerance(value = 2.0, tolerance = 0.5), - ) - ), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_publicGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_publicGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() { val data = createInputData( listOf( TestDataRow("group1", "pid1", 1.0, 2.0), TestDataRow("group1", "pid1", 0.5, 2.5), - TestDataRow("group1", "pid2", 1.0, 0.0), + TestDataRow("group1", "pid2", 1.5, 0.0), TestDataRow("nonPublicGroup", "pid2", 3.0), ) ) @@ -988,13 +872,21 @@ class SparkDataFrameApiTest { .count("cnt") .aggregateValue( "value", - ValueAggregationsBuilder().sum("sumValue"), - ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)), + ValueAggregationsBuilder() + .sum("sumValue") + .mean("meanValue") + .variance("varianceValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)), ) .aggregateValue( "anotherValue", - ValueAggregationsBuilder().sum("sumAnotherValue"), - ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)), + ValueAggregationsBuilder() + .sum("sumAnotherValue") + .mean("meanAnotherValue") + .variance("varianceAnotherValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 3.0)), ) .aggregateVector( ColumnNames("value", "anotherValue"), @@ -1008,12 +900,6 @@ class SparkDataFrameApiTest { val result: SparkDataFrame = query.run() - // sumValue: pid1 contributes 1.5, pid2 contributes 1.0. Total 2.5 - // sumAnotherValue: - // pid1 contributes 2.0 + 2.5 = 4.5. Bounded by [0.0, 3.0], so clipped to 3.0 - // pid2 contributes 0.0. Bounded by [0.0, 3.0], so it is 0.0 - // Total sumAnotherValue = 3.0 + 0.0 = 3.0 - val expected = listOf( QueryPerGroupResultWithTolerance( @@ -1022,12 +908,23 @@ class SparkDataFrameApiTest { "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), - "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), + "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), + // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5) + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), + "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), + "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), + "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), + "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( + // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 => + // clip it (1.5, 2.0). + // pid2: (1.5, 0.0), L_INF norm is 1.5 => no clipping. + // result: (1.5, 2.0) + (1.5, 0.0) = (3.0, 2.0) "vectorSumResult" to listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), + DoubleWithTolerance(value = 3.0, tolerance = 0.5), DoubleWithTolerance(value = 2.0, tolerance = 0.5), ) ), @@ -1037,7 +934,7 @@ class SparkDataFrameApiTest { } @Test - fun run_privateGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_privateGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() { val data = createInputData( listOf( @@ -1061,39 +958,34 @@ class SparkDataFrameApiTest { .count("cnt") .aggregateValue( "value", - ValueAggregationsBuilder().sum("sumValue"), - ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)), + ValueAggregationsBuilder() + .sum("sumValue") + .mean("meanValue") + .variance("varianceValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)), ) .aggregateValue( "anotherValue", - ValueAggregationsBuilder().sum("sumAnotherValue"), - ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)), + ValueAggregationsBuilder() + .sum("sumAnotherValue") + .mean("meanAnotherValue") + .variance("varianceAnotherValue") + .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"), + ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 2.5)), ) .aggregateVector( ColumnNames("value", "anotherValue"), vectorSize = 2, VectorAggregationsBuilder().vectorSum("vectorSumResult"), VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0) + maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 2.0) ), ) .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) val result: SparkDataFrame = query.run() - // sumValue: - // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5. - // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0. - // sumAnotherValue: - // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total - // 3.0. - // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0. - // vectorSumResult: - // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0, - // 0.0), - // not clipped. - // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0). - // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0). val expected = listOf( QueryPerGroupResultWithTolerance( @@ -1102,13 +994,24 @@ class SparkDataFrameApiTest { "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5), - "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), + "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5), + "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1), + "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5), + "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), + "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), + "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1), + "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), ), mapOf( + // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L1 norm is 6 => + // clip it to (1.5, 4.5) * 2.0 / 6 = (0.5, 1.5) + // pid2: (1.0, 0.0), L1 norm is 1.0 => no clipping. + // result: (0.5, 1.5) + (1.0, 0.0) = (1.5, 1.5) + // nonPublicGroup: pid2 contributes (3.0, 0.0), L1-clipped to (2.0, 0.0). "vectorSumResult" to listOf( - DoubleWithTolerance(value = 2.5, tolerance = 0.5), - DoubleWithTolerance(value = 2.0, tolerance = 0.5), + DoubleWithTolerance(value = 1.5, tolerance = 0.5), + DoubleWithTolerance(value = 1.5, tolerance = 0.5), ) ), ) @@ -1116,8 +1019,9 @@ class SparkDataFrameApiTest { assertEquals(result, expected) } + // When counting distinct privacy units different group selection mechanism is used. @Test - fun run_privateGroups_noPidCount_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { + fun run_privateGroups_noCountDistinctPrivacyUnits_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() { val data = createInputData( listOf( @@ -1194,174 +1098,6 @@ class SparkDataFrameApiTest { assertEquals(result, expected) } - @Test - fun run_privateGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("group2", "pid1", 1.0), - ) - ) - val query = - SparkDataFrameQueryBuilder.from( - data, - ColumnNames("privacyUnit"), - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups()) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateValue( - "value", - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) - - val result: SparkDataFrame = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - - @Test - fun run_privateGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0, 2.0), - TestDataRow("group1", "pid1", 1.5, 2.5), - TestDataRow("group1", "pid2", 3.0, -1.0), - TestDataRow("group2", "pid1", -1.0, -3.0), - ) - ) - val query = - SparkDataFrameQueryBuilder.from( - data, - ColumnNames("privacyUnit"), - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups()) - .countDistinctPrivacyUnits("pidCnt") - .count("cnt") - .aggregateVector( - ColumnNames("value", "anotherValue"), - vectorSize = 2, - VectorAggregationsBuilder().vectorSum("vectorSumResult"), - VectorContributionBounds( - maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 5.0) - ), - ) - .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE) - - val result: SparkDataFrame = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5), - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - ), - mapOf( - "vectorSumResult" to - // pid1: (1.0, 2.0) + (1.5, 2.5) = (2.5, 4.5), L1 norm is 7 => - // clip it to (2.5, 4.5) * 5.0 / 7.0 = (1.8, 3.2) - // pid2: (3.0, -1.0), L1 norm is 4.0 => no clipping. - // result: (1.8, 3.2) + (3.0, -1.0) = (4.8, 2.2) - listOf( - DoubleWithTolerance(value = 4.8, tolerance = 0.5), - DoubleWithTolerance(value = 2.2, tolerance = 0.5), - ) - ), - ) - ) - assertEquals(result, expected) - } - - // When counting distinct privacy units different group selection mechanism is used. - @Test - fun run_privateGroups_noCountDistinctPrivacyUnits_calculatesStatisticsCorrectly() { - val data = - createInputData( - listOf( - TestDataRow("group1", "pid1", 1.0), - TestDataRow("group1", "pid1", 1.5), - TestDataRow("group1", "pid2", 2.0), - TestDataRow("group2", "pid1", 1.0), - ) - ) - val query = - SparkDataFrameQueryBuilder.from( - data, - ColumnNames("privacyUnit"), - ContributionBoundingLevel.DATASET_LEVEL( - maxGroupsContributed = 2, - maxContributionsPerGroup = 2, - ), - ) - .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups()) - .count("cnt") - .aggregateValue( - "value", - ValueAggregationsBuilder() - .sum("sumResult") - .mean("meanResult") - .variance("varianceResult") - .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"), - ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)), - ) - .build(TotalBudget(epsilon = 3000.0, delta = 0.001), NoiseKind.GAUSSIAN) - - val result: SparkDataFrame = query.run() - - val expected = - listOf( - QueryPerGroupResultWithTolerance( - "group1", - mapOf( - "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5), - "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5), - "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6) - "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05), - "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5), - ), - vectorAggregationResults = mapOf(), - ) - ) - assertEquals(result, expected) - } - @Test fun run_vectorSumOnly_calculatesStatisticsCorrectly() { val data =