|
5 | 5 | import re |
6 | 6 |
|
7 | 7 | from connexion.context import context |
| 8 | +from psycopg2 import errors |
8 | 9 |
|
9 | 10 | from .. import models |
10 | 11 | from .. import schemas |
@@ -44,3 +45,183 @@ class ClassView(cls): |
44 | 45 | ClassView.__name__ = cls.__name__ |
45 | 46 |
|
46 | 47 | return ClassView |
| 48 | + |
| 49 | + |
| 50 | +def validate_search_query(query: str) -> bool: |
| 51 | + """ |
| 52 | + Validate a search query string. |
| 53 | +
|
| 54 | + Args: |
| 55 | + query (str): The query string to validate. |
| 56 | +
|
| 57 | + Returns: |
| 58 | + bool: True if the query is valid, False otherwise. |
| 59 | + """ |
| 60 | + # Check for valid parentheses |
| 61 | + if not validate_parentheses(query): |
| 62 | + raise errors.SyntaxError("Unmatched parentheses") |
| 63 | + |
| 64 | + # Check for valid query end |
| 65 | + if not validate_query_end(query): |
| 66 | + raise errors.SyntaxError("Query cannot end with an operator") |
| 67 | + |
| 68 | + return True |
| 69 | + |
| 70 | + |
| 71 | +def validate_parentheses(query: str) -> bool: |
| 72 | + """ |
| 73 | + Validate the parentheses in a query string. |
| 74 | +
|
| 75 | + Args: |
| 76 | + query (str): The query string to validate. |
| 77 | +
|
| 78 | + Returns: |
| 79 | + bool: True if parentheses are valid, False otherwise. |
| 80 | + """ |
| 81 | + stack = [] |
| 82 | + for char in query: |
| 83 | + if char == "(": |
| 84 | + stack.append(char) |
| 85 | + elif char == ")": |
| 86 | + if not stack: |
| 87 | + return False # Unmatched closing parenthesis |
| 88 | + stack.pop() |
| 89 | + return not stack # Ensure all opening parentheses are closed |
| 90 | + |
| 91 | + |
| 92 | +def validate_query_end(query: str) -> bool: |
| 93 | + """Query should not end with an operator""" |
| 94 | + operators = ("AND", "OR", "NOT") |
| 95 | + |
| 96 | + if query.strip().split(" ")[-1] in operators: |
| 97 | + return False |
| 98 | + return True |
| 99 | + |
| 100 | + |
| 101 | +def count_chars(target, query: str) -> int: |
| 102 | + """Count the number of chars in a query string. |
| 103 | + Excluding those in quoted phrases.""" |
| 104 | + count = 0 |
| 105 | + in_quotes = False |
| 106 | + for char in query: |
| 107 | + if char == '"': |
| 108 | + in_quotes = not in_quotes |
| 109 | + if char == target and not in_quotes: |
| 110 | + count += 1 |
| 111 | + return count |
| 112 | + |
| 113 | + |
| 114 | +def pubmed_to_tsquery(query: str) -> str: |
| 115 | + """ |
| 116 | + Convert a PubMed-like search query to PostgreSQL tsquery format, |
| 117 | + grouping both single-quoted and double-quoted text with the <-> operator |
| 118 | + for proximity search. |
| 119 | +
|
| 120 | + Additionally, automatically adds & between non-explicitly connected terms |
| 121 | + and handles NOT terms. |
| 122 | +
|
| 123 | + Args: |
| 124 | + query (str): The search query. |
| 125 | +
|
| 126 | + Returns: |
| 127 | + str: The PostgreSQL tsquery equivalent. |
| 128 | + """ |
| 129 | + |
| 130 | + query = query.upper() # Ensure uniformity |
| 131 | + |
| 132 | + # Step 1: Split into tokens (preserving quoted phrases) |
| 133 | + # Regex pattern: match quoted phrases or non-space sequences |
| 134 | + tokens = re.findall(r'"[^"]*"|\'[^\']*\'|\S+', query) |
| 135 | + |
| 136 | + # Step 2: Combine tokens in parantheses into single tokens |
| 137 | + def combine_parentheses(tokens: list) -> list: |
| 138 | + """ |
| 139 | + Combine tokens within parentheses into a single token. |
| 140 | +
|
| 141 | + Args: |
| 142 | + tokens (list): List of tokens to process. |
| 143 | +
|
| 144 | + Returns: |
| 145 | + list: Processed list with tokens inside parentheses combined. |
| 146 | + """ |
| 147 | + combined_tokens = [] |
| 148 | + buffer = [] |
| 149 | + paren_count = 0 |
| 150 | + for token in tokens: |
| 151 | + # If buffer is not empty, we are inside parentheses |
| 152 | + if len(buffer) > 0: |
| 153 | + buffer.append(token) |
| 154 | + |
| 155 | + # Adjust the count of parentheses |
| 156 | + paren_count += count_chars("(", token) - count_chars(")", token) |
| 157 | + |
| 158 | + if paren_count < 1: |
| 159 | + # Combine all tokens in parentheses |
| 160 | + combined_tokens.append(" ".join(buffer)) |
| 161 | + buffer = [] # Clear the buffer |
| 162 | + paren_count = 0 |
| 163 | + |
| 164 | + else: |
| 165 | + n_paren = count_chars("(", token) - count_chars(")", token) |
| 166 | + # If not in parentheses, but token contains opening parentheses |
| 167 | + # Start capturing tokens inside parentheses |
| 168 | + if token[0] == "(" and n_paren > 0: |
| 169 | + paren_count += n_paren |
| 170 | + buffer.append(token) # Start capturing tokens in parens |
| 171 | + print(buffer) |
| 172 | + else: |
| 173 | + combined_tokens.append(token) |
| 174 | + |
| 175 | + # If the list ends without a closing parenthesis (invalid input) |
| 176 | + # append buffer contents (fallback) |
| 177 | + if buffer: |
| 178 | + combined_tokens.append(" ".join(buffer)) |
| 179 | + |
| 180 | + return combined_tokens |
| 181 | + |
| 182 | + tokens = combine_parentheses(tokens) |
| 183 | + print(tokens) |
| 184 | + for i, token in enumerate(tokens): |
| 185 | + if token[0] == "(" and token[-1] == ")": |
| 186 | + # RECURSIVE: Process the contents of the parentheses |
| 187 | + token_res = pubmed_to_tsquery(token[1:-1]) |
| 188 | + token = "(" + token_res + ")" |
| 189 | + tokens[i] = token |
| 190 | + |
| 191 | + # Step 4: Handle both single-quoted and double-quoted phrases, |
| 192 | + # grouping them with <-> (proximity operator) |
| 193 | + elif token[0] in ('"', "'"): |
| 194 | + # Split quoted text into individual words and join with <-> for |
| 195 | + # proximity search |
| 196 | + words = re.findall(r"\w+", token) |
| 197 | + tokens[i] = "<->".join(words) |
| 198 | + |
| 199 | + # Step 3: Replace logical operators AND, OR, NOT |
| 200 | + else: |
| 201 | + if token == "AND": |
| 202 | + tokens[i] = "&" |
| 203 | + elif token == "OR": |
| 204 | + tokens[i] = "|" |
| 205 | + elif token == "NOT": |
| 206 | + tokens[i] = "&!" |
| 207 | + |
| 208 | + processed_tokens = [] |
| 209 | + last_token = None |
| 210 | + for token in tokens: |
| 211 | + # Step 5: Add & between consecutive terms that aren't already |
| 212 | + # connected by an operator |
| 213 | + stripped_token = token.strip() |
| 214 | + if stripped_token not in ("&", "|", "!", "&!"): |
| 215 | + stripped_token = re.sub(r"[\[\],;:!?@#]", "", stripped_token) |
| 216 | + if stripped_token == "": |
| 217 | + continue # Ignore empty tokens from splitting |
| 218 | + |
| 219 | + if last_token and last_token not in ("&", "|", "!", "&!"): |
| 220 | + if stripped_token not in ("&", "|", "!", "&!"): |
| 221 | + # Insert an implicit AND (&) between two non-operator tokens |
| 222 | + processed_tokens.append("&") |
| 223 | + |
| 224 | + processed_tokens.append(stripped_token) |
| 225 | + last_token = stripped_token |
| 226 | + |
| 227 | + return " ".join(processed_tokens) |
0 commit comments