Skip to content

Commit bde69c0

Browse files
committed
fixes
1 parent a7cc678 commit bde69c0

File tree

3 files changed

+13
-81
lines changed

3 files changed

+13
-81
lines changed

analysis/parameter_tuning.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ def _find_candidate_parameters(
162162
if tune_count_linf:
163163
linf_count_bounds = _find_candidates_constant_relative_step(
164164
hist.linf_contributions_histogram, max_linf_candidates)
165+
elif pipeline_dp.Metrics.COUNT in metrics:
166+
linf_count_bounds = [aggregate_params.max_contributions_per_partition]
165167

166168
linf_sum_bounds = None
167169
if tune_sum_linf:
@@ -331,7 +333,8 @@ def tune(col,
331333
pipeline_dp.PreAggregateExtractors],
332334
public_partitions=None,
333335
strategy_selector_factory: Optional[
334-
dp_strategy_selector.DPStrategySelectorFactory] = None):
336+
dp_strategy_selector.DPStrategySelectorFactory] = None,
337+
candidates: Optional[analysis.MultiParameterConfiguration] = None):
335338
"""Tunes parameters.
336339
337340
It works in the following way:
@@ -371,13 +374,14 @@ def tune(col,
371374
strategy_selector_factory = dp_strategy_selector.DPStrategySelectorFactory(
372375
)
373376

374-
candidates: analysis.MultiParameterConfiguration = (
375-
_find_candidate_parameters(
376-
hist=contribution_histograms,
377-
parameters_to_tune=options.parameters_to_tune,
378-
aggregate_params=options.aggregate_params,
379-
max_candidates=options.number_of_parameter_candidates,
380-
))
377+
if candidates is None:
378+
candidates: analysis.MultiParameterConfiguration = (
379+
_find_candidate_parameters(
380+
hist=contribution_histograms,
381+
parameters_to_tune=options.parameters_to_tune,
382+
aggregate_params=options.aggregate_params,
383+
max_candidates=options.number_of_parameter_candidates,
384+
))
381385

382386
# Add DP strategy (noise_kind, partition_selection_strategy) to multi
383387
# parameter configuration.
@@ -429,6 +433,7 @@ def _convert_utility_analysis_to_tune_result(
429433
# TODO(dvadym): implement relative error.
430434
# TODO(dvadym): take into consideration partition selection from private
431435
# partition selection.
436+
assert tune_options.function_to_minimize == MinimizingFunction.ABSOLUTE_ERROR
432437

433438
# Sort utility reports by configuration index.
434439
sorted_utility_reports = sorted(utility_reports,

tests/dp_engine_test.py

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -288,38 +288,6 @@ def test_calculate_private_contribution_works_on_beam(self):
288288
max_partitions_contributed=2)
289289
]))
290290

291-
@unittest.skipIf(
292-
sys.version_info.minor <= 7 and sys.version_info.major == 3,
293-
"There are some problems with PySpark setup on older python")
294-
def test_calculate_private_contribution_does_not_work_on_spark_due_to_unsupported_operations(
295-
self):
296-
# Arrange
297-
import pyspark
298-
engine = pipeline_dp.DPEngine(budget_accountant=None,
299-
backend=pipeline_dp.SparkRDDBackend(
300-
pyspark.SparkContext.getOrCreate(
301-
pyspark.SparkConf())))
302-
params = pipeline_dp.CalculatePrivateContributionBoundsParams(
303-
aggregation_eps=0.9,
304-
aggregation_delta=0.001,
305-
calculation_eps=0.1,
306-
aggregation_noise_kind=pipeline_dp.NoiseKind.LAPLACE,
307-
max_partitions_contributed_upper_bound=2)
308-
# user 0 contributes only 1 partitions, others contribute to both
309-
data = [("pk0", 0)]
310-
for i in range(10000):
311-
data += [("pk0", i + 1), ("pk1", i + 1)]
312-
data_extractors = pipeline_dp.DataExtractors(
313-
partition_extractor=lambda x: x[0],
314-
privacy_id_extractor=lambda x: x[1],
315-
value_extractor=lambda _: 1,
316-
)
317-
partitions = ["pk0", "pk1"]
318-
319-
with self.assertRaises(NotImplementedError):
320-
engine.calculate_private_contribution_bounds(
321-
data, params, data_extractors, partitions)
322-
323291
def _create_params_default(
324292
self) -> Tuple[pipeline_dp.AggregateParams, list]:
325293
"""Returns default params and default public partitions."""
@@ -1216,20 +1184,6 @@ def test_run_e2e_partition_selection_local(self):
12161184

12171185
self.assertLen(list(output), 5)
12181186

1219-
@unittest.skip("There are some problems with serialization in this test. "
1220-
"Tests in private_spark_test.py work normaly so probably it"
1221-
" is because of some missing setup.")
1222-
def test_run_e2e_partition_selection_spark(self):
1223-
import pyspark
1224-
conf = pyspark.SparkConf()
1225-
sc = pyspark.SparkContext.getOrCreate(conf=conf)
1226-
input = sc.parallelize(list(range(10)))
1227-
1228-
output = self.run_e2e_private_partition_selection_large_budget(
1229-
input, pipeline_dp.SparkRDDBackend(sc))
1230-
1231-
self.assertLen(collect_to_container(), 5)
1232-
12331187
def test_run_e2e_partition_selection_beam(self):
12341188
with test_pipeline.TestPipeline() as p:
12351189
input = p | "Create input" >> beam.Create(list(range(10)))

tests/pipeline_functions_test.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -120,33 +120,6 @@ def test_min_max_per_key(self, col, expected_min_max):
120120

121121
beam_util.assert_that(result, beam_util.equal_to(expected_min_max))
122122

123-
124-
@unittest.skipIf(sys.version_info.minor <= 7 and sys.version_info.major == 3,
125-
"There are some problems with PySpark setup on older python.")
126-
class SparkRDDBackendTest(parameterized.TestCase):
127-
128-
@classmethod
129-
def setUpClass(cls):
130-
import pyspark
131-
conf = pyspark.SparkConf()
132-
cls.sc = pyspark.SparkContext.getOrCreate(conf=conf)
133-
cls.backend = pipeline_dp.SparkRDDBackend(cls.sc)
134-
135-
def test_key_by_extracts_keys_and_keeps_values_untouched(self):
136-
col = self.sc.parallelize(["key1_value1", "key1_value2", "key2_value1"])
137-
138-
def underscore_separated_key_extractor(el):
139-
return el.split("_")[0]
140-
141-
result = composite_funcs.key_by(self.backend,
142-
col,
143-
underscore_separated_key_extractor,
144-
stage_name="Key by").collect()
145-
146-
self.assertSetEqual(
147-
{("key1", "key1_value1"), ("key1", "key1_value2"),
148-
("key2", "key2_value1")}, set(result))
149-
150123
def test_size_accounts_for_duplicates(self):
151124
col = self.sc.parallelize([3, 2, 1, 1])
152125

0 commit comments

Comments
 (0)