Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions python/dotpromptz/src/dotpromptz/picoschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ async def picoschema_to_json_schema(schema: Any, schema_resolver: SchemaResolver
return await PicoschemaParser(schema_resolver).parse(schema)


def picoschema_to_json_schema_sync(schema: Any) -> JsonSchema | None:
"""Parses a Picoschema definition into a JSON Schema (synchronous).

This sync version works for schemas using only built-in scalar types
(string, number, integer, boolean, null, any). Use the async version
if you need to resolve named schema references.

Args:
schema: The Picoschema definition (can be a dict or string).

Returns:
The equivalent JSON Schema, or None if the input schema is None.

Raises:
ValueError: If the schema references a named type that requires resolution.
"""
return PicoschemaParser(schema_resolver=None).parse_sync(schema)


class PicoschemaParser:
"""Parses Picoschema definitions into JSON Schema.

Expand Down Expand Up @@ -160,6 +179,38 @@ async def parse(self, schema: Any) -> JsonSchema | None:
# If the schema is not a JSON Schema, parse it as Picoschema.
return await self.parse_pico(schema)

def parse_sync(self, schema: Any) -> JsonSchema | None:
"""Parses a schema synchronously (no schema resolution support).

Args:
schema: The schema definition to parse.

Returns:
The resulting JSON Schema, or None if the input is None.

Raises:
ValueError: If the schema references a named type that requires resolution.
"""
if not schema:
return None

if isinstance(schema, str):
type_name, description = extract_description(schema)
if type_name in JSON_SCHEMA_SCALAR_TYPES:
out: JsonSchema = {'type': type_name}
if description:
out['description'] = description
return out
raise ValueError(f"Picoschema: unsupported scalar type '{type_name}'. Use async version for schema resolution.")

if isinstance(schema, dict) and _is_json_schema(schema):
return cast(JsonSchema, schema)

if isinstance(schema, dict) and isinstance(schema.get('properties'), dict):
return {**cast(JsonSchema, schema), 'type': 'object'}

return self.parse_pico_sync(schema)

async def parse_pico(self, obj: Any, path: list[str] | None = None) -> JsonSchema:
"""Recursively parses a Picoschema object or string fragment.

Expand Down Expand Up @@ -244,6 +295,89 @@ async def parse_pico(self, obj: Any, path: list[str] | None = None) -> JsonSchem
del schema['required']
return schema

def parse_pico_sync(self, obj: Any, path: list[str] | None = None) -> JsonSchema:
"""Recursively parses a Picoschema object synchronously.

Args:
obj: The Picoschema fragment (dict or string).
path: The current path within the schema structure (for error reporting).

Returns:
The JSON Schema representation of the fragment.

Raises:
ValueError: If the schema references a named type or is invalid.
"""
if path is None:
path = []

if isinstance(obj, str):
type_name, description = extract_description(obj)
if type_name not in JSON_SCHEMA_SCALAR_TYPES:
raise ValueError(f"Picoschema: unsupported scalar type '{type_name}'. Use async version for schema resolution.")

if type_name == 'any':
return {'description': description} if description else {}

return {'type': type_name, 'description': description} if description else {'type': type_name}
elif not isinstance(obj, dict):
raise ValueError(f'Picoschema: only consists of objects and strings. Got: {obj}')

schema: dict[str, Any] = {
'type': 'object',
'properties': {},
'required': [],
'additionalProperties': False,
}

for key, value in obj.items():
if key == WILDCARD_PROPERTY_NAME:
schema['additionalProperties'] = self.parse_pico_sync(value, [*path, key])
continue

parts = key.split('(')
name = parts[0]
type_info = parts[1][:-1] if len(parts) > 1 else None
is_optional = name.endswith('?')
property_name = name[:-1] if is_optional else name

if not is_optional:
schema['required'].append(property_name)

if not type_info:
prop = self.parse_pico_sync(value, [*path, key])
if is_optional and isinstance(prop.get('type'), str):
prop['type'] = [prop['type'], 'null']
schema['properties'][property_name] = prop
continue

type_name, description = extract_description(type_info)
if type_name == 'array':
prop = self.parse_pico_sync(value, [*path, key])
schema['properties'][property_name] = {
'type': ['array', 'null'] if is_optional else 'array',
'items': prop,
}
elif type_name == 'object':
prop = self.parse_pico_sync(value, [*path, key])
if is_optional:
prop['type'] = [prop['type'], 'null']
schema['properties'][property_name] = prop
elif type_name == 'enum':
prop = {'enum': value}
if is_optional and None not in prop['enum']:
prop['enum'].append(None)
schema['properties'][property_name] = prop
else:
raise ValueError(f"Picoschema: parenthetical types must be 'object' or 'array', got: {type_name}")

if description:
schema['properties'][property_name]['description'] = description

if not schema['required']:
del schema['required']
return schema


def extract_description(input_str: str) -> tuple[str, str | None]:
"""Extracts the type/name and optional description from a Picoschema string.
Expand Down
124 changes: 124 additions & 0 deletions python/dotpromptz/tests/dotpromptz/picoschema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,130 @@ async def test_invalid_input_type(self) -> None:
await self.parser.parse_pico(123)


class TestPicoschemaParserSync(unittest.TestCase):
"""Synchronous picoschema parser tests."""

def setUp(self) -> None:
"""Set up the test case."""
self.parser = picoschema.PicoschemaParser()

def test_parse_sync_none(self) -> None:
"""Test parsing None returns None."""
self.assertIsNone(self.parser.parse_sync(None))

def test_parse_sync_scalar_type(self) -> None:
"""Test parsing a scalar type string."""
result = self.parser.parse_sync('string')
self.assertEqual(result, {'type': 'string'})

def test_parse_sync_scalar_with_description(self) -> None:
"""Test parsing a scalar type string with description."""
result = self.parser.parse_sync('string, a string')
self.assertEqual(result, {'type': 'string', 'description': 'a string'})

def test_parse_sync_object_schema(self) -> None:
"""Test parsing a standard JSON object schema."""
schema = {'type': 'object', 'properties': {'name': {'type': 'string'}}}
result = self.parser.parse_sync(schema)
self.assertEqual(result, schema)

def test_parse_sync_picoschema_object(self) -> None:
"""Test parsing a picoschema object."""
schema = {'name': 'string', 'age?': 'integer'}
expected = {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'age': {'type': ['integer', 'null']},
},
'required': ['name'],
'additionalProperties': False,
}
result = self.parser.parse_sync(schema)
self.assertEqual(result, expected)

def test_parse_sync_named_type_raises(self) -> None:
"""Test that parsing a named type raises ValueError."""
with self.assertRaises(ValueError) as context:
self.parser.parse_sync('CustomType')
self.assertIn('unsupported scalar type', str(context.exception))
self.assertIn('Use async version', str(context.exception))

def test_parse_pico_sync_array(self) -> None:
"""Test parsing array type synchronously."""
schema = {'items(array)': 'string'}
expected = {
'type': 'object',
'properties': {'items': {'type': 'array', 'items': {'type': 'string'}}},
'required': ['items'],
'additionalProperties': False,
}
result = self.parser.parse_pico_sync(schema)
self.assertEqual(result, expected)

def test_parse_pico_sync_nested_object(self) -> None:
"""Test parsing nested object synchronously."""
schema = {'user(object)': {'name': 'string', 'email?': 'string'}}
expected = {
'type': 'object',
'properties': {
'user': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'email': {'type': ['string', 'null']},
},
'required': ['name'],
'additionalProperties': False,
}
},
'required': ['user'],
'additionalProperties': False,
}
result = self.parser.parse_pico_sync(schema)
self.assertEqual(result, expected)

def test_parse_pico_sync_enum(self) -> None:
"""Test parsing enum type synchronously."""
schema = {'status(enum)': ['active', 'inactive']}
expected = {
'type': 'object',
'properties': {'status': {'enum': ['active', 'inactive']}},
'required': ['status'],
'additionalProperties': False,
}
result = self.parser.parse_pico_sync(schema)
self.assertEqual(result, expected)


class TestPicoschemaToJsonSchemaSync(unittest.TestCase):
"""Tests for the top-level sync function."""

def test_basic_schema(self) -> None:
"""Test converting a basic picoschema."""
schema = {'diff': 'string', 'context?': 'string'}
result = picoschema.picoschema_to_json_schema_sync(schema)
expected = {
'type': 'object',
'properties': {
'diff': {'type': 'string'},
'context': {'type': ['string', 'null']},
},
'required': ['diff'],
'additionalProperties': False,
}
self.assertEqual(result, expected)

def test_none_returns_none(self) -> None:
"""Test that None input returns None."""
self.assertIsNone(picoschema.picoschema_to_json_schema_sync(None))

def test_named_type_raises(self) -> None:
"""Test that named types raise ValueError."""
with self.assertRaises(ValueError):
picoschema.picoschema_to_json_schema_sync({'field': 'CustomType'})


class TestExtractDescription(unittest.TestCase):
"""Extract description tests."""

Expand Down
Loading