Skip to content

Commit df8f72a

Browse files
committed
Add full-resolve-manifest command to connector builder
1 parent 56b88af commit df8f72a

File tree

4 files changed

+743
-7
lines changed

4 files changed

+743
-7
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

+29-6
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class TestReadLimits:
4343
def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
4444
command_config = config.get("__test_read_config", {})
4545
max_pages_per_slice = (
46-
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
46+
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
4747
)
4848
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
4949
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
@@ -67,11 +67,11 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest
6767

6868

6969
def read_stream(
70-
source: DeclarativeSource,
71-
config: Mapping[str, Any],
72-
configured_catalog: ConfiguredAirbyteCatalog,
73-
state: List[AirbyteStateMessage],
74-
limits: TestReadLimits,
70+
source: DeclarativeSource,
71+
config: Mapping[str, Any],
72+
configured_catalog: ConfiguredAirbyteCatalog,
73+
state: List[AirbyteStateMessage],
74+
limits: TestReadLimits,
7575
) -> AirbyteMessage:
7676
try:
7777
test_read_handler = TestReader(
@@ -117,5 +117,28 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
117117
return error.as_airbyte_message()
118118

119119

120+
def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
121+
try:
122+
manifest = source.resolved_manifest
123+
streams = manifest.get("streams", [])
124+
for stream in streams:
125+
stream["dynamic_stream_name"] = None
126+
streams.extend(source.dynamic_streams)
127+
128+
return AirbyteMessage(
129+
type=Type.RECORD,
130+
record=AirbyteRecordMessage(
131+
data={"manifest": manifest},
132+
emitted_at=_emitted_at(),
133+
stream="full_resolve_manifest",
134+
),
135+
)
136+
except Exception as exc:
137+
error = AirbyteTracedException.from_exception(
138+
exc, message=f"Error full resolving manifest: {str(exc)}"
139+
)
140+
return error.as_airbyte_message()
141+
142+
120143
def _emitted_at() -> int:
121144
return ab_datetime_now().to_epoch_millis()

airbyte_cdk/connector_builder/main.py

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
get_limits,
1616
read_stream,
1717
resolve_manifest,
18+
full_resolve_manifest,
1819
)
1920
from airbyte_cdk.entrypoint import AirbyteEntrypoint
2021
from airbyte_cdk.models import (
@@ -81,6 +82,8 @@ def handle_connector_builder_request(
8182
catalog is not None
8283
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
8384
return read_stream(source, config, catalog, state, limits)
85+
elif command == "full_resolve_manifest":
86+
return full_resolve_manifest(source)
8487
else:
8588
raise ValueError(f"Unrecognized command {command}.")
8689

airbyte_cdk/sources/declarative/manifest_declarative_source.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def __init__(
106106
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
107107
)
108108

109+
self._config = config
109110
self._validate_source()
110111

111112
@property
@@ -116,6 +117,10 @@ def resolved_manifest(self) -> Mapping[str, Any]:
116117
def message_repository(self) -> MessageRepository:
117118
return self._message_repository
118119

120+
@property
121+
def dynamic_streams(self) -> List[Dict[str, Any]]:
122+
return self._dynamic_stream_configs(manifest=self._source_config, config=self._config)
123+
119124
@property
120125
def connection_checker(self) -> ConnectionChecker:
121126
check = self._source_config["check"]
@@ -343,7 +348,6 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
343348
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
344349
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
345350
for s in stream_configs:
346-
s["dynamic_stream_name"] = None
347351
if "type" not in s:
348352
s["type"] = "DeclarativeStream"
349353
return stream_configs

0 commit comments

Comments
 (0)