Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions schema_registry/client/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
class BaseSchema(ABC):
"""Abstract class for schema wrapper"""

def __init__(self, schema: typing.Union[str, typing.Dict[str, typing.Any]]) -> None:
def __init__(self, schema: typing.Union[str, typing.Dict[str, typing.Any]], ignore_default_error: bool = False) -> None:
if isinstance(schema, str):
schema = json.loads(schema)
self.raw_schema = typing.cast(typing.Dict, schema)
self.ignore_default_error = ignore_default_error
self.schema = self.parse_schema(self.raw_schema)
self.generate_hash()

Expand Down Expand Up @@ -66,10 +67,10 @@ def __eq__(self, other: typing.Any) -> bool:
class AvroSchema(BaseSchema):
"""Integrate BaseSchema for Avro schema."""

def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
def __init__(self, *args: typing.Any, ignore_default_error: bool = False, **kwargs: typing.Any) -> None:
self._expanded_schema: typing.Optional[typing.Dict] = None
self._flat_schema: typing.Optional[typing.Dict] = None

self.ignore_default_error: typing.Optional[bool] = ignore_default_error
super().__init__(*args, **kwargs)

@property
Expand Down Expand Up @@ -103,28 +104,28 @@ def flat_schema(self) -> typing.Dict:
# NOTE: Dict expected when we pass a dict
self._flat_schema = typing.cast(
typing.Dict,
fastavro.parse_schema(self.raw_schema, _write_hint=False, _force=True),
fastavro.parse_schema(self.raw_schema, _write_hint=False, _force=True, _ignore_default_error=self.ignore_default_error),
)

return self._flat_schema

def parse_schema(self, schema: typing.Dict) -> typing.Dict:
# NOTE: Dict expected when we pass a dict
return typing.cast(typing.Dict, fastavro.parse_schema(schema, _force=True))
return typing.cast(typing.Dict, fastavro.parse_schema(schema, _force=True, _ignore_default_error=self.ignore_default_error))

@staticmethod
def load(fp: str) -> AvroSchema:
def load(fp: str, ignore_default_error=False) -> AvroSchema:
"""Parse an avro schema from a file path."""
with open(fp, mode="r") as f:
content = f.read()
return AvroSchema(content)
return AvroSchema(content, ignore_default_error)

@staticmethod
async def async_load(fp: str) -> AvroSchema:
async def async_load(fp: str, ignore_default_error=False) -> AvroSchema:
"""Parse an avro schema from a file path."""
async with aiofiles.open(fp, mode="r") as f:
content = await f.read()
return AvroSchema(content)
return AvroSchema(content, ignore_default_error)


class JsonSchema(BaseSchema):
Expand Down
24 changes: 24 additions & 0 deletions tests/avro_schemas/default_schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "basic",
"type": "record",
"doc": "basic schema for tests",
"namespace": "python.test.basic",
"fields": [
{
"name": "number",
"doc": "age",
"type": [
"long",
"null"
],
"default": "null"
},
{
"name": "name",
"doc": "a name",
"type": [
"string"
]
}
]
}
10 changes: 10 additions & 0 deletions tests/client/async_client/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ async def test_avro_schema_load_parse_error():
with pytest.raises(fastavro.schema.UnknownType):
await schema.AvroSchema.async_load(data_gen.get_schema_path("invalid_schema.avsc"))

@pytest.mark.asyncio
async def test_avro_schema_load_parse_default_error():
with pytest.raises(fastavro.schema.SchemaParseException):
await schema.AvroSchema.async_load(data_gen.get_schema_path("default_schema.avsc"))

@pytest.mark.asyncio
async def test_avro_schema_load_parse_default_error_ignored():
parsed = await schema.AvroSchema.async_load(data_gen.get_schema_path("default_schema.avsc"), ignore_default_error=True)
assert isinstance(parsed, schema.AvroSchema)


def test_json_schema_from_string():
parsed = schema.JsonSchema(data_gen.JSON_BASIC_SCHEMA)
Expand Down
Loading