Skip to content

Commit 5d44efd

Browse files
Update _compute_budgets_for_aggregation (OpenMined#567)
1 parent 9c27ba7 commit 5d44efd

File tree

6 files changed

+102
-45
lines changed

6 files changed

+102
-45
lines changed

pipeline_dp/aggregate_params.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ def is_partition_selection(self) -> bool:
144144
MechanismType.GAUSSIAN_THRESHOLDING
145145
]
146146

147+
@property
148+
def uses_delta(self) -> bool:
149+
return self in [
150+
MechanismType.GAUSSIAN, MechanismType.TRUNCATED_GEOMETRIC,
151+
MechanismType.LAPLACE_THRESHOLDING,
152+
MechanismType.GAUSSIAN_THRESHOLDING
153+
]
154+
147155

148156
def noise_to_thresholding(noise_kind: NoiseKind) -> MechanismType:
149157
if noise_kind == NoiseKind.LAPLACE:

pipeline_dp/budget_accounting.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ class MechanismSpecInternal:
130130
mechanism_spec: MechanismSpec
131131

132132

133-
Budget = collections.namedtuple("Budget", ["epsilon", "delta"])
133+
Budget = collections.namedtuple("Budget",
134+
["epsilon", "delta", "mechanism_type"])
134135

135136

136137
class BudgetAccountant(abc.ABC):
@@ -197,10 +198,12 @@ def scope(self, weight: float):
197198
"""
198199
return BudgetAccountantScope(self, weight)
199200

200-
def _compute_budget_for_aggregation(self, weight: float) -> Budget:
201+
def _compute_budgets_for_aggregation(self, weight: float) -> Budget:
201202
"""Computes budget per aggregation.
202203
203-
It splits the budget using the naive composition.
204+
It splits the budget using the naive composition. If neither
205+
'num_aggregations' nor 'aggregation_weights' is set in the constructor,
206+
it returns an empty list.
204207
205208
Warning: This function changes the 'self' internal state. It can be
206209
called only from the API function of DPEngine, like aggregate() or
@@ -210,18 +213,48 @@ def _compute_budget_for_aggregation(self, weight: float) -> Budget:
210213
weight: the budget weight of the aggregation.
211214
212215
Returns:
213-
the budget.
216+
budgets for each mechanism used in the aggregation.
214217
"""
218+
budgets = []
215219
self._actual_aggregation_weights.append(weight)
216220
if self._expected_num_aggregations:
217-
return Budget(self._total_epsilon / self._expected_num_aggregations,
218-
self._total_delta / self._expected_num_aggregations)
219-
if self._expected_aggregation_weights:
221+
epsilon = self._total_epsilon / self._expected_num_aggregations
222+
delta = self._total_delta / self._expected_num_aggregations
223+
elif self._expected_aggregation_weights:
220224
budget_ratio = weight / sum(self._expected_aggregation_weights)
221-
return Budget(self._total_epsilon * budget_ratio,
222-
self._total_delta * budget_ratio)
223-
# No expectations on aggregations, no way to compute budget.
224-
return None
225+
epsilon = self._total_epsilon * budget_ratio
226+
delta = self._total_delta * budget_ratio
227+
else:
228+
# No expectations on aggregations, no way to compute budget.
229+
return budgets
230+
231+
if not self._scopes_stack:
232+
raise ValueError(
233+
"No scope is active. Did you forget to use 'with'?")
234+
current_scope = self._scopes_stack[-1]
235+
current_scope._normalise_mechanism_weights()
236+
eps_weights = []
237+
del_weights = []
238+
for mechanism in current_scope.mechanisms:
239+
if mechanism.mechanism_spec.mechanism_type.uses_delta:
240+
eps_weights.append(mechanism.weight)
241+
del_weights.append(mechanism.weight)
242+
else:
243+
eps_weights.append(mechanism.weight)
244+
del_weights.append(0)
245+
sum_eps_weights = sum(eps_weights)
246+
sum_del_weights = sum(del_weights)
247+
if sum_del_weights == 0:
248+
sum_del_weights = 1
249+
250+
for i, mechanism in enumerate(current_scope.mechanisms):
251+
budgets.append(
252+
Budget(
253+
epsilon=epsilon * eps_weights[i] / sum_eps_weights,
254+
delta=delta * del_weights[i] / sum_del_weights,
255+
mechanism_type=mechanism.mechanism_spec.mechanism_type,
256+
))
257+
return budgets
225258

226259
def _check_aggregation_restrictions(self):
227260
if self._expected_num_aggregations:

pipeline_dp/dp_engine.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ def aggregate(self,
106106
self._current_report_generator)
107107
col = self._aggregate(col, params, data_extractors,
108108
public_partitions)
109-
budget = self._budget_accountant._compute_budget_for_aggregation(
109+
budgets = self._budget_accountant._compute_budgets_for_aggregation(
110110
params.budget_weight)
111-
return self._annotate(col, params=params, budget=budget)
111+
return self._annotate(col, params=params, budget=budgets)
112112

113113
def _aggregate(self, col, params: pipeline_dp.AggregateParams,
114114
data_extractors: pipeline_dp.DataExtractors,
@@ -233,9 +233,9 @@ def select_partitions(self, col, params: pipeline_dp.SelectPartitionsParams,
233233
with self._budget_accountant.scope(weight=params.budget_weight):
234234
self._add_report_generator(params, "select_partitions")
235235
col = self._select_partitions(col, params, data_extractors)
236-
budget = self._budget_accountant._compute_budget_for_aggregation(
236+
budgets = self._budget_accountant._compute_budgets_for_aggregation(
237237
params.budget_weight)
238-
return self._annotate(col, params=params, budget=budget)
238+
return self._annotate(col, params=params, budget=budgets)
239239

240240
def _select_partitions(self, col,
241241
params: pipeline_dp.SelectPartitionsParams,
@@ -597,6 +597,20 @@ def add_dp_noise(self,
597597
collection of (partition_key, value + noise).
598598
Output partition keys are the same as in the input collection.
599599
"""
600+
with self._budget_accountant.scope(weight=params.budget_weight):
601+
self._add_report_generator(params,
602+
"add_dp_noise",
603+
is_public_partition=True)
604+
if out_explain_computation_report is not None:
605+
out_explain_computation_report._set_report_generator(
606+
self._current_report_generator)
607+
anonymized_col = self._add_dp_noise(col, params)
608+
budgets = self._budget_accountant._compute_budgets_for_aggregation(
609+
params.budget_weight)
610+
return self._annotate(anonymized_col, params=params, budget=budgets)
611+
612+
def _add_dp_noise(self, col,
613+
params: pipeline_dp.aggregate_params.AddDPNoiseParams):
600614
# Request budget and create Sensitivities object
601615
mechanism_type = params.noise_kind.convert_to_mechanism_type()
602616
mechanism_spec = self._budget_accountant.request_budget(mechanism_type)
@@ -606,15 +620,6 @@ def add_dp_noise(self,
606620
l1=params.l1_sensitivity,
607621
l2=params.l2_sensitivity)
608622

