Skip to content
This repository was archived by the owner on Jun 30, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5007b9c
CU-86999tnz7: Update usage of constants for train/test splitting
mart-r Jun 3, 2025
f5c8d02
CU-86999tnz7: Add alternative category names
mart-r Jun 3, 2025
1dc3f84
CU-86999tnz7: Fix hashes due to config changes
mart-r Jun 3, 2025
cb5a871
CU-86999tnz7: Allow (optionally) training addons (e.g MetaCAT) during…
mart-r Jun 3, 2025
710a9ae
CU-86999tnz7: Add optional change description when saving model
mart-r Jun 3, 2025
a2eba01
CU-86999tnz7: Add a test for description upon save
mart-r Jun 3, 2025
de22945
CU-86999tnz7: Fix typing during tests
mart-r Jun 3, 2025
3f02bda
CU-86999tnz7: Fix issues with extra labels, add relevant tests
mart-r Jun 4, 2025
6a5dc33
CU-86999tnz7: Ported DeID improvments
mart-r Jun 4, 2025
80a9d70
CU-86999tnz7: Add missing resource files
mart-r Jun 4, 2025
0d02fcc
CU-86999tnz7: Add preprocessors for UMLS and Snomed
mart-r Jun 4, 2025
44b806c
CU-86999tnz7: Add usage monitoring
mart-r Jun 4, 2025
1bab151
CU-86999tnz7: Use promise of a hash for usage monitoring
mart-r Jun 4, 2025
c62a978
CU-86999tnz7: Allow 15 minutes for tests within main workflow
mart-r Jun 4, 2025
3b758d9
CU-86999tnz7: Add README for release scripts
mart-r Jun 4, 2025
7007afe
CU-86999tnz7: Allowing conversion of beta namespaces to proper ones d…
mart-r Jun 5, 2025
d1cf03a
CU-86999tnz7: Allow clearing unpacked data when saving model pack
mart-r Jun 5, 2025
d47ef0a
CU-86999tnz7: Make sure model pack path refers to existing file/folder
mart-r Jun 5, 2025
052eb2b
CU-86999tnz7: Add base backwards compatibility stuff.
mart-r Jun 5, 2025
080eeec
CU-86999tnz7: Run model regression during workflow
mart-r Jun 5, 2025
ceb150f
CU-86999tnz7: Fix vocab data path during regression vocab test
mart-r Jun 5, 2025
3945ec0
CU-86999tnz7: Fix typo in script
mart-r Jun 5, 2025
b39fe5b
CU-86999tnz7: Add hash to custom names unless explicitly disabled
mart-r Jun 5, 2025
a561826
CU-86999tnz7: Add backwards compatibiltiy script
mart-r Jun 5, 2025
364f1ef
CU-86999tnz7: Run backwards compatibility as part of workflow
mart-r Jun 5, 2025
d488ab0
CU-86999tnz7: Avoid runtime warnings due to config namespaces
mart-r Jun 6, 2025
7154432
CU-86999tnz7: Add initial multiprocessing option
mart-r Jun 6, 2025
8ff7664
CU-86999tnz7: Add minor tests for batching
mart-r Jun 6, 2025
7a9d2c1
CU-86999tnz7: Allow text index to be a string. Add doc string to mult…
mart-r Jun 6, 2025
7acd2b9
CU-86999tnz7: Allow batching on a per-character basis
mart-r Jun 6, 2025
fcd56f4
CU-86999tnz7: Add a few tests for a per-character batching
mart-r Jun 6, 2025
2590b0e
CU-86999tnz7: Fix issue with resulting text indices for multiprocessing
mart-r Jun 6, 2025
ee326b1
CU-86999tnz7: Add minor multiprocessing test
mart-r Jun 6, 2025
cf77a72
CU-86999tnz7: Allow an extra 5 minutes for workflow /tests
mart-r Jun 6, 2025
aab5ef5
Merge branch 'main' into CU-86999tnz7-resync-with-v1
mart-r Jun 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ jobs:
uv run ruff check medcat2 --preview
- name: Test
run: |
timeout 10m uv run python -m unittest discover
timeout 20m uv run python -m unittest discover
- name: Model regression
run: |
uv run bash tests/backwards_compatibility/run_current.sh
- name: Backwards compatibility
run: |
uv run bash tests/backwards_compatibility/check_backwards_compatibility.sh
21 changes: 21 additions & 0 deletions .release/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Releases

The scripts within here are designed to help preparing for and dealing with releases.

The main idea is to use the `prepare_release.sh` script from within the root of the project and it will delegate either to `prepare_minor_release.sh` or `prepare_patch_release.sh` as necessary.
The workflow within the scripts is as follows:
- Create or check out release branch (`release/v<major>.<minor>`)
- Update version in `pyproject.toml`
- Create a tag based on the version
- Push both the branch as well as the tag to `origin`

The general usage for a minor release based on the `main` branch from within the **root of the project** is simply:
```
bash .release/prepare_release.sh <major>.<minor>.0
```
and the usage for a patch release (from within the **root of the project**) is in the format
```
bash .release/prepare_release.sh <major>.<minor>.<patch> <hash 1> <hash 2> ...
```
where `hash 1` and `hash 2` (and so on) refer to the commit hashes that need to be included / cherry-picked in the patch release.

