Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
eecfdf6
feat: add summary_only parameter to execute_stream_test_read
devin-ai-integration[bot] Aug 2, 2025
421b506
fix: change summary_only default to False and update tests
devin-ai-integration[bot] Aug 2, 2025
2d575db
feat: replace summary_only with include_records, include_raw_requests…
devin-ai-integration[bot] Aug 2, 2025
273c60b
fix: resolve MyPy import and type annotation errors
devin-ai-integration[bot] Aug 2, 2025
b5b7661
feat: simplify to 2 parameters - include_records and include_raw_resp…
devin-ai-integration[bot] Aug 4, 2025
1fde72a
feat: simplify filter_config_secrets application to entire slices str…
devin-ai-integration[bot] Aug 4, 2025
22e1062
Update connector_builder_mcp/_connector_builder.py
aaronsteers Aug 4, 2025
75c67d5
Update connector_builder_mcp/_connector_builder.py
aaronsteers Aug 4, 2025
c8365d3
fix: update remaining conditional checks to use nullable boolean logic
devin-ai-integration[bot] Aug 4, 2025
da05947
docs: clarify conditional logic for record data extraction and nullab…
devin-ai-integration[bot] Aug 4, 2025
3100d33
Update connector_builder_mcp/_connector_builder.py
aaronsteers Aug 4, 2025
6c7149e
docs: format slices field description for proper line length
devin-ai-integration[bot] Aug 4, 2025
c1a57e4
docs: remove redundant Args block from execute_stream_test_read docst…
devin-ai-integration[bot] Aug 4, 2025
d1e7766
fix: remove trailing whitespace to resolve ruff formatting issues
devin-ai-integration[bot] Aug 4, 2025
ed1eb0e
docs: update include_raw_response_data parameter description per GitH…
devin-ai-integration[bot] Aug 4, 2025
e12055d
docs: add note about using max_records to prevent overflow per GitHub…
devin-ai-integration[bot] Aug 4, 2025
cb5e6d5
feat: implement helper functions to traverse slices/pages and return …
devin-ai-integration[bot] Aug 4, 2025
e505d4f
Merge branch 'main' into devin/1754099079-add-summary-only-option
aaronsteers Aug 6, 2025
ecf9476
Update connector_builder_mcp/_connector_builder.py
aaronsteers Aug 6, 2025
b0b1426
feat: simplify execute_stream_test_read to minimal implementation
devin-ai-integration[bot] Aug 6, 2025
3931c14
Merge branch 'main' into devin/1754099079-add-summary-only-option
aaronsteers Aug 6, 2025
4116e40
add args to control output
aaronsteers Aug 6, 2025
35483a0
fix imports
aaronsteers Aug 6, 2025
32a4d27
fully tested
aaronsteers Aug 6, 2025
083a63c
lint fixes
aaronsteers Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 84 additions & 5 deletions connector_builder_mcp/_connector_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
read_stream,
resolve_manifest,
)
from airbyte_cdk.models.airbyte_protocol import (
from airbyte_cdk.models import (
AirbyteStream,
ConfiguredAirbyteCatalog,
DestinationSyncMode,
Expand All @@ -25,7 +25,7 @@
from fastmcp import FastMCP
from pydantic import BaseModel, Field

from connector_builder_mcp._util import validate_manifest_structure
from connector_builder_mcp._util import filter_config_secrets, validate_manifest_structure

logger = logging.getLogger(__name__)

Expand All @@ -46,6 +46,12 @@ class StreamTestResult(BaseModel):
message: str
records_read: int = 0
errors: list[str] = []
records: list[dict[str, Any]] | None = Field(
default=None, description="Actual record data when include_records=True"
)
slices: list[dict[str, Any]] | None = Field(
default=None, description="Raw slices data when include_raw_response_data=True"
)


def _get_dummy_catalog(stream_name: str) -> ConfiguredAirbyteCatalog:
Expand Down Expand Up @@ -146,6 +152,16 @@ def execute_stream_test_read(
int,
Field(description="Maximum number of records to read", ge=1, le=1000),
] = 10,
include_records: Annotated[
bool,
Field(description="If True, include actual record data in the response"),
] = False,
include_raw_response_data: Annotated[
bool,
Field(
description="If True, include raw HTTP request and response data in slices structure"
),
] = False,
) -> StreamTestResult:
"""Execute reading from a connector stream.

Expand All @@ -154,9 +170,15 @@ def execute_stream_test_read(
config: Connector configuration
stream_name: Name of the stream to test
max_records: Maximum number of records to read
include_records: If True, include actual record data in the response
include_raw_response_data: If True, include raw HTTP request and response data in slices structure

Returns:
Test result with success status and details
Test result with success status and details, optionally including record and HTTP data

Note:
Raw request and response data in slices is automatically sanitized using filter_config_secrets()
to prevent accidental exposure of API keys and other sensitive information.
"""
logger.info(f"Testing stream read for stream: {stream_name}")

Expand Down Expand Up @@ -185,22 +207,79 @@ def execute_stream_test_read(
)

if result.type.value == "RECORD":
records_data = None
slices_data: list[dict[str, Any]] | None = None

if result.record and result.record.data:
try:
stream_data = result.record.data

if include_records:
if isinstance(stream_data, dict) and "records" in stream_data:
records_data = stream_data["records"]
elif isinstance(stream_data, dict):
records_data = [stream_data]

if include_raw_response_data:
slices = (
stream_data.get("slices", []) if isinstance(stream_data, dict) else []
)
slices_data = filter_config_secrets(slices) # type: ignore[assignment]

except Exception as e:
logger.warning(f"Failed to extract data: {e}")

return StreamTestResult(
success=True,
message=f"Successfully read from stream {stream_name}",
records_read=max_records,
records=records_data,
slices=slices_data,
)

error_msg = "Failed to read from stream"
if hasattr(result, "trace") and result.trace:
error_msg = result.trace.error.message

return StreamTestResult(success=False, message=error_msg, errors=[error_msg])
slices_data = None

if include_raw_response_data:
if result.record and result.record.data:
try:
stream_data = result.record.data
slices = stream_data.get("slices", []) if isinstance(stream_data, dict) else []
slices_data = filter_config_secrets(slices) # type: ignore[assignment]
except Exception as e:
logger.warning(f"Failed to extract debug data: {e}")

return StreamTestResult(
success=False,
message=error_msg,
errors=[error_msg],
slices=slices_data,
)

except Exception as e:
logger.error(f"Error testing stream read: {e}")
error_msg = f"Stream test error: {str(e)}"
return StreamTestResult(success=False, message=error_msg, errors=[error_msg])

slices_data = None

if include_raw_response_data:
try:
if "result" in locals() and result.record and result.record.data:
stream_data = result.record.data
slices = stream_data.get("slices", []) if isinstance(stream_data, dict) else []
slices_data = filter_config_secrets(slices) # type: ignore[assignment]
except Exception:
pass

return StreamTestResult(
success=False,
message=error_msg,
errors=[error_msg],
slices=slices_data,
)


def get_resolved_manifest(
Expand Down
53 changes: 30 additions & 23 deletions connector_builder_mcp/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,42 @@ def initialize_logging() -> None:
)


def filter_config_secrets(config: dict[str, Any]) -> dict[str, Any]:
def filter_config_secrets(
config: dict[str, Any] | list[Any] | Any,
) -> dict[str, Any] | list[Any] | Any:
"""Filter sensitive information from configuration for logging.

Args:
config: Configuration dictionary that may contain secrets
config: Configuration dictionary, list, or other value that may contain secrets

Returns:
Configuration dictionary with sensitive values masked
Configuration with sensitive values masked
"""
filtered = config.copy()
sensitive_keys = {
"password",
"token",
"key",
"secret",
"credential",
"api_key",
"access_token",
"refresh_token",
"client_secret",
}

for key, value in filtered.items():
if isinstance(value, dict):
filtered[key] = filter_config_secrets(value)
elif any(sensitive in key.lower() for sensitive in sensitive_keys):
filtered[key] = "***REDACTED***"

return filtered
if isinstance(config, dict):
filtered = config.copy()
sensitive_keys = {
"password",
"token",
"key",
"secret",
"credential",
"api_key",
"access_token",
"refresh_token",
"client_secret",
}

for key, value in filtered.items():
if isinstance(value, dict | list):
filtered[key] = filter_config_secrets(value)
elif any(sensitive in key.lower() for sensitive in sensitive_keys):
filtered[key] = "***REDACTED***"

return filtered
elif isinstance(config, list):
return [filter_config_secrets(item) for item in config]
else:
return config


def validate_manifest_structure(manifest: dict[str, Any]) -> bool:
Expand Down
Loading
Loading