Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docetl/operations/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@
from pydantic import model_validator

from docetl.operations.map import MapOperation
from docetl.operations.utils.validation import (
convert_schema_to_dict_format,
is_pydantic_model,
)


class FilterOperation(MapOperation):
class schema(MapOperation.schema):
type: str = "filter"
prompt: str
output: dict[str, Any]
output: dict[str, Any] | Any

@model_validator(mode="after")
def validate_filter_output_schema(self):
# Check that schema exists and has the right structure for filtering
schema_dict = self.output["schema"]
raw_schema = self.output["schema"]

# Convert Pydantic schema to dict format for validation
if is_pydantic_model(raw_schema):
schema_dict = convert_schema_to_dict_format(raw_schema)
else:
schema_dict = raw_schema

# Filter out _short_explanation for validation
schema = {k: v for k, v in schema_dict.items() if k != "_short_explanation"}
Expand Down
16 changes: 14 additions & 2 deletions docetl/operations/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
from docetl.operations.base import BaseOperation
from docetl.operations.utils import RichLoopBar, strict_render, validate_output_types
from docetl.operations.utils.api import OutputMode
from docetl.operations.utils.validation import (
convert_schema_to_dict_format,
is_pydantic_model,
)


class MapOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "map"
output: dict[str, Any] | None = None
output: dict[str, Any] | Any | None = None
prompt: str | None = None
model: str | None = None
optimize: bool | None = None
Expand Down Expand Up @@ -610,7 +614,15 @@ def execute(self, input_data: list[dict]) -> tuple[list[dict], float]:
"""
results = {}
total_cost = 0
output_schema = self.config.get("output", {}).get("schema", {})
# Handle both dict and Pydantic schemas
output_config = self.config.get("output", {})
raw_schema = output_config.get("schema", {})

# Convert Pydantic schema to dict format if needed
if is_pydantic_model(raw_schema):
output_schema = convert_schema_to_dict_format(raw_schema)
else:
output_schema = raw_schema

# Check if there's no prompt and only drop_keys
if "prompts" not in self.config and "drop_keys" in self.config:
Expand Down
15 changes: 14 additions & 1 deletion docetl/operations/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

# Import OutputMode enum for structured output checks
from docetl.operations.utils.api import OutputMode
from docetl.operations.utils.validation import (
convert_schema_to_dict_format,
is_pydantic_model,
)
from docetl.utils import completion_cost


Expand All @@ -46,7 +50,7 @@ class ReduceOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "reduce"
reduce_key: str | list[str]
output: dict[str, Any]
output: dict[str, Any] | Any
prompt: str
optimize: bool | None = None
synthesize_resolve: bool | None = None
Expand Down Expand Up @@ -200,6 +204,15 @@ def execute(self, input_data: list[dict]) -> tuple[list[dict], float]:
f"Using gleaning with validation prompt: {self.config.get('gleaning', {}).get('validation_prompt', '')}"
)

# Handle both dict and Pydantic schemas
if self.config.get("output") and "schema" in self.config["output"]:
raw_schema = self.config["output"]["schema"]
if is_pydantic_model(raw_schema):
# Convert Pydantic schema to dict format for internal processing
self.config["output"]["schema"] = convert_schema_to_dict_format(
raw_schema
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Schema Mutation Causes Inconsistent Behavior

The ReduceOperation and ResolveOperation classes mutate self.config["output"]["schema"] in place. When a Pydantic model is provided for the output schema, it's converted to a dictionary and directly overwrites the original Pydantic model in the configuration. This can lead to inconsistent behavior or break functionality if the operation instance is reused, as subsequent executions will operate on the converted dictionary schema.

Additional Locations (1)

Fix in Cursor Fix in Web

reduce_keys = self.config["reduce_key"]
if isinstance(reduce_keys, str):
reduce_keys = [reduce_keys]
Expand Down
24 changes: 20 additions & 4 deletions docetl/operations/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

from docetl.operations.base import BaseOperation
from docetl.operations.utils import RichLoopBar, rich_as_completed, strict_render
from docetl.operations.utils.validation import (
convert_schema_to_dict_format,
is_pydantic_model,
)
from docetl.utils import completion_cost, extract_jinja_variables


Expand All @@ -29,7 +33,7 @@ class schema(BaseOperation.schema):
type: str = "resolve"
comparison_prompt: str
resolution_prompt: str | None = None
output: dict[str, Any] | None = None
output: dict[str, Any] | Any | None = None
embedding_model: str | None = None
resolution_model: str | None = None
comparison_model: str | None = None
Expand Down Expand Up @@ -113,12 +117,15 @@ def validate_output_schema(self, info: ValidationInfo):
if "schema" not in self.output:
raise ValueError("Missing 'schema' in 'output' configuration")

if not isinstance(self.output["schema"], dict):
# Accept both dict schemas and Pydantic models
schema = self.output["schema"]
if not isinstance(schema, dict) and not is_pydantic_model(schema):
raise TypeError(
"'schema' in 'output' configuration must be a dictionary"
"'schema' in 'output' configuration must be a dictionary or Pydantic BaseModel"
)

if not self.output["schema"]:
# Check if schema is empty (only applies to dict schemas)
if isinstance(schema, dict) and not schema:
raise ValueError("'schema' in 'output' configuration cannot be empty")

return self
Expand Down Expand Up @@ -208,6 +215,15 @@ def execute(self, input_data: list[dict]) -> tuple[list[dict], float]:
if len(input_data) == 0:
return [], 0

# Handle both dict and Pydantic schemas
if self.config.get("output") and "schema" in self.config["output"]:
raw_schema = self.config["output"]["schema"]
if is_pydantic_model(raw_schema):
# Convert Pydantic schema to dict format for internal processing
self.config["output"]["schema"] = convert_schema_to_dict_format(
raw_schema
)

# Initialize observability data for all items at the start
if self.config.get("enable_observability", False):
observability_key = f"_observability_{self.config['name']}"
Expand Down
79 changes: 67 additions & 12 deletions docetl/operations/utils/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
)
from .validation import (
convert_dict_schema_to_list_schema,
convert_schema_to_dict_format,
convert_val,
get_user_input_for_schema,
is_pydantic_model,
safe_eval,
strict_render,
)
Expand Down Expand Up @@ -130,16 +132,32 @@ def call_llm_batch(
model: str,
op_type: str,
messages: list[dict[str, str]],
output_schema: dict[str, str],
output_schema: dict[str, str] | Any,
verbose: bool = False,
timeout_seconds: int = 120,
max_retries_per_timeout: int = 2,
bypass_cache: bool = False,
litellm_completion_kwargs: dict[str, Any] = {},
op_config: dict[str, Any] = {},
) -> LLMResult:
# Turn the output schema into a list of schemas
output_schema = convert_dict_schema_to_list_schema(output_schema)
# Handle Pydantic schemas
if is_pydantic_model(output_schema):
# Auto-enable structured output for Pydantic schemas
op_config = op_config.copy()
if "output" not in op_config:
op_config["output"] = {}
if "mode" not in op_config["output"]:
op_config["output"]["mode"] = OutputMode.STRUCTURED_OUTPUT.value

# For structured output mode, pass the Pydantic model directly
# The LLM API will handle the OpenAPI conversion internally
op_config["_pydantic_schema"] = output_schema
# Convert to dict format only for the list schema wrapper
dict_schema = convert_schema_to_dict_format(output_schema, model)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Schema Conversion Function Parameter Mismatch

The convert_schema_to_dict_format function is called with the LLM model name as its second argument. The function's model parameter, which defaults to 'gpt-4o-mini', seems intended for schema conversion logic rather than the LLM model name. This semantic mismatch could lead to incorrect schema conversion behavior.

Fix in Cursor Fix in Web

output_schema = convert_dict_schema_to_list_schema(dict_schema)
else:
# Regular dict schema processing
output_schema = convert_dict_schema_to_list_schema(output_schema)

# Invoke the LLM call
return self.call_llm(
Expand Down Expand Up @@ -442,7 +460,7 @@ def call_llm(
model: str,
op_type: str,
messages: list[dict[str, str]],
output_schema: dict[str, str],
output_schema: dict[str, str] | Any,
tools: list[dict[str, str]] | None = None,
scratchpad: str | None = None,
timeout_seconds: int = 120,
Expand Down Expand Up @@ -479,6 +497,20 @@ def call_llm(
Raises:
TimeoutError: If the call times out after retrying.
"""
# Handle Pydantic schemas
if is_pydantic_model(output_schema):
# Auto-enable structured output for Pydantic schemas
op_config = op_config.copy()
if "output" not in op_config:
op_config["output"] = {}
if "mode" not in op_config["output"]:
op_config["output"]["mode"] = OutputMode.STRUCTURED_OUTPUT.value