609-
# Initialize ReportGenerator.
610-
self._report_generators.append(
611-
report_generator.ReportGenerator(params,
612-
"add_dp_noise",
613-
is_public_partition=True))
614-
if out_explain_computation_report is not None:
615-
out_explain_computation_report._set_report_generator(
616-
self._current_report_generator)
617-
618623
# Add noise to values.
619624
def create_mechanism() -> dp_computations.AdditiveMechanism:
620625
return dp_computations.create_additive_mechanism(
@@ -635,11 +640,7 @@ def add_noise(value: Union[int, float]) -> DpNoiseAdditionResult:
635640
def add_noise(value: Union[int, float]) -> float:
636641
return create_mechanism().add_noise(value)
637642

638-
anonymized_col = self._backend.map_values(col, add_noise, "Add noise")
639-
640-
budget = self._budget_accountant._compute_budget_for_aggregation(
641-
params.budget_weight)
642-
return self._annotate(anonymized_col, params=params, budget=budget)
643+
return self._backend.map_values(col, add_noise, "Add noise")
643644

644645
def _annotate(self, col, params: Union[pipeline_dp.AggregateParams,
645646
pipeline_dp.SelectPartitionsParams,

tests/budget_accounting_test.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,14 @@ def test_num_aggregations(self, num_aggregations):
153153
total_delta=total_delta,
154154
num_aggregations=num_aggregations)
155155
for _ in range(num_aggregations):
156-
budget = budget_accountant._compute_budget_for_aggregation(1)
157-
expected_epsilon = total_epsilon / num_aggregations
158-
expected_delta = total_delta / num_aggregations
159-
self.assertAlmostEqual(expected_epsilon, budget.epsilon)
160-
self.assertAlmostEqual(expected_delta, budget.delta)
156+
with budget_accountant.scope(weight=1):
157+
budget_accountant.request_budget(
158+
mechanism_type=MechanismType.GAUSSIAN)
159+
budgets = budget_accountant._compute_budgets_for_aggregation(1)
160+
expected_epsilon = total_epsilon / num_aggregations
161+
expected_delta = total_delta / num_aggregations
162+
self.assertAlmostEqual(expected_epsilon, budgets[0].epsilon)
163+
self.assertAlmostEqual(expected_delta, budgets[0].delta)
161164

162165
budget_accountant.compute_budgets()
163166

@@ -169,11 +172,15 @@ def test_aggregation_weights(self):
169172
total_delta=total_delta,
170173
aggregation_weights=weights)
171174
for weight in weights:
172-
budget = budget_accountant._compute_budget_for_aggregation(weight)
173-
expected_epsilon = total_epsilon * weight / sum(weights)
174-
expected_delta = total_delta * weight / sum(weights)
175-
self.assertAlmostEqual(expected_epsilon, budget.epsilon)
176-
self.assertAlmostEqual(expected_delta, budget.delta)
175+
with budget_accountant.scope(weight=weight):
176+
budget_accountant.request_budget(
177+
mechanism_type=MechanismType.GAUSSIAN)
178+
budgets = budget_accountant._compute_budgets_for_aggregation(
179+
weight)
180+
expected_epsilon = total_epsilon * weight / sum(weights)
181+
expected_delta = total_delta * weight / sum(weights)
182+
self.assertAlmostEqual(expected_epsilon, budgets[0].epsilon)
183+
self.assertAlmostEqual(expected_delta, budgets[0].delta)
177184

178185
budget_accountant.compute_budgets()
179186

@@ -190,7 +197,8 @@ def test_not_enough_aggregations(self, use_num_aggregations):
190197
num_aggregations=num_aggregations,
191198
aggregation_weights=weights)
192199

