Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add streams limit to full resolve manifest command #455

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from dataclasses import asdict, dataclass, field
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Dict

from airbyte_cdk.connector_builder.test_reader import TestReader
from airbyte_cdk.models import (
Expand All @@ -27,30 +27,34 @@
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
DEFAULT_MAXIMUM_STREAMS = 100

MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
MAX_SLICES_KEY = "max_slices"
MAX_RECORDS_KEY = "max_records"
MAX_STREAMS_KEY = "max_streams"


@dataclass
class TestReadLimits:
class TestLimits:
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)


def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
def get_limits(config: Mapping[str, Any]) -> TestLimits:
command_config = config.get("__test_read_config", {})
max_pages_per_slice = (
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
)
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
return TestReadLimits(max_records, max_pages_per_slice, max_slices)
max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_RECORDS
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)


def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
config=config,
Expand All @@ -71,7 +75,7 @@ def read_stream(
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
limits: TestReadLimits,
limits: TestLimits,
) -> AirbyteMessage:
try:
test_read_handler = TestReader(
Expand Down Expand Up @@ -117,13 +121,24 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
return error.as_airbyte_message()


def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
try:

manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None
streams.extend(source.dynamic_streams)

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

if len(generated_streams) < limits.max_streams:
generated_streams += [stream]

for generated_streams_list in mapped_streams.values():
streams.extend(generated_streams_list)

manifest["streams"] = streams
return AirbyteMessage(
type=Type.RECORD,
Expand Down
6 changes: 3 additions & 3 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
TestLimits,
create_source,
full_resolve_manifest,
get_limits,
Expand Down Expand Up @@ -73,7 +73,7 @@ def handle_connector_builder_request(
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
limits: TestReadLimits,
limits: TestLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
Expand All @@ -83,7 +83,7 @@ def handle_connector_builder_request(
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
elif command == "full_resolve_manifest":
return full_resolve_manifest(source)
return full_resolve_manifest(source, limits)
else:
raise ValueError(f"Unrecognized command {command}.")

Expand Down
93 changes: 14 additions & 79 deletions unit_tests/connector_builder/test_connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE,
DEFAULT_MAXIMUM_NUMBER_OF_SLICES,
DEFAULT_MAXIMUM_RECORDS,
TestReadLimits,
TestLimits,
create_source,
get_limits,
resolve_manifest,
Expand Down Expand Up @@ -384,6 +384,7 @@
RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG = {
"__injected_declarative_manifest": DYNAMIC_STREAM_MANIFEST,
"__command": "full_resolve_manifest",
"__test_read_config": {"max_streams": 2},
}

TEST_READ_CONFIG = {
Expand Down Expand Up @@ -524,7 +525,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
command = "resolve_manifest"
config["__command"] = command
source = ManifestDeclarativeSource(source_config=MANIFEST)
limits = TestReadLimits()
limits = TestLimits()
resolved_manifest = handle_connector_builder_request(
source, command, config, create_configured_catalog("dummy_stream"), _A_STATE, limits
)
Expand Down Expand Up @@ -728,7 +729,7 @@ def test_read():
emitted_at=1,
),
)
limits = TestReadLimits()
limits = TestLimits()
with patch(
"airbyte_cdk.connector_builder.test_reader.TestReader.run_test_read",
return_value=stream_read,
Expand Down Expand Up @@ -789,7 +790,7 @@ def test_config_update() -> None:
config,
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
_A_PER_PARTITION_STATE,
TestReadLimits(),
TestLimits(),
)
assert output.record.data["latest_config_update"]

Expand Down Expand Up @@ -825,7 +826,7 @@ def check_config_against_spec(self) -> Literal[False]:
mock_from_exception.return_value = stack_trace

source = MockManifestDeclarativeSource()
limits = TestReadLimits()
limits = TestLimits()
response = read_stream(
source,
TEST_READ_CONFIG,
Expand Down Expand Up @@ -865,7 +866,7 @@ def test_handle_429_response():
] = {"backoff_strategies": [{"type": "ConstantBackoffStrategy", "backoff_time_in_seconds": 5}]}

config = TEST_READ_CONFIG
limits = TestReadLimits()
limits = TestLimits()
source = create_source(config, limits)

with patch("requests.Session.send", return_value=response) as mock_send:
Expand Down Expand Up @@ -982,7 +983,7 @@ def test_create_source():
max_records = 3
max_pages_per_slice = 2
max_slices = 1
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
limits = TestLimits(max_records, max_pages_per_slice, max_slices)

config = {"__injected_declarative_manifest": MANIFEST}

Expand Down Expand Up @@ -1064,7 +1065,7 @@ def test_read_source(mock_http_stream):
max_records = 100
max_pages_per_slice = 2
max_slices = 3
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
limits = TestLimits(max_records, max_pages_per_slice, max_slices)

catalog = ConfiguredAirbyteCatalog(
streams=[
Expand Down Expand Up @@ -1111,7 +1112,7 @@ def test_read_source_single_page_single_slice(mock_http_stream):
max_records = 100
max_pages_per_slice = 1
max_slices = 1
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
limits = TestLimits(max_records, max_pages_per_slice, max_slices)

catalog = ConfiguredAirbyteCatalog(
streams=[
Expand Down Expand Up @@ -1195,7 +1196,7 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error
endpoints when running on Cloud or OSS deployments
"""

limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1)

catalog = ConfiguredAirbyteCatalog(
streams=[
Expand Down Expand Up @@ -1281,7 +1282,7 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected
endpoints when running on Cloud or OSS deployments
"""

limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1)

catalog = ConfiguredAirbyteCatalog(
streams=[
Expand Down Expand Up @@ -1339,7 +1340,7 @@ def test_read_stream_exception_with_secrets():
]
)
state = []
limits = TestReadLimits()
limits = TestLimits()

# Add the secret to be filtered
update_secrets([config["api_key"]])
Expand Down Expand Up @@ -1367,7 +1368,7 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file):
config = copy.deepcopy(RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG)
command = config["__command"]
source = ManifestDeclarativeSource(source_config=DYNAMIC_STREAM_MANIFEST)
limits = TestReadLimits()
limits = TestLimits(max_streams=2)
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/parents"),
Expand Down Expand Up @@ -1625,72 +1626,6 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file):
},
"dynamic_stream_name": "TestDynamicStream",
},
{
"type": "DeclarativeStream",
"name": "parent_2_item_1",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "2/1",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
"dynamic_stream_name": "TestDynamicStream",
},
{
"type": "DeclarativeStream",
"name": "parent_2_item_2",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "2/2",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
"dynamic_stream_name": "TestDynamicStream",
},
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
"spec": {
Expand Down
Loading