Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
description: 'Connector Builder MCP Test Mode'
tools: ['editFiles', 'execute_record_counts_smoke_test', 'execute_stream_test_read', 'get_connector_builder_docs', 'get_connector_manifest', 'list_dotenv_secrets', 'populate_dotenv_missing_secrets_stubs', 'validate_manifest']
tools: ['editFiles', 'run_connector_readiness_test_report', 'execute_stream_test_read', 'get_connector_builder_docs', 'get_connector_manifest', 'list_dotenv_secrets', 'populate_dotenv_missing_secrets_stubs', 'validate_manifest']
---
You are helping us test the connector builder MCP mode. This mode is designed to assist in building and testing Airbyte connectors within the MCP framework.

Expand Down
4 changes: 2 additions & 2 deletions connector_builder_mcp/_connector_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from connector_builder_mcp._secrets import register_secrets_tools
from connector_builder_mcp._validation_testing import (
execute_dynamic_manifest_resolution_test,
execute_record_counts_smoke_test,
execute_stream_test_read,
run_connector_readiness_test_report,
validate_manifest,
)

Expand Down Expand Up @@ -312,7 +312,7 @@ def register_connector_builder_tools(app: FastMCP) -> None:
"""
app.tool(validate_manifest)
app.tool(execute_stream_test_read)
app.tool(execute_record_counts_smoke_test)
app.tool(run_connector_readiness_test_report)
app.tool(execute_dynamic_manifest_resolution_test)
app.tool(get_manifest_yaml_json_schema)
app.tool(get_connector_builder_checklist)
Expand Down
164 changes: 142 additions & 22 deletions connector_builder_mcp/_validation_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class StreamTestResult(BaseModel):
records: list[dict[str, Any]] | None = None
errors: list[str] = []
raw_api_responses: list[dict[str, Any]] | None = None
record_stats: dict[str, Any] | None = None


class StreamSmokeTest(BaseModel):
Expand All @@ -61,6 +62,7 @@ class StreamSmokeTest(BaseModel):
records_read: int = 0
duration_seconds: float = 0.0
error_message: str | None = None
field_count_warnings: list[str] = []


class MultiStreamSmokeTest(BaseModel):
Expand Down Expand Up @@ -248,6 +250,10 @@ def execute_stream_test_read(
bool,
Field(description="Whether to include actual record data in the response"),
] = False,
include_record_stats: Annotated[
bool,
Field(description="Include basic statistics on record properties"),
] = True,
include_raw_responses_data: Annotated[
bool | None,
Field(
Expand All @@ -270,6 +276,7 @@ def execute_stream_test_read(
stream_name: Name of the stream to test
max_records: Maximum number of records to read (default: 10000)
include_records_data: Whether to include actual record data in response
include_record_stats: Whether to include basic statistics on record properties
include_raw_responses_data: Whether to include raw API responses
dotenv_file_uris: Optional paths/URLs to .env files or privatebin URLs for secret hydration

Expand Down Expand Up @@ -315,6 +322,9 @@ def execute_stream_test_read(

records_data = []
slices = []
record_stats = None
property_stats: dict[str, dict[str, Any]] = {}
total_records = 0

for message in output: # type: ignore[attr-defined]
if isinstance(message, AirbyteMessage):
Expand All @@ -325,6 +335,23 @@ def execute_stream_test_read(
):
if include_records_data:
records_data.append(message.record.data)

if include_record_stats and message.record.data:
total_records += 1
for key, value in message.record.data.items():
if key not in property_stats:
property_stats[key] = {
"type": type(value).__name__,
"num_null": 0,
"num_non_null": 0,
}

if value is None:
property_stats[key]["num_null"] += 1
else:
property_stats[key]["num_non_null"] += 1
property_stats[key]["type"] = type(value).__name__

elif (
message.type == Type.TRACE
and message.trace
Expand All @@ -335,15 +362,28 @@ def execute_stream_test_read(
if slice_data:
slices.append(slice_data)

if include_record_stats:
record_stats = {
"num_properties": len(property_stats),
"properties": property_stats,
}

raw_responses_data = None
if include_raw_responses_data is True and slices and isinstance(slices, list):
raw_responses_data = slices

records_count = (
len(records_data)
if include_records_data
else (total_records if include_record_stats else 0)
)

return StreamTestResult(
success=True,
message=f"Successfully read {len(records_data)} records from stream {stream_name}",
records_read=len(records_data),
message=f"Successfully read {records_count} records from stream {stream_name}",
records_read=records_count,
records=records_data if include_records_data else None,
record_stats=record_stats,
raw_api_responses=raw_responses_data,
)

Expand All @@ -363,7 +403,7 @@ def execute_stream_test_read(
)


def execute_record_counts_smoke_test(
def run_connector_readiness_test_report(
manifest: Annotated[
str,
Field(description="The connector manifest. Can be raw a YAML string or path to YAML file"),
Expand All @@ -390,22 +430,26 @@ def execute_record_counts_smoke_test(
description="Optional paths/URLs to local .env files or Privatebin.net URLs for secret hydration. Can be a single string, comma-separated string, or list of strings. Privatebin secrets may be created at privatebin.net, and must: contain text formatted as a dotenv file, use a password sent via the `PRIVATEBIN_PASSWORD` env var, and not include password text in the URL."
),
] = None,
) -> MultiStreamSmokeTest:
"""Execute a smoke test to count records from all streams in the connector.
) -> str:
"""Execute a connector readiness test and generate a comprehensive markdown report.

This function is meant to be run after individual streams have been tested with the test read tool,
to validate things are working properly and generate a report that can be shared with the end user.

This function tests all available streams by reading records up to the specified limit
and returns comprehensive statistics including record counts, errors, and timing information.
It tests all available streams by reading records up to the specified limit and returns a
markdown-formatted readiness report with validation warnings and statistics.

Args:
manifest: The connector manifest (YAML string or file path)
config: Connector configuration
streams: Optional CSV-delimited list of streams to test
max_records: Maximum number of records to read per stream (default: 10000)
dotenv_file_uris: Optional paths/URLs to .env files or privatebin URLs for secret hydration

Returns:
MultiStreamSmokeTest result with per-stream statistics and overall summary
Markdown-formatted readiness report with per-stream statistics and validation warnings
"""
logger.info("Starting multi-stream smoke test")
logger.info("Starting connector readiness test")
start_time = time.time()
total_streams_tested = 0
total_streams_successful = 0
Expand All @@ -416,13 +460,13 @@ def execute_record_counts_smoke_test(

config = hydrate_config(config, dotenv_file_uris=dotenv_file_uris)

available_streams = manifest_dict.get("streams", [])
total_available_streams = len(available_streams)

stream_names: list[str]
if isinstance(streams, str):
stream_names = [s.strip() for s in streams.split(",") if s.strip()]
elif isinstance(streams, list):
stream_names = [s.strip() for s in streams]
else:
available_streams = manifest_dict.get("streams", [])
stream_names = [
stream.get("name", f"stream_{i}") for i, stream in enumerate(available_streams)
]
Expand All @@ -440,6 +484,7 @@ def execute_record_counts_smoke_test(
stream_name=stream_name,
max_records=max_records,
include_records_data=False,
include_record_stats=True,
include_raw_responses_data=False,
dotenv_file_uris=dotenv_file_uris,
)
Expand All @@ -450,12 +495,22 @@ def execute_record_counts_smoke_test(
if result.success:
total_streams_successful += 1
total_records_count += records_read
stream_results[stream_name] = StreamSmokeTest(

field_count_warnings = []

if result.record_stats and result.record_stats.get("num_properties", 0) < 2:
field_count_warnings.append(
f"Records have only {result.record_stats.get('num_properties', 0)} field(s), expected at least 2"
)

smoke_test_result = StreamSmokeTest(
stream_name=stream_name,
success=True,
records_read=records_read,
duration_seconds=stream_duration,
)
smoke_test_result.field_count_warnings = field_count_warnings
stream_results[stream_name] = smoke_test_result
logger.info(f"✓ {stream_name}: {records_read} records in {stream_duration:.2f}s")
else:
error_message = result.message
Expand Down Expand Up @@ -484,18 +539,83 @@ def execute_record_counts_smoke_test(
overall_success = total_streams_successful == total_streams_tested

logger.info(
f"Smoke test completed: {total_streams_successful}/{total_streams_tested} streams successful, "
f"Readiness test completed: {total_streams_successful}/{total_streams_tested} streams successful, "
f"{total_records_count} total records in {total_duration:.2f}s"
)

return MultiStreamSmokeTest(
success=overall_success,
total_streams_tested=total_streams_tested,
total_streams_successful=total_streams_successful,
total_records_count=total_records_count,
duration_seconds=total_duration,
stream_results=stream_results,
)
if not overall_success:
failed_streams = [name for name, result in stream_results.items() if not result.success]
error_details = []
for name, smoke_result in stream_results.items():
if not smoke_result.success:
error_msg = getattr(smoke_result, "error_message", "Unknown error")
error_details.append(f"- **{name}**: {error_msg}")

return f"""# Connector Readiness Test Report - FAILED

**Status**: {total_streams_successful}/{total_streams_tested} streams successful
**Failed streams**: {", ".join(failed_streams)}
**Total duration**: {total_duration:.2f}s

{chr(10).join(error_details)}
"""

report_lines = [
"# Connector Readiness Test Report",
"",
"## Summary",
f"- **Streams Tested**: {total_streams_tested} out of {total_available_streams} total streams",
f"- **Successful Streams**: {total_streams_successful}/{total_streams_tested}",
f"- **Total Records Extracted**: {total_records_count:,}",
f"- **Total Duration**: {total_duration:.2f}s",
"",
"## Stream Results",
"",
]

for stream_name, smoke_result in stream_results.items():
if smoke_result.success:
warnings = []
if smoke_result.records_read == 0:
warnings.append("⚠️ No records extracted")
elif smoke_result.records_read == 1:
warnings.append("⚠️ Only 1 record extracted - may indicate pagination issues")
elif smoke_result.records_read % 10 == 0:
warnings.append("⚠️ Record count is multiple of 10 - may indicate pagination limit")

# TODO: Add page size validation
# if page_size is specified in config, check if records_read is multiple of page_size (important-comment)

field_warnings = getattr(smoke_result, "field_count_warnings", [])
if field_warnings:
warnings.append(f"⚠️ Field count issues: {'; '.join(field_warnings[:2])}")

report_lines.extend(
[
f"### {stream_name} ✅",
f"- **Records Extracted**: {smoke_result.records_read:,}",
f"- **Duration**: {smoke_result.duration_seconds:.2f}s",
]
)

if warnings:
report_lines.append(f"- **Warnings**: {' | '.join(warnings)}")
else:
report_lines.append("- **Status**: No issues detected")

report_lines.append("")
else:
error_msg = getattr(smoke_result, "error_message", "Unknown error")
report_lines.extend(
[
f"### {stream_name} ❌",
"- **Status**: Failed",
f"- **Error**: {error_msg}",
"",
]
)

return "\n".join(report_lines)


def execute_dynamic_manifest_resolution_test(
Expand Down
39 changes: 38 additions & 1 deletion tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
StreamTestResult,
execute_dynamic_manifest_resolution_test,
execute_stream_test_read,
run_connector_readiness_test_report,
validate_manifest,
)

Expand Down Expand Up @@ -248,7 +249,7 @@ def test_performance_multiple_tool_calls(self, rick_and_morty_manifest, empty_co
end_time = time.time()
duration = end_time - start_time

assert duration < 20, f"Multiple tool calls took too long: {duration}s"
assert duration < 20.0, f"Multiple tool calls took too long: {duration}s"

def test_simple_api_manifest_workflow(self, simple_api_manifest, empty_config):
"""Test workflow with simple API manifest."""
Expand All @@ -261,6 +262,42 @@ def test_simple_api_manifest_workflow(self, simple_api_manifest, empty_config):
assert isinstance(resolved_manifest, dict)
assert "streams" in resolved_manifest

@pytest.mark.parametrize(
"manifest_fixture,stream_name",
[
("rick_and_morty_manifest", "characters"),
("simple_api_manifest", "users"),
],
)
def test_sample_manifests_with_both_tools(
self, manifest_fixture, stream_name, request, empty_config
):
"""Test that both execute_stream_test_read and run_connector_readiness_test_report work with sample manifests."""
manifest = request.getfixturevalue(manifest_fixture)

stream_result = execute_stream_test_read(manifest, empty_config, stream_name, max_records=5)
assert isinstance(stream_result, StreamTestResult)
assert stream_result.message is not None
if stream_result.success:
assert stream_result.records_read >= 0
assert (
"Successfully read" in stream_result.message
and "records from stream" in stream_result.message
)

readiness_result = run_connector_readiness_test_report(
manifest, empty_config, max_records=10
)
assert isinstance(readiness_result, str)
assert "# Connector Readiness Test Report" in readiness_result
assert stream_name in readiness_result

if "FAILED" in readiness_result:
assert "Failed streams" in readiness_result
assert "Total duration" in readiness_result
else:
assert "Records Extracted" in readiness_result


class TestMCPServerIntegration:
"""Integration tests for MCP server functionality."""
Expand Down
Loading