Skip to content

Commit ef30f5f

Browse files
authored
feat: allow inline pipeline in dryrun tool (#34)
* feat: allow inline pipeline in dryrun tool * ci: pin python to 3.11 * test: remove legal pipeline tests * chore: pin mysql connector to 9.5.0
1 parent 4924607 commit ef30f5f

5 files changed

Lines changed: 108 additions & 47 deletions

File tree

.github/workflows/python-app.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ jobs:
2424
- uses: actions/checkout@v4
2525
- name: Install the latest version of uv
2626
uses: astral-sh/setup-uv@v5
27+
- run: uv python pin cp311
2728
- name: Install dependencies
2829
run: uv sync
2930
- name: Format

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ license = {text = "MIT"}
1111
requires-python = ">=3.11"
1212
dependencies = [
1313
"mcp>=1.8.0",
14-
"mysql-connector-python>=9.1.0",
14+
"mysql-connector-python==9.5.0",
1515
"pyyaml>=6.0.2",
1616
"aiohttp>=3.9.0",
1717
]

src/greptimedb_mcp_server/server.py

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ async def execute_tql(
374374
],
375375
end: Annotated[
376376
str,
377-
"End time: SQL expression (e.g., 'now()'), " "RFC3339, or Unix timestamp",
377+
"End time: SQL expression (e.g., 'now()'), RFC3339, or Unix timestamp",
378378
],
379379
step: Annotated[str, "Query resolution step, e.g., '1m', '5m', '1h'"],
380380
lookback: Annotated[str | None, "Lookback delta for range queries"] = None,
@@ -722,16 +722,14 @@ async def create_pipeline(
722722
pipelines = result.get("pipelines", [])
723723
version = pipelines[0]["version"] if pipelines else "unknown"
724724
return (
725-
f"Pipeline '{name}' created successfully.\n"
726-
f"Version: {version}"
725+
f"Pipeline '{name}' created successfully.\nVersion: {version}"
727726
)
728727
except (json.JSONDecodeError, KeyError, IndexError):
729728
return f"Pipeline '{name}' created successfully."
730729
else:
731730
error_detail = response_text if response_text else "No details"
732731
return (
733-
f"Error creating pipeline (HTTP {response.status}): "
734-
f"{error_detail}"
732+
f"Error creating pipeline (HTTP {response.status}): {error_detail}"
735733
)
736734

737735
except aiohttp.ClientError as e:
@@ -741,24 +739,59 @@ async def create_pipeline(
741739

742740
@mcp.tool()
743741
async def dryrun_pipeline(
744-
pipeline_name: Annotated[str, "Name of the pipeline to test"],
745-
data: Annotated[str, "Test data in JSON format (single object or array)"],
742+
pipeline: Annotated[
743+
str | None,
744+
"Pipeline configuration in YAML format (inline). Provide this to test a pipeline without saving it.",
745+
] = None,
746+
pipeline_name: Annotated[
747+
str | None,
748+
"Name of the saved pipeline to test. Provide either 'pipeline' or 'pipeline_name', not both.",
749+
] = None,
750+
data: Annotated[
751+
str, "Test data in JSON or NDJSON format (single object or array)"
752+
] = "",
753+
data_type: Annotated[
754+
str | None,
755+
"Content type of the data (e.g., 'application/x-ndjson'). If omitted, GreptimeDB will use default.",
756+
] = None,
746757
) -> str:
747-
"""Test a pipeline with sample data without writing to the database."""
758+
"""Test a pipeline with sample data without writing to the database.
759+
760+
You can test a pipeline in two ways:
761+
- Provide 'pipeline' with inline YAML configuration
762+
- Provide 'pipeline_name' to test a previously saved pipeline
763+
764+
Args:
765+
pipeline: Pipeline YAML configuration (inline)
766+
pipeline_name: Name of saved pipeline (mutually exclusive with pipeline)
767+
data: Test data in JSON/NDJSON format
768+
data_type: Optional content type (e.g., 'application/x-ndjson')
769+
"""
748770
state = get_state()
749-
pipeline_name = _validate_pipeline_name(pipeline_name)
750771

751-
try:
752-
parsed = json.loads(data)
753-
normalized_data = json.dumps(parsed, ensure_ascii=False)
754-
except json.JSONDecodeError as e:
755-
return f"Error: Invalid JSON data: {str(e)}"
772+
if not data or not data.strip():
773+
return "Error: data parameter is required"
774+
775+
if pipeline is not None and pipeline_name is not None:
776+
return "Error: Provide either 'pipeline' or 'pipeline_name', not both"
777+
778+
if pipeline is None and pipeline_name is None:
779+
return "Error: Provide either 'pipeline' or 'pipeline_name'"
780+
781+
if pipeline_name is not None:
782+
pipeline_name = _validate_pipeline_name(pipeline_name)
756783

757784
url = f"{state.http_base_url}/v1/pipelines/_dryrun"
758-
request_body = {
759-
"pipeline_name": pipeline_name,
760-
"data": normalized_data,
761-
}
785+
request_body = {"data": data}
786+
787+
if data_type:
788+
request_body["data_type"] = data_type
789+
790+
if pipeline is not None:
791+
request_body["pipeline"] = pipeline
792+
elif pipeline_name is not None:
793+
request_body["pipeline_name"] = pipeline_name
794+
762795
auth = state.get_http_auth()
763796
logger.debug(f"Dryrun request URL: {url}")
764797
logger.debug(f"Dryrun request body: {request_body}")
@@ -780,12 +813,11 @@ async def dryrun_pipeline(
780813
else:
781814
error_detail = response_text if response_text else "No details"
782815
return (
783-
f"Error testing pipeline (HTTP {response.status}): "
784-
f"{error_detail}"
816+
f"Error testing pipeline (HTTP {response.status}): {error_detail}"
785817
)
786818

787819
except aiohttp.ClientError as e:
788-
logger.error(f"HTTP error testing pipeline '{pipeline_name}': {e}")
820+
logger.error(f"HTTP error testing pipeline: {e}")
789821
return f"Error testing pipeline: {str(e)}"
790822

791823

@@ -813,8 +845,7 @@ async def delete_pipeline(
813845
else:
814846
error_detail = response_text if response_text else "No details"
815847
return (
816-
f"Error deleting pipeline (HTTP {response.status}): "
817-
f"{error_detail}"
848+
f"Error deleting pipeline (HTTP {response.status}): {error_detail}"
818849
)
819850

820851
except aiohttp.ClientError as e:

src/greptimedb_mcp_server/templates/pipeline_creator/template.md

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -132,31 +132,42 @@ transform:
132132

133133
Generate a complete, valid YAML pipeline configuration. After generation:
134134
1. Use `create_pipeline` tool to create the pipeline
135-
2. Use `dryrun_pipeline` tool to verify with sample data
135+
2. Use `dryrun_pipeline` tool with inline pipeline YAML to verify with sample data
136136

137137
**Note**: You can update an existing pipeline by calling `create_pipeline` with the same name. Each call creates a new version. Use `list_pipelines` to view all versions, and `delete_pipeline` to remove specific versions.
138138

139139
## Testing with dryrun_pipeline
140140

141-
The `dryrun_pipeline` tool accepts JSON data in the following formats:
141+
Use `dryrun_pipeline` with separated parameters:
142142

143-
**Single log entry (JSON object with "message" field for plain text logs):**
144-
```json
145-
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"}
143+
**Example with inline pipeline YAML:**
144+
```python
145+
dryrun_pipeline(
146+
pipeline='''version: 2
147+
processors:
148+
- date:
149+
fields:
150+
- timestamp
151+
formats:
152+
- '%Y-%m-%dT%H:%M:%SZ' ''',
153+
data='{"timestamp": "2024-05-25T20:16:37Z", "level": "INFO"}',
154+
data_type='application/json'
155+
)
146156
```
147157

148-
**Multiple log entries (JSON array):**
149-
```json
150-
[
151-
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"},
152-
{"message": "192.168.1.1 - - [25/May/2024:20:17:37 +0000] \"POST /api/login HTTP/1.1\" 200 1784"}
153-
]
158+
**Example with saved pipeline:**
159+
```python
160+
dryrun_pipeline(
161+
pipeline_name='my_log_pipeline',
162+
data='{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}',
163+
data_type='application/x-ndjson'
164+
)
154165
```
155166

156-
**Structured JSON logs (fields map directly to pipeline input):**
157-
```json
158-
{"timestamp": "2024-05-25 20:16:37", "level": "INFO", "service": "api", "message": "Request processed"}
159-
```
167+
**Data Formats:**
168+
- **Single log entry:** `{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}`
169+
- **Multiple entries (JSON array):** `[{"message": "log1"}, {"message": "log2"}]`
170+
- **NDJSON (newline-delimited):** Use `data_type='application/x-ndjson'` with data like `"{"msg":"line1"}\n{"msg":"line2"}"`
160171

161172
## Common Log Format Examples
162173

@@ -213,6 +224,8 @@ Pattern: `%{timestamp} %{hostname} %{app}[%{pid}]: %{message}`
213224
## Troubleshooting
214225

215226
If `dryrun_pipeline` fails:
227+
- **Missing required parameters**: Ensure you provide `data` and exactly one of `pipeline` or `pipeline_name`
228+
- **Both pipeline and pipeline_name provided**: Only provide one of them
216229
- **Pattern mismatch**: Check if dissect/regex pattern matches the log format exactly
217230
- **Date format error**: Verify the date format string matches the timestamp in logs
218231
- **Missing fields**: Use `ignore_missing: true` in processors to handle optional fields
@@ -230,11 +243,11 @@ curl -X POST "http://localhost:4000/v1/pipelines/my_pipeline" \
230243
-H "Content-Type: application/x-yaml" \
231244
-d @pipeline.yaml
232245

233-
# Dryrun pipeline
246+
# Dryrun pipeline (constructs JSON request internally)
234247
curl -X POST "http://localhost:4000/v1/pipelines/_dryrun" \
235248
-u "<username>:<password>" \
236249
-H "Content-Type: application/json" \
237-
-d '{"pipeline_name": "my_pipeline", "data": "{\"message\": \"test log entry\"}"}'
250+
-d '{"pipeline": "version: 2", "data": "{\"timestamp\": \"2024-05-25T20:16:37Z\"}", "data_type": "application/json"}'
238251

239252
# Delete pipeline
240253
curl -X DELETE "http://localhost:4000/v1/pipelines/my_pipeline?version=<version>" \

tests/test_server.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,18 +592,34 @@ async def test_create_pipeline_invalid_name():
592592

593593

594594
@pytest.mark.asyncio
595-
async def test_dryrun_pipeline_invalid_name():
596-
"""Test dryrun_pipeline with invalid name"""
595+
async def test_dryrun_pipeline_invalid_pipeline_name():
596+
"""Test dryrun_pipeline with invalid pipeline name"""
597597
with pytest.raises(ValueError) as excinfo:
598598
await dryrun_pipeline(pipeline_name="123-invalid", data='{"message": "test"}')
599599
assert "Invalid pipeline name" in str(excinfo.value)
600600

601601

602602
@pytest.mark.asyncio
603-
async def test_dryrun_pipeline_invalid_json():
604-
"""Test dryrun_pipeline with invalid JSON data"""
605-
result = await dryrun_pipeline(pipeline_name="test_pipeline", data="invalid json")
606-
assert "Error: Invalid JSON data" in result
603+
async def test_dryrun_pipeline_missing_data():
604+
"""Test dryrun_pipeline without required data parameter"""
605+
result = await dryrun_pipeline(pipeline="version: 2", data="")
606+
assert "Error: data parameter is required" in result
607+
608+
609+
@pytest.mark.asyncio
610+
async def test_dryrun_pipeline_both_pipeline_and_name():
611+
"""Test dryrun_pipeline with both pipeline and pipeline_name (should error)"""
612+
result = await dryrun_pipeline(
613+
pipeline="version: 2", pipeline_name="test_pipeline", data='{"message": "test"}'
614+
)
615+
assert "Error: Provide either 'pipeline' or 'pipeline_name', not both" in result
616+
617+
618+
@pytest.mark.asyncio
619+
async def test_dryrun_pipeline_neither_pipeline_nor_name():
620+
"""Test dryrun_pipeline without pipeline or pipeline_name (should error)"""
621+
result = await dryrun_pipeline(data='{"message": "test"}')
622+
assert "Error: Provide either 'pipeline' or 'pipeline_name'" in result
607623

608624

609625
@pytest.mark.asyncio

0 commit comments

Comments
 (0)