|
| 1 | +import html |
| 2 | +from typing import Any, Dict, Literal |
| 3 | + |
| 4 | +from langchain.pydantic_v1 import BaseModel, Field, root_validator |
| 5 | + |
| 6 | + |
| 7 | +class StackExchangeAPIWrapper(BaseModel): |
| 8 | + """Wrapper for Stack Exchange API.""" |
| 9 | + |
| 10 | + client: Any #: :meta private: |
| 11 | + max_results: int = 3 |
| 12 | + """Max number of results to include in output.""" |
| 13 | + query_type: Literal["all", "title", "body"] = "all" |
| 14 | + """Which part of StackOverflows items to match against. One of 'all', 'title', |
| 15 | + 'body'. Defaults to 'all'. |
| 16 | + """ |
| 17 | + fetch_params: Dict[str, Any] = Field(default_factory=dict) |
| 18 | + """Additional params to pass to StackApi.fetch.""" |
| 19 | + result_separator: str = "\n\n" |
| 20 | + """Separator between question,answer pairs.""" |
| 21 | + |
| 22 | + @root_validator() |
| 23 | + def validate_environment(cls, values: Dict) -> Dict: |
| 24 | + """Validate that the required Python package exists.""" |
| 25 | + try: |
| 26 | + from stackapi import StackAPI |
| 27 | + |
| 28 | + values["client"] = StackAPI("stackoverflow") |
| 29 | + except ImportError: |
| 30 | + raise ImportError( |
| 31 | + "The 'stackapi' Python package is not installed. " |
| 32 | + "Please install it with `pip install stackapi`." |
| 33 | + ) |
| 34 | + return values |
| 35 | + |
| 36 | + def run(self, query: str) -> str: |
| 37 | + """Run query through StackExchange API and parse results.""" |
| 38 | + |
| 39 | + query_key = "q" if self.query_type == "all" else self.query_type |
| 40 | + output = self.client.fetch( |
| 41 | + "search/excerpts", **{query_key: query}, **self.fetch_params |
| 42 | + ) |
| 43 | + if len(output["items"]) < 1: |
| 44 | + return f"No relevant results found for '{query}' on Stack Overflow." |
| 45 | + questions = [ |
| 46 | + item for item in output["items"] if item["item_type"] == "question" |
| 47 | + ][: self.max_results] |
| 48 | + answers = [item for item in output["items"] if item["item_type"] == "answer"] |
| 49 | + results = [] |
| 50 | + for question in questions: |
| 51 | + res_text = f"Question: {question['title']}\n{question['excerpt']}" |
| 52 | + relevant_answers = [ |
| 53 | + answer |
| 54 | + for answer in answers |
| 55 | + if answer["question_id"] == question["question_id"] |
| 56 | + ] |
| 57 | + accepted_answers = [ |
| 58 | + answer for answer in relevant_answers if answer["is_accepted"] |
| 59 | + ] |
| 60 | + if relevant_answers: |
| 61 | + top_answer = ( |
| 62 | + accepted_answers[0] if accepted_answers else relevant_answers[0] |
| 63 | + ) |
| 64 | + excerpt = html.unescape(top_answer["excerpt"]) |
| 65 | + res_text += f"\nAnswer: {excerpt}" |
| 66 | + results.append(res_text) |
| 67 | + |
| 68 | + return self.result_separator.join(results) |
0 commit comments