Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions marker/builders/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,73 @@ class DocumentBuilder(BaseBuilder):
bool,
"Disable OCR processing.",
] = False
page_batch_size: Annotated[
int,
"Number of pages to process at once. 0 = all pages (default, fastest but most memory). "
">0 = process in batches, compressing images after each batch to dramatically reduce "
"RAM usage on large documents (97-page doc: ~13 GB -> ~200 MB).",
] = 0

def __call__(self, provider: PdfProvider, layout_builder: LayoutBuilder, line_builder: LineBuilder, ocr_builder: OcrBuilder):
document = self.build_document(provider)
if self.page_batch_size > 0:
return self._build_streaming(provider, layout_builder, line_builder, ocr_builder)
else:
return self._build_all_at_once(provider, layout_builder, line_builder, ocr_builder)

def _build_all_at_once(self, provider, layout_builder, line_builder, ocr_builder):
"""Original behavior: load all images, process all pages."""
document = self._create_document_structure(provider, load_images=True)
layout_builder(document, provider)
line_builder(document, provider)
if not self.disable_ocr:
ocr_builder(document, provider)
return document

def build_document(self, provider: PdfProvider):
def _build_streaming(self, provider, layout_builder, line_builder, ocr_builder):
"""Batch mode: create pages without images, load and process in batches."""
document = self._create_document_structure(provider, load_images=False)
batch_size = self.page_batch_size

for start in range(0, len(document.pages), batch_size):
end = min(start + batch_size, len(document.pages))
batch = document.pages[start:end]

# Load images for this batch only
ids = [p.page_id for p in batch]
lowres = provider.get_images(ids, self.lowres_image_dpi)
highres = provider.get_images(ids, self.highres_image_dpi)
for i, page in enumerate(batch):
page.lowres_image = lowres[i]
page.highres_image = highres[i]

# Builders iterate document.pages — temporarily scope to the batch
original_pages = document.pages
document.pages = batch
try:
layout_builder(document, provider)
line_builder(document, provider)
if not self.disable_ocr:
ocr_builder(document, provider)
finally:
document.pages = original_pages

# Compress images to bytes to free CPU RAM (~100x reduction)
for page in batch:
page.compress_images()

return document

def _create_document_structure(self, provider: PdfProvider, load_images: bool):
"""Create Document with page structure. If load_images=False, pages start with no images."""
PageGroupClass: PageGroup = get_block_class(BlockTypes.Page)
lowres_images = provider.get_images(provider.page_range, self.lowres_image_dpi)
highres_images = provider.get_images(provider.page_range, self.highres_image_dpi)

if load_images:
lowres_images = provider.get_images(provider.page_range, self.lowres_image_dpi)
highres_images = provider.get_images(provider.page_range, self.highres_image_dpi)
else:
lowres_images = [None] * len(provider.page_range)
highres_images = [None] * len(provider.page_range)

initial_pages = [
PageGroupClass(
page_id=p,
Expand Down
46 changes: 46 additions & 0 deletions marker/schema/groups/page.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict
from io import BytesIO
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np

Expand Down Expand Up @@ -57,6 +58,12 @@ def get_image(
):
image = self.highres_image if highres else self.lowres_image

# Auto-decompress if stored as bytes (memory-saving mode)
if isinstance(image, bytes):
image = Image.open(BytesIO(image))
# Do NOT cache — let downstream processors re-decompress on demand.
# This keeps peak memory at O(batch_size) instead of O(total_pages).

# Check if RGB, convert if needed
if isinstance(image, Image.Image) and image.mode != "RGB":
image = image.convert("RGB")
Expand All @@ -77,6 +84,45 @@ def get_image(

return image

def compress_images(self, quality: int = 85, fmt: str = "JPEG") -> None:
"""Convert PIL Image objects to compressed bytes, freeing CPU memory.

After OCR is complete, images are only needed occasionally (table extraction,
equation detection, debug output). Compressing them to JPEG bytes reduces
per-page memory from ~13 MB (raw pixels) to ~50-200 KB, a 100x savings.

``get_image()`` auto-decompresses on next access — transparent to callers.
"""
for attr in ("lowres_image", "highres_image"):
img = getattr(self, attr)
if isinstance(img, Image.Image):
buf = BytesIO()
save_kwargs = {"format": fmt, "quality": quality}
if fmt == "JPEG" and img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.save(buf, **save_kwargs)
setattr(self, attr, buf.getvalue())

def clear_images(self) -> None:
"""Release all image data from memory.

Call after all processing stages that need images are complete.
"""
self.lowres_image = None
self.highres_image = None

def images_size_bytes(self) -> int:
"""Return total bytes used by stored images (useful for diagnostics)."""
total = 0
for attr in ("lowres_image", "highres_image"):
img = getattr(self, attr)
if isinstance(img, bytes):
total += len(img)
elif isinstance(img, Image.Image):
# Rough estimate: width * height * 3 channels
total += img.size[0] * img.size[1] * 3
return total

@computed_field
@property
def current_children(self) -> List[Block]:
Expand Down
Loading