Skip to content

Commit 01fa881

Browse files
lazebnyioctavia-squidington-iii
and
octavia-squidington-iii
authored
feat(connector-builder): add full_resolve command to connector builder (#442)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 15182cf commit 01fa881

File tree

7 files changed

+697
-2
lines changed

7 files changed

+697
-2
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

+25
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,30 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
117117
return error.as_airbyte_message()
118118

119119

120+
def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
121+
try:
122+
manifest = {**source.resolved_manifest}
123+
streams = manifest.get("streams", [])
124+
for stream in streams:
125+
stream["dynamic_stream_name"] = None
126+
streams.extend(source.dynamic_streams)
127+
manifest["streams"] = streams
128+
return AirbyteMessage(
129+
type=Type.RECORD,
130+
record=AirbyteRecordMessage(
131+
data={"manifest": manifest},
132+
emitted_at=_emitted_at(),
133+
stream="full_resolve_manifest",
134+
),
135+
)
136+
except AirbyteTracedException as exc:
137+
return exc.as_airbyte_message()
138+
except Exception as exc:
139+
error = AirbyteTracedException.from_exception(
140+
exc, message=f"Error full resolving manifest: {str(exc)}"
141+
)
142+
return error.as_airbyte_message()
143+
144+
120145
def _emitted_at() -> int:
121146
return ab_datetime_now().to_epoch_millis()

airbyte_cdk/connector_builder/main.py

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from airbyte_cdk.connector_builder.connector_builder_handler import (
1313
TestReadLimits,
1414
create_source,
15+
full_resolve_manifest,
1516
get_limits,
1617
read_stream,
1718
resolve_manifest,
@@ -81,6 +82,8 @@ def handle_connector_builder_request(
8182
catalog is not None
8283
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
8384
return read_stream(source, config, catalog, state, limits)
85+
elif command == "full_resolve_manifest":
86+
return full_resolve_manifest(source)
8487
else:
8588
raise ValueError(f"Unrecognized command {command}.")
8689

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -3766,6 +3766,13 @@ definitions:
37663766
type:
37673767
type: string
37683768
enum: [DynamicDeclarativeStream]
3769+
name:
3770+
title: Name
3771+
description: The dynamic stream name.
3772+
type: string
3773+
default: ""
3774+
example:
3775+
- "Tables"
37693776
stream_template:
37703777
title: Stream Template
37713778
description: Reference to the stream template.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def __init__(
106106
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
107107
)
108108

109+
self._config = config or {}
109110
self._validate_source()
110111

111112
@property
@@ -116,6 +117,12 @@ def resolved_manifest(self) -> Mapping[str, Any]:
116117
def message_repository(self) -> MessageRepository:
117118
return self._message_repository
118119

120+
@property
121+
def dynamic_streams(self) -> List[Dict[str, Any]]:
122+
return self._dynamic_stream_configs(
123+
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
124+
)
125+
119126
@property
120127
def connection_checker(self) -> ConnectionChecker:
121128
check = self._source_config["check"]
@@ -348,13 +355,16 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
348355
return stream_configs
349356

350357
def _dynamic_stream_configs(
351-
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
358+
self,
359+
manifest: Mapping[str, Any],
360+
config: Mapping[str, Any],
361+
with_dynamic_stream_name: Optional[bool] = None,
352362
) -> List[Dict[str, Any]]:
353363
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
354364
dynamic_stream_configs: List[Dict[str, Any]] = []
355365
seen_dynamic_streams: Set[str] = set()
356366

357-
for dynamic_definition in dynamic_stream_definitions:
367+
for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
358368
components_resolver_config = dynamic_definition["components_resolver"]
359369

360370
if not components_resolver_config:
@@ -393,6 +403,11 @@ def _dynamic_stream_configs(
393403
# Ensure that each stream is created with a unique name
394404
name = dynamic_stream.get("name")
395405

406+
if with_dynamic_stream_name:
407+
dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
408+
"name", f"dynamic_stream_{dynamic_definition_index}"
409+
)
410+
396411
if not isinstance(name, str):
397412
raise ValueError(
398413
f"Expected stream name {name} to be a string, got {type(name)}."

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+3
Original file line numberDiff line numberDiff line change
@@ -2489,6 +2489,9 @@ class HttpComponentsResolver(BaseModel):
24892489

24902490
class DynamicDeclarativeStream(BaseModel):
24912491
type: Literal["DynamicDeclarativeStream"]
2492+
name: Optional[str] = Field(
2493+
"", description="The dynamic stream name.", example=["Tables"], title="Name"
2494+
)
24922495
stream_template: DeclarativeStream = Field(
24932496
..., description="Reference to the stream template.", title="Stream Template"
24942497
)

0 commit comments

Comments
 (0)