# Store the Pydantic schema for structured output
op_config["_pydantic_schema"] = output_schema
# Convert to dict format for internal processing
output_schema = convert_schema_to_dict_format(output_schema, model)

# Determine output mode using central enum
output_mode_str = op_config.get("output", {}).get(
"mode", OutputMode.TOOLS.value
Expand Down Expand Up @@ -674,15 +706,38 @@ def _call_llm_with_cache(
# Prepare structured output schema if using structured output mode
response_format = None
if use_structured_output:
if scratchpad is not None:
props["updated_scratchpad"] = {"type": "string"}
# Check if we have a Pydantic schema to use directly
pydantic_schema = op_config.get("_pydantic_schema")
if pydantic_schema and is_pydantic_model(pydantic_schema):
# Use the OpenAPI schema from Pydantic directly
from .validation import pydantic_to_openapi_schema

openapi_schema = pydantic_to_openapi_schema(pydantic_schema)

if scratchpad is not None:
# Add scratchpad to the schema properties
openapi_schema = openapi_schema.copy()
openapi_schema["properties"] = openapi_schema["properties"].copy()
openapi_schema["properties"]["updated_scratchpad"] = {
"type": "string"
}
if "required" in openapi_schema:
openapi_schema["required"] = list(
openapi_schema["required"]
) + ["updated_scratchpad"]

schema = {
"type": "object",
"properties": props,
"required": list(props.keys()),
"additionalProperties": False,
}
schema = openapi_schema
else:
# Use the converted dict schema
if scratchpad is not None:
props["updated_scratchpad"] = {"type": "string"}

schema = {
"type": "object",
"properties": props,
"required": list(props.keys()),
"additionalProperties": False,
}

response_format = {
"type": "json_schema",
Expand Down
Loading