-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathvertical_size_partitioner.py
More file actions
313 lines (281 loc) · 12.9 KB
/
vertical_size_partitioner.py
File metadata and controls
313 lines (281 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""VerticalSizePartitioner class."""
# flake8: noqa: E501
# pylint: disable=C0301, R0902, R0913
from math import floor
from typing import Literal, Optional, Union, cast
import numpy as np
import datasets
from flwr_datasets.partitioner.partitioner import Partitioner
from flwr_datasets.partitioner.vertical_partitioner_utils import (
_add_active_party_columns,
_init_optional_str_or_list_str,
)
class VerticalSizePartitioner(Partitioner):
"""Creates vertical partitions by spliting features (columns) based on sizes.
The sizes refer to the number of columns after the `drop_columns` are
dropped. `shared_columns` and `active_party_column` are excluded and
added only after the size-based division.
Enables selection of "active party" column(s) and palcement into
a specific partition or creation of a new partition just for it.
Also enables droping columns and sharing specified columns across
all partitions.
Parameters
----------
partition_sizes : Union[list[int], list[float]]
A list where each value represents the size of a partition.
list[int] -> each value represent an absolute number of columns. Size zero is
allowed and will result in an empty partition if no shared columns are present.
A list of floats -> each value represent a fraction total number of columns.
Note that these values apply to collums without `active_party_columns`, `shared_columns`.
They are additionally included in to the partition(s). `drop_columns` are also not counted
toward the partition sizes.
In case fo list[int]: sum(partition_sizes) == len(columns) - len(drop_columns) -
len(shared_columns) - len(active_party_columns)
active_party_columns : Optional[Union[str, list[str]]]
Column(s) (typically representing labels) associated with the
"active party" (which can be the server).
active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int]
Determines how to assign the active party columns:
- `"add_to_first"`: Append active party columns to the first partition.
- `"add_to_last"`: Append active party columns to the last partition.
- `"create_as_first"`: Create a new partition at the start containing only these columns.
- `"create_as_last"`: Create a new partition at the end containing only these columns.
- `"add_to_all"`: Append active party columns to all partitions.
- int: Append active party columns to the specified partition index.
drop_columns : Optional[Union[str, list[str]]]
Columns to remove entirely from the dataset before partitioning.
shared_columns : Optional[Union[str, list[str]]]
Columns to duplicate into every partition after initial partitioning.
shuffle : bool
Whether to shuffle the order of columns before partitioning.
seed : Optional[int]
Random seed for shuffling columns. Has no effect if `shuffle=False`.
Examples
--------
>>> from flwr_datasets import FederatedDataset
>>> from flwr_datasets.partitioner import VerticalSizePartitioner
>>>
>>> partitioner = VerticalSizePartitioner(
... partition_sizes=[8, 4, 2],
... active_party_columns="income",
... active_party_columns_mode="create_as_last"
... )
>>> fds = FederatedDataset(
... dataset="scikit-learn/adult-census-income",
... partitioners={"train": partitioner}
... )
>>> partitions = [fds.load_partition(i) for i in range(fds.partitioners["train"].num_partitions)]
>>> print([partition.column_names for partition in partitions])
"""
def __init__( # pylint: disable=R0917
self,
partition_sizes: Union[list[int], list[float]],
active_party_columns: Optional[Union[str, list[str]]] = None,
active_party_columns_mode: Union[
Literal[
"add_to_first",
"add_to_last",
"create_as_first",
"create_as_last",
"add_to_all",
],
int,
] = "add_to_last",
drop_columns: Optional[Union[str, list[str]]] = None,
shared_columns: Optional[Union[str, list[str]]] = None,
shuffle: bool = True,
seed: Optional[int] = 42,
) -> None:
super().__init__()
self._partition_sizes = partition_sizes
self._active_party_columns = _init_optional_str_or_list_str(
active_party_columns
)
self._active_party_columns_mode = active_party_columns_mode
self._drop_columns = _init_optional_str_or_list_str(drop_columns)
self._shared_columns = _init_optional_str_or_list_str(shared_columns)
self._shuffle = shuffle
self._seed = seed
self._rng = np.random.default_rng(seed=self._seed)
self._partition_columns: Optional[list[list[str]]] = None
self._partitions_determined = False
self._validate_parameters_in_init()
def _determine_partitions_if_needed(self) -> None:
if self._partitions_determined:
return
if self.dataset is None:
raise ValueError("No dataset is set for this partitioner.")
all_columns = list(self.dataset.column_names)
self._validate_parameters_while_partitioning(
all_columns, self._shared_columns, self._active_party_columns
)
columns = [column for column in all_columns if column not in self._drop_columns]
columns = [column for column in columns if column not in self._shared_columns]
columns = [
column for column in columns if column not in self._active_party_columns
]
if self._shuffle:
self._rng.shuffle(columns)
if all(isinstance(fraction, float) for fraction in self._partition_sizes):
partition_columns = _fraction_split(
columns, cast(list[float], self._partition_sizes)
)
else:
partition_columns = _count_split(
columns, cast(list[int], self._partition_sizes)
)
partition_columns = _add_active_party_columns(
self._active_party_columns,
self._active_party_columns_mode,
partition_columns,
)
# Add shared columns to all partitions
for partition in partition_columns:
for column in self._shared_columns:
partition.append(column)
self._partition_columns = partition_columns
self._partitions_determined = True
def load_partition(self, partition_id: int) -> datasets.Dataset:
"""Load a partition based on the partition index.
Parameters
----------
partition_id : int
The index that corresponds to the requested partition.
Returns
-------
dataset_partition : Dataset
Single partition of a dataset.
"""
self._determine_partitions_if_needed()
assert self._partition_columns is not None
if partition_id < 0 or partition_id >= len(self._partition_columns):
raise IndexError(
f"partition_id: {partition_id} out of range <0, {self.num_partitions - 1}>."
)
columns = self._partition_columns[partition_id]
return self.dataset.select_columns(columns)
@property
def num_partitions(self) -> int:
"""Number of partitions."""
self._determine_partitions_if_needed()
assert self._partition_columns is not None
return len(self._partition_columns)
def _validate_parameters_in_init(self) -> None:
if not isinstance(self._partition_sizes, list):
raise ValueError("partition_sizes must be a list.")
if all(isinstance(fraction, float) for fraction in self._partition_sizes):
fraction_sum = sum(self._partition_sizes)
# Tolerance 0.01 for the floating point numerical problems
if 0.99 < fraction_sum < 1.01:
self._partition_sizes = self._partition_sizes[:-1] + [
1.0 - self._partition_sizes[-1]
]
fraction_sum = 1.0
if fraction_sum != 1.0:
raise ValueError(
"Float ratios in `partition_sizes` must sum to 1.0. "
f"Instead got: {fraction_sum}."
)
if any(
fraction < 0.0 or fraction > 1.0 for fraction in self._partition_sizes
):
raise ValueError(
"All floats in `partition_sizes` must be >= 0.0 and <= 1.0."
)
elif all(
isinstance(coulumn_count, int) for coulumn_count in self._partition_sizes
):
if any(coulumn_count < 0 for coulumn_count in self._partition_sizes):
raise ValueError("All integers in `partition_sizes` must be >= 0.")
else:
raise ValueError("`partition_sizes` list must be all floats or all ints.")
# Validate columns lists
for parameter_name, parameter_list in [
("drop_columns", self._drop_columns),
("shared_columns", self._shared_columns),
("active_party_columns", self._active_party_columns),
]:
if not all(isinstance(column, str) for column in parameter_list):
raise ValueError(f"All entries in {parameter_name} must be strings.")
valid_modes = {
"add_to_first",
"add_to_last",
"create_as_first",
"create_as_last",
"add_to_all",
}
if not (
isinstance(self._active_party_columns_mode, int)
or self._active_party_columns_mode in valid_modes
):
raise ValueError(
"active_party_columns_mode must be an int or one of "
"'add_to_first', 'add_to_last', 'create_as_first', 'create_as_last', "
"'add_to_all'."
)
def _validate_parameters_while_partitioning(
self,
all_columns: list[str],
shared_columns: list[str],
active_party_columns: list[str],
) -> None:
# Shared columns existance check
for column in shared_columns:
if column not in all_columns:
raise ValueError(f"Shared column '{column}' not found in the dataset.")
# Active party columns existence check
for column in active_party_columns:
if column not in all_columns:
raise ValueError(
f"Active party column '{column}' not found in the dataset."
)
num_columns = len(all_columns)
num_cols_unused_in_core_div = 0
if self._active_party_columns is not None:
num_cols_unused_in_core_div += len(self._active_party_columns)
if self._shared_columns is not None:
num_cols_unused_in_core_div += len(self._shared_columns)
if self._drop_columns is not None:
num_cols_unused_in_core_div += len(self._drop_columns)
num_core_div_columns = num_columns - num_cols_unused_in_core_div
if all(isinstance(size, int) for size in self._partition_sizes):
if sum(self._partition_sizes) != num_core_div_columns:
raise ValueError(
"Sum of partition sizes cannot differ from the total number of columns "
"used in the division. Note that shared_columns, drop_columns and"
"active_party_columns are not included in the division."
)
def _count_split(columns: list[str], counts: list[int]) -> list[list[str]]:
partition_columns = []
start = 0
for count in counts:
end = start + count
partition_columns.append(columns[start:end])
start = end
return partition_columns
def _fraction_split(columns: list[str], fractions: list[float]) -> list[list[str]]:
num_columns = len(columns)
partitions = []
cumulative = 0
for index, fraction in enumerate(fractions):
count = int(floor(fraction * num_columns))
if index == len(fractions) - 1:
# Last partition takes the remainder
count = num_columns - cumulative
partitions.append(columns[cumulative : cumulative + count])
cumulative += count
return partitions