Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
56e925c
Draft : implementation OpenAI support
legstar67 Mar 11, 2026
f99a690
continuation and correction of the OpenAI API support
legstar67 Mar 16, 2026
7189b3e
cleaning
legstar67 Mar 16, 2026
141d214
small modifications and cleaning
legstar67 Mar 22, 2026
7811496
reset from the main
legstar67 Mar 22, 2026
8e085a3
implementation of config classes and agnostic-provider classes
legstar67 Mar 22, 2026
f26f804
pytests for previous implementations
legstar67 Mar 23, 2026
2003d34
OpenAI API implementation
legstar67 Mar 23, 2026
95ae28c
unit tests for OpenAI implementation
legstar67 Mar 23, 2026
4dd199f
chunking by byte implementation + its unit tests
legstar67 Mar 25, 2026
6964db8
implementation of the provider-agnostic batch orchestrator and metada…
legstar67 Mar 25, 2026
77e39e4
unit tests + corrections
legstar67 Mar 26, 2026
a5f9b62
implementation of the status checker , and the collector + unit tests…
legstar67 Mar 26, 2026
b32b4ae
correction on the image handling + tests
legstar67 Mar 26, 2026
c55efaf
correct the problem Copilot spotted about a global index which is not…
legstar67 Mar 26, 2026
fead12e
correction of problem of sample id spotted by copilot
legstar67 Mar 26, 2026
cead89b
correction of the file naming, spotted by copilot, to allow sharding
legstar67 Mar 26, 2026
03eed23
add an import
legstar67 Mar 26, 2026
d60e337
Update src/mmirage/core/process/batch/collector.py
legstar67 Mar 26, 2026
46f5e65
Merge branch 'feature/llm-api-support' of github.com:EPFLiGHT/MMIRAGE…
legstar67 Mar 26, 2026
c642744
copilot suggestion
legstar67 Mar 26, 2026
c7d4bac
template application changed + small correction
legstar67 Mar 27, 2026
c5e75a2
suggestion by Copilot, mostly changing the CLI arguement to accept li…
legstar67 Mar 27, 2026
1ce4e2c
small correction, to respect backward compatibility
legstar67 Mar 27, 2026
30b52b8
small correction on the config for vision
legstar67 Mar 27, 2026
71f9d6f
big improvement : making the 3 steps of the pipeline really provider …
legstar67 May 1, 2026
7ff6e10
abstract method added in BaseProcessor
legstar67 May 2, 2026
c9c43e9
small update on the completion_window of openai
legstar67 May 2, 2026
69beb2f
adjust adapter.py thanks to the comments of @fabnemEPFL
legstar67 May 2, 2026
ddca511
make --metadata_path optional as already in the config file + tests
legstar67 May 5, 2026
473c906
use of a batch receipt base path
legstar67 May 5, 2026
92219ac
solve problem of parsing the openai output
legstar67 May 6, 2026
16b270a
upgrade of status checker according to the comments
legstar67 May 11, 2026
af013e5
modif according to comments : optimization + deduplication
legstar67 May 11, 2026
b9ff47c
new class BatchMetadataRecord instead of using dict
legstar67 May 11, 2026
bfad50e
log, tracking and syntax modif following the comments
legstar67 May 12, 2026
41d59f2
small modif following copilot's comments
legstar67 May 12, 2026
eaf089c
syntax improved according to comments
legstar67 May 12, 2026
91be3de
correction of use of a dict instead of the config class
legstar67 May 12, 2026
08a1dfa
cleaning
legstar67 May 12, 2026
69d01f1
improve of openai adapter following the comments
legstar67 May 12, 2026
e81d1a3
improvement of openai adapter following the comments
legstar67 May 12, 2026
c32d633
improvement of openai adapter following the comments
legstar67 May 12, 2026
0298f76
syntax improved following the comments + test recommended by a copilo…
legstar67 May 12, 2026
4d4f8f1
working version of llm api provider feature
legstar67 May 12, 2026
f23bc70
Merge branch 'main' into feature/llm-api-support
legstar67 May 12, 2026
645110a
small correction
legstar67 May 12, 2026
a8e5680
Merge branch 'main' into feature/llm-api-support
fabnemEPFL May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions configs/config_mock_openai_batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
processors:
- type: llm
server_args:
model_path: Qwen/Qwen3-4B-Instruct-2507
tp_size: 1
disable_custom_all_reduce: true
default_sampling_params:
temperature: 0.1
top_p: 0.9
max_new_tokens: 1024
custom_params:
chat_template_kwargs:
enable_thinking: false
batch_provider:
enabled: true
provider: openai
model: gpt-4o-mini
max_chunk_bytes: 52428800
metadata_output_path: tests/output/batch_metadata.jsonl
credentials:
api_key:

loading_params:
datasets:
- path: tests/mock_data/data.jsonl
type: JSONL
output_dir: tests/output/data_openai_batch