228 changes: 223 additions & 5 deletions medcat2/cat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional, Union, Any, overload, Literal
from typing import Optional, Union, Any, overload, Literal, Iterable, Iterator
from typing import cast
import os
import json
from datetime import date
from concurrent.futures import ProcessPoolExecutor, as_completed, Future
import itertools

import shutil
import logging
Expand All @@ -23,6 +27,7 @@
from medcat2.components.addons.addons import AddonComponent
from medcat2.utils.legacy.identifier import is_legacy_model_pack
from medcat2.utils.defaults import AVOID_LEGACY_CONVERSION_ENVIRON
from medcat2.utils.usage_monitoring import UsageMonitor


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,6 +56,8 @@ def __init__(self,

self._trainer: Optional[Trainer] = None
self._pipeline = self._recreate_pipe(model_load_path)
self.usage_monitor = UsageMonitor(
self._get_hash, self.config.general.usage_monitor)

def _recreate_pipe(self, model_load_path: Optional[str] = None
) -> Pipeline:
Expand All @@ -75,7 +82,10 @@ def ignore_attrs(cls) -> list[str]:
]

def __call__(self, text: str) -> Optional[MutableDocument]:
return self._pipeline.get_doc(text)
doc = self._pipeline.get_doc(text)
if self.usage_monitor.should_monitor:
self.usage_monitor.log_inference(len(text), len(doc.final_ents))
return doc

def _ensure_not_training(self) -> None:
"""Method to ensure config is not set to train.
Expand Down Expand Up @@ -139,6 +149,188 @@ def get_entities(self,
return {}
return self._doc_to_out(doc, only_cui=only_cui)

def _mp_worker_func(
self,
texts_and_indices: list[tuple[str, str, bool]]
) -> list[tuple[str, str, Union[dict, Entities, OnlyCUIEntities]]]:
return [
(text, text_index, self.get_entities(text, only_cui=only_cui))
for text, text_index, only_cui in texts_and_indices]

def _generate_batches_by_char_length(
self,
text_iter: Union[Iterator[str], Iterator[tuple[str, str]]],
batch_size_chars: int,
only_cui: bool,
) -> Iterator[list[tuple[str, str, bool]]]:
docs: list[tuple[str, str, bool]] = []
char_count = 0
for i, _doc in enumerate(text_iter):
# NOTE: not sure why mypy is complaining here
doc = cast(
str, _doc[1] if isinstance(_doc, tuple) else _doc)
doc_index: str = _doc[0] if isinstance(_doc, tuple) else str(i)
clen = len(doc)
char_count += clen
if char_count > batch_size_chars:
yield docs
docs = []
char_count = clen
docs.append((doc_index, doc, only_cui))

if len(docs) > 0:
yield docs

def _generate_batches(
self,
text_iter: Union[Iterator[str], Iterator[tuple[str, str]]],
batch_size: int,
batch_size_chars: int,
only_cui: bool,
) -> Iterator[list[tuple[str, str, bool]]]:
if batch_size_chars < 1 and batch_size < 1:
raise ValueError("Either `batch_size` or `batch_size_chars` "
"must be greater than 0.")
if batch_size > 0 and batch_size_chars > 0:
raise ValueError(
"Cannot specify both `batch_size` and `batch_size_chars`. "
"Please use one of them.")
if batch_size_chars > 0:
return self._generate_batches_by_char_length(
text_iter, batch_size_chars, only_cui)
else:
return self._generate_simple_batches(
text_iter, batch_size, only_cui)

def _generate_simple_batches(
self,
text_iter: Union[Iterator[str], Iterator[tuple[str, str]]],
batch_size: int,
only_cui: bool,
) -> Iterator[list[tuple[str, str, bool]]]:
text_index = 0
while True:
# Take a small batch from the iterator
batch = list(itertools.islice(text_iter, batch_size))
if not batch:
break
# NOTE: typing is correct:
# - if str, then (str, int, bool)
# - if tuple, then (str, int, bool)
# but for some reason mypy complains
yield [
(text, str(text_index + i), only_cui) # type: ignore
if isinstance(text, str) else
(text[1], text[0], only_cui)
for i, text in enumerate(batch)
]
text_index += len(batch)

def _mp_one_batch_per_process(
self,
executor: ProcessPoolExecutor,
batch_iter: Iterator[list[tuple[str, str, bool]]],
external_processes: int
) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]:
futures: list[Future] = []
# submit batches, one for each external processes
for _ in range(external_processes):
try:
batch = next(batch_iter)
futures.append(
executor.submit(self._mp_worker_func, batch))
except StopIteration:
break
# Main process works on next batch while workers are busy
main_batch: Optional[list[tuple[str, str, bool]]]
try:
main_batch = next(batch_iter)
main_results = self._mp_worker_func(main_batch)

# Yield main process results immediately
for result in main_results:
yield result[1], result[2]

except StopIteration:
main_batch = None
# since the main process did around the same amount of work
# we would expect all subprocess to have finished by now
# so we're going to wait for them to finish, yield their results,
# and subsequently submit the next batch to keep them busy
for _ in range(external_processes):
# Wait for any future to complete
done_future = next(as_completed(futures))
futures.remove(done_future)

# Yield all results from this batch
for result in done_future.result():
yield result[1], result[2]

# Submit next batch to keep workers busy
try:
batch = next(batch_iter)
futures.append(
executor.submit(self._mp_worker_func, batch))
except StopIteration:
# NOTE: if there's nothing to batch, we've got nothing
# to submit in terms of new work to the workers,
# but we may still have some futures to wait for
pass

def get_entities_multi_texts(
self,
texts: Union[Iterable[str], Iterable[tuple[str, str]]],
only_cui: bool = False,
n_process: int = 1,
batch_size: int = -1,
batch_size_chars: int = 1_000_000,
) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]:
"""Get entities from multiple texts (potentially in parallel).

