diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 4ab1de190b..d541d6a485 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -20,7 +20,7 @@ env: tests/test_asr_pipeline.py tests/test_threaded_pipeline.py PYTEST_TO_SKIP: |- - EXAMPLES_TO_SKIP: '^(batch_convert|compare_vlm_models|minimal|minimal_vlm_pipeline|minimal_asr_pipeline|export_multimodal|custom_convert|develop_picture_enrichment|rapidocr_with_custom_models|suryaocr_with_custom_models|offline_convert|pictures_description|pictures_description_api|vlm_pipeline_api_model|granitedocling_repetition_stopping|mlx_whisper_example|gpu_standard_pipeline|gpu_vlm_pipeline|demo_layout_vlm|post_process_ocr_with_vlm)\.py$|xbrl_conversion\.ipynb$' + EXAMPLES_TO_SKIP: '^(batch_convert|compare_vlm_models|minimal|minimal_vlm_pipeline|minimal_asr_pipeline|export_multimodal|custom_convert|develop_picture_enrichment|rapidocr_with_custom_models|suryaocr_with_custom_models|offline_convert|pictures_description|pictures_description_api|vlm_pipeline_api_model|granitedocling_repetition_stopping|mlx_whisper_example|gpu_standard_pipeline|gpu_vlm_pipeline|demo_layout_vlm|post_process_ocr_with_vlm|run_with_formats_html_rendered|run_with_formats_html_rendered_mp)\.py$|xbrl_conversion\.ipynb$' jobs: lint: diff --git a/docling/backend/html_backend.py b/docling/backend/html_backend.py index 7e7f014299..1bb51d4387 100644 --- a/docling/backend/html_backend.py +++ b/docling/backend/html_backend.py @@ -1,28 +1,39 @@ import base64 import logging +import math import os import re import warnings from contextlib import contextmanager from copy import deepcopy +from dataclasses import dataclass, field as dataclass_field from io import BytesIO from pathlib import Path -from typing import Final, Iterator, Optional, Union, cast +from typing import Any, Final, Iterator, Literal, Optional, Union, cast from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup, NavigableString, PageElement, Tag from bs4.element import PreformattedString from docling_core.types.doc import ( + BoundingBox, + CoordOrigin, DocItem, DocItemLabel, DoclingDocument, DocumentOrigin, + GraphCell, + GraphCellLabel, + GraphData, + GraphLink, + GraphLinkLabel, GroupItem, GroupLabel, PictureItem, + ProvenanceItem, RefItem, RichTableCell, + Size, TableCell, TableData, TableItem, @@ -123,6 +134,104 @@ **{k: {} for k in _CODE_TAG_SET}, } +_DATA_DOCLING_ID_ATTR: Final = "data-docling-id" +_FORM_CONTAINER_CLASS: Final = "form_region" +_FORM_KEY_ID_RE: Final = re.compile(r"^key(?P[A-Za-z0-9]+)$") +_FORM_MARKER_ID_RE: Final = re.compile(r"^key(?P[A-Za-z0-9]+)_marker$") +_FORM_VALUE_ID_RE: Final = re.compile( + r"^key(?P[A-Za-z0-9]+)_value(?P[A-Za-z0-9]+)$" +) +_CUSTOM_CHECKBOX_CLASSES: Final = {"checkbox", "checkbox-box", "checkbox-input"} +_CHECKBOX_MARK_TEXTS: Final = {"x", "✓", "✔", "☑"} +_CHECKBOX_CONTAINER_CLASSES: Final = { + "checkbox-container", + "checkbox-item", + "checkbox-option", + "option", +} +_INLINE_HTML_TAGS: Final = { + "a", + "abbr", + "b", + "bdi", + "bdo", + "cite", + "code", + "data", + "dfn", + "em", + "i", + "kbd", + "label", + "mark", + "q", + "s", + "samp", + "small", + "span", + "strong", + "sub", + "sup", + "u", + "var", +} + + +@dataclass(frozen=True) +class _RenderedBBox: + page_no: int + bbox: BoundingBox + + +@dataclass +class _ExtractedFormValue: + tag: Tag + order: int + orig: str + text: str + prov: Optional[ProvenanceItem] + kind: Literal["read_only", "fillable"] = "read_only" + checkbox_label: Optional[DocItemLabel] = None + consumed_label_tag_obj_ids: set[int] = dataclass_field(default_factory=set) + checkbox_label_tags: list[Tag] = dataclass_field(default_factory=list) + + +@dataclass +class _ExtractedFormMarker: + tag: Tag + order: int + orig: str + text: str + prov: Optional[ProvenanceItem] + + +@dataclass +class _ExtractedFormText: + tag: Tag + order: int + orig: str + text: str + prov: Optional[ProvenanceItem] + label: DocItemLabel = DocItemLabel.TEXT + + +@dataclass +class _ExtractedFormField: + key_tag: Optional[Tag] + key_order: int + key_orig: str + key_text: str + key_prov: Optional[ProvenanceItem] + marker: Optional[_ExtractedFormMarker] + values: list[_ExtractedFormValue] + extra_texts: list[_ExtractedFormText] = dataclass_field(default_factory=list) + + +@dataclass +class _ExtractedFormRegion: + fields: list[_ExtractedFormField] + consumed_tag_ids: set[str] + class _Context(BaseModel): list_ordered_flag_by_ref: dict[str, bool] = {} @@ -134,6 +243,7 @@ class AnnotatedText(BaseModel): hyperlink: Union[AnyUrl, Path, None] = None formatting: Union[Formatting, None] = None code: bool = False + source_tag_id: Optional[str] = None class AnnotatedTextList(list): @@ -142,11 +252,13 @@ def to_single_text_element(self) -> AnnotatedText: current_text = "" current_f = None current_code = False + current_source_tag_id = None for at in self: t = at.text h = at.hyperlink f = at.formatting c = at.code + s = at.source_tag_id current_text += t.strip() + " " if f is not None and current_f is None: current_f = f @@ -160,6 +272,18 @@ def to_single_text_element(self) -> AnnotatedText: _log.warning( f"Clashing hyperlinks: '{h}' and '{current_h}'! Chose '{current_h}'" ) + if s is not None and current_source_tag_id is None: + current_source_tag_id = s + elif ( + s is not None + and current_source_tag_id is not None + and s != current_source_tag_id + ): + _log.warning( + "Clashing provenance tags: " + f"'{s}' and '{current_source_tag_id}'! " + f"Chose '{current_source_tag_id}'" + ) current_code = c if c else current_code return AnnotatedText( @@ -167,6 +291,7 @@ def to_single_text_element(self) -> AnnotatedText: hyperlink=current_h, formatting=current_f, code=current_code, + source_tag_id=current_source_tag_id, ) def simplify_text_elements(self) -> "AnnotatedTextList": @@ -177,12 +302,14 @@ def simplify_text_elements(self) -> "AnnotatedTextList": hyperlink = self[0].hyperlink formatting = self[0].formatting code = self[0].code + source_tag_id = self[0].source_tag_id last_elm = text for i in range(1, len(self)): if ( hyperlink == self[i].hyperlink and formatting == self[i].formatting and code == self[i].code + and source_tag_id == self[i].source_tag_id ): sep = " " if not self[i].text.strip() or not last_elm.strip(): @@ -192,7 +319,11 @@ def simplify_text_elements(self) -> "AnnotatedTextList": else: simplified.append( AnnotatedText( - text=text, hyperlink=hyperlink, formatting=formatting, code=code + text=text, + hyperlink=hyperlink, + formatting=formatting, + code=code, + source_tag_id=source_tag_id, ) ) text = self[i].text @@ -200,10 +331,15 @@ def simplify_text_elements(self) -> "AnnotatedTextList": hyperlink = self[i].hyperlink formatting = self[i].formatting code = self[i].code + source_tag_id = self[i].source_tag_id if text: simplified.append( AnnotatedText( - text=text, hyperlink=hyperlink, formatting=formatting, code=code + text=text, + hyperlink=hyperlink, + formatting=formatting, + code=code, + source_tag_id=source_tag_id, ) ) return simplified @@ -239,17 +375,32 @@ def __init__( self.options: HTMLBackendOptions self.soup: Optional[BeautifulSoup] = None self.path_or_stream: Union[BytesIO, Path] = path_or_stream - self.base_path: Optional[str] = str(options.source_uri) + self.base_path: Optional[str] = ( + str(options.source_uri) if options.source_uri is not None else None + ) # Initialize the parents for the hierarchy self.max_levels = 10 self.level = 0 self.parents: dict[int, Optional[Union[DocItem, GroupItem]]] = {} self.ctx = _Context() + self._disable_inline_group_depth: int = 0 for i in range(self.max_levels): self.parents[i] = None self.hyperlink: Union[AnyUrl, Path, None] = None self.format_tags: list[str] = [] + self._raw_html_bytes: Optional[bytes] = None + self._rendered_html: Optional[str] = None + self._rendered_bbox_by_id: dict[str, _RenderedBBox] = {} + self._rendered_text_bbox_by_id: dict[str, _RenderedBBox] = {} + self._rendered_page_images: list[Image.Image] = [] + self._rendered_page_size: Optional[Size] = None + self._suppressed_tag_ids_stack: list[set[str]] = [] + self._suppressed_tag_obj_ids_stack: list[set[int]] = [] + self._form_fields_by_key_id_stack: list[dict[str, _ExtractedFormField]] = [] + self._tag_name_by_docling_id_cache: dict[str, str] = {} + self._generated_html_id_counter: int = 0 + self._render_visibility_cache: dict[int, bool] = {} try: raw = ( @@ -257,6 +408,7 @@ def __init__( if isinstance(path_or_stream, BytesIO) else Path(path_or_stream).read_bytes() ) + self._raw_html_bytes = raw self.soup = BeautifulSoup(raw, "html.parser") except Exception as e: raise RuntimeError( @@ -297,6 +449,20 @@ def convert(self) -> DoclingDocument: ) doc = DoclingDocument(name=self.file.stem or "file", origin=origin) + if cast(HTMLBackendOptions, self.options).render_page: + self._render_with_browser() + if self._rendered_html: + self.soup = BeautifulSoup(self._rendered_html, "html.parser") + + if self._rendered_page_images and self._rendered_page_size: + render_dpi = cast(HTMLBackendOptions, self.options).render_dpi + for page_no, page_image in enumerate(self._rendered_page_images, start=1): + doc.add_page( + page_no=page_no, + size=self._rendered_page_size, + image=ImageRef.from_pil(image=page_image, dpi=render_dpi), + ) + assert self.soup is not None # set the title as furniture, since it is part of the document metadata title = self.soup.title @@ -340,9 +506,641 @@ def convert(self) -> DoclingDocument: ) # reset context self.ctx = _Context() + self._render_visibility_cache.clear() self._walk(content, doc) return doc + def _get_render_page_size(self) -> tuple[int, int]: + options = cast(HTMLBackendOptions, self.options) + width = options.render_page_width + height = options.render_page_height + if options.render_page_orientation == "landscape": + width, height = height, width + return width, height + + def _coerce_base_url(self, value: str) -> str: + if HTMLDocumentBackend._is_remote_url(value) or value.startswith("file://"): + return value + return Path(value).resolve().as_uri() + + def _get_render_html_text(self) -> str: + if self._raw_html_bytes is None: + return "" + return self._raw_html_bytes.decode("utf-8", errors="replace") + + def _inject_base_tag(self, html_text: str, base_url: Optional[str]) -> str: + if not base_url: + return html_text + soup = BeautifulSoup(html_text, "html.parser") + if soup.head is None: + return html_text + if soup.head.find("base") is not None: + return html_text + base_tag = soup.new_tag("base", href=base_url) + soup.head.insert(0, base_tag) + return str(soup) + + def _pad_image(self, image: Image.Image, width: int, height: int) -> Image.Image: + if image.width == width and image.height == height: + return image + canvas = Image.new("RGB", (width, height), color=(255, 255, 255)) + canvas.paste(image, (0, 0)) + return canvas + + def _render_with_browser(self) -> None: + options = cast(HTMLBackendOptions, self.options) + if not options.render_page: + return + + try: + from playwright.sync_api import sync_playwright + except ImportError as exc: + raise RuntimeError( + "Playwright is required for HTML rendering. " + "Install it with 'pip install \"docling[htmlrender]\"' and run " + "'playwright install'." + ) from exc + + width, height = self._get_render_page_size() + self._rendered_page_size = Size(width=width, height=height) + + render_url: Optional[str] = None + render_html = self._get_render_html_text() + + if isinstance(self.path_or_stream, Path): + render_url = self.path_or_stream.resolve().as_uri() + elif self.base_path: + render_html = self._inject_base_tag( + render_html, self._coerce_base_url(self.base_path) + ) + + with sync_playwright() as playwright: + browser = playwright.chromium.launch(headless=True) + context = browser.new_context( + viewport={"width": width, "height": height}, + device_scale_factor=options.render_device_scale, + ) + page = context.new_page() + if options.render_print_media: + page.emulate_media(media="print") + + if render_url: + page.goto(render_url, wait_until=options.render_wait_until) + else: + page.set_content(render_html, wait_until=options.render_wait_until) + + if options.page_padding > 0: + page.evaluate( + """ + (padding) => { + if (!document || !document.body) { + return; + } + document.body.style.padding = `${padding}px`; + document.body.style.boxSizing = "border-box"; + } + """, + options.page_padding, + ) + + if options.render_wait_ms: + page.wait_for_timeout(options.render_wait_ms) + + # Some pages settle to their final layout only after first full-page capture. + # Warm up with a throwaway screenshot so bbox extraction and saved image align. + page.screenshot(full_page=True) + + render_data = page.evaluate( + """ + () => { + const nodes = Array.from(document.querySelectorAll('*')); + const boxes = {}; + const textBoxes = {}; + let idx = 0; + for (const node of nodes) { + idx += 1; + const id = String(idx); + node.setAttribute('data-docling-id', id); + const rect = node.getBoundingClientRect(); + if (!rect) { + continue; + } + const width = rect.width || 0; + const height = rect.height || 0; + if (width <= 0 && height <= 0) { + continue; + } + const x = rect.left + window.scrollX; + const y = rect.top + window.scrollY; + boxes[id] = { x, y, width, height }; + + let textLeft = null; + let textTop = null; + let textRight = null; + let textBottom = null; + const textNodes = Array.from(node.childNodes).filter( + (child) => + child && + child.nodeType === Node.TEXT_NODE && + child.textContent && + child.textContent.trim() + ); + for (const textNode of textNodes) { + const range = document.createRange(); + range.selectNodeContents(textNode); + const rects = Array.from(range.getClientRects()); + for (const tRect of rects) { + const tWidth = tRect.width || 0; + const tHeight = tRect.height || 0; + if (tWidth <= 0 && tHeight <= 0) { + continue; + } + const tX = tRect.left + window.scrollX; + const tY = tRect.top + window.scrollY; + const tR = tX + tWidth; + const tB = tY + tHeight; + textLeft = textLeft === null ? tX : Math.min(textLeft, tX); + textTop = textTop === null ? tY : Math.min(textTop, tY); + textRight = textRight === null ? tR : Math.max(textRight, tR); + textBottom = textBottom === null ? tB : Math.max(textBottom, tB); + } + range.detach(); + } + if ( + textLeft !== null && + textTop !== null && + textRight !== null && + textBottom !== null + ) { + textBoxes[id] = { + x: textLeft, + y: textTop, + width: textRight - textLeft, + height: textBottom - textTop + }; + } + } + const doc = document.documentElement; + const body = document.body; + const scrollWidth = Math.max( + doc ? doc.scrollWidth : 0, + body ? body.scrollWidth : 0 + ); + const scrollHeight = Math.max( + doc ? doc.scrollHeight : 0, + body ? body.scrollHeight : 0 + ); + return { boxes, textBoxes, scrollWidth, scrollHeight }; + } + """ + ) + + self._rendered_html = page.content() + scroll_width = int(render_data.get("scrollWidth", width)) + scroll_height = int(render_data.get("scrollHeight", height)) + self._rendered_page_images = self._capture_page_images( + page=page, + render_data=render_data, + page_width=width, + page_height=height, + full_page=options.render_full_page, + ) + if self._rendered_page_images and self._rendered_page_size: + self._rendered_page_size = Size( + width=scroll_width, + height=scroll_height if options.render_full_page else height, + ) + + self._rendered_bbox_by_id = self._build_bbox_mapping( + render_data=render_data, + page_height=int(self._rendered_page_size.height) + if self._rendered_page_size + else height, + full_page=options.render_full_page, + ) + self._rendered_text_bbox_by_id = self._build_bbox_mapping( + render_data={ + "boxes": render_data.get("textBoxes", {}), + "scrollHeight": render_data.get("scrollHeight"), + }, + page_height=int(self._rendered_page_size.height) + if self._rendered_page_size + else height, + full_page=options.render_full_page, + ) + + context.close() + browser.close() + + def _capture_page_images( + self, + page, + render_data: dict, + page_width: int, + page_height: int, + full_page: bool, + ) -> list[Image.Image]: + scroll_height = int(render_data.get("scrollHeight", page_height)) + if scroll_height <= 0: + return [] + + screenshot_bytes = page.screenshot(full_page=True) + full_image = Image.open(BytesIO(screenshot_bytes)).convert("RGB") + + if full_page: + return [full_image] + + page_images: list[Image.Image] = [] + page_count = max(1, math.ceil(scroll_height / page_height)) + scale_y = full_image.height / float(scroll_height) + target_height = round(page_height * scale_y) + + for page_idx in range(page_count): + top_css = page_idx * page_height + bottom_css = min(top_css + page_height, scroll_height) + top_px = round(top_css * scale_y) + bottom_px = round(bottom_css * scale_y) + if bottom_px <= top_px: + continue + cropped = full_image.crop((0, top_px, full_image.width, bottom_px)) + cropped = self._pad_image( + image=cropped, width=full_image.width, height=target_height + ) + page_images.append(cropped) + + return page_images + + def _build_bbox_mapping( + self, render_data: dict, page_height: int, full_page: bool + ) -> dict[str, _RenderedBBox]: + boxes = render_data.get("boxes", {}) or {} + scroll_height = float(render_data.get("scrollHeight", page_height)) + + if full_page: + page_count = 1 + else: + page_count = max(1, math.ceil(scroll_height / page_height)) + + mapping: dict[str, _RenderedBBox] = {} + for tag_id, rect in boxes.items(): + left = float(rect.get("x", 0.0)) + top = float(rect.get("y", 0.0)) + width = float(rect.get("width", 0.0)) + height = float(rect.get("height", 0.0)) + if width <= 0 and height <= 0: + continue + right = left + width + bottom = top + height + if full_page: + page_no = 1 + offset = 0.0 + else: + page_no = int(top // page_height) + 1 + page_no = min(max(page_no, 1), page_count) + offset = (page_no - 1) * page_height + bbox = BoundingBox( + l=left, + t=top - offset, + r=right, + b=bottom - offset, + coord_origin=CoordOrigin.TOPLEFT, + ) + mapping[str(tag_id)] = _RenderedBBox(page_no=page_no, bbox=bbox) + + return mapping + + def _get_tag_id(self, tag: Optional[Tag]) -> Optional[str]: + if tag is None: + return None + tag_id = tag.get(_DATA_DOCLING_ID_ATTR) + if not tag_id: + return None + return str(tag_id) + + @staticmethod + def _get_html_id(tag: Optional[Tag]) -> Optional[str]: + if tag is None: + return None + tag_id = tag.get("id") + if not isinstance(tag_id, str) or not tag_id: + return None + return tag_id + + def _get_rendered_bbox_for_tag(self, tag: Optional[Tag]) -> Optional[_RenderedBBox]: + tag_id = self._get_tag_id(tag) + if tag_id is None: + return None + return self._rendered_bbox_by_id.get(tag_id) + + def _get_rendered_text_bbox_for_tag( + self, tag: Optional[Tag] + ) -> Optional[_RenderedBBox]: + tag_id = self._get_tag_id(tag) + if tag_id is None: + return None + return self._rendered_text_bbox_by_id.get(tag_id) + + @staticmethod + def _has_negative_bbox_coordinates(rendered_bbox: Optional[_RenderedBBox]) -> bool: + if rendered_bbox is None: + return False + bbox = rendered_bbox.bbox + return bbox.l < 0 or bbox.t < 0 or bbox.r < 0 or bbox.b < 0 + + def _is_tag_outside_capture_area(self, tag: Tag) -> bool: + rendered = self._get_rendered_text_bbox_for_tag( + tag + ) or self._get_rendered_bbox_for_tag(tag) + return self._has_negative_bbox_coordinates(rendered) + + @staticmethod + def _has_inline_hidden_style(tag: Tag) -> bool: + style = tag.get("style") + if not isinstance(style, str) or not style.strip(): + return False + normalized = re.sub(r"\s+", "", style.lower()) + if "display:none" in normalized: + return True + if "visibility:hidden" in normalized or "visibility:collapse" in normalized: + return True + if re.search(r"opacity:0(?:[;]|$)", normalized): + return True + return False + + def _has_rendered_presence(self, tag: Tag) -> bool: + if not self._rendered_bbox_by_id and not self._rendered_text_bbox_by_id: + return True + + cache_key = id(tag) + if cache_key in self._render_visibility_cache: + return self._render_visibility_cache[cache_key] + + if ( + self._get_rendered_text_bbox_for_tag(tag) is not None + or self._get_rendered_bbox_for_tag(tag) is not None + ): + self._render_visibility_cache[cache_key] = True + return True + + has_visible_descendant = False + for descendant in tag.find_all(True): + if not isinstance(descendant, Tag): + continue + if ( + self._get_rendered_text_bbox_for_tag(descendant) is not None + or self._get_rendered_bbox_for_tag(descendant) is not None + ): + has_visible_descendant = True + break + + self._render_visibility_cache[cache_key] = has_visible_descendant + return has_visible_descendant + + def _is_invisible_tag(self, tag: Tag) -> bool: + if tag.has_attr("hidden"): + return True + aria_hidden = tag.get("aria-hidden") + if isinstance(aria_hidden, str) and aria_hidden.strip().lower() in { + "true", + "1", + "yes", + }: + return True + if self._has_inline_hidden_style(tag): + return True + if not self._has_rendered_presence(tag): + return True + return False + + def _make_prov( + self, + text: str, + tag: Optional[Tag] = None, + source_tag_id: Optional[str] = None, + ) -> Optional[ProvenanceItem]: + if not self._rendered_bbox_by_id: + return None + + render_box: Optional[_RenderedBBox] = None + if source_tag_id: + render_box = self._rendered_bbox_by_id.get(source_tag_id) + if render_box is None: + render_box = self._get_rendered_bbox_for_tag(tag) + if render_box is None: + return None + + return ProvenanceItem( + page_no=render_box.page_no, + bbox=render_box.bbox, + charspan=(0, len(text)), + ) + + def _make_text_prov( + self, + text: str, + tag: Optional[Tag] = None, + source_tag_id: Optional[str] = None, + ) -> Optional[ProvenanceItem]: + if not self._rendered_text_bbox_by_id: + return self._make_prov(text=text, tag=tag, source_tag_id=source_tag_id) + + render_box: Optional[_RenderedBBox] = None + if source_tag_id: + render_box = self._rendered_text_bbox_by_id.get(source_tag_id) + if render_box is None: + render_box = self._get_rendered_text_bbox_for_tag(tag) + if render_box is None and isinstance(tag, Tag): + descendant_boxes: list[_RenderedBBox] = [] + for descendant in [tag, *tag.find_all(True)]: + if not isinstance(descendant, Tag): + continue + descendant_box = self._get_rendered_text_bbox_for_tag(descendant) + if descendant_box is not None: + descendant_boxes.append(descendant_box) + if descendant_boxes: + page_no = descendant_boxes[0].page_no + same_page_boxes = [ + rendered.bbox + for rendered in descendant_boxes + if rendered.page_no == page_no + ] + if same_page_boxes: + render_box = _RenderedBBox( + page_no=page_no, + bbox=( + same_page_boxes[0] + if len(same_page_boxes) == 1 + else BoundingBox.enclosing_bbox(same_page_boxes) + ), + ) + if render_box is None: + return self._make_prov(text=text, tag=tag, source_tag_id=source_tag_id) + + return ProvenanceItem( + page_no=render_box.page_no, + bbox=render_box.bbox, + charspan=(0, len(text)), + ) + + def _make_text_prov_for_source_tag_ids( + self, text: str, tag: Optional[Tag], source_tag_ids: list[str] + ) -> Optional[ProvenanceItem]: + unique_ids = list(dict.fromkeys(source_tag_ids)) + if not unique_ids: + return self._make_text_prov(text=text, tag=tag) + + boxes: list[BoundingBox] = [] + page_no: Optional[int] = None + for source_id in unique_ids: + rendered = None + if self._rendered_text_bbox_by_id: + rendered = self._rendered_text_bbox_by_id.get(source_id) + if rendered is None and self._rendered_bbox_by_id: + rendered = self._rendered_bbox_by_id.get(source_id) + if rendered is None: + continue + if page_no is None: + page_no = rendered.page_no + if rendered.page_no != page_no: + continue + boxes.append(rendered.bbox) + + if not boxes: + return self._make_text_prov(text=text, tag=tag, source_tag_id=unique_ids[0]) + + bbox = boxes[0] if len(boxes) == 1 else BoundingBox.enclosing_bbox(boxes) + return ProvenanceItem( + page_no=page_no if page_no is not None else 1, + bbox=bbox, + charspan=(0, len(text)), + ) + + def _get_rendered_bbox_for_source_tag_id( + self, source_tag_id: str + ) -> Optional[_RenderedBBox]: + rendered = None + if self._rendered_text_bbox_by_id: + rendered = self._rendered_text_bbox_by_id.get(source_tag_id) + if rendered is None and self._rendered_bbox_by_id: + rendered = self._rendered_bbox_by_id.get(source_tag_id) + return rendered + + def _are_source_tag_ids_inline_neighbors( + self, left_source_tag_id: str, right_source_tag_id: str + ) -> bool: + left = self._get_rendered_bbox_for_source_tag_id(left_source_tag_id) + right = self._get_rendered_bbox_for_source_tag_id(right_source_tag_id) + if left is None or right is None: + return False + if left.page_no != right.page_no: + return False + + left_box = left.bbox + right_box = right.bbox + min_height = max(1.0, min(left_box.height, right_box.height)) + max_height = max(1.0, max(left_box.height, right_box.height)) + vertical_overlap = min(left_box.b, right_box.b) - max(left_box.t, right_box.t) + if vertical_overlap <= 0 or (vertical_overlap / min_height) < 0.6: + return False + + horizontal_gap = right_box.l - left_box.r + max_gap = max(8.0, 1.5 * max_height) + min_gap = -0.5 * min(left_box.width, right_box.width) + return min_gap <= horizontal_gap <= max_gap + + def _compact_adjacent_single_char_parts( + self, parts: AnnotatedTextList + ) -> list[tuple[AnnotatedText, list[str]]]: + compacted: list[tuple[AnnotatedText, list[str]]] = [] + idx = 0 + while idx < len(parts): + current = parts[idx] + current_text = HTMLDocumentBackend._clean_unicode(current.text.strip()) + + if len(current_text) == 1 and current.source_tag_id is not None: + run_chars: list[str] = [] + run_source_ids: list[str] = [] + prev_source_id: Optional[str] = None + run_end = idx + while run_end < len(parts): + candidate = parts[run_end] + candidate_text = HTMLDocumentBackend._clean_unicode( + candidate.text.strip() + ) + if ( + len(candidate_text) != 1 + or candidate.hyperlink != current.hyperlink + or candidate.formatting != current.formatting + or candidate.code != current.code + ): + break + candidate_source_id = candidate.source_tag_id + if candidate_source_id is None: + break + if ( + prev_source_id is not None + and not self._are_source_tag_ids_inline_neighbors( + prev_source_id, candidate_source_id + ) + ): + break + run_chars.append(candidate_text) + run_source_ids.append(candidate_source_id) + prev_source_id = candidate_source_id + run_end += 1 + + if len(run_chars) > 1: + compacted.append( + ( + AnnotatedText( + text="".join(run_chars), + hyperlink=current.hyperlink, + formatting=current.formatting, + code=current.code, + source_tag_id=( + run_source_ids[0] if run_source_ids else None + ), + ), + run_source_ids, + ) + ) + idx = run_end + continue + + source_tag_ids = [current.source_tag_id] if current.source_tag_id else [] + compacted.append((current, source_tag_ids)) + idx += 1 + return compacted + + def _make_checkbox_with_label_prov( + self, text: str, checkbox_tag: Tag, label_tags: list[Tag] + ) -> Optional[ProvenanceItem]: + checkbox_rendered = self._get_rendered_bbox_for_tag(checkbox_tag) + if checkbox_rendered is None: + return self._make_prov(text=text, tag=checkbox_tag) + + boxes: list[BoundingBox] = [checkbox_rendered.bbox] + for label_tag in label_tags: + rendered = self._get_rendered_text_bbox_for_tag( + label_tag + ) or self._get_rendered_bbox_for_tag(label_tag) + if rendered is None: + continue + if rendered.page_no != checkbox_rendered.page_no: + continue + boxes.append(rendered.bbox) + + bbox = ( + checkbox_rendered.bbox + if len(boxes) == 1 + else BoundingBox.enclosing_bbox(boxes) + ) + return ProvenanceItem( + page_no=checkbox_rendered.page_no, + bbox=bbox, + charspan=(0, len(text)), + ) + @staticmethod def _fix_invalid_paragraph_structure(soup: BeautifulSoup) -> None: """Rewrite

elements that contain block-level breakers. @@ -361,6 +1159,8 @@ def _start_para(): nonlocal current_p if current_p is None: current_p = soup.new_tag("p") + if p.get(_DATA_DOCLING_ID_ATTR): + current_p[_DATA_DOCLING_ID_ATTR] = p.get(_DATA_DOCLING_ID_ATTR) new_nodes.append(current_p) def _flush_para_if_empty(): @@ -497,6 +1297,25 @@ def _is_rich_table_cell(self, table_cell: Tag) -> bool: is_rich: bool = True children = table_cell.find_all(recursive=True) # all descendants of type Tag + has_input = any(child.name == "input" for child in children) + has_custom_checkbox = any( + self._is_custom_checkbox_tag(child) for child in children + ) + has_line_break = any(child.name == "br" for child in children) + direct_block_text_children = [ + child + for child in table_cell.find_all(recursive=False) + if isinstance(child, Tag) and child.name in {"p", "div", "li"} + ] + has_nested_form_semantic_id = any( + self._is_form_semantic_tag(child) + for child in children + if isinstance(child, Tag) + ) + if has_nested_form_semantic_id: + return True + if has_line_break or len(direct_block_text_children) > 1: + return True if not children: content = [ item @@ -509,10 +1328,18 @@ def _is_rich_table_cell(self, table_cell: Tag) -> bool: table_cell, find_parent_annotation=True ) if not annotations: - is_rich = bool(item for item in children if item.name == "img") + is_rich = bool( + item for item in children if item.name in {"img", "input"} + ) elif len(annotations) == 1: anno: AnnotatedText = annotations[0] - is_rich = bool(anno.formatting) or bool(anno.hyperlink) or anno.code + is_rich = ( + bool(anno.formatting) + or bool(anno.hyperlink) + or anno.code + or has_input + or has_custom_checkbox + ) return is_rich @@ -591,6 +1418,10 @@ def parse_table_data( self.get_text(html_cell).strip() ) col_span, row_span = self._get_cell_spans(html_cell) + cell_bbox = None + rendered_cell = self._get_rendered_bbox_for_tag(html_cell) + if rendered_cell is not None: + cell_bbox = rendered_cell.bbox if row_header: row_span -= 1 while ( @@ -606,6 +1437,7 @@ def parse_table_data( if rich_table_cell: rich_cell = RichTableCell( text=text, + bbox=cell_bbox, row_span=row_span, col_span=col_span, start_row_offset_idx=start_row_span + row_idx, @@ -620,6 +1452,7 @@ def parse_table_data( else: simple_cell = TableCell( text=text, + bbox=cell_bbox, row_span=row_span, col_span=col_span, start_row_offset_idx=start_row_span + row_idx, @@ -632,7 +1465,7 @@ def parse_table_data( doc.add_table_cell(table_item=docling_table, cell=simple_cell) return data - def _walk(self, element: Tag, doc: DoclingDocument) -> list[RefItem]: + def _walk(self, element: Tag, doc: DoclingDocument) -> list[RefItem]: # noqa: C901 """Parse an XML tag by recursively walking its content. While walking, the method buffers inline text across tags like or , @@ -656,23 +1489,44 @@ def _flush_buffer() -> None: return for annotated_text_list in parts: - with self._use_inline_group(annotated_text_list, doc) as inline_ref: - for annotated_text in annotated_text_list: + compacted_parts = self._compact_adjacent_single_char_parts( + annotated_text_list + ) + force_inline_group = ( + len(annotated_text_list) == 1 + and bool(annotated_text_list[0].code) + and element.name not in {"p", "pre"} + ) + with self._use_inline_group( + annotated_text_list, doc, force=force_inline_group + ) as inline_ref: + for annotated_text, source_tag_ids in compacted_parts: if annotated_text.text.strip(): seg_clean = HTMLDocumentBackend._clean_unicode( annotated_text.text.strip() ) if annotated_text.code: + prov = self._make_text_prov_for_source_tag_ids( + text=seg_clean, + tag=element, + source_tag_ids=source_tag_ids, + ) docling_code2 = doc.add_code( parent=self.parents[self.level], text=seg_clean, content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) if inline_ref is None: added_refs.append(docling_code2.get_ref()) else: + prov = self._make_text_prov_for_source_tag_ids( + text=seg_clean, + tag=element, + source_tag_ids=source_tag_ids, + ) docling_text2 = doc.add_text( parent=self.parents[self.level], label=DocItemLabel.TEXT, @@ -680,6 +1534,7 @@ def _flush_buffer() -> None: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) if inline_ref is None: added_refs.append(docling_text2.get_ref()) @@ -689,43 +1544,139 @@ def _flush_buffer() -> None: for node in element.contents: if isinstance(node, Tag): name = node.name.lower() + if form_field := self._consume_form_field_for_tag(node): + _flush_buffer() + added_refs.extend( + self._add_field_item_from_extracted( + field=form_field, + doc=doc, + parent=self.parents[self.level], + ) + ) + continue + if self._is_suppressed_tag(node): + if name == "br": + # Keep explicit line breaks as text boundaries even when + # the
tag itself has no rendered bbox. + _flush_buffer() + continue + has_block_descendants = bool( + node.find(_BLOCK_TAGS) + or node.find("input") + or node.find( + lambda item: isinstance(item, Tag) + and self._is_custom_checkbox_tag(item) + ) + ) + has_pending_form_fields = self._has_pending_form_field_in_subtree(node) + if self._is_form_container(node): + _flush_buffer() + form_refs = self._handle_form_container(node, doc) + added_refs.extend(form_refs) + continue + if self._should_flatten_info_text(node): + _flush_buffer() + flattened_ref = self._emit_flattened_text_tag(node, doc) + if flattened_ref is not None: + added_refs.append(flattened_ref) + continue + if self._is_custom_checkbox_tag(node): + _flush_buffer() + checkbox_ref = self._emit_custom_checkbox(node, doc) + if checkbox_ref is not None: + added_refs.append(checkbox_ref) + continue if name == "img": _flush_buffer() im_ref3 = self._emit_image(node, doc) if im_ref3: added_refs.append(im_ref3) - elif name in _FORMAT_TAG_MAP: + elif name == "input": _flush_buffer() - with self._use_format([name]): - wk = self._walk(node, doc) - added_refs.extend(wk) + input_ref = self._emit_input(node, doc) + if input_ref: + added_refs.append(input_ref) + elif name in _FORMAT_TAG_MAP: + if has_block_descendants or has_pending_form_fields: + _flush_buffer() + with self._use_format([name]): + wk = self._walk(node, doc) + added_refs.extend(wk) + else: + with self._use_format([name]): + buffer.extend( + self._extract_text_and_hyperlink_recursively( + node, + find_parent_annotation=True, + keep_newlines=False, + ) + ) elif name == "a": - with self._use_hyperlink(node): - wk2 = self._walk(node, doc) - added_refs.extend(wk2) + if has_block_descendants or has_pending_form_fields: + _flush_buffer() + with self._use_hyperlink(node): + wk2 = self._walk(node, doc) + added_refs.extend(wk2) + else: + with self._use_hyperlink(node): + buffer.extend( + self._extract_text_and_hyperlink_recursively( + node, + find_parent_annotation=True, + keep_newlines=False, + ) + ) elif name in _BLOCK_TAGS: + if name != "table": + for field in self._consume_form_fields_in_subtree(node): + _flush_buffer() + added_refs.extend( + self._add_field_item_from_extracted( + field=field, + doc=doc, + parent=self.parents[self.level], + ) + ) _flush_buffer() blk = self._handle_block(node, doc) added_refs.extend(blk) - elif node.find(_BLOCK_TAGS): + elif has_block_descendants: _flush_buffer() wk3 = self._walk(node, doc) added_refs.extend(wk3) - else: + elif has_pending_form_fields: + # Preserve DOM reading order: recurse into inline containers + # with pending form fields instead of bulk-emitting them. + _flush_buffer() + wk4 = self._walk(node, doc) + added_refs.extend(wk4) + elif self._should_buffer_tag_text_inline(node): buffer.extend( self._extract_text_and_hyperlink_recursively( - node, find_parent_annotation=True, keep_newlines=True + node, find_parent_annotation=True, keep_newlines=False ) ) + else: + _flush_buffer() + wk5 = self._walk(node, doc) + added_refs.extend(wk5) elif isinstance(node, NavigableString) and not isinstance( node, PreformattedString ): - if str(node).strip("\n\r") == "": - _flush_buffer() + node_text = str(node) + if node_text.strip("\n\r") == "": + parent_tag = node.parent if isinstance(node.parent, Tag) else None + if ( + parent_tag is not None + and parent_tag.name in {"td", "th"} + and "\n" in node_text + ): + _flush_buffer() + continue else: buffer.extend( self._extract_text_and_hyperlink_recursively( - node, find_parent_annotation=True, keep_newlines=True + node, find_parent_annotation=True, keep_newlines=False ) ) @@ -781,8 +1732,16 @@ def _extract_text_and_hyperlink_recursively( return AnnotatedTextList() if isinstance(item, NavigableString): + if isinstance(item.parent, Tag): + if self._is_suppressed_tag(item.parent): + return AnnotatedTextList() + if self._is_checkbox_label_container(item.parent): + return AnnotatedTextList() text = item.strip() code = any(code_tag in self.format_tags for code_tag in _CODE_TAG_SET) + source_tag_id = ( + self._get_tag_id(item.parent) if isinstance(item.parent, Tag) else None + ) if text: return AnnotatedTextList( [ @@ -791,6 +1750,7 @@ def _extract_text_and_hyperlink_recursively( hyperlink=self.hyperlink, formatting=self._formatting, code=code, + source_tag_id=source_tag_id, ) ] ) @@ -802,12 +1762,19 @@ def _extract_text_and_hyperlink_recursively( hyperlink=self.hyperlink, formatting=self._formatting, code=code, + source_tag_id=source_tag_id, ) ] ) return AnnotatedTextList() tag = cast(Tag, item) + if self._is_suppressed_tag(tag): + return AnnotatedTextList() + if self._is_checkbox_like_tag(tag): + return AnnotatedTextList() + if self._is_checkbox_label_tag(tag): + return AnnotatedTextList() if not ignore_list or (tag.name not in ["ul", "ol"]): for child in tag: if isinstance(child, Tag) and child.name in _FORMAT_TAG_MAP: @@ -867,9 +1834,52 @@ def _use_format(self, tags: list[str]): finally: self.format_tags = self.format_tags[: -len(tags)] + def _get_tag_name_for_docling_id(self, source_tag_id: str) -> Optional[str]: + if source_tag_id in self._tag_name_by_docling_id_cache: + tag_name = self._tag_name_by_docling_id_cache[source_tag_id] + return tag_name or None + if self.soup is None: + return None + tag = self.soup.find(attrs={_DATA_DOCLING_ID_ATTR: source_tag_id}) + tag_name = tag.name if isinstance(tag, Tag) else "" + self._tag_name_by_docling_id_cache[source_tag_id] = tag_name + return tag_name or None + + def _should_create_inline_group( + self, annotated_text_list: AnnotatedTextList + ) -> bool: + if len(annotated_text_list) <= 1: + return False + # In non-render mode there are no source tag ids. Still keep mixed + # inline formatting (e.g.

......) as one flow. + if all( + annotated_text.source_tag_id is None + for annotated_text in annotated_text_list + ): + return True + # Allow paragraph-like block containers to contribute inline segments + # when mixed with formatting tags (e.g.,

