|
1 | 1 | import random |
2 | 2 | import re |
3 | 3 | import typing |
4 | | -from typing import Tuple |
| 4 | + |
| 5 | +import pydantic |
| 6 | + |
| 7 | +import data_curation.schemas.augmentation as schemas |
| 8 | +from utils import normalize as normalize |
| 9 | +from utils import path as path |
| 10 | + |
| 11 | +enhancements = path.load_loinc_enhancements() |
| 12 | + |
| 13 | +LOINC_ENHANCEMENTS = normalize.merge_enhancements(enhancements) |
5 | 14 |
|
6 | 15 |
|
7 | 16 | def scramble_word_order( |
@@ -67,7 +76,7 @@ def _word_deletion( |
67 | 76 | return delete_indices |
68 | 77 |
|
69 | 78 |
|
70 | | -def _get_word_detail_by_char_range(word_details: dict, char_idx: int) -> Tuple[int, dict]: |
| 79 | +def _get_word_detail_by_char_range(word_details: dict, char_idx: int) -> typing.Tuple[int, dict]: |
71 | 80 | for key, word_deets in word_details.items(): |
72 | 81 | if char_idx in range(int(word_deets["start"]), int(word_deets["end"])): |
73 | 82 | return int(key), word_deets |
@@ -212,3 +221,130 @@ def insert_loinc_related_names( |
212 | 221 | words.insert(idx_to_insert, name_to_insert) |
213 | 222 |
|
214 | 223 | return " ".join(words) |
| 224 | + |
| 225 | + |
| 226 | +@pydantic.validate_call |
| 227 | +def enhance_loinc_str( |
| 228 | + text: str, |
| 229 | + enhancement_type: schemas.EnhancementType, |
| 230 | + max_enhancements: int, |
| 231 | + min_enhancements: int = 1, |
| 232 | +) -> str: |
| 233 | + """ |
| 234 | + Enhances the input text by applying specified enhancement techniques. |
| 235 | + :param text: The input text to enhance. |
| 236 | + :param enhancement_type: The type of enhancement to apply. Options are: |
| 237 | + - "abbrv": Replace words with their abbrveviations. |
| 238 | + - "synonyms": Replace words with semantically related terms. |
| 239 | + - "all": Apply all of the above techniques. |
| 240 | + :param max_enhancements: The maximum number of enhancements to apply. |
| 241 | + :param min_enhancements: The minimum number of enhancements to apply. |
| 242 | + :return: The enhanced text. |
| 243 | + """ |
| 244 | + if max_enhancements <= min_enhancements: |
| 245 | + raise ValueError("max_enhancements must be greater than min_enhancements") |
| 246 | + |
| 247 | + words = [[word.lower().strip(), [i]] for i, word in enumerate(text.split())] |
| 248 | + # Check for possible enhancements |
| 249 | + possible_words_to_enhance = _check_for_enhancements(words) |
| 250 | + |
| 251 | + # Choose number of enhancements to apply |
| 252 | + # Look for substrings to enhance if there are no individual words to enhance |
| 253 | + if len(possible_words_to_enhance) < 1: |
| 254 | + words = _generate_substrings(words) |
| 255 | + possible_words_to_enhance = _check_for_enhancements(words) |
| 256 | + |
| 257 | + if not possible_words_to_enhance: |
| 258 | + return text |
| 259 | + |
| 260 | + # Determine number of enhancements to apply |
| 261 | + if len(possible_words_to_enhance) < min_enhancements: |
| 262 | + num_enhancements = len(possible_words_to_enhance) |
| 263 | + else: |
| 264 | + num_enhancements = random.randint( |
| 265 | + min_enhancements, min(max_enhancements, len(possible_words_to_enhance)) |
| 266 | + ) |
| 267 | + |
| 268 | + # Apply enhancements |
| 269 | + words = _apply_enhancements( |
| 270 | + words, possible_words_to_enhance, enhancement_type, num_enhancements |
| 271 | + ) |
| 272 | + |
| 273 | + return " ".join(w[0] for w in words) |
| 274 | + |
| 275 | + |
| 276 | +def _apply_enhancements( |
| 277 | + words: list[str, list[int]], |
| 278 | + possible_words_to_enhance: dict[int, str], |
| 279 | + enhancement_type: typing.Annotated[schemas.EnhancementType, pydantic.Field()], |
| 280 | + num_enhancements: int, |
| 281 | +) -> list[str, list[int]]: |
| 282 | + """ |
| 283 | +
|
| 284 | + :param words: The list of words in the input text with their indices. |
| 285 | + :param possible_words_to_enhance: A dictionary of words that can be enhanced. |
| 286 | + :param enhancement_type: The type of enhancement to apply. |
| 287 | + :param num_enhancements: The number of enhancements to apply. |
| 288 | + :return: A tuple containing the enhanced list of words and the number of enhancements made. |
| 289 | + """ |
| 290 | + |
| 291 | + # Apply enhancements |
| 292 | + for _ in range(num_enhancements): |
| 293 | + word_to_enhance = random.choice(list(possible_words_to_enhance.keys())) |
| 294 | + word_to_enhance_idx = possible_words_to_enhance.pop(word_to_enhance) |
| 295 | + |
| 296 | + possible_enhancements = LOINC_ENHANCEMENTS[word_to_enhance] |
| 297 | + if not possible_enhancements.get(enhancement_type) and enhancement_type != "all": |
| 298 | + continue |
| 299 | + |
| 300 | + if enhancement_type == "all": |
| 301 | + # Randomly choose between abbrveviation and synonyms & randomly pick an enhancement from the available options for the specified type |
| 302 | + enhancement_type = random.choice(["abbrv", "synonyms"]) |
| 303 | + # If there are no enhancements of the chosen type, switch to the other type |
| 304 | + if not possible_enhancements.get(enhancement_type): |
| 305 | + enhancement_type = "abbrv" if enhancement_type == "synonyms" else "synonyms" |
| 306 | + |
| 307 | + enhancement = random.choice(possible_enhancements[enhancement_type]) |
| 308 | + |
| 309 | + words[word_to_enhance_idx[0]][0] = enhancement |
| 310 | + |
| 311 | + return words |
| 312 | + |
| 313 | + |
| 314 | +def _check_for_enhancements(words: list[str, list[int]]) -> list[str, list[int]]: |
| 315 | + """ |
| 316 | + Checks the list of words for possible enhancements based on the LOINC_ENHANCEMENTS dictionary. |
| 317 | +
|
| 318 | + :param words: List of words to check for enhancements, including their indices. |
| 319 | + :return: A dictionary with indices of words that can be enhanced as keys and the words themselves as values. |
| 320 | + """ |
| 321 | + # Check that there are words to enhance |
| 322 | + possible_words_to_enhance = {} |
| 323 | + |
| 324 | + for word, idx in words: |
| 325 | + if word in LOINC_ENHANCEMENTS: |
| 326 | + # Only add if there are enhancements available |
| 327 | + if not LOINC_ENHANCEMENTS[word].get("abbrv") and not LOINC_ENHANCEMENTS[word].get( |
| 328 | + "synonyms" |
| 329 | + ): |
| 330 | + continue |
| 331 | + possible_words_to_enhance[word] = idx |
| 332 | + |
| 333 | + return possible_words_to_enhance |
| 334 | + |
| 335 | + |
| 336 | +def _generate_substrings(words: list[str, list[int]]) -> list[str, list[int]]: |
| 337 | + """ |
| 338 | + Generates all possible substrings of the input list of words with at least 2 words |
| 339 | + per substring. |
| 340 | +
|
| 341 | + :param words: List of words, including their indices, to generate substrings from. |
| 342 | + :return: List of substrings, including their indices. |
| 343 | + """ |
| 344 | + substrings = [] |
| 345 | + for start_idx in range(len(words)): |
| 346 | + for end_idx in range(start_idx + 2, len(words) + 1): # ensures at least 2 words |
| 347 | + substring = " ".join(word for word, _ in words[start_idx:end_idx]) |
| 348 | + substrings.append([substring, [start_idx, end_idx]]) |
| 349 | + |
| 350 | + return substrings |
0 commit comments