193-
budget_accountant._compute_budget_for_aggregation(1)
200+
with budget_accountant.scope(weight=1):
201+
budget_accountant._compute_budgets_for_aggregation(1)
194202
with self.assertRaises(ValueError):
195203
# num_aggregations = 2, but only 1 aggregation_scope was created
196204
budget_accountant.compute_budgets()

tests/combiners_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ def test_create_compound_combiner_with_post_aggregation(self):
213213
# Act.
214214
compound_combiner = dp_combiners.create_compound_combiner(
215215
params, budget_accountant)
216-
budget_accountant._compute_budget_for_aggregation(params.budget_weight)
216+
with budget_accountant.scope(weight=params.budget_weight):
217+
budget_accountant._compute_budgets_for_aggregation(
218+
params.budget_weight)
217219
budget_accountant.compute_budgets()
218220

219221
# Assert

tests/dp_engine_test.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,7 +1304,12 @@ def test_annotate_call(self, mock_annotate_fn):
13041304
total_delta,
13051305
num_aggregations=3)
13061306
dp_engine = self._create_dp_engine_default(budget_accountant)
1307-
aggregate_params, public_partitions = self._create_params_default()
1307+
aggregate_params = pipeline_dp.AggregateParams(
1308+
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
1309+
metrics=[agg.Metrics.COUNT],
1310+
max_partitions_contributed=1,
1311+
max_contributions_per_partition=1)
1312+
public_partitions = ["pk0", "pk10", "pk11"]
13081313
select_partition_params = pipeline_dp.SelectPartitionsParams(2)
13091314
extractors = self._get_default_extractors()
13101315
input = [1, 2, 3]
@@ -1320,9 +1325,9 @@ def test_annotate_call(self, mock_annotate_fn):
13201325
# Assert
13211326
self.assertEqual(3, mock_annotate_fn.call_count)
13221327
for i_call in range(3):
1323-
budget = mock_annotate_fn.call_args_list[i_call][1]['budget']
1324-
self.assertEqual(total_epsilon / 3, budget.epsilon)
1325-
self.assertEqual(total_delta / 3, budget.delta)
1328+
budgets = mock_annotate_fn.call_args_list[i_call][1]['budget']
1329+
self.assertEqual(total_epsilon / 3, budgets[0].epsilon)
1330+
self.assertEqual(total_delta / 3, budgets[0].delta)
13261331

13271332
def test_min_max_sum_per_partition(self):
13281333
dp_engine, budget_accountant = self._create_dp_engine_default(

0 commit comments

Comments
 (0)