text bold). + inline_group_container_tags = {"p", "address", "summary", "td", "th"} + for annotated_text in annotated_text_list: + source_tag_id = annotated_text.source_tag_id + if source_tag_id is None: + return False + tag_name = self._get_tag_name_for_docling_id(source_tag_id) + if tag_name is None: + return False + if ( + tag_name not in _INLINE_HTML_TAGS + and tag_name not in inline_group_container_tags + ): + return False + return True + @contextmanager def _use_inline_group( - self, annotated_text_list: AnnotatedTextList, doc: DoclingDocument + self, + annotated_text_list: AnnotatedTextList, + doc: DoclingDocument, + force: bool = False, ) -> Iterator[RefItem | None]: """Create an inline group for annotated texts. @@ -885,21 +1895,24 @@ def _use_inline_group( The RefItem of the created InlineGroup, or None when the list has only one element and no group is created. """ - if len(annotated_text_list) > 1: - inline_fmt = doc.add_group( - label=GroupLabel.INLINE, - parent=self.parents[self.level], - content_layer=self.content_layer, - ) - self.parents[self.level + 1] = inline_fmt - self.level += 1 - try: - yield inline_fmt.get_ref() - finally: - self.parents[self.level] = None - self.level -= 1 - else: + if self._disable_inline_group_depth > 0: yield None + return + if not force and not self._should_create_inline_group(annotated_text_list): + yield None + return + inline_fmt = doc.add_group( + label=GroupLabel.INLINE, + parent=self.parents[self.level], + content_layer=self.content_layer, + ) + self.parents[self.level + 1] = inline_fmt + self.level += 1 + try: + yield inline_fmt.get_ref() + finally: + self.parents[self.level] = None + self.level -= 1 @contextmanager def _use_details(self, tag: Tag, doc: DoclingDocument): @@ -925,6 +1938,17 @@ def _use_details(self, tag: Tag, doc: DoclingDocument): self.parents[self.level + 1] = None self.level -= 1 + @contextmanager + def _use_form_container(self, form_item: DocItem): + """Create a form container group and set it as the current parent.""" + self.parents[self.level + 1] = form_item + self.level += 1 + try: + yield None + finally: + self.parents[self.level + 1] = None + self.level -= 1 + @contextmanager def _use_footer(self, tag: Tag, doc: DoclingDocument): """Create a group with a footer. @@ -978,6 +2002,11 @@ def _handle_heading(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: ) annotated_text = annotated_text_list.to_single_text_element() text_clean = HTMLDocumentBackend._clean_unicode(annotated_text.text) + prov = self._make_text_prov( + text=text_clean, + tag=tag, + source_tag_id=annotated_text.source_tag_id, + ) # the first level is for the title item if level == 1: for key in self.parents.keys(): @@ -988,6 +2017,7 @@ def _handle_heading(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) p1 = self.parents[self.level + 1] if p1 is not None: @@ -1021,6 +2051,7 @@ def _handle_heading(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) p2 = self.parents[self.level + 1] if p2 is not None: @@ -1033,7 +2064,7 @@ def _handle_heading(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: added_ref.append(im_ref) return added_ref - def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: + def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: # noqa: C901 tag_name = tag.name.lower() start: Optional[int] = None name: str = "" @@ -1082,10 +2113,24 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: li_text = re.sub( r"\s+|\n+", " ", "".join([el.text for el in min_parts]) ).strip() + inputs_in_li = [ + input_tag + for input_tag in li.find_all("input") + if input_tag.find_parent("li") is li + ] + custom_checkboxes_in_li = [ + checkbox_tag + for checkbox_tag in li.find_all( + lambda item: isinstance(item, Tag) + and self._is_custom_checkbox_tag(item) + ) + if checkbox_tag.find_parent("li") is li + ] # 3) add the list item - if li_text: + if li_text or inputs_in_li or custom_checkboxes_in_li: if len(min_parts) > 1: + li_prov = self._make_text_prov(text=li_text, tag=li) # create an empty list element in order to hook the inline group onto that one self.parents[self.level + 1] = doc.add_list_item( text="", @@ -1093,23 +2138,38 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: marker=marker, parent=list_group, content_layer=self.content_layer, + prov=li_prov, ) self.level += 1 with self._use_inline_group(min_parts, doc): - for annotated_text in min_parts: + compacted_parts = self._compact_adjacent_single_char_parts( + min_parts + ) + for annotated_text, source_tag_ids in compacted_parts: li_text = re.sub( r"\s+|\n+", " ", annotated_text.text ).strip() li_clean = HTMLDocumentBackend._clean_unicode(li_text) if annotated_text.code: + prov = self._make_text_prov_for_source_tag_ids( + text=li_clean, + tag=li, + source_tag_ids=source_tag_ids, + ) doc.add_code( parent=self.parents[self.level], text=li_clean, content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) else: + prov = self._make_text_prov_for_source_tag_ids( + text=li_clean, + tag=li, + source_tag_ids=source_tag_ids, + ) doc.add_text( parent=self.parents[self.level], label=DocItemLabel.TEXT, @@ -1117,8 +2177,16 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) + for input_tag in inputs_in_li: + if isinstance(input_tag, Tag): + self._emit_input(input_tag, doc) + for checkbox_tag in custom_checkboxes_in_li: + if isinstance(checkbox_tag, Tag): + self._emit_custom_checkbox(checkbox_tag, doc) + # 4) recurse into any nested lists, attaching them to this

  • item for sublist in li({"ul", "ol"}, recursive=False): if isinstance(sublist, Tag): @@ -1127,10 +2195,15 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: # now the list element with inline group is not a parent anymore self.parents[self.level] = None self.level -= 1 - else: + elif li_text: annotated_text = min_parts[0] li_text = re.sub(r"\s+|\n+", " ", annotated_text.text).strip() li_clean = HTMLDocumentBackend._clean_unicode(li_text) + prov = self._make_text_prov( + text=li_clean, + tag=li, + source_tag_id=annotated_text.source_tag_id, + ) self.parents[self.level + 1] = doc.add_list_item( text=li_clean, enumerated=is_ordered, @@ -1140,8 +2213,19 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) + if inputs_in_li or custom_checkboxes_in_li: + self.level += 1 + for input_tag in inputs_in_li: + if isinstance(input_tag, Tag): + self._emit_input(input_tag, doc) + for checkbox_tag in custom_checkboxes_in_li: + if isinstance(checkbox_tag, Tag): + self._emit_custom_checkbox(checkbox_tag, doc) + self.level -= 1 + # 4) recurse into any nested lists, attaching them to this
  • item for sublist in li({"ul", "ol"}, recursive=False): if isinstance(sublist, Tag): @@ -1149,6 +2233,28 @@ def _handle_list(self, tag: Tag, doc: DoclingDocument) -> RefItem: self._handle_block(sublist, doc) self.parents[self.level + 1] = None self.level -= 1 + else: + li_prov = self._make_text_prov(text="", tag=li) + self.parents[self.level + 1] = doc.add_list_item( + text="", + enumerated=is_ordered, + marker=marker, + parent=list_group, + content_layer=self.content_layer, + prov=li_prov, + ) + self.level += 1 + for input_tag in inputs_in_li: + if isinstance(input_tag, Tag): + self._emit_input(input_tag, doc) + for checkbox_tag in custom_checkboxes_in_li: + if isinstance(checkbox_tag, Tag): + self._emit_custom_checkbox(checkbox_tag, doc) + for sublist in li({"ul", "ol"}, recursive=False): + if isinstance(sublist, Tag): + self._handle_block(sublist, doc) + self.parents[self.level] = None + self.level -= 1 else: for sublist in li({"ul", "ol"}, recursive=False): if isinstance(sublist, Tag): @@ -1213,21 +2319,33 @@ def _handle_block(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: ) annotated_texts: AnnotatedTextList = text_list.simplify_text_elements() for part in annotated_texts.split_by_newline(): + compacted_part = self._compact_adjacent_single_char_parts(part) with self._use_inline_group(part, doc) as inline_ref: - for annotated_text in part: + for annotated_text, source_tag_ids in compacted_part: if seg := annotated_text.text.strip(): seg_clean = HTMLDocumentBackend._clean_unicode(seg) if annotated_text.code: + prov = self._make_text_prov_for_source_tag_ids( + text=seg_clean, + tag=tag, + source_tag_ids=source_tag_ids, + ) docling_code = doc.add_code( parent=self.parents[self.level], text=seg_clean, content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) if inline_ref is None: added_refs.append(docling_code.get_ref()) else: + prov = self._make_text_prov_for_source_tag_ids( + text=seg_clean, + tag=tag, + source_tag_ids=source_tag_ids, + ) docling_text = doc.add_text( parent=self.parents[self.level], label=DocItemLabel.TEXT, @@ -1235,6 +2353,7 @@ def _handle_block(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) if inline_ref is None: added_refs.append(docling_text.get_ref()) @@ -1244,13 +2363,28 @@ def _handle_block(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: for img_tag in tag("img"): if isinstance(img_tag, Tag): self._emit_image(img_tag, doc) + for input_tag in tag("input"): + if isinstance(input_tag, Tag): + input_ref = self._emit_input(input_tag, doc) + if input_ref is not None: + added_refs.append(input_ref) + for checkbox_tag in tag.find_all( + lambda item: isinstance(item, Tag) + and self._is_custom_checkbox_tag(item) + ): + if isinstance(checkbox_tag, Tag): + checkbox_ref = self._emit_custom_checkbox(checkbox_tag, doc) + if checkbox_ref is not None: + added_refs.append(checkbox_ref) elif tag_name == "table": num_rows, num_cols = self.get_html_table_row_col(tag) data_e = TableData(num_rows=num_rows, num_cols=num_cols) + table_prov = self._make_prov(text="", tag=tag) docling_table = doc.add_table( data=data_e, parent=self.parents[self.level], + prov=table_prov, content_layer=self.content_layer, ) added_refs.append(docling_table.get_ref()) @@ -1267,12 +2401,18 @@ def _handle_block(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: text_clean = HTMLDocumentBackend._clean_unicode( annotated_text.text.strip() ) + prov = self._make_prov( + text=text_clean, + tag=tag, + source_tag_id=annotated_text.source_tag_id, + ) docling_code2 = doc.add_code( parent=self.parents[self.level], text=text_clean, content_layer=self.content_layer, formatting=annotated_text.formatting, hyperlink=annotated_text.hyperlink, + prov=prov, ) if inline_ref is None: added_refs.append(docling_code2.get_ref()) @@ -1288,57 +2428,1673 @@ def _handle_block(self, tag: Tag, doc: DoclingDocument) -> list[RefItem]: self._walk(tag, doc) return added_refs - def _emit_image(self, img_tag: Tag, doc: DoclingDocument) -> Optional[RefItem]: - figure = img_tag.find_parent("figure") - caption: AnnotatedTextList = AnnotatedTextList() + @staticmethod + def _is_form_container(tag: Tag) -> bool: + classes = tag.get("class") + if not classes: + return False + class_values = [classes] if isinstance(classes, str) else classes + return _FORM_CONTAINER_CLASS in class_values + + def _nearest_form_container_ancestor(self, tag: Tag) -> Optional[Tag]: + for parent in tag.parents: + if isinstance(parent, Tag) and self._is_form_container(parent): + return parent + return None - parent = self.parents[self.level] + def _is_tag_in_current_form_scope(self, tag: Tag, form_tag: Tag) -> bool: + return self._nearest_form_container_ancestor(tag) is form_tag - # check if the figure has a link - this is HACK: - def get_img_hyperlink(img_tag): - this_parent = img_tag.parent - while this_parent is not None: - if this_parent.name == "a" and this_parent.get("href"): - return this_parent.get("href") - this_parent = this_parent.parent + @staticmethod + def _is_form_semantic_id(tag_id: Optional[str]) -> bool: + if not tag_id: + return False + return bool( + _FORM_KEY_ID_RE.match(tag_id) + or _FORM_MARKER_ID_RE.match(tag_id) + or _FORM_VALUE_ID_RE.match(tag_id) + ) + + def _should_flatten_info_text(self, tag: Tag) -> bool: + if "info-text" not in self._get_tag_classes(tag): + return False + return not self._is_form_semantic_id(self._get_html_id(tag)) + + def _emit_flattened_text_tag( + self, tag: Tag, doc: DoclingDocument + ) -> Optional[RefItem]: + # Keep full textual payload of info-text blocks even when descendants + # share ids with key/value tags consumed elsewhere in the same form. + text_raw = self.get_text(tag) + _, text_clean = self._normalize_form_text(text_raw) + if not text_clean: return None + prov = self._make_text_prov( + text=text_clean, + tag=tag, + ) + text_item = doc.add_text( + parent=self.parents[self.level], + label=DocItemLabel.TEXT, + text=text_clean, + orig=text_raw, + content_layer=self.content_layer, + prov=prov, + ) + return text_item.get_ref() - if img_hyperlink := get_img_hyperlink(img_tag): - img_text = img_tag.get("alt") or "" - caption.append(AnnotatedText(text=img_text, hyperlink=img_hyperlink)) + def _ensure_tag_html_id(self, tag: Tag) -> str: + existing = self._get_html_id(tag) + if existing is not None: + return existing + self._generated_html_id_counter += 1 + generated = f"docling_auto_input_{self._generated_html_id_counter}" + tag["id"] = generated + return generated - if isinstance(figure, Tag): - caption_tag = figure.find("figcaption", recursive=False) - if isinstance(caption_tag, Tag): - caption = self._extract_text_and_hyperlink_recursively( - caption_tag, find_parent_annotation=True - ) - if not caption and img_tag.get("alt"): - caption = AnnotatedTextList([AnnotatedText(text=img_tag.get("alt"))]) + @staticmethod + def _is_value_in_key_scope(key_tag: Tag, value_tag: Tag) -> bool: + if key_tag is value_tag: + return True + if any(parent is key_tag for parent in value_tag.parents): + return True + key_parent = key_tag.parent + value_parent = value_tag.parent + if key_parent is not None and key_parent is value_parent: + return True + return False - caption_anno_text = caption.to_single_text_element() + @staticmethod + def _dom_distance_between_tags(left_tag: Tag, right_tag: Tag) -> int: + if left_tag is right_tag: + return 0 - caption_item: Optional[TextItem] = None - if caption_anno_text.text: - text_clean = HTMLDocumentBackend._clean_unicode( - caption_anno_text.text.strip() - ) - caption_item = doc.add_text( + left_chain: list[Tag] = [left_tag] + left_chain.extend( + parent for parent in left_tag.parents if isinstance(parent, Tag) + ) + right_chain: list[Tag] = [right_tag] + right_chain.extend( + parent for parent in right_tag.parents if isinstance(parent, Tag) + ) + + left_positions = {id(tag): idx for idx, tag in enumerate(left_chain)} + best_distance: Optional[int] = None + for right_idx, right_ancestor in enumerate(right_chain): + left_idx = left_positions.get(id(right_ancestor)) + if left_idx is None: + continue + distance = left_idx + right_idx + if best_distance is None or distance < best_distance: + best_distance = distance + + return best_distance if best_distance is not None else 10_000 + + def _select_form_value_entries( + self, + key_tag: Optional[Tag], + marker_entries: list[tuple[int, Tag]], + value_entries: list[tuple[Optional[int], int, Tag]], + ) -> list[tuple[Optional[int], int, Tag]]: + if not value_entries: + return [] + + anchor_tag: Optional[Tag] = None + if key_tag is not None: + anchor_tag = key_tag + elif marker_entries: + anchor_tag = marker_entries[0][1] + + grouped_entries: dict[ + tuple[str, int], list[tuple[Optional[int], int, Tag]] + ] = {} + for value_index, dom_order, value_tag in value_entries: + group_key = ( + ("idx", value_index) if value_index is not None else ("dom", dom_order) + ) + grouped_entries.setdefault(group_key, []).append( + (value_index, dom_order, value_tag) + ) + + selected_entries: list[tuple[Optional[int], int, Tag]] = [] + for entries in grouped_entries.values(): + ranked_entries = sorted( + entries, + key=lambda entry: ( + ( + 0 + if ( + key_tag is not None + and self._is_value_in_key_scope(key_tag, entry[2]) + ) + else 1 + ) + if key_tag is not None + else 0, + ( + self._dom_distance_between_tags(anchor_tag, entry[2]) + if anchor_tag is not None + else 0 + ), + ( + 0 + if ( + entry[2].name in {"input", "select", "textarea"} + or self._is_checkbox_like_tag(entry[2]) + ) + else 1 + ), + entry[1], + ), + ) + selected_entries.append(ranked_entries[0]) + + selected_entries.sort( + key=lambda entry: ( + entry[0] is None, + entry[0] if entry[0] is not None else entry[1], + entry[1], + ) + ) + return selected_entries + + @staticmethod + def _get_table_cell(tag: Tag) -> Optional[Tag]: + parent_cell = tag.find_parent(["td", "th"]) + return parent_cell if isinstance(parent_cell, Tag) else None + + @staticmethod + def _is_bbox_within_any_table( + value_bbox: BoundingBox, table_bboxes: list[BoundingBox], threshold: float = 0.9 + ) -> bool: + for table_bbox in table_bboxes: + if value_bbox.intersection_over_self(table_bbox) >= threshold: + return True + return False + + def _should_ignore_table_kv_link( + self, key_tag: Tag, value_tag: Tag, table_bboxes: list[BoundingBox] + ) -> bool: + key_table = key_tag.find_parent("table") + value_table = value_tag.find_parent("table") + key_cell = self._get_table_cell(key_tag) + value_cell = self._get_table_cell(value_tag) + if key_table is not None or value_table is not None: + if key_table is not value_table: + return True + if key_cell is None or value_cell is None: + return True + if key_cell is value_cell: + return False + key_row = key_cell.find_parent("tr") + value_row = value_cell.find_parent("tr") + if key_row is not None and key_row is value_row: + return False + if key_cell.parent is not None and key_cell.parent is value_cell.parent: + return False + if ( + self._dom_distance_between_tags(key_cell, value_cell) <= 4 + and key_table is value_table + ): + return False + if key_cell is not value_cell: + return True + + if key_table is None and value_table is None and table_bboxes: + value_rendered = self._get_rendered_bbox_for_tag(value_tag) + if value_rendered and self._is_bbox_within_any_table( + value_rendered.bbox, table_bboxes + ): + return True + + return False + + @staticmethod + def _extract_text_excluding_ids(tag: Tag, excluded_ids: set[str]) -> str: + def _extract(node: PageElement) -> list[str]: + if isinstance(node, NavigableString): + return [str(node)] + if isinstance(node, Tag): + node_id = node.get("id") + if node_id and node_id in excluded_ids: + return [] + parts: list[str] = [] + for child in node: + parts.extend(_extract(child)) + if node.name in {"p", "li"}: + parts.append(" ") + return parts + return [] + + return "".join(_extract(tag)) + + @staticmethod + def _extract_direct_text(tag: Tag) -> str: + parts: list[str] = [] + for child in tag.contents: + if isinstance(child, NavigableString): + parts.append(str(child)) + return "".join(parts) + + @staticmethod + def _normalize_form_text(text: str) -> tuple[str, str]: + raw = re.sub(r"\s+", " ", text).strip() + return raw, HTMLDocumentBackend._clean_unicode(raw) + + @staticmethod + def _infer_form_value_kind(value_tag: Tag) -> Literal["read_only", "fillable"]: + if HTMLDocumentBackend._is_checkbox_like_tag(value_tag): + return "fillable" + if ( + value_tag.find( + lambda item: isinstance(item, Tag) + and HTMLDocumentBackend._is_checkbox_like_tag(item) + ) + is not None + ): + return "fillable" + + classes = HTMLDocumentBackend._get_tag_classes(value_tag) + fillable_class_hints = { + "input", + "input-box", + "input-field", + "input_field", + "text-input", + "text-box", + "textbox", + "form-control", + } + if classes & fillable_class_hints: + return "fillable" + if any( + class_name.endswith(("-input", "_input", "-input-box")) + for class_name in classes + ): + return "fillable" + + if value_tag.name in {"input", "select", "textarea"}: + return "fillable" + if value_tag.find(["input", "select", "textarea"]) is not None: + return "fillable" + return "read_only" + + @staticmethod + def _get_tag_classes(tag: Tag) -> set[str]: + classes = tag.get("class") + if not classes: + return set() + if isinstance(classes, str): + return {classes} + return {str(value) for value in classes if isinstance(value, str)} + + @staticmethod + def _has_inline_display_style(tag: Tag) -> bool: + style_attr = tag.get("style") + if not isinstance(style_attr, str): + return False + display_match = re.search(r"display\s*:\s*([^;]+)", style_attr, flags=re.I) + if display_match is None: + return False + display_value = display_match.group(1).strip().lower() + return display_value.startswith("inline") or display_value == "contents" + + def _should_buffer_tag_text_inline(self, tag: Tag) -> bool: + tag_name = tag.name.lower() + if tag_name in _INLINE_HTML_TAGS: + return True + # Treat explicit inline-styled divs like inline wrappers. + if tag_name == "div" and self._has_inline_display_style(tag): + return True + return False + + @staticmethod + def _is_input_checkbox_or_radio_tag(tag: Tag) -> bool: + if tag.name != "input": + return False + input_type = str(tag.get("type", "")).strip().lower() + return input_type in {"checkbox", "radio"} + + @staticmethod + def _is_generic_text_input_candidate(tag: Tag) -> bool: + if tag.name != "input": + return False + input_type = str(tag.get("type", "")).strip().lower() + if input_type in { + "hidden", + "checkbox", + "radio", + "submit", + "button", + "reset", + "file", + "image", + "color", + "range", + "date", + "datetime-local", + "month", + "time", + "week", + }: + return False + return True + + @staticmethod + def _is_custom_checkbox_tag(tag: Tag) -> bool: + return bool( + HTMLDocumentBackend._get_tag_classes(tag) & _CUSTOM_CHECKBOX_CLASSES + ) + + @staticmethod + def _is_checkbox_like_tag(tag: Tag) -> bool: + return HTMLDocumentBackend._is_input_checkbox_or_radio_tag( + tag + ) or HTMLDocumentBackend._is_custom_checkbox_tag(tag) + + @staticmethod + def _extract_text_excluding_tag_obj_ids( + tag: Tag, excluded_obj_ids: set[int] + ) -> str: + def _extract(node: PageElement) -> list[str]: + if isinstance(node, NavigableString): + return [str(node)] + if isinstance(node, Tag): + if id(node) in excluded_obj_ids: + return [] + parts: list[str] = [] + for child in node.contents: + parts.extend(_extract(child)) + if node.name in {"p", "li", "div", "label", "span", "td", "th"}: + parts.append(" ") + return parts + return [] + + return "".join(_extract(tag)) + + @staticmethod + def _has_direct_checkbox_like_child(tag: Tag) -> bool: + for child in tag.find_all(recursive=False): + if isinstance(child, Tag) and HTMLDocumentBackend._is_checkbox_like_tag( + child + ): + return True + return False + + def _is_checkbox_label_container(self, tag: Tag) -> bool: + classes = self._get_tag_classes(tag) + if not (classes & _CHECKBOX_CONTAINER_CLASSES): + return False + return self._has_direct_checkbox_like_child(tag) + + def _is_checkbox_label_tag(self, tag: Tag) -> bool: + if self._is_checkbox_like_tag(tag): + return False + if "checkbox-label" in self._get_tag_classes(tag): + return True + parent = tag.parent + if isinstance(parent, Tag) and self._is_checkbox_label_container(parent): + return True + return False + + @staticmethod + def _normalize_checkbox_text(text: str) -> str: + compact = re.sub(r"\s+", " ", text).strip() + if not compact: + return "" + if compact.lower() in _CHECKBOX_MARK_TEXTS: + return "" + return HTMLDocumentBackend._clean_unicode(compact) + + @staticmethod + def _is_checkbox_checked(tag: Tag) -> bool: + if HTMLDocumentBackend._is_input_checkbox_or_radio_tag(tag): + if tag.has_attr("checked"): + return True + aria_checked = str(tag.get("aria-checked", "")).strip().lower() + return aria_checked in {"true", "1", "yes", "on"} + + classes = HTMLDocumentBackend._get_tag_classes(tag) + if "checked" in classes: + return True + + aria_checked = str(tag.get("aria-checked", "")).strip().lower() + if aria_checked in {"true", "1", "yes", "on"}: + return True + + data_checked = str(tag.get("data-checked", "")).strip().lower() + if data_checked in {"true", "1", "yes", "on"}: + return True + + text = re.sub(r"\s+", "", HTMLDocumentBackend.get_text(tag)) + return text.lower() in _CHECKBOX_MARK_TEXTS + + @staticmethod + def _get_checkbox_label_for_tag(tag: Tag) -> Optional[DocItemLabel]: + if not HTMLDocumentBackend._is_checkbox_like_tag(tag): + return None + return ( + DocItemLabel.CHECKBOX_SELECTED + if HTMLDocumentBackend._is_checkbox_checked(tag) + else DocItemLabel.CHECKBOX_UNSELECTED + ) + + def _extract_checkbox_text_and_consumed_label_obj_ids( # noqa: C901 + self, checkbox_tag: Tag + ) -> tuple[str, set[int], list[Tag]]: + consumed_tag_obj_ids: set[int] = set() + consumed_label_tags: list[Tag] = [] + parent = checkbox_tag.parent if isinstance(checkbox_tag.parent, Tag) else None + seen_label_obj_ids: set[int] = set() + + def _add_label_tag(label_tag: Tag) -> None: + label_obj_id = id(label_tag) + if label_obj_id in seen_label_obj_ids: + return + seen_label_obj_ids.add(label_obj_id) + consumed_tag_obj_ids.add(label_obj_id) + consumed_label_tags.append(label_tag) + + def _label_texts(tags: list[Tag]) -> list[str]: + texts: list[str] = [] + for label_tag in tags: + raw = self.get_text(label_tag) + normalized = self._normalize_checkbox_text(raw) + if normalized: + texts.append(normalized) + return texts + + # Native checkbox/radio with explicit