-
Notifications
You must be signed in to change notification settings - Fork 200
/
Copy pathdice_random.py
247 lines (216 loc) · 13.1 KB
/
dice_random.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
Module to generate diverse counterfactual explanations based on random sampling.
A simple implementation.
"""
import random
import timeit
import numpy as np
import pandas as pd
from dice_ml import diverse_counterfactuals as exp
from dice_ml.constants import ModelTypes
from dice_ml.explainer_interfaces.explainer_base import ExplainerBase
class DiceRandom(ExplainerBase):
def __init__(self, data_interface, model_interface):
"""Init method
:param data_interface: an interface class to access data related params.
:param model_interface: an interface class to access trained ML model.
"""
super().__init__(data_interface) # initiating data related parameters
self.data_interface.create_ohe_params()
self.model = model_interface
self.model.load_model() # loading pickled trained model if applicable
self.model.transformer.feed_data_params(data_interface)
self.model.transformer.initialize_transform_func()
self.precisions = self.data_interface.get_decimal_precisions(output_type="dict")
if self.data_interface.outcome_name in self.precisions:
self.outcome_precision = [self.precisions[self.data_interface.outcome_name]]
else:
self.outcome_precision = 0
def _generate_counterfactuals(self, query_instance, total_CFs, desired_range=None,
desired_class="opposite", permitted_range=None,
features_to_vary="all", stopping_threshold=0.5, posthoc_sparsity_param=0.1,
posthoc_sparsity_algorithm="linear", sample_size=1000, random_seed=None, verbose=False):
"""Generate counterfactuals by randomly sampling features.
:param query_instance: Test point of interest. A dictionary of feature names and values or a single row dataframe.
:param total_CFs: Total number of counterfactuals required.
:param desired_range: For regression problems. Contains the outcome range to generate counterfactuals in.
:param desired_class: Desired counterfactual class - can take 0 or 1. Default value is "opposite" to the outcome
class of query_instance for binary classification.
:param permitted_range: Dictionary with feature names as keys and permitted range in list as values.
Defaults to the range inferred from training data. If None, uses the parameters
initialized in data_interface.
:param features_to_vary: Either a string "all" or a list of feature names to vary.
:param stopping_threshold: Minimum threshold for counterfactuals target class probability.
:param posthoc_sparsity_param: Parameter for the post-hoc operation on continuous features to enhance sparsity.
:param posthoc_sparsity_algorithm: Perform either linear or binary search. Takes "linear" or "binary".
Prefer binary search when a feature range is large
(for instance, income varying from 10k to 1000k) and only if the features
share a monotonic relationship with predicted outcome in the model.
:param sample_size: Sampling size
:param random_seed: Random seed for reproducibility
:returns: A CounterfactualExamples object that contains the dataframe of generated counterfactuals as an attribute.
"""
self.features_to_vary = self.setup(features_to_vary, permitted_range, query_instance, feature_weights=None)
if features_to_vary == "all":
self.fixed_features_values = {}
else:
self.fixed_features_values = {}
for feature in self.data_interface.feature_names:
if feature not in features_to_vary:
self.fixed_features_values[feature] = query_instance[feature].iat[0]
# Do predictions once on the query_instance and reuse across to reduce the number
# inferences.
model_predictions = self.predict_fn(query_instance)
# number of output nodes of ML model
self.num_output_nodes = None
if self.model.model_type == ModelTypes.Classifier:
self.num_output_nodes = model_predictions.shape[1]
# query_instance need no transformation for generating CFs using random sampling.
# find the predicted value of query_instance
test_pred = model_predictions[0]
if self.model.model_type == ModelTypes.Classifier:
self.target_cf_class = self.infer_target_cfs_class(desired_class, test_pred, self.num_output_nodes)
elif self.model.model_type == ModelTypes.Regressor:
self.target_cf_range = self.infer_target_cfs_range(desired_range)
# fixing features that are to be fixed
self.total_CFs = total_CFs
self.stopping_threshold = stopping_threshold
if self.model.model_type == ModelTypes.Classifier:
# TODO Generalize this for multi-class
if self.target_cf_class == 0 and self.stopping_threshold > 0.5:
self.stopping_threshold = 0.25
elif self.target_cf_class == 1 and self.stopping_threshold < 0.5:
self.stopping_threshold = 0.75
# get random samples for each feature independently
start_time = timeit.default_timer()
random_instances = self.get_samples(
self.fixed_features_values,
self.feature_range, sampling_random_seed=random_seed, sampling_size=sample_size)
# Generate copies of the query instance that will be changed one feature
# at a time to encourage sparsity.
cfs_df = None
candidate_cfs = pd.DataFrame(
np.repeat(query_instance.values, sample_size, axis=0), columns=query_instance.columns)
# Loop to change one feature at a time ##->(NOT TRUE), then two features, and so on.
for num_features_to_vary in range(1, len(self.features_to_vary)+1):
# commented lines allow more values to change as num_features_to_vary increases, instead of .at you should use .loc
# is deliberately left commented out to let you choose.
# is slower, but more complete and still faster than genetic/KDtree
# selected_features = np.random.choice(self.features_to_vary, (sample_size, num_features_to_vary), replace=True)
selected_features = np.random.choice(self.features_to_vary, (sample_size, 1), replace=True)
for k in range(sample_size):
candidate_cfs.at[k, selected_features[k][0]] = random_instances._get_value(k, selected_features[k][0])
# If you only want to change one feature, you should use _get_value
# candidate_cfs.iloc[k][selected_features[k]]=random_instances.iloc[k][selected_features[k]]
scores = self.predict_fn(candidate_cfs)
validity = self.decide_cf_validity(scores)
if sum(validity) > 0:
rows_to_add = candidate_cfs[validity == 1]
if cfs_df is None:
cfs_df = rows_to_add.copy()
else:
cfs_df = cfs_df.append(rows_to_add)
cfs_df.drop_duplicates(inplace=True)
# Always change at least 2 features before stopping
if num_features_to_vary >= 2 and len(cfs_df) >= total_CFs:
break
self.total_cfs_found = 0
self.valid_cfs_found = False
if cfs_df is not None and len(cfs_df) > 0:
if len(cfs_df) > total_CFs:
cfs_df = cfs_df.sample(total_CFs)
cfs_df.reset_index(inplace=True, drop=True)
if len(cfs_df) > 0:
self.cfs_pred_scores = self.predict_fn(cfs_df)
cfs_df[self.data_interface.outcome_name] = self.get_model_output_from_scores(self.cfs_pred_scores)
else:
if self.model.model_type == ModelTypes.Classifier:
self.cfs_pred_scores = [0]*self.num_output_nodes
else:
self.cfs_pred_scores = [0]
self.total_cfs_found = len(cfs_df)
self.valid_cfs_found = True if self.total_cfs_found >= self.total_CFs else False
if len(cfs_df) > 0:
final_cfs_df = cfs_df[self.data_interface.feature_names + [self.data_interface.outcome_name]]
final_cfs_df[self.data_interface.outcome_name] = \
final_cfs_df[self.data_interface.outcome_name].round(self.outcome_precision)
self.cfs_preds = final_cfs_df[[self.data_interface.outcome_name]].values
self.final_cfs = final_cfs_df[self.data_interface.feature_names].values
else:
final_cfs_df = None
self.cfs_preds = None
self.cfs_pred_scores = None
self.final_cfs = None
else:
final_cfs_df = None
self.cfs_preds = None
self.cfs_pred_scores = None
self.final_cfs = None
test_instance_df = self.data_interface.prepare_query_instance(query_instance)
test_instance_df[self.data_interface.outcome_name] = \
np.array(np.round(self.get_model_output_from_scores((test_pred,)), self.outcome_precision))
# post-hoc operation on continuous features to enhance sparsity - only for public data
if posthoc_sparsity_param is not None and posthoc_sparsity_param > 0 and \
self.final_cfs is not None and 'data_df' in self.data_interface.__dict__:
final_cfs_df_sparse = final_cfs_df.copy()
final_cfs_df_sparse = self.do_posthoc_sparsity_enhancement(
final_cfs_df_sparse, test_instance_df, posthoc_sparsity_param, posthoc_sparsity_algorithm)
else:
final_cfs_df_sparse = None
self.elapsed = timeit.default_timer() - start_time
m, s = divmod(self.elapsed, 60)
if self.valid_cfs_found:
if verbose:
print('Diverse Counterfactuals found! total time taken: %02d' %
m, 'min %02d' % s, 'sec')
else:
if self.total_cfs_found == 0:
print('No Counterfactuals found for the given configuration, perhaps try with different parameters...',
'; total time taken: %02d' % m, 'min %02d' % s, 'sec')
else:
print('Only %d (required %d) ' % (self.total_cfs_found, self.total_CFs),
'Diverse Counterfactuals found for the given configuration, perhaps try with different parameters...',
'; total time taken: %02d' % m, 'min %02d' % s, 'sec')
return exp.CounterfactualExamples(data_interface=self.data_interface,
final_cfs_df=final_cfs_df,
test_instance_df=test_instance_df,
final_cfs_df_sparse=final_cfs_df_sparse,
posthoc_sparsity_param=posthoc_sparsity_param,
desired_class=desired_class,
desired_range=desired_range,
model_type=self.model.model_type)
def get_samples(self, fixed_features_values, feature_range, sampling_random_seed, sampling_size):
# first get required parameters
precisions = self.data_interface.get_decimal_precisions(output_type="dict")
categorical_features_frequencies = {}
for feature in self.data_interface.categorical_feature_names:
categorical_features_frequencies[feature] = len(self.data_interface.data_df[feature].value_counts())
if sampling_random_seed is not None:
random.seed(sampling_random_seed)
samples = []
for feature in self.data_interface.feature_names:
if feature in fixed_features_values:
sample = [fixed_features_values[feature]]*sampling_size
elif feature in self.data_interface.continuous_feature_names:
low = feature_range[feature][0]
high = feature_range[feature][1]
sample = self.get_continuous_samples(
low, high, precisions[feature], size=sampling_size,
seed=sampling_random_seed)
else:
if sampling_random_seed is not None:
random.seed(sampling_random_seed)
sample = random.choices(feature_range[feature], k=sampling_size)
samples.append(sample)
samples = pd.DataFrame(dict(zip(self.data_interface.feature_names, samples)))
return samples
def get_continuous_samples(self, low, high, precision, size=1000, seed=None):
if seed is not None:
np.random.seed(seed)
if precision == 0:
result = np.random.randint(low, high+1, size).tolist()
result = [float(r) for r in result]
else:
result = np.random.uniform(low, high+(10**-precision), size)
result = [round(r, precision) for r in result]
return result