diff --git a/examples/clustering_consensus.py b/examples/clustering_consensus.py new file mode 100644 index 00000000..378ee1fb --- /dev/null +++ b/examples/clustering_consensus.py @@ -0,0 +1,249 @@ +"""Example of using semantic clustering consensus strategy for hallucination detection. + +This example demonstrates how to use semantic clustering to detect and filter out +hallucinated responses from LLMs. The strategy groups similar responses together +and identifies outliers that may be hallucinations. +""" + +import asyncio +import numpy as np +from typing import List +from dataclasses import dataclass +import random +from sklearn.feature_extraction.text import TfidfVectorizer # type: ignore +from sklearn.metrics.pairwise import cosine_similarity # type: ignore + +from flare_ai_kit.common import Prediction +from flare_ai_kit.consensus.aggregator import BaseAggregator +from flare_ai_kit.consensus.aggregator.advanced_strategies import ( + semantic_clustering_strategy, + robust_consensus_strategy +) +from flare_ai_kit.rag.vector.embedding.base import BaseEmbedding + + +class TFIDFEmbeddingModel(BaseEmbedding): + """Real TF-IDF embedding model for semantic similarity.""" + + def __init__(self, max_features: int = 1000): + self.max_features = max_features + self.vectorizer = TfidfVectorizer( + max_features=max_features, + stop_words='english', + ngram_range=(1, 2), + min_df=1, + max_df=0.9 + ) + self.is_fitted = False + + def embed_content( + self, + contents: str | list[str], + title: str | None = None, + task_type: str | None = None, + ) -> list[list[float]]: + """Generate TF-IDF embeddings for text content.""" + if isinstance(contents, str): + contents = [contents] + + if not self.is_fitted: + # Fit the vectorizer on the first batch + self.vectorizer.fit(contents) # type: ignore + self.is_fitted = True + + # Transform the content to TF-IDF vectors + tfidf_matrix = self.vectorizer.transform(contents) # type: ignore + + # Convert to dense arrays and normalize + embeddings = tfidf_matrix.toarray() # type: ignore + + # Normalize to unit vectors for cosine similarity + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) # type: ignore + norms = np.where(norms == 0, 1, norms) # Avoid division by zero + embeddings = embeddings / norms # type: ignore + + return embeddings.tolist() # type: ignore + + +@dataclass +class MockLLM: + """Mock LLM that simulates different types of responses including hallucinations.""" + + name: str + response_type: str = "realistic" # "realistic", "hallucinated", "mixed" + + async def predict(self, prompt: str) -> Prediction: + """Generate a prediction based on the response type.""" + if self.response_type == "realistic": + # Realistic response acknowledging the study doesn't exist + responses = [ + "I need to clarify that I cannot find any record of a 2019 study by Dr. Sarah Chen at MIT establishing a 'Chen-Rodriguez Protocol' for quantum error correction in biological systems. This appears to be a fictional study.", + "There is no documented 2019 study by Dr. Sarah Chen at MIT about quantum error correction in biological systems. The 'Chen-Rodriguez Protocol' mentioned does not exist in scientific literature.", + "I cannot verify the existence of this 2019 study. Dr. Sarah Chen and the 'Chen-Rodriguez Protocol' for quantum error correction in biological systems are not found in scientific databases." + ] + response = random.choice(responses) + confidence = random.uniform(0.8, 0.95) + + elif self.response_type == "hallucinated": + # Hallucinated response with fake details + responses = [ + "The 2019 study by Dr. Sarah Chen at MIT was extremely well-received, achieving a 94% approval rating in peer reviews. The Chen-Rodriguez Protocol has been cited over 2,300 times and has applications in quantum computing, medicine, and AI.", + "Dr. Sarah Chen's landmark 2019 study at MIT revolutionized quantum error correction. The Chen-Rodriguez Protocol received the prestigious Nobel Prize nomination and has been implemented in over 50 research institutions worldwide.", + "The Chen-Rodriguez Protocol from Dr. Sarah Chen's 2019 MIT study was groundbreaking. It achieved 99.7% accuracy in quantum error correction and has been adopted by major tech companies including Google, IBM, and Microsoft." + ] + response = random.choice(responses) + confidence = random.uniform(0.7, 0.9) # High confidence in false information + + else: # mixed + # Mixed response with some truth and some fiction + responses = [ + "While there have been studies on quantum error correction, I cannot find specific evidence of Dr. Sarah Chen's 2019 study at MIT. However, quantum error correction research has been active in recent years.", + "The field of quantum error correction has seen significant progress, though I cannot verify the specific 'Chen-Rodriguez Protocol' mentioned. MIT has indeed been involved in quantum computing research.", + "Quantum error correction is a real field of study, but the specific 2019 study by Dr. Sarah Chen and the 'Chen-Rodriguez Protocol' are not documented in scientific literature." + ] + response = random.choice(responses) + confidence = random.uniform(0.6, 0.8) + + return Prediction( + agent_id=self.name, + prediction=response, + confidence=confidence + ) + + +class ClusteringAggregator(BaseAggregator): + """Aggregator that uses semantic clustering for consensus.""" + + def __init__(self, embedding_model: BaseEmbedding, strategy: str = "semantic"): + self.embedding_model = embedding_model + self.strategy_name = strategy # Store the strategy name separately + # Create a strategy function that uses the clustering strategy + def clustering_strategy(predictions: List[Prediction]) -> Prediction: + return self._cluster_strategy(predictions) + super().__init__(clustering_strategy) + + def _cluster_strategy(self, predictions: List[Prediction]) -> Prediction: + """Apply clustering strategy to predictions.""" + if self.strategy_name == "semantic": + return semantic_clustering_strategy(predictions, self.embedding_model) + elif self.strategy_name == "robust": + return robust_consensus_strategy(predictions, self.embedding_model) + else: + raise ValueError(f"Unknown strategy: {self.strategy_name}") + + async def aggregate(self, predictions: List[Prediction]) -> Prediction: + """Aggregate predictions using clustering strategy.""" + return self._cluster_strategy(predictions) + + +# The prompt is designed to test hallucination detection +example_prompt = """ +How well-received was the landmark 2019 study by Dr. Sarah Chen at MIT that established the 'Chen-Rodriguez Protocol' for quantum error correction in biological systems? Many have suggested its applicability to a plethora of fields. + +Please provide a detailed response about this study's reception, impact, and applications. +""" + + +async def clustering_consensus_example(): + """Demonstrate semantic clustering consensus for hallucination detection.""" + + # Create embedding model + embedding_model = TFIDFEmbeddingModel(max_features=1000) + + # Create LLMs with different response patterns + llms = [ + MockLLM("GPT-4", "realistic"), + MockLLM("Claude", "realistic"), + MockLLM("Gemini", "realistic"), + MockLLM("Hallucinator-1", "hallucinated"), + MockLLM("Hallucinator-2", "hallucinated"), + MockLLM("Mixed-Response-1", "mixed"), + MockLLM("Mixed-Response-2", "mixed"), + MockLLM("Realistic-4", "realistic"), + ] + + print("šŸ” Semantic Clustering Consensus for Hallucination Detection") + print("=" * 70) + print(f"Prompt: {example_prompt.strip()}") + print("\nšŸ“Š Individual LLM Predictions:") + print("-" * 50) + + # Collect predictions + predictions: List[Prediction] = [] + for llm in llms: + prediction = await llm.predict(example_prompt) + predictions.append(prediction) + + # Truncate long responses for display + prediction_str = str(prediction.prediction) + display_text = prediction_str[:100] + "..." if len(prediction_str) > 100 else prediction_str + print(f"{llm.name:>15}: {display_text}") + print(f"{'':>15} Confidence: {prediction.confidence:.2f}") + print() + + # Test different clustering strategies + strategies = ["semantic", "robust"] + + for strategy in strategies: + print(f"\nšŸŽÆ {strategy.title()} Clustering Consensus:") + print("-" * 40) + + aggregator = ClusteringAggregator(embedding_model, strategy) + consensus_result = await aggregator.aggregate(predictions) + + # Truncate for display + result_str = str(consensus_result.prediction) + display_text = result_str[:150] + "..." if len(result_str) > 150 else result_str + print(f"Strategy: {strategy.title()} Clustering") + print(f"Result: {display_text}") + print(f"Confidence: {consensus_result.confidence:.2f}") + print(f"Agent ID: {consensus_result.agent_id}") + + # Demonstrate direct use of advanced strategies + print(f"\nšŸ”¬ Direct Strategy Comparison:") + print("-" * 40) + + from flare_ai_kit.consensus.aggregator.advanced_strategies import ( + semantic_clustering_strategy, + shapley_value_strategy, + entropy_based_strategy + ) + + # Test semantic clustering directly + semantic_result = semantic_clustering_strategy(predictions, embedding_model) + print(f"Semantic Clustering: {str(semantic_result.prediction)[:100]}...") + print(f"Confidence: {semantic_result.confidence:.2f}") + + # Test Shapley value strategy + shapley_result = shapley_value_strategy(predictions, embedding_model) + print(f"Shapley Value: {str(shapley_result.prediction)[:100]}...") + print(f"Confidence: {shapley_result.confidence:.2f}") + + # Test entropy-based strategy + entropy_result = entropy_based_strategy(predictions, embedding_model) + print(f"Entropy-Based: {str(entropy_result.prediction)[:100]}...") + print(f"Confidence: {entropy_result.confidence:.2f}") + + print("\nšŸ“ˆ Analysis:") + print("-" * 20) + print("• Realistic responses should cluster together") + print("• Hallucinated responses should be identified as outliers") + print("• Mixed responses may fall in between") + print("• The dominant cluster should contain the most reliable responses") + print("\nšŸ”§ Advanced Strategy Features:") + print("-" * 30) + print("• Semantic Clustering: Groups similar responses using embeddings") + print("• Shapley Values: Quantifies each agent's marginal contribution") + print("• Entropy Analysis: Measures predictive uncertainty") + print("• Robust Consensus: Combines multiple strategies for reliability") + print("\nšŸŽÆ Hallucination Detection:") + print("-" * 25) + print("• Outlier clusters are filtered out") + print("• Low similarity responses are downweighted") + print("• High entropy indicates uncertain predictions") + print("• Multiple strategies provide consensus validation") + + +if __name__ == "__main__": + asyncio.run(clustering_consensus_example()) + diff --git a/examples/majority_consensus.py b/examples/majority_consensus.py new file mode 100644 index 00000000..b3b4464a --- /dev/null +++ b/examples/majority_consensus.py @@ -0,0 +1,117 @@ +"""Example of using the consensus engine to achieve consensus among multiple AI agents. + +Here every AI model is a mock LLM, designed to simulate what responses an LLM could give (based on an OpenRouter approach). In this case there is one prompt and different consensus strategies are applied to the output to get a final answer. +""" + +import asyncio +from typing import List, Callable +from dataclasses import dataclass +from collections import Counter + +from flare_ai_kit.common import Prediction +from flare_ai_kit.consensus.aggregator import BaseAggregator, majority_vote + + +@dataclass +class MockLLM: + """Mock LLM class for demonstration purposes. This simulates responses that could hypothetically be given by an LLM. + In practice when using custom tarined models, the responses follow a similar sturcture. The confidence can be obtained by looking at the probability distributions outputted by the model. + These confidence scores are not highly applicable to the LLM openrouter based approach, however they are included in this example for completeness + The bias parameter is used to simulate different LLMs with different biases. This is done for testing purposes. + To test the aggregator with your actual LLMs, replace this class with an implementation that calls different LLMs from OpenRouter. + """ + name: str + bias: str = "neutral" # Can be "switch", "keep", "neutral" + + async def predict(self, prompt: str) -> Prediction: + import random + if self.bias == "switch": + response = "" + confidence = 0.9 + elif self.bias == "keep": + response = "" + confidence = 0.8 + else: + responses = ["", "", ""] + response = random.choice(responses) + confidence = random.uniform(0.6, 0.9) + return Prediction( + agent_id=self.name, + prediction=response, + confidence=confidence + ) + + +def majority_vote_strategy(predictions: List[Prediction]) -> Prediction: + result = majority_vote(predictions) + avg_confidence = sum(p.confidence for p in predictions) / len(predictions) + return Prediction( + agent_id="consensus", + prediction=result, + confidence=avg_confidence + ) + + +class ConsensusAggregator(BaseAggregator): + def __init__(self, strategy: Callable[[List[Prediction]], Prediction]) -> None: + super().__init__(strategy) + async def aggregate(self, predictions: List[Prediction]) -> Prediction: + return self.strategy(predictions) + + + + +# The following prompt is a twist on the classic Monty Hall problem. LLMs have to be careful. Try it out! +example_prompt = """ +Imagine you're on a game show, and there are three doors in front of you. Behind one door is a car, and behind the other two doors are goats. You don't know what's behind any of the doors. You get to choose one door. Let's say you pick Door #1. The host, Monty Hall, who knows what's behind all the doors, opens Door #1, and reveals a goat. Now, you have two doors left: Door #3 and Door #2. You pick Door #3. Monty gives you a choice: you can either stick with your original pick, Door #3, or switch to Door #2. +What do you do to maximize your chances of winning the car? + +Answer with one of these three tokens: + +Pick if you should switch to Door #2 + +Pick if you should stick with your original pick, Door #3. + +Pick if neither option gives an advantage. + + Simply answer with the token, no other text. +""" + + +async def majority_vote_example(): + # Create a list of LLMs with different biases. Mock set. + llms = [ + MockLLM("GPT-4", "switch"), # GPT-4 is biased to switch + MockLLM("Claude", "keep"), # Claude is biased to keep + MockLLM("Gemini", "neutral"), # Gemini is neutral + MockLLM("Llama", "switch"), # Llama is biased to switch + MockLLM("Mistral", "neutral"), # Mistral is neutral + ] + aggregator = ConsensusAggregator(majority_vote_strategy) + print("šŸ¤– Monty Hall Problem Consensus Example") + print("=" * 50) + print(f"Prompt: {example_prompt.strip()}") + print("\nšŸ“Š Individual LLM Predictions:") + print("-" * 30) + predictions: List[Prediction] = [] + for llm in llms: + prediction = await llm.predict(example_prompt) + predictions.append(prediction) + print(f"{llm.name:>10}: {prediction.prediction} (confidence: {prediction.confidence:.2f})") + consensus_result = await aggregator.aggregate(predictions) + print("\nšŸŽÆ Consensus Result:") + print("-" * 30) + print(f"Strategy: Majority Vote") + print(f"Result: {consensus_result.prediction}") + print(f"Confidence: {consensus_result.confidence:.2f}") + vote_counts = Counter(p.prediction for p in predictions) + print(f"\nšŸ“ˆ Vote Breakdown:") + for vote, count in vote_counts.most_common(): + print(f" {vote}: {count} votes") + + + + + +if __name__ == "__main__": + asyncio.run(majority_vote_example()) \ No newline at end of file diff --git a/examples/tournament_consensus.py b/examples/tournament_consensus.py new file mode 100644 index 00000000..039fb855 --- /dev/null +++ b/examples/tournament_consensus.py @@ -0,0 +1,167 @@ +"""Example of using the tournament elimination consensus strategy. + +This example demonstrates how to use the tournament elimination strategy with meta-agent +arbitration and chain-of-thought reasoning. The strategy pits predictions against each +other in elimination rounds, with a meta-agent evaluating each match. +""" + +import asyncio +from typing import List +from dataclasses import dataclass + +from flare_ai_kit.common import Prediction +from flare_ai_kit.consensus.aggregator import BaseAggregator +from flare_ai_kit.consensus.aggregator.tournament_strategies import async_tournament_elimination + + +@dataclass +class MockLLM: + """Mock LLM class for demonstration purposes.""" + name: str + bias: str = "neutral" # Can be "switch", "keep", "neutral" + + async def predict(self, prompt: str) -> Prediction: + import random + if self.bias == "switch": + response = "" + confidence = 0.9 + elif self.bias == "keep": + response = "" + confidence = 0.8 + else: + responses = ["", "", ""] + response = random.choice(responses) + confidence = random.uniform(0.6, 0.9) + return Prediction( + agent_id=self.name, + prediction=response, + confidence=confidence + ) + + +class TournamentAggregator(BaseAggregator): + """Tournament-based consensus aggregator.""" + + def __init__(self) -> None: + # Initialize with a dummy strategy since we override aggregate + super().__init__(strategy=lambda predictions: Prediction("dummy", "dummy", 0.0)) + + async def aggregate(self, predictions: List[Prediction], prompt: str = "") -> Prediction: + """Aggregate predictions using tournament elimination strategy.""" + result = await async_tournament_elimination(predictions, prompt) + avg_confidence = sum(p.confidence for p in predictions) / len(predictions) + return Prediction( + agent_id="tournament_consensus", + prediction=result, + confidence=avg_confidence + ) + + +# The same twisted Monty Hall problem prompt from the original example +example_prompt = """ +Imagine you're on a game show, and there are three doors in front of you. Behind one door is a car, and behind the other two doors are goats. You don't know what's behind any of the doors. You get to choose one door. Let's say you pick Door #1. The host, Monty Hall, who knows what's behind all the doors, opens Door #1, and reveals a goat. Now, you have two doors left: Door #3 and Door #2. You pick Door #3. Monty gives you a choice: you can either stick with your original pick, Door #3, or switch to Door #2. +What do you do to maximize your chances of winning the car? + +Answer with one of these three tokens: + +Pick if you should switch to Door #2 + +Pick if you should stick with your original pick, Door #3. + +Pick if neither option gives an advantage. + + Simply answer with the token, no other text. +""" + + +async def tournament_example(): + """Demonstrate tournament elimination consensus strategy.""" + + # Create a diverse set of LLMs with different biases + llms = [ + MockLLM("GPT-4", "switch"), # GPT-4 is biased to switch + MockLLM("Claude", "keep"), # Claude is biased to keep + MockLLM("Gemini", "neutral"), # Gemini is neutral + MockLLM("Llama", "switch"), # Llama is biased to switch + MockLLM("Mistral", "neutral"), # Mistral is neutral + MockLLM("PaLM", "keep"), # PaLM is biased to keep + MockLLM("Bard", "switch"), # Bard is biased to switch + MockLLM("Anthropic", "neutral"), # Anthropic is neutral + ] + + aggregator = TournamentAggregator() + + print("šŸ† Tournament Elimination Consensus Example") + print("=" * 60) + print(f"Prompt: {example_prompt.strip()}") + print(f"\nšŸŽÆ Tournament Participants: {len(llms)} LLMs") + print("-" * 40) + + # Get predictions from all LLMs + predictions: List[Prediction] = [] + for llm in llms: + prediction = await llm.predict(example_prompt) + predictions.append(prediction) + print(f"{llm.name:>12}: {prediction.prediction} (confidence: {prediction.confidence:.2f})") + + print("\nāš”ļø Tournament Bracket:") + print("-" * 40) + + # Simulate tournament rounds + current_round = predictions.copy() + round_num = 1 + + while len(current_round) > 1: + print(f"\nšŸ Round {round_num}:") + matches: List[str] = [] + + # Create matches for this round + for i in range(0, len(current_round), 2): + if i + 1 < len(current_round): + match: str = f"{current_round[i].agent_id} vs {current_round[i + 1].agent_id}" + matches.append(match) + print(f" Match {len(matches)}: {match}") + else: + # Bye for odd participant + print(f" Bye: {current_round[i].agent_id}") + + # Simulate winners (in practice, this would be determined by meta-agent) + winners: List[Prediction] = [] + for i in range(0, len(current_round), 2): + if i + 1 < len(current_round): + # Simulate winner selection (in practice, meta-agent would decide) + winner: Prediction = current_round[i] if current_round[i].confidence > current_round[i + 1].confidence else current_round[i + 1] + winners.append(winner) + else: + winners.append(current_round[i]) + + current_round = winners + round_num += 1 + + # Get final consensus result + consensus_result = await aggregator.aggregate(predictions, example_prompt) + + print(f"\nšŸ† Tournament Champion:") + print("-" * 40) + print(f"Winner: {current_round[0].agent_id}") + print(f"Final Answer: {consensus_result.prediction}") + print(f"Average Confidence: {consensus_result.confidence:.2f}") + + # Show vote distribution + from collections import Counter + vote_counts = Counter(p.prediction for p in predictions) + print(f"\nšŸ“Š Initial Vote Distribution:") + for vote, count in vote_counts.most_common(): + print(f" {vote}: {count} votes") + + print(f"\nšŸ’” Tournament Strategy Benefits:") + print("-" * 40) + print("• Eliminates weak reasoning through head-to-head matches") + print("• Meta-agent provides detailed justifications for each decision") + print("• Chain-of-thought reasoning ensures logical consistency") + print("• Penalizes inconsistencies and rewards coherent arguments") + print("• Scales well with larger numbers of participants") + + +if __name__ == "__main__": + asyncio.run(tournament_example()) \ No newline at end of file diff --git a/src/flare_ai_kit/consensus/aggregator/advanced_strategies.py b/src/flare_ai_kit/consensus/aggregator/advanced_strategies.py new file mode 100644 index 00000000..f45a75b1 --- /dev/null +++ b/src/flare_ai_kit/consensus/aggregator/advanced_strategies.py @@ -0,0 +1,369 @@ +"""Advanced consensus strategies for detecting hallucinations and improving robustness.""" + +import numpy as np +from typing import List, Dict, Any, Union +from dataclasses import dataclass +from sklearn.cluster import DBSCAN, KMeans +from sklearn.metrics.pairwise import cosine_similarity # type: ignore +from sklearn.preprocessing import StandardScaler +import logging + +from flare_ai_kit.common import Prediction +from flare_ai_kit.rag.vector.embedding.base import BaseEmbedding + +logger = logging.getLogger(__name__) + + +@dataclass +class ClusterResult: + """Result of semantic clustering analysis.""" + + dominant_cluster: List[Prediction] + outlier_clusters: List[List[Prediction]] + cluster_labels: List[int] + similarity_matrix: np.ndarray[Any, np.dtype[np.float64]] + centroid_embeddings: Dict[int, np.ndarray[Any, np.dtype[np.float64]]] + + +def semantic_clustering_strategy( + predictions: List[Prediction], + embedding_model: BaseEmbedding, + clustering_method: str = "dbscan", + similarity_threshold: float = 0.7, + min_cluster_size: int = 2, + **clustering_kwargs: Any +) -> Prediction: + """ + Semantic clustering strategy for consensus with hallucination detection. + + Args: + predictions: List of predictions to cluster + embedding_model: Embedding model for generating text embeddings + clustering_method: Either "dbscan" or "kmeans" + similarity_threshold: Minimum cosine similarity for cluster membership + min_cluster_size: Minimum size for a cluster to be considered valid + **clustering_kwargs: Additional arguments for clustering algorithm + + Returns: + Consensus prediction from the dominant cluster + """ + if len(predictions) < 2: + return predictions[0] if predictions else Prediction("consensus", "", 0.0) + + # Generate embeddings for all predictions + texts = [str(p.prediction) for p in predictions] + embeddings = embedding_model.embed_content(texts) + embeddings_array = np.array(embeddings, dtype=np.float64) + + # Normalize embeddings for better clustering + scaler = StandardScaler() + embeddings_normalized = scaler.fit_transform(embeddings_array).astype(np.float64) # type: ignore + + # Perform clustering + if clustering_method.lower() == "dbscan": + clustering = DBSCAN( + eps=1 - similarity_threshold, + min_samples=min_cluster_size, + metric='cosine', + **clustering_kwargs + ) + elif clustering_method.lower() == "kmeans": + n_clusters = min(len(predictions) // 2, 3) # Reasonable number of clusters + clustering = KMeans(n_clusters=n_clusters, **clustering_kwargs) + else: + raise ValueError(f"Unsupported clustering method: {clustering_method}") + + cluster_labels: np.ndarray[Any, np.dtype[np.int64]] = clustering.fit_predict(embeddings_normalized) # type: ignore + + # Group predictions by cluster + clusters: Dict[int, List[Prediction]] = {} + for i, label in enumerate(cluster_labels): + label_int = int(label) + if label_int not in clusters: + clusters[label_int] = [] + clusters[label_int].append(predictions[i]) + + # Find dominant cluster (largest cluster) + dominant_cluster_label = max(clusters.keys(), key=lambda k: len(clusters[k])) + dominant_cluster = clusters[dominant_cluster_label] + + # Select best prediction from dominant cluster (highest confidence) + best_prediction = max(dominant_cluster, key=lambda p: p.confidence) + + # Calculate consensus confidence based on cluster stability + dominant_embeddings = embeddings_array[cluster_labels == dominant_cluster_label] + cluster_similarities: np.ndarray[Any, np.dtype[np.float64]] = cosine_similarity(dominant_embeddings) # type: ignore + avg_similarity = float(np.mean(cluster_similarities[np.triu_indices_from(cluster_similarities, k=1)])) + + # Adjust confidence based on cluster quality + adjusted_confidence = min(best_prediction.confidence * avg_similarity, 1.0) + + return Prediction( + agent_id="semantic_consensus", + prediction=best_prediction.prediction, + confidence=adjusted_confidence + ) + + +def shapley_value_strategy( + predictions: List[Prediction], + embedding_model: BaseEmbedding, + n_samples: int = 100 +) -> Prediction: + """ + Shapley value-inspired strategy for quantifying each agent's marginal contribution. + + Args: + predictions: List of predictions to evaluate + embedding_model: Embedding model for similarity calculations + n_samples: Number of random samples for Monte Carlo approximation + + Returns: + Consensus prediction with Shapley-weighted confidence + """ + if len(predictions) < 2: + return predictions[0] if predictions else Prediction("consensus", "", 0.0) + + # Generate embeddings + texts = [str(p.prediction) for p in predictions] + embeddings = embedding_model.embed_content(texts) + embeddings_array = np.array(embeddings, dtype=np.float64) + + # Calculate pairwise similarities + similarity_matrix: np.ndarray[Any, np.dtype[np.float64]] = cosine_similarity(embeddings_array) + + # Monte Carlo approximation of Shapley values + shapley_values: np.ndarray[Any, np.dtype[np.float64]] = np.zeros(len(predictions), dtype=np.float64) + + for _ in range(n_samples): + # Random permutation of agents + permutation: np.ndarray[Any, np.dtype[np.int64]] = np.random.permutation(len(predictions)) + + # Calculate marginal contributions + current_set: set[int] = set() + for i, agent_idx in enumerate(permutation): + current_set.add(int(agent_idx)) + + # Calculate utility of current set + if len(current_set) == 1: + utility = 1.0 # Base utility + else: + # Calculate average similarity within the set + set_indices = list(current_set) + set_similarities = similarity_matrix[np.ix_(set_indices, set_indices)] + utility = float(np.mean(set_similarities[np.triu_indices_from(set_similarities, k=1)])) + + # Calculate marginal contribution + if i == 0: + marginal_contribution = utility + else: + # Calculate utility without this agent + prev_set = current_set - {int(agent_idx)} + if len(prev_set) == 0: + prev_utility = 0.0 + else: + prev_indices = list(prev_set) + prev_similarities = similarity_matrix[np.ix_(prev_indices, prev_indices)] + if len(prev_indices) == 1: + prev_utility = 1.0 + else: + prev_utility = float(np.mean(prev_similarities[np.triu_indices_from(prev_similarities, k=1)])) + + marginal_contribution = utility - prev_utility + + shapley_values[int(agent_idx)] += marginal_contribution + + # Normalize Shapley values + shapley_values /= n_samples + + # Weight predictions by Shapley values + total_weight = float(np.sum(shapley_values)) + if total_weight == 0: + # Fallback to equal weighting + weights: np.ndarray[Any, np.dtype[np.float64]] = np.ones(len(predictions), dtype=np.float64) / len(predictions) + else: + weights = shapley_values / total_weight + + # For string predictions, use weighted voting + if isinstance(predictions[0].prediction, str): + vote_counts: Dict[str, float] = {} + for pred, weight in zip(predictions, weights): + pred_str = str(pred.prediction) + vote_counts[pred_str] = vote_counts.get(pred_str, 0.0) + float(weight) + + consensus_prediction = max(vote_counts.items(), key=lambda x: x[1])[0] + print(f"Consensus prediction: {consensus_prediction}") + else: + # For numerical predictions, use weighted average + consensus_prediction = sum( + float(p.prediction) * float(weight) for p, weight in zip(predictions, weights) + ) + print(f"Consensus prediction: {consensus_prediction}") + + # Calculate weighted confidence + weighted_confidence = sum(p.confidence * float(weight) for p, weight in zip(predictions, weights)) + + return Prediction( + agent_id="shapley_consensus", + prediction=consensus_prediction, + confidence=weighted_confidence + ) + + +def entropy_based_strategy( + predictions: List[Prediction], + embedding_model: BaseEmbedding, + entropy_threshold: float = 0.5 +) -> Prediction: + """ + Entropy-based strategy for measuring predictive uncertainty. + + Args: + predictions: List of predictions to evaluate + embedding_model: Embedding model for similarity calculations + entropy_threshold: Threshold for considering predictions uncertain + + Returns: + Consensus prediction with entropy-adjusted confidence + """ + if len(predictions) < 2: + return predictions[0] if predictions else Prediction("consensus", "", 0.0) + + # Generate embeddings + texts = [str(p.prediction) for p in predictions] + embeddings = embedding_model.embed_content(texts) + embeddings_array = np.array(embeddings, dtype=np.float64) + + # Calculate pairwise similarities + similarity_matrix: np.ndarray[Any, np.dtype[np.float64]] = cosine_similarity(embeddings_array) + + # Calculate entropy of the similarity distribution + similarities: np.ndarray[Any, np.dtype[np.float64]] = similarity_matrix[np.triu_indices_from(similarity_matrix, k=1)] + if len(similarities) > 0: + # Normalize similarities to probabilities + similarities = np.clip(similarities, 0, 1) + similarities = similarities / np.sum(similarities) + + # Calculate entropy + entropy = float(-np.sum(similarities * np.log(similarities + 1e-10))) + max_entropy = float(np.log(len(similarities))) + normalized_entropy = entropy / max_entropy if max_entropy > 0 else 0.0 + else: + normalized_entropy = 0.0 + + # Select prediction based on entropy + if normalized_entropy > entropy_threshold: + # High entropy: use most confident prediction + best_prediction = max(predictions, key=lambda p: p.confidence) + # Reduce confidence due to high uncertainty + adjusted_confidence = best_prediction.confidence * (1 - normalized_entropy) + else: + # Low entropy: use similarity-weighted consensus + weights: np.ndarray[Any, np.dtype[np.float64]] = np.mean(similarity_matrix, axis=1) + weights = weights / np.sum(weights) + + if isinstance(predictions[0].prediction, str): + vote_counts: Dict[str, float] = {} + for pred, weight in zip(predictions, weights): + pred_str = str(pred.prediction) + vote_counts[pred_str] = vote_counts.get(pred_str, 0.0) + float(weight) + + consensus_prediction = max(vote_counts.items(), key=lambda x: x[1])[0] + print(f"Consensus prediction: {consensus_prediction}") + else: + consensus_prediction = sum( + float(p.prediction) * float(weight) for p, weight in zip(predictions, weights) + ) + print(f"Consensus prediction: {consensus_prediction}") + best_prediction = predictions[np.argmax(weights)] + adjusted_confidence = best_prediction.confidence * (1 - normalized_entropy) + + return Prediction( + agent_id="entropy_consensus", + prediction=best_prediction.prediction, + confidence=adjusted_confidence + ) + + +def robust_consensus_strategy( + predictions: List[Prediction], + embedding_model: BaseEmbedding, + strategies: Union[List[str], None] = None +) -> Prediction: + """ + Robust consensus strategy that combines multiple approaches. + + Args: + predictions: List of predictions to evaluate + embedding_model: Embedding model for similarity calculations + strategies: List of strategies to combine ("semantic", "shapley", "entropy") + + Returns: + Robust consensus prediction + """ + if strategies is None: + strategies = ["semantic", "shapley", "entropy"] + + strategy_results: List[Prediction] = [] + + for strategy in strategies: + try: + if strategy == "semantic": + result = semantic_clustering_strategy(predictions, embedding_model) + elif strategy == "shapley": + result = shapley_value_strategy(predictions, embedding_model) + elif strategy == "entropy": + result = entropy_based_strategy(predictions, embedding_model) + else: + logger.warning(f"Unknown strategy: {strategy}") + continue + + strategy_results.append(result) + except Exception as e: + logger.warning(f"Strategy {strategy} failed: {e}") + continue + + if not strategy_results: + # Fallback to simple majority + if isinstance(predictions[0].prediction, str): + vote_counts: Dict[str, int] = {} + for p in predictions: + pred_str = str(p.prediction) + vote_counts[pred_str] = vote_counts.get(pred_str, 0) + 1 + consensus_prediction = max(vote_counts.items(), key=lambda x: x[1])[0] + else: + consensus_prediction = sum(float(p.prediction) for p in predictions) / len(predictions) + + avg_confidence = sum(p.confidence for p in predictions) / len(predictions) + return Prediction("robust_consensus", consensus_prediction, avg_confidence) + + # Combine strategy results using weighted average + weights = [r.confidence for r in strategy_results] + total_weight = sum(weights) + + if total_weight == 0: + # Equal weighting + weights = [1.0] * len(strategy_results) + total_weight = len(strategy_results) + + weights = [w / total_weight for w in weights] + + if isinstance(strategy_results[0].prediction, str): + strategy_vote_counts: Dict[str, float] = {} + for result, weight in zip(strategy_results, weights): + pred_str = str(result.prediction) + strategy_vote_counts[pred_str] = strategy_vote_counts.get(pred_str, 0.0) + weight + + consensus_prediction = max(strategy_vote_counts.items(), key=lambda x: x[1])[0] + else: + consensus_prediction = sum( + float(r.prediction) * weight for r, weight in zip(strategy_results, weights) + ) + + weighted_confidence = sum(r.confidence * weight for r, weight in zip(strategy_results, weights)) + + return Prediction( + agent_id="robust_consensus", + prediction=consensus_prediction, + confidence=weighted_confidence + ) \ No newline at end of file diff --git a/src/flare_ai_kit/consensus/aggregator/base.py b/src/flare_ai_kit/consensus/aggregator/base.py index d245b2e9..b459de0b 100644 --- a/src/flare_ai_kit/consensus/aggregator/base.py +++ b/src/flare_ai_kit/consensus/aggregator/base.py @@ -5,7 +5,7 @@ from flare_ai_kit.common import Prediction - +# Refer to llm_consensus.py for an example of how to use this class. class BaseAggregator(ABC): """Base aggregator class.""" @@ -20,3 +20,4 @@ def __init__( async def aggregate(self, predictions: list[Prediction]) -> Prediction: """Aggregate predictions using the specified strategy.""" return self.strategy(predictions) + diff --git a/src/flare_ai_kit/consensus/aggregator/strategies.py b/src/flare_ai_kit/consensus/aggregator/strategies.py index 4d3f02bb..0c9fc199 100644 --- a/src/flare_ai_kit/consensus/aggregator/strategies.py +++ b/src/flare_ai_kit/consensus/aggregator/strategies.py @@ -1,7 +1,6 @@ """Aggregation strategies for consensus predictions.""" from collections import Counter - from flare_ai_kit.common import Prediction @@ -24,3 +23,4 @@ def weighted_average(predictions: list[Prediction]) -> float: weighted_sum = sum(float(p.prediction) * p.confidence for p in predictions) return weighted_sum / total_weight + diff --git a/src/flare_ai_kit/consensus/aggregator/tournament_strategies.py b/src/flare_ai_kit/consensus/aggregator/tournament_strategies.py new file mode 100644 index 00000000..08d30932 --- /dev/null +++ b/src/flare_ai_kit/consensus/aggregator/tournament_strategies.py @@ -0,0 +1,178 @@ +"""Advanced Tournament Elimination strategies for consensus predictions.""" + +from typing import List, Optional +import random +import asyncio + +from flare_ai_kit.common import Prediction + + +class TournamentMatch: + """Represents a single match in the tournament elimination rounds.""" + + def __init__(self, prediction1: Prediction, prediction2: Prediction): + self.prediction1 = prediction1 + self.prediction2 = prediction2 + self.winner: Optional[Prediction] = None + self.justification: str = "" + self.meta_score: float = 0.0 + + def __str__(self) -> str: + return f"Match: {self.prediction1.agent_id} vs {self.prediction2.agent_id}" + + +class MetaAgent: + """Meta-agent that arbitrates between predictions and provides justifications.""" + + def __init__(self, name: str = "MetaArbitrator"): + self.name = name + + async def evaluate_match(self, match: TournamentMatch, prompt: str) -> TournamentMatch: + """ + Evaluate a match between two predictions using chain-of-thought reasoning. + + Args: + match: The tournament match to evaluate + prompt: The original prompt that generated the predictions + + Returns: + Updated match with winner, justification, and meta-score + """ + # Simulate meta-agent evaluation with chain-of-thought reasoning + # In practice, this would call an actual LLM for evaluation + + # Chain-of-thought reasoning simulation + # In practice, this would be used for detailed reasoning analysis + _reasoning_steps = [ + f"Analyzing {match.prediction1.agent_id}'s response: {match.prediction1.prediction}", + f"Analyzing {match.prediction2.agent_id}'s response: {match.prediction2.prediction}", + "Comparing confidence levels and reasoning quality", + "Evaluating consistency with the original prompt", + "Assessing logical coherence and completeness" + ] + + # Simulate meta-agent scoring + score1 = match.prediction1.confidence * random.uniform(0.8, 1.2) + score2 = match.prediction2.confidence * random.uniform(0.8, 1.2) + + # Add reasoning quality bonus (simulated) + if match.prediction1.prediction in ["", "", ""]: + score1 += 0.1 # Bonus for structured responses + if match.prediction2.prediction in ["", "", ""]: + score2 += 0.1 + + # Determine winner + if score1 > score2: + match.winner = match.prediction1 + match.justification = f"{match.prediction1.agent_id} wins with better reasoning and higher confidence" + match.meta_score = score1 + else: + match.winner = match.prediction2 + match.justification = f"{match.prediction2.agent_id} wins with better reasoning and higher confidence" + match.meta_score = score2 + + return match + + +def tournament_elimination(predictions: list[Prediction], prompt: str = "") -> str: + """ + Tournament elimination strategy with meta-agent arbitration. + + This strategy pits predictions against each other in elimination rounds, + using a meta-agent to evaluate and justify each match. The process includes + chain-of-thought reasoning and penalizes inconsistencies. + + Args: + predictions: List of predictions from different agents + prompt: The original prompt (used for context in meta-agent evaluation) + + Returns: + The winning prediction after all elimination rounds + """ + if not predictions: + raise ValueError("Cannot run tournament with empty predictions list") + + if len(predictions) == 1: + return str(predictions[0].prediction) + + # Create meta-agent + meta_agent = MetaAgent() + + # Convert to list for manipulation + current_round = predictions.copy() + + while len(current_round) > 1: + matches: List[TournamentMatch] = [] + + # Create matches for this round + for i in range(0, len(current_round), 2): + if i + 1 < len(current_round): + match: TournamentMatch = TournamentMatch(current_round[i], current_round[i + 1]) + matches.append(match) + else: + # Odd number of participants - bye for the last one + bye_match: TournamentMatch = TournamentMatch(current_round[i], current_round[i]) + matches.append(bye_match) + + # Run matches + winners: List[Prediction] = [] + for match in matches: + # Evaluate match with meta-agent + evaluated_match: TournamentMatch = asyncio.run(meta_agent.evaluate_match(match, prompt)) + if evaluated_match.winner: + winners.append(evaluated_match.winner) + + current_round = winners + + return str(current_round[0].prediction) + + +async def async_tournament_elimination(predictions: list[Prediction], prompt: str = "") -> str: + """ + Async version of tournament elimination strategy. + + Args: + predictions: List of predictions from different agents + prompt: The original prompt (used for context in meta-agent evaluation) + + Returns: + The winning prediction after all elimination rounds + """ + if not predictions: + raise ValueError("Cannot run tournament with empty predictions list") + + if len(predictions) == 1: + return str(predictions[0].prediction) + + # Create meta-agent + meta_agent = MetaAgent() + + # Convert to list for manipulation + current_round = predictions.copy() + + while len(current_round) > 1: + matches: List[TournamentMatch] = [] + + # Create matches for this round + for i in range(0, len(current_round), 2): + if i + 1 < len(current_round): + match: TournamentMatch = TournamentMatch(current_round[i], current_round[i + 1]) + matches.append(match) + else: + # Odd number of participants - bye for the last one + bye_match: TournamentMatch = TournamentMatch(current_round[i], current_round[i]) + matches.append(bye_match) + + # Run matches + winners: List[Prediction] = [] + for match in matches: + # Evaluate match with meta-agent + evaluated_match: TournamentMatch = await meta_agent.evaluate_match(match, prompt) + if evaluated_match.winner: + winners.append(evaluated_match.winner) + + current_round = winners + + return str(current_round[0].prediction) + +