num_shards: 1
shard_id: 0
batch_size: 64

processing_params:
inputs:
- name: text
key: text

outputs:
- name: formatted_answer
type: llm
output_type: JSON
output_schema:
- question
- answer
prompt: |
Generate one question and its corresponding answer using the following text:
```
{{ text }}
```

remove_columns: true
output_schema:
conversations:
- role: "user"
content: "{{ formatted_answer.question }}"
- role: "assistant"
content: "{{ formatted_answer.answer }}"
47 changes: 47 additions & 0 deletions configs/config_mock_openai_batch_vision.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
processors:
- type: llm
server_args:
model_path: Qwen/Qwen3-VL-8B-Instruct
tp_size: 1
trust_remote_code: true
chat_template: qwen2-vl
default_sampling_params:
temperature: 0.1
top_p: 0.9
max_new_tokens: 512
batch_provider:
enabled: true
provider: openai
model: gpt-4o-mini
metadata_output_path: tests/output/batch_metadata.jsonl
credentials:
api_key: ""

loading_params:
datasets:
- path: tests/mock_data_vision/data.jsonl
type: JSONL
output_dir: tests/output/data_openai_batch_vision
image_base_path: tests/mock_data_vision

num_shards: 1
shard_id: 0
batch_size: 1

processing_params:
inputs:
- name: image_input
key: image
type: image

outputs:
- name: caption
type: llm
output_type: plain
prompt: |
Describe what you see in this image in one concise sentence.

remove_columns: false
output_schema:
image: "{{ image_input }}"
caption: "{{ caption }}"
95 changes: 95 additions & 0 deletions src/mmirage/config/batch_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Provider-agnostic batch configuration contracts.

This module defines the shared configuration shape used by any future batch
submission provider (OpenAI, Anthropic, etc.).
"""

from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Dict, Literal, Optional


class OversizedRequestPolicy(str, Enum):
"""Policy for handling single requests that exceed the chunk byte limit."""

ISOLATE = "isolate"
REJECT = "reject"


@dataclass
class BatchRetryPolicy:
"""Retry behavior used by provider-neutral batch submission orchestration.

Attributes:
max_attempts: Maximum number of submission attempts for retryable errors.
initial_backoff_seconds: Delay before the first retry attempt.
backoff_multiplier: Multiplicative factor for subsequent retry delays.
"""

max_attempts: int = 3
initial_backoff_seconds: float = 2.0
backoff_multiplier: float = 2.0

def __post_init__(self) -> None:
if self.max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
if self.initial_backoff_seconds < 0:
raise ValueError("initial_backoff_seconds must be >= 0")
if self.backoff_multiplier < 1:
raise ValueError("backoff_multiplier must be >= 1")


@dataclass
class BatchProviderConfig:
"""Shared contract for provider-specific batch configuration.

Concrete provider configs should inherit from this dataclass and extend it
with provider-specific settings. The fields here are intentionally provider
neutral so chunking/submission orchestration can run through one typed path.

Attributes:
provider: Provider identifier (for example, "openai" or "anthropic").
enabled: Whether batch submission mode is enabled.
max_chunk_bytes: Maximum serialized request bytes per chunk.
Defaults to 50 MB.
max_requests_per_chunk: Optional hard cap on number of requests in a
chunk. If None, no request-count cap is enforced.
metadata_output_path: Base path where submission metadata receipts are saved.
Submission writes suffixed files like ``.text.<run>.jsonl`` and
``.multimodal.<run>.jsonl`` from this base path.
retry_policy: Retry policy used by the shared batch layer.
oversized_request_policy: Handling policy when a single request exceeds
``max_chunk_bytes``. ``isolate`` creates a dedicated oversized
chunk, while ``reject`` fails fast.
extras: Provider-specific knobs that do not belong in the shared fields.
credentials: Provider credentials required to submit chunks.
"""

provider: str
enabled: bool = True
max_chunk_bytes: int = 50 * 1024 * 1024
max_requests_per_chunk: Optional[int] = None
metadata_output_path: str = ""
retry_policy: BatchRetryPolicy = field(default_factory=BatchRetryPolicy)
oversized_request_policy: OversizedRequestPolicy | str = OversizedRequestPolicy.ISOLATE
extras: Dict[str, Any] = field(default_factory=dict)
credentials: Dict[str, str] = field(default_factory=dict)

def __post_init__(self) -> None:
self.provider = self.provider.strip().lower()

if not self.provider:
raise ValueError("provider must be a non-empty string")
if self.max_chunk_bytes < 1:
raise ValueError("max_chunk_bytes must be >= 1")
if self.max_requests_per_chunk is not None and self.max_requests_per_chunk < 1:
raise ValueError("max_requests_per_chunk must be >= 1 when provided")
if isinstance(self.oversized_request_policy, str):
try:
self.oversized_request_policy = OversizedRequestPolicy(
self.oversized_request_policy.strip().lower()
)
except ValueError as exc:
raise ValueError(
"oversized_request_policy must be either 'isolate' or 'reject'"
) from exc
47 changes: 47 additions & 0 deletions src/mmirage/config/openai_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""OpenAI-specific batch configuration."""