If `n_process` > 1, `n_process - 1` new processes will be created
and data will be processed on those as well as the main process in
parallel.

Args:
texts (Union[Iterable[str], Iterable[tuple[str, str]]]):
The input text. Either an iterable of raw text or one
with in the format of `(text_index, text)`.
only_cui (bool):
Whether to only return CUIs rather than other information
like start/end and annotated value. Defaults to False.
n_process (int):
Number of processes to use. Defaults to 1.
batch_size (int):
The number of texts to batch at a time. A batch of the
specified size will be given to each worker process.
Defaults to -1 and in this case the character count will
be used instead.
batch_size_chars (int):
The maximum number of characters to process in a batch.
Each process will be given batch of texts with a total
number of characters not exceeding this value. Defaults
to 1,000,000 characters. Set to -1 to disable.

Yields:
Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]:
The results in the format of (text_index, entities).
"""
text_iter = cast(
Union[Iterator[str], Iterator[tuple[str, str]]], iter(texts))
batch_iter = self._generate_batches(
text_iter, batch_size, batch_size_chars, only_cui)
if n_process == 1:
# just do in series
for batch in batch_iter:
for text_index, _, result in self._mp_worker_func(batch):
yield text_index, result
return

external_processes = n_process - 1
with ProcessPoolExecutor(max_workers=external_processes) as executor:
yield from self._mp_one_batch_per_process(
executor, batch_iter, external_processes)

def _get_entity(self, ent: MutableEntity,
doc_tokens: list[str],
cui: str) -> Entity:
Expand Down Expand Up @@ -253,6 +445,9 @@ def save_model_pack(
self, target_folder: str, pack_name: str = DEFAULT_PACK_NAME,
serialiser_type: Union[str, AvailableSerialisers] = 'dill',
make_archive: bool = True,
only_archive: bool = False,
add_hash_to_pack_name: bool = True,
change_description: Optional[str] = None,
) -> str:
"""Save model pack.

Expand All @@ -268,14 +463,22 @@ def save_model_pack(
The serialiser type. Defaults to 'dill'.
make_archive (bool):
Whether to make the arhive /.zip file. Defaults to True.
only_archive (bool):
Whether to clear the non-compressed folder. Defaults to False.
add_hash_to_pack_name (bool):
Whether to add the hash to the pack name. This is only relevant
if pack_name is specified. Defaults to True.
change_description (Optional[str]):
If provided, this the description will be added to the
model description. Defaults to None.

Returns:
str: The final model pack path.
"""
self.config.meta.mark_saved_now()
# figure out the location/folder of the saved files
hex_hash = self._versioning()
if pack_name == DEFAULT_PACK_NAME:
hex_hash = self._versioning(change_description)
if pack_name == DEFAULT_PACK_NAME or add_hash_to_pack_name:
pack_name = f"{pack_name}_{hex_hash}"
model_pack_path = os.path.join(target_folder, pack_name)
# ensure target folder and model pack folder exist
Expand All @@ -294,9 +497,16 @@ def save_model_pack(
if make_archive:
shutil.make_archive(model_pack_path, 'zip',
root_dir=model_pack_path)
if only_archive:
logger.info("Removing the non-archived model pack folder: %s",
model_pack_path)
shutil.rmtree(model_pack_path, ignore_errors=True)
# change the model pack path to the zip file so that we
# refer to an existing file
model_pack_path += ".zip"
return model_pack_path

def _versioning(self) -> str:
def _get_hash(self) -> str:
hasher = Hasher()
logger.debug("Hashing the CDB")
hasher.update(self.cdb.get_hash())
Expand All @@ -306,6 +516,14 @@ def _versioning(self) -> str:
type(component).__name__)
hasher.update(component.get_hash())
hex_hash = self.config.meta.hash = hasher.hexdigest()
return hex_hash

def _versioning(self, change_description: Optional[str]) -> str:
date_today = date.today().strftime("%d %B %Y")
if change_description is not None:
self.config.meta.description += (
f"\n[{date_today}] {change_description}")
hex_hash = self._get_hash()
history = self.config.meta.history
if not history or history[-1] != hex_hash:
history.append(hex_hash)
Expand Down
Loading