Skip to content

Commit 43b6a6e

Browse files
committed
cdk: Update x-infer-schema behvaior
This uses the changes introduced in estuary/flow#2247 to allow connectors to adjust the inferred schema complexity limit for their collections. It also emits a key-only subschema into `x-infer-schema` which has the effect of always preserving key fields in collection read schemas, even if they end up getting simplified away in the inferred schema.
1 parent a34d226 commit 43b6a6e

File tree

1 file changed

+49
-1
lines changed

1 file changed

+49
-1
lines changed

estuary-cdk/estuary_cdk/capture/common.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import abc
22
import asyncio
3+
import copy
34
import functools
45
from enum import Enum, StrEnum
56
from dataclasses import dataclass
@@ -412,6 +413,7 @@ class FixedSchema:
412413
initial_state: _BaseResourceState
413414
initial_config: _BaseResourceConfig
414415
schema_inference: bool
416+
schema_inference_limit: int | None = None
415417
reduction_strategy: ReductionStrategy | None = None
416418
disable: bool = False
417419

@@ -428,7 +430,10 @@ def discovered(
428430
schema = resource.model.model_json_schema(mode="serialization")
429431

430432
if resource.schema_inference:
431-
schema["x-infer-schema"] = True
433+
key_only_schema = subset_schema(schema, resource.key)
434+
schema["x-infer-schema"] = key_only_schema
435+
if resource.schema_inference_limit is not None:
436+
schema["x-schema-inference-limit"] = resource.schema_inference_limit
432437

433438
if resource.reduction_strategy:
434439
schema["reduce"] = {"strategy": resource.reduction_strategy}
@@ -1056,3 +1061,46 @@ async def _binding_incremental_task(
10561061
"incremental task is idle",
10571062
{"sleep_for": sleep_for, "cursor": state.cursor, "subtask_id": subtask_id},
10581063
)
1064+
1065+
def subset_schema(schema: dict, pointers: list[str]) -> dict:
1066+
"""
1067+
Extracts a subset of a JSON Schema based on a list of JSON Pointers.
1068+
1069+
Constructs a new schema containing only the elements specified by the
1070+
pointers and the necessary structural parents to maintain their paths.
1071+
"""
1072+
subset: dict = {}
1073+
1074+
for pointer in pointers:
1075+
if pointer == "" or pointer == "/":
1076+
continue
1077+
1078+
# According to RFC 6901, pointers start with '/', so we strip it.
1079+
# An empty string after stripping means an invalid pointer format, so we skip.
1080+
if not pointer.startswith('/'):
1081+
continue
1082+
1083+
parts = pointer.lstrip('/').split('/')
1084+
1085+
# Decode JSON Pointer escape sequences (~1 for / and ~0 for ~)
1086+
decoded_parts = [part.replace('~1', '/').replace('~0', '~') for part in parts]
1087+
1088+
# We will traverse both the source schema and our destination subset simultaneously.
1089+
current_source_level = schema
1090+
current_dest_level = subset
1091+
1092+
# Iterate through all but the last part of the path
1093+
for i, part in enumerate(decoded_parts):
1094+
# Move down one level in the source schema
1095+
current_source_level = current_source_level[part]
1096+
1097+
# If this is the final part of the path, we copy the entire value.
1098+
if i == len(decoded_parts) - 1:
1099+
current_dest_level[part] = copy.deepcopy(current_source_level)
1100+
else:
1101+
# For intermediate parts, we ensure the path exists in the destination.
1102+
# setdefault is perfect here: it gets the key if it exists, or creates
1103+
# it with a default value ({}) if it doesn't, and returns the value.
1104+
current_dest_level = current_dest_level.setdefault(part, {})
1105+
1106+
return subset

0 commit comments

Comments
 (0)