-
Notifications
You must be signed in to change notification settings - Fork 127
Enable Token probability based Semantic Entropy #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7b97a0f
9450011
faffe98
f8ca19c
377b587
3fceae3
1f8d81f
d2c5029
4a43347
09a2499
12f897d
59e940f
5be1154
8378596
e87630d
8cf698b
a6c088b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -83,7 +83,7 @@ def predict(self, response1: str, response2: str) -> Any: | |
| probabilites = np.exp(np_logits) / np.exp(np_logits).sum(axis=-1, keepdims=True) | ||
| return probabilites | ||
|
|
||
| def evaluate(self, responses: List[str], sampled_responses: List[List[str]], use_best: bool, compute_entropy: bool = False) -> Dict[str, Any]: | ||
| def evaluate(self, responses: List[str], sampled_responses: List[List[str]], responses_logprobs: List[List[Dict[str, Any]]] = None, sampled_responses_logprobs: List[List[List[Dict[str, Any]]]] = None, use_best: bool = False, compute_entropy: bool = False) -> Dict[str, Any]: | ||
| """ | ||
| Evaluate confidence scores on LLM responses. | ||
|
|
||
|
|
@@ -109,9 +109,10 @@ def evaluate(self, responses: List[str], sampled_responses: List[List[str]], use | |
| The dictionary will also contain original and multiple responses, updated if `use_best` is True | ||
| """ | ||
| self.num_responses = len(sampled_responses[0]) | ||
| self.logprobs, self.multiple_logprobs = responses_logprobs, sampled_responses_logprobs | ||
| observed_consistency_data = {"noncontradiction": [], "semantic_negentropy": [], "responses": responses, "sampled_responses": sampled_responses} | ||
| for i, response in enumerate(responses): | ||
| oc_result_i = self._observed_consistency_i(original=response, candidates=sampled_responses[i], use_best=use_best, compute_entropy=compute_entropy) | ||
| oc_result_i = self._observed_consistency_i(original=response, candidates=sampled_responses[i], i=i, use_best=use_best, compute_entropy=compute_entropy) | ||
| observed_consistency_data["noncontradiction"].append(oc_result_i["nli_score_i"]) | ||
| observed_consistency_data["semantic_negentropy"].append(oc_result_i["semantic_negentropy"]) | ||
| responses[i] = oc_result_i["response"] # Replace with optimized response if use_best | ||
|
|
@@ -122,7 +123,7 @@ def evaluate(self, responses: List[str], sampled_responses: List[List[str]], use | |
| observed_consistency_data["sampled_responses"] = sampled_responses | ||
| return observed_consistency_data | ||
|
|
||
| def _observed_consistency_i(self, original: str, candidates: List[str], use_best: bool = False, compute_entropy: bool = False) -> Dict[str, Any]: | ||
| def _observed_consistency_i(self, original: str, candidates: List[str], i: int = None, use_best: bool = False, compute_entropy: bool = False) -> Dict[str, Any]: | ||
| """ | ||
| Compute observed consistency score on the provided original response and multiple candidates. | ||
| """ | ||
|
|
@@ -132,7 +133,8 @@ def _observed_consistency_i(self, original: str, candidates: List[str], use_best | |
| semantic_negentropy = None | ||
| if compute_entropy or use_best: | ||
| all_responses = [original] + candidates | ||
| tmp = self._semantic_entropy_process(candidates=all_responses) | ||
| all_logprobs = [self.logprobs[i]] + self.multiple_logprobs[i] if (self.logprobs and self.multiple_logprobs) else None | ||
| tmp = self._semantic_entropy_process(candidates=all_responses, i=i, logprobs_results=all_logprobs) | ||
| best_response, semantic_negentropy, scores = tmp | ||
| if use_best: | ||
| all_responses.remove(best_response) | ||
|
|
@@ -147,24 +149,25 @@ def _observed_consistency_i(self, original: str, candidates: List[str], use_best | |
|
|
||
| return {"nli_score_i": np.mean(nli_scores), "candidates": candidates, "response": best_response, "semantic_negentropy": semantic_negentropy} | ||
|
|
||
| def _semantic_entropy_process(self, candidates: List[str], i: int = None, discrete=True) -> Any: | ||
| def _semantic_entropy_process(self, candidates: List[str], i: int = None, logprobs_results: List[List[Dict[str, Any]]] = None) -> Any: | ||
| """ | ||
| Executes complete process for semantic entropy and returns best response, SE score, and dictionary | ||
| of NLI scores for response pairs | ||
| """ | ||
| if self.verbose: | ||
| if self.verbose and i is not None: | ||
| print("Question No. - ", i + 1) | ||
| clustered_responses, nli_scores = self._cluster_responses(responses=candidates) | ||
| if discrete: | ||
| response_probabilities = [[1] * len(cluster_i) for cluster_i in clustered_responses] | ||
| cluster_probabilities = self._compute_cluster_probability(response_probabilities=response_probabilities) | ||
| best_response = clustered_responses[cluster_probabilities.index(max(cluster_probabilities))][0] | ||
| semantic_negentropy = self._compute_semantic_entropy(cluster_probabilities=cluster_probabilities) | ||
| else: | ||
| # TODO: enable continuous semantic entropy | ||
| raise ValueError("SemanticEntropy currently only supports discrete evaluations") | ||
| response_probabilities = self._compute_response_probabilities(logprobs_results=logprobs_results, num_responses=len(candidates)) | ||
| clustered_responses, cluster_probabilities, nli_scores = self._cluster_responses(responses=candidates, response_probabilities=response_probabilities) | ||
| best_response = clustered_responses[cluster_probabilities.index(max(cluster_probabilities))][0] | ||
| semantic_negentropy = self._compute_semantic_entropy(cluster_probabilities=cluster_probabilities) | ||
| return (best_response, semantic_negentropy, nli_scores) | ||
|
|
||
| def _compute_response_probabilities(self, logprobs_results: List[List[Dict[str, Any]]], num_responses: int = None) -> List[float]: | ||
| """Compute response probabilities""" | ||
| if logprobs_results: | ||
| return [self.avg_logprob(logprobs_i) if logprobs_i else np.nan for logprobs_i in logprobs_results] | ||
| return [1 / num_responses] * num_responses | ||
|
|
||
| def _get_nli_results(self, response1: str, response2: str) -> Dict[str, Any]: | ||
| """This method computes mean NLI score and determines whether entailment exists.""" | ||
| if response1 == response2: | ||
|
|
@@ -181,7 +184,7 @@ def _get_nli_results(self, response1: str, response2: str) -> Dict[str, Any]: | |
| avg_nli_score = ((s1 + s2) / 2)[0] | ||
| return {"score": avg_nli_score, "entailment": entailment} | ||
|
|
||
| def _cluster_responses(self, responses: List[str]) -> Any: | ||
| def _cluster_responses(self, responses: List[str], response_probabilities: List[List[float]]) -> Any: | ||
| """ | ||
| This method create clusters from a list of responses based on the semantic meaning of each response. | ||
|
|
||
|
|
@@ -194,12 +197,12 @@ def _cluster_responses(self, responses: List[str]) -> Any: | |
| ---------- | ||
| A list of lists, where each list represents a cluster. | ||
| """ | ||
| clusters = [deque([responses[0]])] | ||
| clusters, cluster_probabilities = [deque([responses[0]])], [response_probabilities[0]] | ||
| nli_scores = {} | ||
| entailments = {} | ||
| for i in range(1, len(responses)): | ||
| new_cluster_indicator = True | ||
| for cluster in clusters: | ||
| for j, cluster in enumerate(clusters): | ||
| key, rev_key = (cluster[0], responses[i]), (responses[i], cluster[0]) | ||
| if key in nli_scores: | ||
| # Do not recompute if pair already assessed | ||
|
|
@@ -213,29 +216,39 @@ def _cluster_responses(self, responses: List[str]) -> Any: | |
| if entailment: | ||
| new_cluster_indicator = False | ||
| cluster.append(responses[i]) | ||
| cluster_probabilities[j] += response_probabilities[i] | ||
|
|
||
| if new_cluster_indicator: | ||
| clusters.append(deque([responses[i]])) | ||
| cluster_probabilities.append(response_probabilities[i]) | ||
|
|
||
| # Arrange cluster so that first element is mode (if exists) else longest | ||
| clusters = [self._sort_responses(list(cluster)) for cluster in clusters] | ||
| return clusters, nli_scores | ||
|
|
||
| def _compute_semantic_entropy(self, cluster_probabilities: List[float]) -> float: | ||
| # Normalize cluster probabilities | ||
| cluster_probabilities = self._normalize_cluster_probabilities(cluster_probabilities=cluster_probabilities) | ||
| return clusters, cluster_probabilities, nli_scores | ||
|
|
||
| def _normalize_entropy(self, entropy_values): | ||
| return [e / math.log(self.num_responses + 1) for e in entropy_values] | ||
|
|
||
| @staticmethod | ||
| def _compute_semantic_entropy(cluster_probabilities: List[float]) -> float: | ||
| """ | ||
| Helper function to compute semantic entropy score from cluster probabilities | ||
| """ | ||
| return abs(sum([p * math.log(p) for p in cluster_probabilities])) | ||
| return abs(sum([p * math.log(p) if p > 0.0 else 0 for p in cluster_probabilities])) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that a cluster has a non-positive probability? I don't think that should be possible
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mohitcek what do you think about this? |
||
|
|
||
| def _normalize_entropy(self, entropy_values): | ||
| return [e / math.log(self.num_responses + 1) for e in entropy_values] | ||
| @staticmethod | ||
| def avg_logprob(logprobs: List[Dict[str, Any]]) -> float: | ||
| "Compute average logprob" | ||
| return np.mean([np.exp(d["logprob"]) for d in logprobs]) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's update this as discussed |
||
|
|
||
| @staticmethod | ||
| def _compute_cluster_probability(response_probabilities: List[float]) -> float: | ||
| """Computes cluster probabilities from response probabilities""" | ||
| total_probability = sum(map(sum, response_probabilities)) | ||
| cluster_probabilities = [sum(rp_i) / total_probability for rp_i in response_probabilities] | ||
| return cluster_probabilities | ||
| def _normalize_cluster_probabilities(cluster_probabilities: List[float]) -> float: | ||
| """Normalize cluster probabilities""" | ||
| total_probability = sum(cluster_probabilities) | ||
| return [cp_i / total_probability for cp_i in cluster_probabilities] | ||
|
|
||
| @staticmethod | ||
| def _sort_responses(responses: List[str]) -> List[str]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,6 +121,9 @@ async def generate_and_score(self, prompts: List[str], num_responses: int = 5) - | |
| self.prompts = prompts | ||
| self.num_responses = num_responses | ||
|
|
||
| if hasattr(self.llm, "logprobs") and "semantic_negentropy" in self.scorers: | ||
| self.llm.logprobs = True | ||
|
|
||
| responses = await self.generate_original_responses(prompts) | ||
| sampled_responses = await self.generate_candidate_responses(prompts) | ||
| return self.score(responses=responses, sampled_responses=sampled_responses) | ||
|
|
@@ -152,7 +155,9 @@ def score(self, responses: List[str], sampled_responses: List[List[str]]) -> UQR | |
| self.scores_dict = {k: [] for k in self.scorer_objects} | ||
| if self.use_nli: | ||
| compute_entropy = "semantic_negentropy" in self.scorers | ||
| nli_scores = self.nli_scorer.evaluate(responses=self.responses, sampled_responses=self.sampled_responses, use_best=self.use_best, compute_entropy=compute_entropy) | ||
| responses_logprobs = self.logprobs if hasattr(self.llm, "logprobs") else None | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My preference is for black box we avoid using token probabilities altogether. Let's just stick with discrete entropy here. |
||
| sampled_responses_logprobs = self.multiple_logprobs if hasattr(self.llm, "logprobs") else None | ||
| nli_scores = self.nli_scorer.evaluate(responses=self.responses, sampled_responses=self.sampled_responses, responses_logprobs=responses_logprobs, sampled_responses_logprobs=sampled_responses_logprobs, use_best=self.use_best, compute_entropy=compute_entropy) | ||
| if self.use_best: | ||
| self.original_responses = self.responses.copy() | ||
| self.responses = nli_scores["responses"] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,6 +98,10 @@ async def generate_and_score(self, prompts: List[str], num_responses: int = 5) - | |
| self.num_responses = num_responses | ||
| self.nli_scorer.num_responses = num_responses | ||
|
|
||
| if hasattr(self.llm, "logprobs"): | ||
| print("UQLM: Using logprobs to compute response probabilities for semantic entropy score") | ||
| self.llm.logprobs = True | ||
|
|
||
|
Comment on lines
+124
to
+110
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we instead check if logprobs is not available and warn that only Discrete Semantic Entropy will be used. Maybe something like this: if not hasattr(self.llm, "logprobs"):
warnings.warn("The provided LLM does not support logprobs access. Only discrete semantic entropy will be computed.")
else:
self.llm.logprobs = True
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| responses = await self.generate_original_responses(prompts) | ||
| sampled_responses = await self.generate_candidate_responses(prompts) | ||
| return self.score(responses=responses, sampled_responses=sampled_responses) | ||
|
|
@@ -132,8 +136,10 @@ def score(self, responses: List[str] = None, sampled_responses: List[List[str]] | |
| print("Computing confidence scores...") | ||
| for i in range(n_prompts): | ||
| candidates = [self.responses[i]] + self.sampled_responses[i] | ||
| tmp = self.nli_scorer._semantic_entropy_process(candidates=candidates, i=i) | ||
| best_responses[i], semantic_entropy[i], scores = tmp | ||
|
|
||
| candidate_logprobs = [self.logprobs[i]] + self.multiple_logprobs[i] if (self.logprobs and self.multiple_logprobs) else None | ||
| tmp = self.nli_scorer._semantic_entropy_process(candidates=candidates, i=i, logprobs_results=candidate_logprobs) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we enable computation of both simultaneously? Let me know what you think. It's barely any extra time/effort to compute both after NLI clustering is done |
||
| best_responses[i], semantic_entropy[i], _ = tmp | ||
|
|
||
| confidence_scores = [1 - ne for ne in self.nli_scorer._normalize_entropy(semantic_entropy)] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have this be the default calculation for best response