# PIPELINE DEFINITION # Name: data-processing-docling-vlm-pipeline # Description: Docling VLM convert pipeline by the Data Processing Team # Inputs: # docling_image_export_mode: str [Default: 'embedded'] # docling_num_threads: int [Default: 4.0] # docling_remote_model_enabled: bool [Default: False] # docling_timeout_per_document: int [Default: 300.0] # num_splits: int [Default: 3.0] # pdf_base_url: str [Default: 'https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf'] # pdf_filenames: str [Default: '2203.01017v2.pdf'] # pdf_from_s3: bool [Default: False] components: comp-create-pdf-splits: executorLabel: exec-create-pdf-splits inputDefinitions: artifacts: input_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 description: Path to the input directory containing PDF files. parameters: num_splits: description: Number of splits to create. parameterType: NUMBER_INTEGER outputDefinitions: parameters: Output: parameterType: LIST comp-docling-convert-vlm: executorLabel: exec-docling-convert-vlm inputDefinitions: artifacts: artifacts_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 description: Path to the directory containing Docling models. input_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 description: Path to the input directory containing PDF files. parameters: image_export_mode: defaultValue: embedded description: Mode to export images. isOptional: true parameterType: STRING num_threads: defaultValue: 4.0 description: Number of threads to use per document processing. isOptional: true parameterType: NUMBER_INTEGER pdf_filenames: description: List of PDF file names to process. parameterType: LIST remote_model_enabled: defaultValue: false description: Whether or not to use a remote model. isOptional: true parameterType: BOOLEAN remote_model_secret_mount_path: defaultValue: /mnt/secrets description: Path to the remote model secret mount path. isOptional: true parameterType: STRING timeout_per_document: defaultValue: 300.0 description: Timeout per document processing. isOptional: true parameterType: NUMBER_INTEGER outputDefinitions: artifacts: output_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 comp-download-docling-models: executorLabel: exec-download-docling-models inputDefinitions: parameters: pipeline_type: defaultValue: standard description: Type of pipeline (standard, vlm) isOptional: true parameterType: STRING remote_model_endpoint_enabled: defaultValue: false description: Whether to download remote model endpoint models (VLM only) isOptional: true parameterType: BOOLEAN outputDefinitions: artifacts: output_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 comp-for-loop-1: dag: tasks: docling-convert-vlm: cachingOptions: {} componentRef: name: comp-docling-convert-vlm inputs: artifacts: artifacts_path: componentInputArtifact: pipelinechannel--download-docling-models-output_path input_path: componentInputArtifact: pipelinechannel--import-pdfs-output_path parameters: image_export_mode: componentInputParameter: pipelinechannel--docling_image_export_mode num_threads: componentInputParameter: pipelinechannel--docling_num_threads pdf_filenames: componentInputParameter: pipelinechannel--create-pdf-splits-Output-loop-item remote_model_enabled: componentInputParameter: pipelinechannel--docling_remote_model_enabled remote_model_secret_mount_path: runtimeValue: constant: /mnt/secrets timeout_per_document: componentInputParameter: pipelinechannel--docling_timeout_per_document taskInfo: name: docling-convert-vlm inputDefinitions: artifacts: pipelinechannel--download-docling-models-output_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 pipelinechannel--import-pdfs-output_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 parameters: pipelinechannel--create-pdf-splits-Output: parameterType: LIST pipelinechannel--create-pdf-splits-Output-loop-item: parameterType: LIST pipelinechannel--docling_image_export_mode: parameterType: STRING pipelinechannel--docling_num_threads: parameterType: NUMBER_INTEGER pipelinechannel--docling_remote_model_enabled: parameterType: BOOLEAN pipelinechannel--docling_timeout_per_document: parameterType: NUMBER_INTEGER comp-import-pdfs: executorLabel: exec-import-pdfs inputDefinitions: parameters: base_url: description: Base URL of the PDF files. parameterType: STRING filenames: description: List of PDF filenames to import. parameterType: STRING from_s3: defaultValue: false description: Whether or not to import from S3. isOptional: true parameterType: BOOLEAN s3_secret_mount_path: defaultValue: /mnt/secrets description: Path to the secret mount path for the S3 credentials. isOptional: true parameterType: STRING outputDefinitions: artifacts: output_path: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 deploymentSpec: executors: exec-create-pdf-splits: container: args: - --executor_input - '{{$}}' - --function_to_execute - create_pdf_splits command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef create_pdf_splits(\n input_path: dsl.Input[dsl.Artifact],\n\ \ num_splits: int,\n) -> List[List[str]]:\n \"\"\"\n Create a list\ \ of PDF splits.\n\n Args:\n input_path: Path to the input directory\ \ containing PDF files.\n num_splits: Number of splits to create.\n\ \ \"\"\"\n from pathlib import Path # pylint: disable=import-outside-toplevel\n\ \n input_path_p = Path(input_path.path)\n\n all_pdfs = [path.name\ \ for path in input_path_p.glob(\"*.pdf\")]\n all_splits = [all_pdfs[i::num_splits]\ \ for i in range(num_splits)]\n filled_splits = list(filter(None, all_splits))\n\ \ return filled_splits\n\n" image: registry.access.redhat.com/ubi9/python-311:9.6-1755074620 exec-docling-convert-vlm: container: args: - --executor_input - '{{$}}' - --function_to_execute - docling_convert_vlm command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef docling_convert_vlm(\n input_path: dsl.Input[dsl.Artifact],\n\ \ artifacts_path: dsl.Input[dsl.Artifact],\n output_path: dsl.Output[dsl.Artifact],\n\ \ pdf_filenames: List[str],\n num_threads: int = 4,\n image_export_mode:\ \ str = \"embedded\",\n timeout_per_document: int = 300,\n remote_model_enabled:\ \ bool = False,\n remote_model_secret_mount_path: str = \"/mnt/secrets\"\ ,\n):\n \"\"\"\n Convert a list of PDF files to JSON and Markdown\ \ using Docling (VLM Pipeline).\n\n Args:\n input_path: Path to\ \ the input directory containing PDF files.\n artifacts_path: Path\ \ to the directory containing Docling models.\n output_path: Path\ \ to the output directory for converted JSON and Markdown files.\n \ \ pdf_filenames: List of PDF file names to process.\n num_threads:\ \ Number of threads to use per document processing.\n timeout_per_document:\ \ Timeout per document processing.\n image_export_mode: Mode to export\ \ images.\n remote_model_enabled: Whether or not to use a remote\ \ model.\n remote_model_secret_mount_path: Path to the remote model\ \ secret mount path.\n \"\"\"\n import os\n from pathlib import\ \ Path\n\n from docling.datamodel.accelerator_options import ( # pylint:\ \ disable=import-outside-toplevel # noqa: PLC0415, E402\n AcceleratorDevice,\n\ \ AcceleratorOptions,\n )\n from docling.datamodel.base_models\ \ import (\n InputFormat,\n ) # pylint: disable=import-outside-toplevel\ \ # noqa: PLC0415, E402\n from docling.datamodel.pipeline_options import\ \ ( # pylint: disable=import-outside-toplevel # noqa: PLC0415, E402\n\ \ VlmPipelineOptions,\n smoldocling_vlm_conversion_options,\n\ \ )\n from docling.datamodel.pipeline_options_vlm_model import ( \ \ # pylint: disable=import-outside-toplevel # noqa: PLC0415, E402\n \ \ ApiVlmOptions,\n ResponseFormat,\n )\n from docling.document_converter\ \ import ( # pylint: disable=import-outside-toplevel # noqa: PLC0415,\ \ E402\n DocumentConverter,\n PdfFormatOption,\n )\n \ \ from docling.pipeline.vlm_pipeline import (\n VlmPipeline,\n \ \ ) # pylint: disable=import-outside-toplevel # noqa: PLC0415, E402\n\ \ from docling_core.types.doc.base import (\n ImageRefMode,\n\ \ ) # pylint: disable=import-outside-toplevel # noqa: PLC0415, E402\n\ \n input_path_p = Path(input_path.path)\n artifacts_path_p = Path(artifacts_path.path)\n\ \ output_path_p = Path(output_path.path)\n output_path_p.mkdir(parents=True,\ \ exist_ok=True)\n\n input_pdfs = [input_path_p / name for name in pdf_filenames]\n\ \n print(\n f\"docling-vlm-convert: starting with backend='vlm',\ \ files={len(input_pdfs)}\",\n flush=True,\n )\n if not pdf_filenames:\n\ \ raise ValueError(\n \"pdf_filenames must be provided\ \ with the list of file names to process\"\n )\n\n allowed_image_export_modes\ \ = {e.value for e in ImageRefMode}\n if image_export_mode not in allowed_image_export_modes:\n\ \ raise ValueError(\n f\"Invalid image_export_mode: {image_export_mode}.\ \ Must be one of {sorted(allowed_image_export_modes)}\"\n )\n\n \ \ if remote_model_enabled:\n if not os.path.exists(remote_model_secret_mount_path):\n\ \ raise ValueError(\n f\"Secret for remote model\ \ should be mounted in {remote_model_secret_mount_path}\"\n )\n\ \n remote_model_endpoint_url_secret = \"REMOTE_MODEL_ENDPOINT_URL\"\ \n remote_model_endpoint_url_file_path = os.path.join(\n \ \ remote_model_secret_mount_path, remote_model_endpoint_url_secret\n\ \ )\n if os.path.isfile(remote_model_endpoint_url_file_path):\n\ \ with open(remote_model_endpoint_url_file_path) as f:\n \ \ remote_model_endpoint_url = f.read()\n else:\n \ \ raise ValueError(\n f\"Key {remote_model_endpoint_url_secret}\ \ not defined in secret {remote_model_secret_mount_path}\"\n \ \ )\n\n remote_model_name_secret = \"REMOTE_MODEL_NAME\"\n \ \ remote_model_name_file_path = os.path.join(\n remote_model_secret_mount_path,\ \ remote_model_name_secret\n )\n if os.path.isfile(remote_model_name_file_path):\n\ \ with open(remote_model_name_file_path) as f:\n \ \ remote_model_name = f.read()\n else:\n raise ValueError(\n\ \ f\"Key {remote_model_name_secret} not defined in secret\ \ {remote_model_secret_mount_path}\"\n )\n\n remote_model_api_key_secret\ \ = \"REMOTE_MODEL_API_KEY\"\n remote_model_api_key_file_path = os.path.join(\n\ \ remote_model_secret_mount_path, remote_model_api_key_secret\n\ \ )\n if os.path.isfile(remote_model_api_key_file_path):\n\ \ with open(remote_model_api_key_file_path) as f:\n \ \ remote_model_api_key = f.read()\n else:\n raise\ \ ValueError(\n f\"Key {remote_model_api_key_secret} not\ \ defined in secret {remote_model_secret_mount_path}\"\n )\n\n\ \ if not remote_model_endpoint_url:\n raise ValueError(\n\ \ \"remote_model_endpoint_url must be provided when remote_model_enabled\ \ is True\"\n )\n\n pipeline_options = VlmPipelineOptions(\n\ \ enable_remote_services=True,\n )\n pipeline_options.vlm_options\ \ = ApiVlmOptions(\n url=remote_model_endpoint_url, # type:\ \ ignore[arg-type]\n params=dict(\n model_id=remote_model_name,\n\ \ parameters=dict(\n max_new_tokens=400,\n\ \ ),\n ),\n prompt=\"OCR the full page\ \ to markdown.\",\n timeout=600,\n response_format=ResponseFormat.MARKDOWN,\n\ \ headers={\n \"Authorization\": f\"Bearer {remote_model_api_key}\"\ ,\n },\n )\n else:\n pipeline_options = VlmPipelineOptions(\n\ \ vlm_options=smoldocling_vlm_conversion_options\n )\n\ \n pipeline_cls = VlmPipeline\n pipeline_options.artifacts_path =\ \ artifacts_path_p\n pipeline_options.document_timeout = float(timeout_per_document)\n\ \ pipeline_options.accelerator_options = AcceleratorOptions(\n \ \ num_threads=num_threads, device=AcceleratorDevice.AUTO\n )\n\n \ \ doc_converter = DocumentConverter(\n format_options={\n \ \ InputFormat.PDF: PdfFormatOption(\n pipeline_cls=pipeline_cls,\n\ \ pipeline_options=pipeline_options,\n )\n \ \ }\n )\n\n results = doc_converter.convert_all(input_pdfs, raises_on_error=True)\n\ \n for result in results:\n doc_filename = result.input.file.stem\n\ \n output_json_path = output_path_p / f\"{doc_filename}.json\"\n\ \ print(f\"docling-vlm-convert: saving {output_json_path}\", flush=True)\n\ \ result.document.save_as_json(\n output_json_path, image_mode=ImageRefMode(image_export_mode)\n\ \ )\n\n output_md_path = output_path_p / f\"{doc_filename}.md\"\ \n print(f\"docling-vlm-convert: saving {output_md_path}\", flush=True)\n\ \ result.document.save_as_markdown(\n output_md_path,\ \ image_mode=ImageRefMode(image_export_mode)\n )\n\n print(\"\ docling-vlm-convert: done\", flush=True)\n\n" image: quay.io/fabianofranz/docling-ubi9:2.54.0 resources: cpuLimit: 4.0 cpuRequest: 0.5 memoryLimit: 6.0 memoryRequest: 1.0 resourceCpuLimit: '4' resourceCpuRequest: 500m resourceMemoryLimit: 6G resourceMemoryRequest: 1G exec-download-docling-models: container: args: - --executor_input - '{{$}}' - --function_to_execute - download_docling_models command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef download_docling_models(\n output_path: dsl.Output[dsl.Artifact],\n\ \ pipeline_type: str = \"standard\",\n remote_model_endpoint_enabled:\ \ bool = False,\n):\n \"\"\"\n Download Docling models based on pipeline\ \ type and configuration.\n\n This unified component handles model downloading\ \ for different pipeline types:\n - standard : Download traditional Docling\ \ models (layout, tableformer, easyocr)\n - vlm : Download Docling VLM\ \ models (smolvlm, smoldocling) for local inference\n When remote_model_endpoint_enabled=True,\ \ downloads minimal models for remote inference\n\n Args:\n output_path:\ \ Path to the output directory for Docling models\n pipeline_type:\ \ Type of pipeline (standard, vlm)\n remote_model_endpoint_enabled:\ \ Whether to download remote model endpoint models (VLM only)\n \"\"\"\ \n from pathlib import Path # pylint: disable=import-outside-toplevel\n\ \n from docling.utils.model_downloader import (\n download_models,\n\ \ ) # pylint: disable=import-outside-toplevel\n\n output_path_p =\ \ Path(output_path.path)\n output_path_p.mkdir(parents=True, exist_ok=True)\n\ \n if pipeline_type == \"standard\":\n # Standard pipeline: download\ \ traditional models\n download_models(\n output_dir=output_path_p,\n\ \ progress=True,\n with_layout=True,\n \ \ with_tableformer=True,\n with_easyocr=False,\n )\n \ \ elif pipeline_type == \"vlm\" and remote_model_endpoint_enabled:\n \ \ # VLM pipeline with remote model endpoint: Download minimal required\ \ models\n # Only models set are what lives in fabianofranz repo\n\ \ # TODO: figure out what needs to be downloaded or removed\n \ \ download_models(\n output_dir=output_path_p,\n \ \ progress=False,\n force=False,\n with_layout=True,\n\ \ with_tableformer=True,\n with_code_formula=False,\n\ \ with_picture_classifier=False,\n with_smolvlm=False,\n\ \ with_smoldocling=False,\n with_smoldocling_mlx=False,\n\ \ with_granite_vision=False,\n with_easyocr=False,\n\ \ )\n elif pipeline_type == \"vlm\":\n # VLM pipeline with\ \ local models: Download VLM models for local inference\n # TODO:\ \ set models downloaded by model name passed into KFP pipeline ex: smoldocling\ \ OR granite-vision\n download_models(\n output_dir=output_path_p,\n\ \ with_smolvlm=True,\n with_smoldocling=True,\n \ \ progress=False,\n force=False,\n with_layout=False,\n\ \ with_tableformer=False,\n with_code_formula=False,\n\ \ with_picture_classifier=False,\n with_smoldocling_mlx=False,\n\ \ with_granite_vision=False,\n with_easyocr=False,\n\ \ )\n else:\n raise ValueError(\n f\"Invalid\ \ pipeline_type: {pipeline_type}. Must be 'standard' or 'vlm'\"\n \ \ )\n\n" image: quay.io/fabianofranz/docling-ubi9:2.54.0 exec-import-pdfs: container: args: - --executor_input - '{{$}}' - --function_to_execute - import_pdfs command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef import_pdfs(\n output_path: dsl.Output[dsl.Artifact],\n \ \ filenames: str,\n base_url: str,\n from_s3: bool = False,\n \ \ s3_secret_mount_path: str = \"/mnt/secrets\",\n):\n \"\"\"\n Import\ \ PDF filenames (comma-separated) from specified URL or S3 bucket.\n\n \ \ Args:\n filenames: List of PDF filenames to import.\n \ \ base_url: Base URL of the PDF files.\n output_path: Path to the\ \ output directory for the PDF files.\n from_s3: Whether or not to\ \ import from S3.\n s3_secret_mount_path: Path to the secret mount\ \ path for the S3 credentials.\n \"\"\"\n import os # pylint: disable=import-outside-toplevel\n\ \ from pathlib import Path # pylint: disable=import-outside-toplevel\n\ \n import boto3 # pylint: disable=import-outside-toplevel\n import\ \ requests # pylint: disable=import-outside-toplevel\n\n filenames_list\ \ = [name.strip() for name in filenames.split(\",\") if name.strip()]\n\ \ if not filenames_list:\n raise ValueError(\n \"filenames\ \ must contain at least one filename (comma-separated)\"\n )\n\n\ \ output_path_p = Path(output_path.path)\n output_path_p.mkdir(parents=True,\ \ exist_ok=True)\n\n if from_s3:\n if not os.path.exists(s3_secret_mount_path):\n\ \ raise ValueError(\n f\"Secret for S3 should\ \ be mounted in {s3_secret_mount_path}\"\n )\n\n s3_endpoint_url_secret\ \ = \"S3_ENDPOINT_URL\"\n s3_endpoint_url_file_path = os.path.join(\n\ \ s3_secret_mount_path, s3_endpoint_url_secret\n )\n \ \ if os.path.isfile(s3_endpoint_url_file_path):\n with\ \ open(s3_endpoint_url_file_path) as f:\n s3_endpoint_url\ \ = f.read()\n else:\n raise ValueError(\n \ \ f\"Key {s3_endpoint_url_secret} not defined in secret {s3_secret_mount_path}\"\ \n )\n\n s3_access_key_secret = \"S3_ACCESS_KEY\"\n \ \ s3_access_key_file_path = os.path.join(\n s3_secret_mount_path,\ \ s3_access_key_secret\n )\n if os.path.isfile(s3_access_key_file_path):\n\ \ with open(s3_access_key_file_path) as f:\n s3_access_key\ \ = f.read()\n else:\n raise ValueError(\n \ \ f\"Key {s3_access_key_secret} not defined in secret {s3_secret_mount_path}\"\ \n )\n\n s3_secret_key_secret = \"S3_SECRET_KEY\"\n \ \ s3_secret_key_file_path = os.path.join(\n s3_secret_mount_path,\ \ s3_secret_key_secret\n )\n if os.path.isfile(s3_secret_key_file_path):\n\ \ with open(s3_secret_key_file_path) as f:\n s3_secret_key\ \ = f.read()\n else:\n raise ValueError(\n \ \ f\"Key {s3_secret_key_secret} not defined in secret {s3_secret_mount_path}\"\ \n )\n\n s3_bucket_secret = \"S3_BUCKET\"\n s3_bucket_file_path\ \ = os.path.join(s3_secret_mount_path, s3_bucket_secret)\n if os.path.isfile(s3_bucket_file_path):\n\ \ with open(s3_bucket_file_path) as f:\n s3_bucket\ \ = f.read()\n else:\n raise ValueError(\n \ \ f\"Key {s3_bucket_secret} not defined in secret {s3_secret_mount_path}\"\ \n )\n\n s3_prefix_secret = \"S3_PREFIX\"\n s3_prefix_file_path\ \ = os.path.join(s3_secret_mount_path, s3_prefix_secret)\n if os.path.isfile(s3_prefix_file_path):\n\ \ with open(s3_prefix_file_path) as f:\n s3_prefix\ \ = f.read()\n else:\n raise ValueError(\n \ \ f\"Key {s3_prefix_secret} not defined in secret {s3_secret_mount_path}\"\ \n )\n\n if not s3_endpoint_url:\n raise ValueError(\"\ S3_ENDPOINT_URL must be provided\")\n\n if not s3_bucket:\n \ \ raise ValueError(\"S3_BUCKET must be provided\")\n\n s3_client\ \ = boto3.client(\n \"s3\",\n endpoint_url=s3_endpoint_url,\n\ \ aws_access_key_id=s3_access_key,\n aws_secret_access_key=s3_secret_key,\n\ \ )\n\n for filename in filenames_list:\n orig\ \ = f\"{s3_prefix.rstrip('/')}/{filename.lstrip('/')}\"\n dest\ \ = output_path_p / filename\n print(f\"import-test-pdfs: downloading\ \ {orig} -> {dest} from s3\", flush=True)\n s3_client.download_file(s3_bucket,\ \ orig, dest)\n else:\n if not base_url:\n raise ValueError(\"\ base_url must be provided\")\n\n for filename in filenames_list:\n\ \ url = f\"{base_url.rstrip('/')}/{filename.lstrip('/')}\"\n\ \ dest = output_path_p / filename\n print(f\"import-test-pdfs:\ \ downloading {url} -> {dest}\", flush=True)\n with requests.get(url,\ \ stream=True, timeout=30) as resp:\n resp.raise_for_status()\n\ \ with dest.open(\"wb\") as f:\n for chunk\ \ in resp.iter_content(chunk_size=8192):\n if chunk:\n\ \ f.write(chunk)\n\n print(\"import-test-pdfs:\ \ done\", flush=True)\n\n" image: registry.access.redhat.com/ubi9/python-311:9.6-1755074620 pipelineInfo: description: Docling VLM convert pipeline by the Data Processing Team name: data-processing-docling-vlm-pipeline root: dag: tasks: create-pdf-splits: cachingOptions: enableCache: true componentRef: name: comp-create-pdf-splits dependentTasks: - import-pdfs inputs: artifacts: input_path: taskOutputArtifact: outputArtifactKey: output_path producerTask: import-pdfs parameters: num_splits: componentInputParameter: num_splits taskInfo: name: create-pdf-splits download-docling-models: cachingOptions: {} componentRef: name: comp-download-docling-models inputs: parameters: pipeline_type: runtimeValue: constant: vlm remote_model_endpoint_enabled: componentInputParameter: docling_remote_model_enabled taskInfo: name: download-docling-models for-loop-1: componentRef: name: comp-for-loop-1 dependentTasks: - create-pdf-splits - download-docling-models - import-pdfs inputs: artifacts: pipelinechannel--download-docling-models-output_path: taskOutputArtifact: outputArtifactKey: output_path producerTask: download-docling-models pipelinechannel--import-pdfs-output_path: taskOutputArtifact: outputArtifactKey: output_path producerTask: import-pdfs parameters: pipelinechannel--create-pdf-splits-Output: taskOutputParameter: outputParameterKey: Output producerTask: create-pdf-splits pipelinechannel--docling_image_export_mode: componentInputParameter: docling_image_export_mode pipelinechannel--docling_num_threads: componentInputParameter: docling_num_threads pipelinechannel--docling_remote_model_enabled: componentInputParameter: docling_remote_model_enabled pipelinechannel--docling_timeout_per_document: componentInputParameter: docling_timeout_per_document parameterIterator: itemInput: pipelinechannel--create-pdf-splits-Output-loop-item items: inputParameter: pipelinechannel--create-pdf-splits-Output taskInfo: name: for-loop-1 import-pdfs: cachingOptions: {} componentRef: name: comp-import-pdfs inputs: parameters: base_url: componentInputParameter: pdf_base_url filenames: componentInputParameter: pdf_filenames from_s3: componentInputParameter: pdf_from_s3 s3_secret_mount_path: runtimeValue: constant: /mnt/secrets taskInfo: name: import-pdfs inputDefinitions: parameters: docling_image_export_mode: defaultValue: embedded isOptional: true parameterType: STRING docling_num_threads: defaultValue: 4.0 isOptional: true parameterType: NUMBER_INTEGER docling_remote_model_enabled: defaultValue: false isOptional: true parameterType: BOOLEAN docling_timeout_per_document: defaultValue: 300.0 isOptional: true parameterType: NUMBER_INTEGER num_splits: defaultValue: 3.0 isOptional: true parameterType: NUMBER_INTEGER pdf_base_url: defaultValue: https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf isOptional: true parameterType: STRING pdf_filenames: defaultValue: 2203.01017v2.pdf isOptional: true parameterType: STRING pdf_from_s3: defaultValue: false isOptional: true parameterType: BOOLEAN schemaVersion: 2.1.0 sdkVersion: kfp-2.14.6 --- platforms: kubernetes: deploymentSpec: executors: exec-docling-convert-vlm: secretAsVolume: - mountPath: /mnt/secrets optional: true secretName: data-processing-docling-pipeline secretNameParameter: runtimeValue: constant: data-processing-docling-pipeline exec-import-pdfs: secretAsVolume: - mountPath: /mnt/secrets optional: true secretName: data-processing-docling-pipeline secretNameParameter: runtimeValue: constant: data-processing-docling-pipeline