diff --git a/python/dotpromptz/src/dotpromptz/picoschema.py b/python/dotpromptz/src/dotpromptz/picoschema.py index 849997865..1764a6236 100644 --- a/python/dotpromptz/src/dotpromptz/picoschema.py +++ b/python/dotpromptz/src/dotpromptz/picoschema.py @@ -89,6 +89,25 @@ async def picoschema_to_json_schema(schema: Any, schema_resolver: SchemaResolver return await PicoschemaParser(schema_resolver).parse(schema) +def picoschema_to_json_schema_sync(schema: Any) -> JsonSchema | None: + """Parses a Picoschema definition into a JSON Schema (synchronous). + + This sync version works for schemas using only built-in scalar types + (string, number, integer, boolean, null, any). Use the async version + if you need to resolve named schema references. + + Args: + schema: The Picoschema definition (can be a dict or string). + + Returns: + The equivalent JSON Schema, or None if the input schema is None. + + Raises: + ValueError: If the schema references a named type that requires resolution. + """ + return PicoschemaParser(schema_resolver=None).parse_sync(schema) + + class PicoschemaParser: """Parses Picoschema definitions into JSON Schema. @@ -160,6 +179,38 @@ async def parse(self, schema: Any) -> JsonSchema | None: # If the schema is not a JSON Schema, parse it as Picoschema. return await self.parse_pico(schema) + def parse_sync(self, schema: Any) -> JsonSchema | None: + """Parses a schema synchronously (no schema resolution support). + + Args: + schema: The schema definition to parse. + + Returns: + The resulting JSON Schema, or None if the input is None. + + Raises: + ValueError: If the schema references a named type that requires resolution. + """ + if not schema: + return None + + if isinstance(schema, str): + type_name, description = extract_description(schema) + if type_name in JSON_SCHEMA_SCALAR_TYPES: + out: JsonSchema = {'type': type_name} + if description: + out['description'] = description + return out + raise ValueError(f"Picoschema: unsupported scalar type '{type_name}'. Use async version for schema resolution.") + + if isinstance(schema, dict) and _is_json_schema(schema): + return cast(JsonSchema, schema) + + if isinstance(schema, dict) and isinstance(schema.get('properties'), dict): + return {**cast(JsonSchema, schema), 'type': 'object'} + + return self.parse_pico_sync(schema) + async def parse_pico(self, obj: Any, path: list[str] | None = None) -> JsonSchema: """Recursively parses a Picoschema object or string fragment. @@ -244,6 +295,89 @@ async def parse_pico(self, obj: Any, path: list[str] | None = None) -> JsonSchem del schema['required'] return schema + def parse_pico_sync(self, obj: Any, path: list[str] | None = None) -> JsonSchema: + """Recursively parses a Picoschema object synchronously. + + Args: + obj: The Picoschema fragment (dict or string). + path: The current path within the schema structure (for error reporting). + + Returns: + The JSON Schema representation of the fragment. + + Raises: + ValueError: If the schema references a named type or is invalid. + """ + if path is None: + path = [] + + if isinstance(obj, str): + type_name, description = extract_description(obj) + if type_name not in JSON_SCHEMA_SCALAR_TYPES: + raise ValueError(f"Picoschema: unsupported scalar type '{type_name}'. Use async version for schema resolution.") + + if type_name == 'any': + return {'description': description} if description else {} + + return {'type': type_name, 'description': description} if description else {'type': type_name} + elif not isinstance(obj, dict): + raise ValueError(f'Picoschema: only consists of objects and strings. Got: {obj}') + + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {}, + 'required': [], + 'additionalProperties': False, + } + + for key, value in obj.items(): + if key == WILDCARD_PROPERTY_NAME: + schema['additionalProperties'] = self.parse_pico_sync(value, [*path, key]) + continue + + parts = key.split('(') + name = parts[0] + type_info = parts[1][:-1] if len(parts) > 1 else None + is_optional = name.endswith('?') + property_name = name[:-1] if is_optional else name + + if not is_optional: + schema['required'].append(property_name) + + if not type_info: + prop = self.parse_pico_sync(value, [*path, key]) + if is_optional and isinstance(prop.get('type'), str): + prop['type'] = [prop['type'], 'null'] + schema['properties'][property_name] = prop + continue + + type_name, description = extract_description(type_info) + if type_name == 'array': + prop = self.parse_pico_sync(value, [*path, key]) + schema['properties'][property_name] = { + 'type': ['array', 'null'] if is_optional else 'array', + 'items': prop, + } + elif type_name == 'object': + prop = self.parse_pico_sync(value, [*path, key]) + if is_optional: + prop['type'] = [prop['type'], 'null'] + schema['properties'][property_name] = prop + elif type_name == 'enum': + prop = {'enum': value} + if is_optional and None not in prop['enum']: + prop['enum'].append(None) + schema['properties'][property_name] = prop + else: + raise ValueError(f"Picoschema: parenthetical types must be 'object' or 'array', got: {type_name}") + + if description: + schema['properties'][property_name]['description'] = description + + if not schema['required']: + del schema['required'] + return schema + def extract_description(input_str: str) -> tuple[str, str | None]: """Extracts the type/name and optional description from a Picoschema string. diff --git a/python/dotpromptz/tests/dotpromptz/picoschema_test.py b/python/dotpromptz/tests/dotpromptz/picoschema_test.py index b0eab23a4..a6b3bfd15 100644 --- a/python/dotpromptz/tests/dotpromptz/picoschema_test.py +++ b/python/dotpromptz/tests/dotpromptz/picoschema_test.py @@ -322,6 +322,130 @@ async def test_invalid_input_type(self) -> None: await self.parser.parse_pico(123) +class TestPicoschemaParserSync(unittest.TestCase): + """Synchronous picoschema parser tests.""" + + def setUp(self) -> None: + """Set up the test case.""" + self.parser = picoschema.PicoschemaParser() + + def test_parse_sync_none(self) -> None: + """Test parsing None returns None.""" + self.assertIsNone(self.parser.parse_sync(None)) + + def test_parse_sync_scalar_type(self) -> None: + """Test parsing a scalar type string.""" + result = self.parser.parse_sync('string') + self.assertEqual(result, {'type': 'string'}) + + def test_parse_sync_scalar_with_description(self) -> None: + """Test parsing a scalar type string with description.""" + result = self.parser.parse_sync('string, a string') + self.assertEqual(result, {'type': 'string', 'description': 'a string'}) + + def test_parse_sync_object_schema(self) -> None: + """Test parsing a standard JSON object schema.""" + schema = {'type': 'object', 'properties': {'name': {'type': 'string'}}} + result = self.parser.parse_sync(schema) + self.assertEqual(result, schema) + + def test_parse_sync_picoschema_object(self) -> None: + """Test parsing a picoschema object.""" + schema = {'name': 'string', 'age?': 'integer'} + expected = { + 'type': 'object', + 'properties': { + 'name': {'type': 'string'}, + 'age': {'type': ['integer', 'null']}, + }, + 'required': ['name'], + 'additionalProperties': False, + } + result = self.parser.parse_sync(schema) + self.assertEqual(result, expected) + + def test_parse_sync_named_type_raises(self) -> None: + """Test that parsing a named type raises ValueError.""" + with self.assertRaises(ValueError) as context: + self.parser.parse_sync('CustomType') + self.assertIn('unsupported scalar type', str(context.exception)) + self.assertIn('Use async version', str(context.exception)) + + def test_parse_pico_sync_array(self) -> None: + """Test parsing array type synchronously.""" + schema = {'items(array)': 'string'} + expected = { + 'type': 'object', + 'properties': {'items': {'type': 'array', 'items': {'type': 'string'}}}, + 'required': ['items'], + 'additionalProperties': False, + } + result = self.parser.parse_pico_sync(schema) + self.assertEqual(result, expected) + + def test_parse_pico_sync_nested_object(self) -> None: + """Test parsing nested object synchronously.""" + schema = {'user(object)': {'name': 'string', 'email?': 'string'}} + expected = { + 'type': 'object', + 'properties': { + 'user': { + 'type': 'object', + 'properties': { + 'name': {'type': 'string'}, + 'email': {'type': ['string', 'null']}, + }, + 'required': ['name'], + 'additionalProperties': False, + } + }, + 'required': ['user'], + 'additionalProperties': False, + } + result = self.parser.parse_pico_sync(schema) + self.assertEqual(result, expected) + + def test_parse_pico_sync_enum(self) -> None: + """Test parsing enum type synchronously.""" + schema = {'status(enum)': ['active', 'inactive']} + expected = { + 'type': 'object', + 'properties': {'status': {'enum': ['active', 'inactive']}}, + 'required': ['status'], + 'additionalProperties': False, + } + result = self.parser.parse_pico_sync(schema) + self.assertEqual(result, expected) + + +class TestPicoschemaToJsonSchemaSync(unittest.TestCase): + """Tests for the top-level sync function.""" + + def test_basic_schema(self) -> None: + """Test converting a basic picoschema.""" + schema = {'diff': 'string', 'context?': 'string'} + result = picoschema.picoschema_to_json_schema_sync(schema) + expected = { + 'type': 'object', + 'properties': { + 'diff': {'type': 'string'}, + 'context': {'type': ['string', 'null']}, + }, + 'required': ['diff'], + 'additionalProperties': False, + } + self.assertEqual(result, expected) + + def test_none_returns_none(self) -> None: + """Test that None input returns None.""" + self.assertIsNone(picoschema.picoschema_to_json_schema_sync(None)) + + def test_named_type_raises(self) -> None: + """Test that named types raise ValueError.""" + with self.assertRaises(ValueError): + picoschema.picoschema_to_json_schema_sync({'field': 'CustomType'}) + + class TestExtractDescription(unittest.TestCase): """Extract description tests."""