Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _create_model(
if model_name in self.properties_cache:
return self.properties_cache[model_name]

fields = self._extract_fields(schema, defs)
fields = self._extract_fields(schema, defs, model_name)
model = create_model(model_name, **fields)
self.properties_cache[model_name] = model
return model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ def _resolve_field_type(
self: "McpToolSpec",
field_schema: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> Any:
"""Resolve the Python type from a field schema."""
if "$ref" in field_schema:
return self._resolve_reference(field_schema, defs)
if "enum" in field_schema:
return Literal[tuple(field_schema["enum"])]
if "anyOf" in field_schema:
return self._resolve_union_type(field_schema, defs)
return self._resolve_basic_type(field_schema, defs)
return self._resolve_union_type(
field_schema, defs, field_name, parent_model_name
)
return self._resolve_basic_type(
field_schema, defs, field_name, parent_model_name
)

def _resolve_reference(
self: "McpToolSpec",
Expand Down Expand Up @@ -59,17 +65,22 @@ def _resolve_union_type(
self: "McpToolSpec",
schema: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> Any:
"""Resolve a Union type (anyOf)."""
union_types = [
self._resolve_union_option(option, defs) for option in schema["anyOf"]
self._resolve_union_option(option, defs, field_name, parent_model_name)
for option in schema["anyOf"]
]
return Union[tuple(union_types)] if len(union_types) > 1 else union_types[0]

def _resolve_union_option(
self: "McpToolSpec",
option: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> Any:
"""Resolve a single option in a union type."""
if "$ref" in option:
Expand All @@ -78,28 +89,42 @@ def _resolve_union_option(
return Literal[tuple(option["enum"])]
if option.get("type") == "null":
return type(None)
return self._resolve_basic_type(option, defs)
return self._resolve_basic_type(option, defs, field_name, parent_model_name)

def _resolve_basic_type(
self: "McpToolSpec",
schema: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> Any:
"""Resolve a basic JSON Schema type."""
json_type = schema.get("type", "string")
json_type = json_type[0] if isinstance(json_type, list) else json_type

if self._is_simple_array(schema):
return self._create_list_type(schema, defs)
return self._create_list_type(schema, defs, field_name, parent_model_name)
if self._is_object_with_properties(schema):
return self._create_inline_object_model(
schema, defs, field_name, parent_model_name
)
if self._is_simple_object(schema):
return self._create_dict_type(schema, defs)
return json_type_mapping.get(json_type, str)


class TypeCreationMixin:
def _create_list_type(self: "McpToolSpec", schema: dict, defs: dict) -> type:
def _create_list_type(
self: "McpToolSpec",
schema: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> type:
"""Create a List type from schema."""
item_type = self._resolve_field_type(schema["items"], defs)
item_type = self._resolve_field_type(
schema["items"], defs, field_name, parent_model_name
)
return List[item_type]

def _create_dict_type(self: "McpToolSpec", schema: dict, defs: dict) -> type:
Expand All @@ -115,6 +140,17 @@ def _create_dict_type(self: "McpToolSpec", schema: dict, defs: dict) -> type:

return Dict[str, Any]

def _create_inline_object_model(
self: "McpToolSpec",
schema: dict,
defs: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> type:
"""Create a Pydantic model for an inline object schema."""
model_name = self._get_inline_model_name(schema, field_name, parent_model_name)
return self._create_model(schema, model_name, defs)

def _is_simple_array(self: "McpToolSpec", schema: dict) -> bool:
"""Check if schema is a simple array type."""
return schema.get("type") == "array" and "items" in schema
Expand All @@ -129,13 +165,41 @@ def _is_simple_object(self: "McpToolSpec", schema: dict) -> bool:
and isinstance(additional_props, dict)
)

def _is_object_with_properties(self: "McpToolSpec", schema: dict) -> bool:
"""Check if schema is an object type with declared properties."""
return schema.get("type") == "object" and "properties" in schema

def _get_inline_model_name(
self: "McpToolSpec",
schema: dict,
field_name: str | None = None,
parent_model_name: str | None = None,
) -> str:
"""Build a stable model name for inline object schemas."""
if schema.get("title"):
return schema["title"]

parts = [parent_model_name, field_name]
raw_name = "".join(part or "" for part in parts) or "InlineObject"
return "".join(
segment.capitalize()
for segment in "".join(
char if char.isalnum() else " " for char in raw_name
).split()
)

def _extract_ref_name(self: "McpToolSpec", ref_path: str) -> str:
"""Extract reference name from $ref path."""
return ref_path.split("#/$defs/")[-1]


class FieldExtractionMixin:
def _extract_fields(self: "McpToolSpec", schema: dict, defs: dict) -> dict:
def _extract_fields(
self: "McpToolSpec",
schema: dict,
defs: dict,
model_name: str | None = None,
) -> dict:
"""Extract Pydantic fields from schema."""
properties = self._get_properties(schema)
required_fields = set(schema.get("required", []))
Expand All @@ -146,7 +210,9 @@ def _extract_fields(self: "McpToolSpec", schema: dict, defs: dict) -> dict:

fields = {}
for field_name, field_schema in properties.items():
field_type = self._resolve_field_type(field_schema, defs)
field_type = self._resolve_field_type(
field_schema, defs, field_name, schema.get("title") or model_name
)
default_value, final_type = self._set_field_default(
field_name,
required_fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,41 @@ def test_schema_structure_exact_match(client: BasicMCPClient):
assert set(json_schema["required"]) == {"name", "method", "lst"}


def test_create_model_from_json_schema_preserves_nested_inline_object():
"""
Regression test for https://github.com/run-llama/llama_index/issues/22141

Inline object properties should be converted to nested Pydantic models instead
of falling back to a bare Dict that drops the object's declared properties.
"""
schema = {
"properties": {
"nested": {
"properties": {"value": {"title": "Value", "type": "string"}},
"required": ["value"],
"type": "object",
},
},
"required": ["nested"],
"type": "object",
}

model = McpToolSpec(None, allowed_tools=[]).create_model_from_json_schema(schema)
json_schema = model.model_json_schema()
nested_schema = json_schema["properties"]["nested"]

if "$ref" in nested_schema:
nested_schema = json_schema["$defs"][nested_schema["$ref"].split("/")[-1]]

assert nested_schema["type"] == "object"
assert nested_schema["properties"]["value"]["type"] == "string"
assert nested_schema["required"] == ["value"]

model(nested={"value": "ok"})
with pytest.raises(ValueError):
model(nested={})


def test_resolve_union_option_with_enum(client: BasicMCPClient):
"""
Regression test for https://github.com/run-llama/llama_index/issues/20109
Expand Down