from dataclasses import dataclass, field
from typing import Any, Dict, Literal, Optional

from mmirage.config.batch_provider import BatchProviderConfig


@dataclass
class OpenAIBatchConfig(BatchProviderConfig):
"""OpenAI Batch API configuration.

Attributes:
provider: Fixed provider identifier for OpenAI.
model: Model name used in each chat completion request body.
batch_endpoint: Target endpoint used by OpenAI batch jobs.
completion_window: OpenAI completion window value.
base_url: Optional base URL, useful for API-compatible gateways.
metadata: Metadata sent on batch creation.
"""

provider: str = "openai"
model: str = "gpt-4.1-mini"
batch_endpoint: str = "/v1/chat/completions"
completion_window: str = "24h"
base_url: Optional[str] = None
metadata: Dict[str, str] = field(default_factory=dict)

def __post_init__(self) -> None:
super().__post_init__()
allowed_windows = {"24h"}
if self.completion_window not in allowed_windows:
raise ValueError(f"completion_window must be one of {allowed_windows}")

if not self.model.strip():
raise ValueError("model must be a non-empty string")
if not self.batch_endpoint.startswith("/"):
raise ValueError("batch_endpoint must start with '/'")

# Mirror OpenAI-specific fields into generic extras for provider-neutral consumers.
self.extras.setdefault("model", self.model)
self.extras.setdefault("batch_endpoint", self.batch_endpoint)
self.extras.setdefault("completion_window", self.completion_window)
if self.base_url:
self.extras.setdefault("base_url", self.base_url)
if self.metadata:
self.extras.setdefault("metadata", dict(self.metadata))
6 changes: 6 additions & 0 deletions src/mmirage/config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import yaml
import os

from mmirage.config.batch_provider import BatchProviderConfig
from mmirage.config.config import MMirageConfig
from mmirage.core.process.base import BaseProcessorConfig, ProcessorRegistry, OutputVar
from mmirage.core.process.batch.provider_resolution import resolve_single_provider_config
from mmirage.core.loader.base import BaseDataLoaderConfig, DataLoaderRegistry

# Register built-in processors/loaders.
Expand Down Expand Up @@ -111,12 +113,16 @@ def output_var_hook(data: Dict[str, Any]) -> OutputVar:
clz = ProcessorRegistry.get_output_var_cls(data["type"])
return from_dict(clz, data, config=config)

def batch_provider_hook(data: Dict[str, Any]) -> BatchProviderConfig:
return resolve_single_provider_config(data)

cfg = expand_env_vars(cfg)
config = Config(
type_hooks={
BaseProcessorConfig: processor_config_hook,
BaseDataLoaderConfig: loader_config_hook,
OutputVar: output_var_hook,
BatchProviderConfig: batch_provider_hook,
}
)
cfg_obj = from_dict(MMirageConfig, cast(dict, cfg), config=config)
Expand Down
4 changes: 4 additions & 0 deletions src/mmirage/core/process/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def batch_process_sample(
NotImplementedError: If not implemented by subclass.
"""
raise NotImplementedError()

def finalize(self) -> None:
"""Optional lifecycle hook; override when a processor buffers state."""
pass

@abc.abstractmethod
def get_token_counts(self) -> TokenCounts:
Expand Down
28 changes: 28 additions & 0 deletions src/mmirage/core/process/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Provider-agnostic batch processing contracts and registry."""

from mmirage.core.process.batch.adapter import BatchSubmissionAdapter, BatchSubmissionResult
from mmirage.core.process.batch.collector import collect_and_merge
from mmirage.core.process.batch.chunking import BatchRequestChunker, RequestChunk
from mmirage.core.process.batch.openai_adapter import OpenAIBatchAdapter
from mmirage.core.process.batch.orchestrator import BatchSubmissionOrchestrator
from mmirage.core.process.batch.registry import BatchAdapterFactory, BatchAdapterRegistry
from mmirage.core.process.batch.status_checker import (
extract_unique_provider_batches,
run_status_checker,
)
from mmirage.config.openai_batch import OpenAIBatchConfig

__all__ = [
"BatchSubmissionAdapter",
"BatchSubmissionResult",
"collect_and_merge",
"BatchRequestChunker",
"RequestChunk",
"BatchSubmissionOrchestrator",
"OpenAIBatchAdapter",
"OpenAIBatchConfig",
"BatchAdapterFactory",
"BatchAdapterRegistry",
"extract_unique_provider_batches",
"run_status_checker",
]
Loading
Loading