Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions connector_builder_mcp/validation_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,43 @@ class MultiStreamSmokeTest(BaseModel):
stream_results: dict[str, StreamSmokeTest]


def _get_raw_responses_for_failure(
include_raw_responses_data: bool,
slices: list[dict[str, Any]],
stream_data: dict[str, Any],
error_msgs: list[str],
) -> list[dict[str, Any]] | None:
"""Return raw responses or simple debug info for failed stream reads."""
if not include_raw_responses_data:
return None
if slices:
return slices
return [
{
"error": error_msgs[0] if error_msgs else "Unknown error",
"logs": stream_data.get("logs", []),
}
]


def _get_raw_responses_for_success(
include_raw_responses_data: bool,
slices: list[dict[str, Any]],
stream_data: dict[str, Any],
) -> list[dict[str, Any]] | None:
"""Return raw responses or simple debug info for successful stream reads."""
if not include_raw_responses_data:
return None
if slices:
return slices
return [
{
"debug": "No raw HTTP data captured - check dpath extractor",
"logs": stream_data.get("logs", []),
}
]


def _calculate_record_stats(
records_data: list[dict[str, Any]],
) -> dict[str, Any]:
Expand Down Expand Up @@ -373,8 +410,8 @@ def execute_stream_test_read( # noqa: PLR0914
# We actually don't want to limit pages or slices.
# But if we don't provide a value they default
# to very low limits, which is not what we want.
"max_pages_per_slice": max_records,
"max_slices": max_records,
"max_pages_per_slice": max(1, max_records),
"max_slices": max(1, max_records),
},
}

Expand All @@ -401,11 +438,18 @@ def execute_stream_test_read( # noqa: PLR0914
stream_data: dict[str, Any] = {}
if result.record and result.record.data:
stream_data = result.record.data
slices_from_stream = stream_data.get("slices", [])
# auxiliary_requests may contain HTTP request/response data when slices is empty
if (
include_raw_responses_data
and not slices_from_stream
and "auxiliary_requests" in stream_data
):
slices_from_stream = stream_data.get("auxiliary_requests", [])

slices = cast(
list[dict[str, Any]],
filter_config_secrets(
stream_data.get("slices", []),
),
filter_config_secrets(slices_from_stream),
)
else:
success = False
Expand All @@ -419,11 +463,16 @@ def execute_stream_test_read( # noqa: PLR0914
execution_logs.extend(log for log in logs if "ERROR" not in str(log))

if success is False:
raw_responses_data = _get_raw_responses_for_failure(
include_raw_responses_data, slices, stream_data, error_msgs
)

return StreamTestResult(
success=success,
message=error_msgs[0] if error_msgs else "Unknown error occurred.",
errors=error_msgs,
logs=execution_logs,
raw_api_responses=raw_responses_data,
)

records_data: list[dict[str, Any]] = []
Expand All @@ -433,9 +482,9 @@ def execute_stream_test_read( # noqa: PLR0914
if isinstance(page, dict) and "records" in page:
records_data.extend(page.pop("records"))

raw_responses_data = None
if include_raw_responses_data is True and slices:
raw_responses_data = slices
raw_responses_data = _get_raw_responses_for_success(
include_raw_responses_data, slices, stream_data
)

record_stats = None
if include_record_stats and records_data:
Expand Down
Loading