diff --git a/cmd/mcp-grafana/main.go b/cmd/mcp-grafana/main.go index fe1ad9b6..13036cbc 100644 --- a/cmd/mcp-grafana/main.go +++ b/cmd/mcp-grafana/main.go @@ -45,7 +45,7 @@ type disabledTools struct { dashboard, folder, oncall, asserts, sift, admin, pyroscope, navigation, proxied, annotations, rendering, cloudwatch, write, examples, clickhouse, searchlogs, - runpanelquery bool + runpanelquery, influxdb bool } // Configuration for the Grafana client. @@ -89,6 +89,7 @@ func (dt *disabledTools) addFlags() { flag.BoolVar(&dt.clickhouse, "disable-clickhouse", false, "Disable ClickHouse tools") flag.BoolVar(&dt.searchlogs, "disable-searchlogs", false, "Disable search logs tools") flag.BoolVar(&dt.runpanelquery, "disable-runpanelquery", false, "Disable run panel query tools") + flag.BoolVar(&dt.influxdb, "disable-influxdb", false, "Disable InfluxDb tools") } func (gc *grafanaConfig) addFlags() { @@ -129,6 +130,7 @@ func (dt *disabledTools) addTools(s *server.MCPServer) { maybeAddTools(s, tools.AddClickHouseTools, enabledTools, dt.clickhouse, "clickhouse") maybeAddTools(s, tools.AddSearchLogsTools, enabledTools, dt.searchlogs, "searchlogs") maybeAddTools(s, tools.AddRunPanelQueryTools, enabledTools, dt.runpanelquery, "runpanelquery") + maybeAddTools(s, tools.AddInfluxTools, enabledTools, dt.influxdb, "influxdb") } func newServer(transport string, dt disabledTools, obs *observability.Observability) (*server.MCPServer, *mcpgrafana.ToolManager) { @@ -183,6 +185,7 @@ Available Capabilities: - Prometheus & Loki: Run PromQL and LogQL queries, retrieve metric/log metadata, and explore label names/values. - ClickHouse: Query ClickHouse datasources via Grafana with macro and variable substitution support. - Elasticsearch: Query Elasticsearch datasources using Lucene syntax or Query DSL for logs and metrics. +- InfluxDB: Query InfluxDB datasources with SQL, InfluxQL, Flux languages - Incidents: Search, create, update, and resolve incidents in Grafana Incident. - Sift Investigations: Start and manage Sift investigations, analyze logs/traces, find error patterns, and detect slow requests. - Alerting: List and fetch alert rules and notification contact points. diff --git a/docker-compose.yaml b/docker-compose.yaml index 2955d61e..38063c6b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -134,3 +134,43 @@ services: interval: 10s timeout: 5s retries: 5 + influxdb3: + build: + dockerfile_inline: | + FROM influxdb:3-core@sha256:255268d2a5f42b8c38d373864a4ba72956d91e14a3361019706bfad2f7c039ab + COPY --chmod=777 ./testdata/tools/influxdb/influxdbv3-seed.sh /init.sh + ports: + - "8181:8181" + command: > + /bin/bash -c "/init.sh & influxdb3 serve --node-id=node0 --object-store=file --data-dir=/var/lib/influxdb3 --admin-token-file=/run/secrets/admin-token" + secrets: + - admin-token + influxdb2: + build: + dockerfile_inline: | + FROM influxdb:2 + COPY --chmod=777 ./testdata/tools/influxdb/influxdbv2-seed.sh /docker-entrypoint-initdb.d/init.sh + ports: + - "8086:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME_FILE: /run/secrets/influxdb2-admin-username + DOCKER_INFLUXDB_INIT_PASSWORD_FILE: /run/secrets/influxdb2-admin-password + DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE: /run/secrets/influxdb2-admin-token + DOCKER_INFLUXDB_INIT_ORG: system-logs + DOCKER_INFLUXDB_INIT_BUCKET: b-system-logs + secrets: + - influxdb2-admin-username + - influxdb2-admin-password + - influxdb2-admin-token + +secrets: + influxdb2-admin-username: + file: ./testdata/tools/influxdb/.env.influxdb2-admin-username + influxdb2-admin-password: + file: ./testdata/tools/influxdb/.env.influxdb2-admin-password + influxdb2-admin-token: + file: ./testdata/tools/influxdb/.env.influxdb2-admin-token + admin-token: + file: ./testdata/tools/influxdb/admin-token.json + diff --git a/pkg/grafana/datasource.go b/pkg/grafana/datasource.go new file mode 100644 index 00000000..187c71df --- /dev/null +++ b/pkg/grafana/datasource.go @@ -0,0 +1,47 @@ +package grafana + +import "errors" + +var ErrNoRows = errors.New("no rows in result set") + +type DSQueryPayload struct { + Queries []any `json:"queries"` + From string `json:"from"` + To string `json:"to"` +} + +type DSQueryFrameField struct { + Name string `json:"name"` + Type string `json:"type"` + TypeInfo struct { + Frame string `json:"frame,omitempty"` + } `json:"typeInfo,omitempty"` + Labels map[string]string `json:"labels"` + Config map[string]interface{} `json:"config,omitempty"` +} + +type DSQueryFrameSchema struct { + Name string `json:"name,omitempty"` + RefID string `json:"refId,omitempty"` + Fields []DSQueryFrameField `json:"fields"` +} + +type DSQueryFrameData struct { + Values [][]interface{} `json:"values"` +} + +type DSQueryFrame struct { + Schema DSQueryFrameSchema `json:"schema,omitempty"` + Data DSQueryFrameData `json:"data"` +} + +type DSQueryResult struct { + Status int `json:"status,omitempty"` + Frames []DSQueryFrame `json:"frames,omitempty"` + Error string `json:"error,omitempty"` +} + +// DSQueryResponse represents the raw API response from Grafana's /api/ds/query +type DSQueryResponse struct { + Results map[string]DSQueryResult `json:"results"` +} diff --git a/testdata/provisioning/datasources/datasources.yaml b/testdata/provisioning/datasources/datasources.yaml index fd714408..84eccfa4 100644 --- a/testdata/provisioning/datasources/datasources.yaml +++ b/testdata/provisioning/datasources/datasources.yaml @@ -87,3 +87,40 @@ datasources: accessKey: test secretKey: test isDefault: false + - name: InfluxDB_v2_Flux + id: 9 + uid: influxdb-flux + type: influxdb + access: proxy + url: http://influxdb2:8086 + jsonData: + version: Flux + organization: system-logs + defaultBucket: b-system-logs + tlsSkipVerify: true + secureJsonData: + token: admintoken + - name: InfluxDB_v2_InfluxQL + id: 10 + uid: influxdb-influxql + type: influxdb + access: proxy + url: http://influxdb2:8086 + jsonData: + dbName: b-system-logs + httpHeaderName1: 'Authorization' + secureJsonData: + httpHeaderValue1: 'Token admintoken' + - name: InfluxDB_v3_SQL + id: 11 + uid: influxdb-sql + type: influxdb + access: proxy + url: http://influxdb3:8181 + jsonData: + version: SQL + dbName: system-logs + httpMode: POST + insecureGrpc: true + secureJsonData: + token: 'apiv3_OgXAgbMRgiGXcAQaFLJoaw==' \ No newline at end of file diff --git a/testdata/tools/influxdb/.env.influxdb2-admin-password b/testdata/tools/influxdb/.env.influxdb2-admin-password new file mode 100644 index 00000000..f3097ab1 --- /dev/null +++ b/testdata/tools/influxdb/.env.influxdb2-admin-password @@ -0,0 +1 @@ +password diff --git a/testdata/tools/influxdb/.env.influxdb2-admin-token b/testdata/tools/influxdb/.env.influxdb2-admin-token new file mode 100644 index 00000000..fae5b667 --- /dev/null +++ b/testdata/tools/influxdb/.env.influxdb2-admin-token @@ -0,0 +1 @@ +admintoken diff --git a/testdata/tools/influxdb/.env.influxdb2-admin-username b/testdata/tools/influxdb/.env.influxdb2-admin-username new file mode 100644 index 00000000..7fbe952b --- /dev/null +++ b/testdata/tools/influxdb/.env.influxdb2-admin-username @@ -0,0 +1 @@ +admin diff --git a/testdata/tools/influxdb/admin-token.json b/testdata/tools/influxdb/admin-token.json new file mode 100644 index 00000000..b6260b91 --- /dev/null +++ b/testdata/tools/influxdb/admin-token.json @@ -0,0 +1,4 @@ +{ + "token": "apiv3_OgXAgbMRgiGXcAQaFLJoaw==", + "name": "_admin" +} diff --git a/testdata/tools/influxdb/influxdbv2-seed.sh b/testdata/tools/influxdb/influxdbv2-seed.sh new file mode 100755 index 00000000..325dbe2f --- /dev/null +++ b/testdata/tools/influxdb/influxdbv2-seed.sh @@ -0,0 +1,83 @@ +#!/bin/bash +echo "Starting InfluxDB v2 data seeding..." + +ADMIN_TOKEN="admintoken" +ORG_NAME="system-logs" +BUCKET_NAME="b-system-logs" + +# --- Generate Timestamps --- +NOW=$(date +%s%N) +M1=$((NOW - 7200000000000)) # 2 hours ago +M2=$((NOW - 3600000000000)) # 1 hour ago +M3=$((NOW - 1800000000000)) # 30 min ago +M4=$((NOW - 900000000000)) # 15 min ago +M5=$((NOW - 300000000000)) # 5 min ago + +# --- Seed b-system-logs bucket --- +echo "Seeding $BUCKET_NAME bucket..." +influx write \ + --token "$ADMIN_TOKEN" \ + --org "$ORG_NAME" \ + --bucket "$BUCKET_NAME" \ + --precision ns \ + <\s*limit\s*\(\s*n\s*:\s*\d+\s*\)\s*$`) +) + +type influxDBClient struct { + httpClient *http.Client + baseURL string +} + +// newInfluxDBClient creates a new InfluxDB client for the given datasource +// queryType: when non-nil used to restrict the datasource to have same queryType +// returns client along with query type of datasource +func newInfluxDBClient(ctx context.Context, uid string, queryType *string) (*influxDBClient, string, error) { + // Verify the datasource exists and is a InfluxDB datasource + ds, err := getDatasourceByUID(ctx, GetDatasourceByUIDParams{UID: uid}) + if err != nil { + return nil, "", err + } + + if ds.Type != InfluxDBDataSourceType { + return nil, "", fmt.Errorf("datasource %s is of type %s, not %s", uid, ds.Type, InfluxDBDataSourceType) + } + + // Verify the query lang specified is the one configured with datasource + dsQueryType := InfluxQLQueryType + + if jsonMap, ok := ds.JSONData.(map[string]interface{}); ok { + if dsQT, ok := jsonMap["version"].(string); ok && dsQT != "" { + dsQueryType = dsQT + } + } + + if queryType != nil { + if *queryType != dsQueryType { + return nil, dsQueryType, fmt.Errorf("datasource %s is configured with querytype %s, not %s", uid, dsQueryType, *queryType) + } + + } + + cfg := mcpgrafana.GrafanaConfigFromContext(ctx) + baseURL := strings.TrimRight(cfg.URL, "/") + + // Create custom transport with TLS configuration if available + var transport = http.DefaultTransport + if tlsConfig := cfg.TLSConfig; tlsConfig != nil { + var err error + transport, err = tlsConfig.HTTPTransport(transport.(*http.Transport)) + if err != nil { + return nil, dsQueryType, fmt.Errorf("failed to create custom transport: %w", err) + } + } + + transport = NewAuthRoundTripper(transport, cfg.AccessToken, cfg.IDToken, cfg.APIKey, cfg.BasicAuth) + transport = mcpgrafana.NewOrgIDRoundTripper(transport, cfg.OrgID) + + client := &http.Client{ + Transport: mcpgrafana.NewUserAgentTransport(transport), + } + + return &influxDBClient{ + httpClient: client, + baseURL: baseURL, + }, dsQueryType, nil +} + +type InfluxQueryArgs struct { + DatasourceUID string `json:"datasourceUid" jsonschema:"required,description=The UID of the InfluxDB datasource to query. Use list_datasources to find available UIDs."` + Query string `json:"query" jsonschema:"required,description=SQL/Flux/InfluxQL query. Supports SQL macros: $__timeFilter for time filtering\\, $__timeFrom/$__timeTo for millisecond timestamps\\, $__interval for calculated intervals\\, $__dateBin()/$__dateBinAlias() to apply date_bin for timestamp columns. Supports Flux macros : v.timeRangeStart\\, v.timeRangeStop\\, v.windowPeriod (Grafana-calculated interval)\\, v.defaultBucket (configured default bucket)\\, v.organization (configured organization)\\."` + QueryType string `json:"query_type" jsonschema:"required,enum=SQL,enum=Flux,enum=InfluxQL,description=QueryType of Datasource. One of the specified options"` + Start string `json:"start,omitempty" jsonschema:"description=Start time. Formats: 'now-1h'\\, '2026-02-02T19:00:00Z'\\, '1738519200000' (Unix ms). Default: now-1h"` + End string `json:"end,omitempty" jsonschema:"description=End time. Formats: 'now'\\, '2026-02-02T20:00:00Z'\\, '1738522800000' (Unix ms). Default: now"` + IntervalMs uint `json:"interval_ms,omitempty" jsonschema:"description=Interval in milliseconds"` + Limit uint `json:"limit,omitempty" jsonschema:"description=Limit number of records per table (or group)"` +} + +// InfluxQueryResFrame represents a single frame of data in the query response. +type InfluxQueryResFrame struct { + Name string `json:"name"` + Columns []string `json:"columns"` + Rows []map[string]any `json:"rows"` + RowCount uint `json:"rowCount"` +} + +// InfluxQueryResult contains the parsed results of an InfluxDB query. +type InfluxQueryResult struct { + Frames []*InfluxQueryResFrame + FramesCount int + Hints *EmptyResultHints `json:"hints,omitempty"` +} + +type influxDBQueryPayload struct { + Datasource struct { + UID string `json:"uid"` + Type string `json:"type"` + } `json:"datasource"` + RefID string `json:"refId"` + Type string `json:"type"` + Format string `json:"format"` + IntervalMs uint `json:"intervalMs"` + Query string `json:"query"` + RawSQL string `json:"rawSql"` + RawQuery bool `json:"rawQuery"` + Limit string `json:"limit"` + ResultFormat string `json:"resultFormat"` +} + +func (ic *influxDBClient) Query(ctx context.Context, args InfluxQueryArgs, from, to time.Time) (*grafana.DSQueryResponse, error) { + format := "time_series" + + if args.QueryType == SQLQueryType { + format = "table" + } + + query := influxDBQueryPayload{ + Datasource: struct { + UID string `json:"uid"` + Type string `json:"type"` + }{ + UID: args.DatasourceUID, + Type: InfluxDBDataSourceType, + }, + RefID: "A", + Type: "timeSeriesQuery", + Format: format, + IntervalMs: args.IntervalMs, + RawQuery: true, + Limit: "", + ResultFormat: "time_series", + } + + // append query + if args.QueryType == SQLQueryType { + query.RawSQL = args.Query + } else { + query.Query = args.Query + } + + payload := grafana.DSQueryPayload{ + Queries: []any{ + query, + }, + From: strconv.FormatInt(from.UnixMilli(), 10), + To: strconv.FormatInt(to.UnixMilli(), 10), + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("marshaling query payload: %w", err) + } + + url := ic.baseURL + "/api/ds/query" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payloadBytes)) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := ic.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("executing request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, influxDBErrorResponseLimitBytes)) + return nil, fmt.Errorf("InfluxDB query returned status %d: %s", resp.StatusCode, string(bodyBytes)) + } + + // Read and parse response + var queryResp grafana.DSQueryResponse + bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, influxDBResponseLimitBytes)) + if err != nil { + return nil, fmt.Errorf("reading response body: %w", err) + } + + if err := unmarshalJSONWithLimitMsg(bodyBytes, &queryResp, influxDBResponseLimitBytes); err != nil { + return nil, err + } + + return &queryResp, nil +} + +func findTopLevelSelectAfterCTE(query string) int { + loc := sqlCTEStartRegEx.FindStringIndex(query) + if loc == nil { + return -1 + } + i := loc[1] + + for i < len(query) { + parenIdx := strings.Index(query[i:], "(") + if parenIdx == -1 { + break + } + i += parenIdx + + depth := 0 + for i < len(query) { + switch query[i] { + case '(': + depth++ + case ')': + depth-- + } + i++ + if depth == 0 { + break + } + } + + // Skip whitespace + for i < len(query) && (query[i] == ' ' || query[i] == '\n' || query[i] == '\t' || query[i] == '\r') { + i++ + } + + if i >= len(query) { + return -1 // nothing after closing paren — malformed + } + + if query[i] == ',' { + i++ // another CTE follows + } else { + // Verify a valid SQL keyword exists here (SELECT, INSERT, UPDATE, DELETE, etc.) + if !sqlKeywordRegEx.MatchString(query[i:]) { + return -1 + } + return i + } + } + return -1 +} + +func enforceQueryLimit(args *InfluxQueryArgs) { + // flux, influxql limits per measurement(influxql), table(flux) level so number of measurements * limit is final records + // sql limit applies on final records level + + limit := InfluxDBDefaultLimit + + if args.Limit >= InfluxDBMaxLimit { + limit = InfluxDBMaxLimit + } else if args.Limit > 0 { + limit = args.Limit + } + switch args.QueryType { + + case SQLQueryType: + query := strings.TrimSuffix(args.Query, ";") + + if sqlCTEStartRegEx.MatchString(query) { + // CTE query + pos := findTopLevelSelectAfterCTE(query) + if pos != -1 { + ctePrefix := query[:pos] // WITH a AS (...), b AS (...) + selectPart := query[pos:] // SELECT * FROM a JOIN b ON true + + if !sqlLimitRegEx.MatchString(selectPart) { + wrappedSelect := "(" + selectPart + ")" + fmt.Sprintf(" LIMIT %d", limit) + args.Query = ctePrefix + wrappedSelect + } + } + } else { + // window functions , generic queries + // wrap query and apply limit + args.Query = "(" + query + ")" + fmt.Sprintf(" LIMIT %d", limit) + } + case InfluxQLQueryType: + // override limits when query contains limit + if influxQLLimitRegEx.Match([]byte(args.Query)) { + replacement := fmt.Sprintf("${1}%d${2}${3}", limit) + args.Query = influxQLLimitRegEx.ReplaceAllString(args.Query, replacement) + } else { + // append limit in other cases + query := strings.TrimSuffix(args.Query, ";") + args.Query = query + fmt.Sprintf(" LIMIT %d", limit) + } + case FluxQueryType: + query := strings.TrimSpace(args.Query) + + if fluxLimitRegEx.MatchString(query) { + // Replace existing limit at end + args.Query = fluxLimitRegEx.ReplaceAllString(query, fmt.Sprintf("|> limit(n:%d)", limit)) + } else { + // Always append limit at end — goal is to always have limit as final operator + args.Query = query + fmt.Sprintf("\n|> limit(n:%d)", limit) + } + + } + +} + +func parseTimeRange(start string, end string) (*time.Time, *time.Time, error) { + // Parse time range + defaultPeriod := time.Hour + + now := time.Now() + fromTime := now.Add(-1 * defaultPeriod) // Default: 1 hour ago + toTime := now // Default: now + + if start != "" { + parsed, err := parseStartTime(start) + if err != nil { + return nil, nil, fmt.Errorf("parsing start time: %w", err) + } + if !parsed.IsZero() { + fromTime = parsed + } + + // set relative end time 1hour from start + if end == "" { + toTime = fromTime.Add(defaultPeriod) + } + } + + if end != "" { + parsed, err := parseEndTime(end) + if err != nil { + return nil, nil, fmt.Errorf("parsing end time: %w", err) + } + if !parsed.IsZero() { + toTime = parsed + } + + if start == "" { + fromTime = toTime.Add(-1 * defaultPeriod) + } + } + + return &fromTime, &toTime, nil + +} + +// parseQueryResponseFrames parses ds/query response in a json key-pair format +// returns list of frames combined of query results +// treats empty results as an error +func parseQueryResponseFrames(resp *grafana.DSQueryResponse) ([]*InfluxQueryResFrame, error) { + frames := make([]*InfluxQueryResFrame, 0) + hasResults := false + + // InfluxQL Query has a frame for each column selection, (different selection sets result in varying row count for each frame) + // SQL Query results in a single frame , selected columns are mapped in frame.columns + for refID, r := range resp.Results { + if r.Error != "" { + return nil, fmt.Errorf("query error (refId=%s): %s", refID, r.Error) + } + + // grow slice to accomadte atleast len(r.Frames) elements + frames = slices.Grow(frames, len(r.Frames)) + + for _, frame := range r.Frames { + + noOfCol := len(frame.Schema.Fields) + if noOfCol == 0 { + // columns not found for frame, skip frame + continue + } + + resFrame := InfluxQueryResFrame{} + resFrame.Columns = make([]string, 0, noOfCol) + + if len(frame.Data.Values) == 0 { + continue + } + + if len(frame.Data.Values) != noOfCol { + // return error when data values count mismatch schema fields + return nil, fmt.Errorf("frame data values count (%d) mismatch schema fields count (%d)", len(frame.Data.Values), noOfCol) + } + + // Number of rows count derived from count of values of first column + rowCount := (len(frame.Data.Values[0])) + resFrame.RowCount = uint(rowCount) + resFrame.Rows = make([]map[string]any, 0, rowCount) + resFrame.Name = frame.Schema.Name + + for colNo, field := range frame.Schema.Fields { + + fieldName := field.Name + + if field.Labels["_field"] != "" && field.Name == "_value" { + // use field name for column values of flux queries + fieldName = field.Labels["_field"] + } + // influxql query with 'time_series' format query + if field.Config != nil { + if displayName, ok := field.Config["displayNameFromDS"].(string); ok && displayName != "" { + fieldName = displayName + } + } + + resFrame.Columns = append(resFrame.Columns, fieldName) + + for rowId, colValue := range frame.Data.Values[colNo] { + if len(resFrame.Rows) < (rowId + 1) { + resFrame.Rows = append(resFrame.Rows, make(map[string]any)) + } + + resFrame.Rows[rowId][fieldName] = colValue + } + } + + frames = append(frames, &resFrame) + if rowCount > 0 && !hasResults { + hasResults = true + } + } + } + + var err error + if !hasResults { + err = grafana.ErrNoRows + } + frames = slices.Clip(frames) + + return frames, err +} +func queryInflux(ctx context.Context, args InfluxQueryArgs) (*InfluxQueryResult, error) { + client, _, err := newInfluxDBClient(ctx, args.DatasourceUID, &args.QueryType) + + if err != nil { + return nil, err + } + + originalQuery := args.Query + + enforceQueryLimit(&args) + from, to, err := parseTimeRange(args.Start, args.End) + if err != nil { + return nil, err + } + + resp, err := client.Query(ctx, args, *from, *to) + if err != nil { + return nil, err + } + + result := InfluxQueryResult{} + + frames, err := parseQueryResponseFrames(resp) + + if err != nil { + if err != grafana.ErrNoRows { + return nil, err + } + // query response returned no rows + // respond sucess with hints + result.Hints = GenerateEmptyResultHints(HintContext{ + DatasourceType: InfluxDBDataSourceType, + Query: originalQuery, + ProcessedQuery: args.Query, + StartTime: *from, + EndTime: *to, + }) + } + + result.Frames = frames + result.FramesCount = len(result.Frames) + + return &result, nil +} + +var QueryInfluxDB = mcpgrafana.MustTool( + "query_influxdb", + "Queries InfluxDB datasource, supports one of Flux, SQL, or InfluxQL query languages. Use in order: list_datasources -> get_datasource to determine query language configured for datasource.Use both list_influxdb_field_keys , list_influxdb_tag_keys to determine the available columns", + queryInflux, + mcp.WithTitleAnnotation("Query InfluxDB"), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithReadOnlyHintAnnotation(true), +) + +type ListBucketArgs struct { + DatasourceUID string `json:"datasourceUid" jsonschema:"required,description=The UID of the InfluxDB datasource. Use list_datasources to find available UIDs."` +} +type ListBucketResult struct { + Buckets *[]string `json:"buckets"` + BucketCount uint `json:"bucketCount"` + Hints *EmptyResultHints `json:"hints,omitempty"` +} + +// extractColValues extracts Values from response of string type columns +func extractColValues(resp *grafana.DSQueryResponse, colName string) (*[]string, error) { + fieldValues := make([]string, 0) + + for _, result := range resp.Results { + + if result.Error != "" { + return nil, errors.New(result.Error) + } + + for _, frame := range result.Frames { + fieldColIdx := -1 + + for idx, field := range frame.Schema.Fields { + if field.Name == colName { + fieldColIdx = idx + break + } + } + + if fieldColIdx == -1 { + // no bucket name col found + continue + } + + if len(frame.Data.Values) <= fieldColIdx { + continue + } + + fieldValues = slices.Grow(fieldValues, len(frame.Data.Values[fieldColIdx])) + + for _, name := range frame.Data.Values[fieldColIdx] { + if s, ok := name.(string); ok { + fieldValues = append(fieldValues, s) + } else { + return nil, fmt.Errorf("expected column %s to be string type, got %T", colName, name) + } + } + } + } + + return &fieldValues, nil +} + +func listBuckets(ctx context.Context, args ListBucketArgs) (*ListBucketResult, error) { + queryType := FluxQueryType + client, sourceQueryType, err := newInfluxDBClient(ctx, args.DatasourceUID, &queryType) + + if err != nil { + if sourceQueryType != "" && sourceQueryType != queryType { + return nil, fmt.Errorf("datasource is not configured with Flux, bucket listing is explicit to Flux linked datasources") + } + return nil, err + } + + query := "buckets()" + + refTime := time.Now() + + response, err := client.Query(ctx, InfluxQueryArgs{DatasourceUID: args.DatasourceUID, Query: query, QueryType: FluxQueryType, Start: "", End: ""}, refTime, refTime) + + if err != nil { + return nil, err + } + + buckets, err := extractColValues(response, "name") + + if err != nil { + return nil, err + } + + result := ListBucketResult{} + + if len(*buckets) == 0 { + // return empty result hints + result.Hints = GenerateEmptyResultHints(HintContext{ + DatasourceType: InfluxDBDataSourceType, + Query: query, + ProcessedQuery: query, + StartTime: refTime, + EndTime: refTime, + Error: fmt.Errorf("empty results, check if buckets exist for connected datasources"), + }) + } + + result.BucketCount = uint(len(*buckets)) + result.Buckets = buckets + return &result, nil +} + +var ListBucketsInflux = mcpgrafana.MustTool( + "list_influxdb_buckets", + "Lists buckets of an InfluxDB datasource identified by its UID. Requires the datasource to be configured with Flux. Use in order: list_datasources -> get_datasource -> list_influxdb_buckets", + listBuckets, + mcp.WithTitleAnnotation("List Buckets InfluxDB"), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithReadOnlyHintAnnotation(true), +) + +type ListMeasurementsArgs struct { + DatasourceUID string `json:"datasourceUid" jsonschema:"required,description=The UID of the InfluxDB datasource. Use list_datasources to find available UIDs."` + Bucket string `json:"bucket,omitempty" jsonschema:"optional,description=Bucket Name of target bucket to fetch from; required only for Flux linked datasources."` + Limit uint `json:"limit"` +} + +type ListMeasurementResult struct { + Measurements *[]string `json:"measurements"` + MeasurementCount uint `json:"measurementCount"` + Hints *EmptyResultHints `json:"hints,omitempty"` +} + +func enforceMeasurementsLimit(args *ListMeasurementsArgs) { + if args.Limit > InfluxDBMeasurementsMaxLimit { + args.Limit = InfluxDBMeasurementsMaxLimit + } + if args.Limit == 0 { + args.Limit = InfluxDBMeasurementsDefaultLimit + } +} +func listMeasurements(ctx context.Context, args ListMeasurementsArgs) (*ListMeasurementResult, error) { + client, queryType, err := newInfluxDBClient(ctx, args.DatasourceUID, nil) + if err != nil { + return nil, err + } + + enforceMeasurementsLimit(&args) + + if queryType == FluxQueryType && args.Bucket == "" { + return nil, fmt.Errorf("bucket is required for %s linked InfluxDB datasources", FluxQueryType) + } + var query string + // represents column key of measurement in response + var colKey string + switch queryType { + case SQLQueryType: + query = fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = 'iox' ORDER BY table_name LIMIT %d", args.Limit) + colKey = "table_name" + case FluxQueryType: + query = fmt.Sprintf( + `import "influxdata/influxdb/schema" + schema.measurements(bucket: %s)|> limit(n: %d)`, + quoteStringAsFluxLiteral(args.Bucket), args.Limit) + colKey = "_value" + case InfluxQLQueryType: + query = fmt.Sprintf("SHOW MEASUREMENTS LIMIT %d", args.Limit) + colKey = "Value" + } + + refTime := time.Now() + response, err := client.Query(ctx, InfluxQueryArgs{DatasourceUID: args.DatasourceUID, Query: query, QueryType: queryType, Start: "", End: ""}, refTime, refTime) + + if err != nil { + return nil, err + } + + measurements, err := extractColValues(response, colKey) + + if err != nil { + return nil, err + } + + result := ListMeasurementResult{} + + if len(*measurements) == 0 { + // add empty results hints + result.Hints = GenerateEmptyResultHints(HintContext{ + DatasourceType: InfluxDBDataSourceType, + Query: query, + ProcessedQuery: query, + StartTime: refTime, + EndTime: refTime, + Error: fmt.Errorf("no measurements found, verify at datasource"), + }) + } + + result.MeasurementCount = uint(len(*measurements)) + result.Measurements = measurements + return &result, nil +} + +var ListMeasurements = mcpgrafana.MustTool( + "list_influxdb_measurements", + "Lists Measurements of an InfluxDB datasource identified by its UID. Use in order: list_datasources -> get_datasource -> list_influxdb_buckets (required only for Flux linked datasource) -> list_influxdb_measurements", + listMeasurements, + mcp.WithTitleAnnotation("List Measurements InfluxDB"), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithReadOnlyHintAnnotation(true), +) + +type ListTagKeysArgs struct { + DatasourceUID string `json:"datasourceUid" jsonschema:"required,description=The UID of the InfluxDB datasource. Use list_datasources to find available UIDs."` + Bucket string `json:"bucket,omitempty" jsonschema:"optional,description=Bucket Name of target bucket to fetch from\\,required only for Flux linked datasources."` + Measurement string `json:"measurement" jsonschema:"required,description=Filter by measurement"` + Limit uint `json:"limit"` +} +type ListTagKeysResult struct { + TagKeys *[]string `json:"tags"` + TagKeysCount uint `json:"tagCount"` + Hints *EmptyResultHints `json:"hints,omitempty"` +} + +func enforceTagKeysLimit(args *ListTagKeysArgs) { + if args.Limit > InfluxDBTagsMaxLimit { + args.Limit = InfluxDBTagsMaxLimit + } + if args.Limit == 0 { + args.Limit = InfluxDBTagsDefaultLimit + } +} + +func quoteStringAsLiteral(s string) string { + // SQL style: single quotes, escape internal single quotes by doubling + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} + +func quoteStringAsFluxLiteral(s string) string { + // Flux style: double quotes, escape backslash then double quotes + s = strings.ReplaceAll(s, `\`, `\\`) + s = strings.ReplaceAll(s, `"`, `\"`) + return `"` + s + `"` +} + +// quoteStringAsInfluxQLIdentifier quotes a string as an InfluxQL identifier using double quotes. +func quoteStringAsInfluxQLIdentifier(s string) string { + // Must escape backslashes FIRST, then double quotes + s = strings.ReplaceAll(s, `\`, `\\`) // \ → \\ + s = strings.ReplaceAll(s, `"`, `\"`) // " → \" + return `"` + s + `"` +} + +func listTagKeys(ctx context.Context, args ListTagKeysArgs) (*ListTagKeysResult, error) { + enforceTagKeysLimit(&args) + + client, queryType, err := newInfluxDBClient(ctx, args.DatasourceUID, nil) + + if err != nil { + return nil, err + } + + if queryType == FluxQueryType && args.Bucket == "" { + return nil, fmt.Errorf("bucket is required for %s linked InfluxDB datasources", FluxQueryType) + } + + var tagColumnKey string + var query string + + switch queryType { + case SQLQueryType: + // data_type 'Dictionary%%' distinguishes tags from fields for SQL QUERIES + query = fmt.Sprintf("SELECT column_name FROM information_schema.columns WHERE table_schema = 'iox' AND table_name = %s AND data_type LIKE 'Dictionary%%' ORDER BY column_name LIMIT %d", + quoteStringAsLiteral(args.Measurement), args.Limit) + tagColumnKey = "column_name" + case FluxQueryType: + query = fmt.Sprintf( + `import "influxdata/influxdb/schema" + schema.measurementTagKeys(bucket: %s, measurement: %s)|> limit(n: %d)`, + quoteStringAsFluxLiteral(args.Bucket), quoteStringAsFluxLiteral(args.Measurement), args.Limit) + tagColumnKey = "_value" + case InfluxQLQueryType: + query = fmt.Sprintf(`SHOW TAG KEYS FROM %s LIMIT %d`, + quoteStringAsInfluxQLIdentifier(args.Measurement), args.Limit) + tagColumnKey = "Value" + } + + refTime := time.Now() + response, err := client.Query(ctx, InfluxQueryArgs{DatasourceUID: args.DatasourceUID, Query: query, QueryType: queryType, Start: "", End: ""}, refTime, refTime) + + if err != nil { + return nil, err + } + + tags, err := extractColValues(response, tagColumnKey) + + if err != nil { + return nil, err + } + + result := ListTagKeysResult{} + + if len(*tags) == 0 { + // add empty results hints + result.Hints = GenerateEmptyResultHints(HintContext{ + DatasourceType: InfluxDBDataSourceType, + Query: query, + ProcessedQuery: query, + StartTime: refTime, + EndTime: refTime, + Error: fmt.Errorf("no tags found, verify at datasource"), + }) + } + + result.TagKeysCount = uint(len(*tags)) + result.TagKeys = tags + return &result, nil +} + +var ListTagKeys = mcpgrafana.MustTool( + "list_influxdb_tag_keys", + "Lists Tag Keys of an InfluxDB datasource identified by its UID. Use in order: list_datasources -> get_datasource -> list_influxdb_buckets (required only for Flux linked datasource) -> list_influxdb_measurements -> list_influxdb_tag_keys", + listTagKeys, + mcp.WithTitleAnnotation("List Tag Keys InfluxDB"), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithReadOnlyHintAnnotation(true), +) + +type ListFieldKeysArgs struct { + DatasourceUID string `json:"datasourceUid" jsonschema:"required,description=The UID of the InfluxDB datasource. Use list_datasources to find available UIDs."` + Bucket string `json:"bucket,omitempty" jsonschema:"optional,description=Bucket Name of target bucket to fetch from\\,required only for Flux linked datasources."` + Measurement string `json:"measurement" jsonschema:"required,description=Filter by measurement"` + Limit uint `json:"limit"` +} + +type ListFieldKeysResult struct { + FieldKeys *[]string `json:"fields"` + FieldKeysCount uint `json:"fieldCount"` + Hints *EmptyResultHints `json:"hints,omitempty"` +} + +// enforceFieldKeysLimit applies the default or maximum limits to the provided field keys arguments. +func enforceFieldKeysLimit(args *ListFieldKeysArgs) { + if args.Limit > InfluxDBTagsMaxLimit { + args.Limit = InfluxDBTagsMaxLimit + } + if args.Limit == 0 { + args.Limit = InfluxDBTagsDefaultLimit + } +} + +func listFieldKeys(ctx context.Context, args ListFieldKeysArgs) (*ListFieldKeysResult, error) { + enforceFieldKeysLimit(&args) + + client, queryType, err := newInfluxDBClient(ctx, args.DatasourceUID, nil) + + if err != nil { + return nil, err + } + + if queryType == FluxQueryType && args.Bucket == "" { + return nil, fmt.Errorf("bucket is required for %s linked InfluxDB datasources", FluxQueryType) + } + + var fieldColumnKey string + var query string + + switch queryType { + case SQLQueryType: + // data_type 'Dictionary%%' distinguishes tags from fields for SQL QUERIES + query = fmt.Sprintf("SELECT column_name FROM information_schema.columns WHERE table_schema = 'iox' AND table_name = %s AND data_type NOT LIKE 'Dictionary%%' ORDER BY column_name LIMIT %d", + quoteStringAsLiteral(args.Measurement), args.Limit) + fieldColumnKey = "column_name" + case FluxQueryType: + query = fmt.Sprintf( + `import "influxdata/influxdb/schema" + schema.measurementFieldKeys(bucket: %s, measurement: %s)|> limit(n: %d)`, + quoteStringAsFluxLiteral(args.Bucket), quoteStringAsFluxLiteral(args.Measurement), args.Limit) + fieldColumnKey = "_value" + case InfluxQLQueryType: + query = fmt.Sprintf(`SHOW FIELD KEYS FROM %s LIMIT %d`, + quoteStringAsInfluxQLIdentifier(args.Measurement), args.Limit) + fieldColumnKey = "Value" + } + + refTime := time.Now() + response, err := client.Query(ctx, InfluxQueryArgs{DatasourceUID: args.DatasourceUID, Query: query, QueryType: queryType, Start: "", End: ""}, refTime, refTime) + + if err != nil { + return nil, err + } + + fieldKeys, err := extractColValues(response, fieldColumnKey) + + if err != nil { + return nil, err + } + + result := ListFieldKeysResult{} + + if len(*fieldKeys) == 0 { + // add empty results hints + result.Hints = GenerateEmptyResultHints(HintContext{ + DatasourceType: InfluxDBDataSourceType, + Query: query, + ProcessedQuery: query, + StartTime: refTime, + EndTime: refTime, + Error: fmt.Errorf("no fields found, verify at datasource"), + }) + } + + result.FieldKeysCount = uint(len(*fieldKeys)) + result.FieldKeys = fieldKeys + return &result, nil +} + +var ListFieldKeys = mcpgrafana.MustTool( + "list_influxdb_field_keys", + "Lists Field Keys of an InfluxDB datasource identified by its UID. Use in order: list_datasources -> get_datasource -> list_influxdb_buckets (required only for Flux linked datasource) -> list_influxdb_measurements -> list_influxdb_field_keys", + listFieldKeys, + mcp.WithTitleAnnotation("List Field Keys InfluxDB"), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithReadOnlyHintAnnotation(true), +) + +func AddInfluxTools(server *server.MCPServer) { + QueryInfluxDB.Register(server) + ListBucketsInflux.Register(server) + ListMeasurements.Register(server) + ListTagKeys.Register(server) + ListFieldKeys.Register(server) +} diff --git a/tools/influxdb_integration_test.go b/tools/influxdb_integration_test.go new file mode 100644 index 00000000..cf97265a --- /dev/null +++ b/tools/influxdb_integration_test.go @@ -0,0 +1,425 @@ +//go:build integration + +package tools + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test_ListBuckets verifies the listing of buckets for different InfluxDB datasource linked types. +func Test_ListBuckets(t *testing.T) { + t.Run("list buckets for Flux linked DataSource", func(t *testing.T) { + ctx := newTestContext() + + result, err := listBuckets(ctx, ListBucketArgs{ + DatasourceUID: "influxdb-flux", + }) + require.NoError(t, err) + + assert.Contains(t, *result.Buckets, "b-system-logs", "should list buckets for Flux DataSource") + }) + + t.Run("error for SQL linked Datasource", func(t *testing.T) { + ctx := newTestContext() + _, err := listBuckets(ctx, ListBucketArgs{ + DatasourceUID: "influxdb-sql", + }) + require.EqualError(t, err, "datasource is not configured with Flux, bucket listing is explicit to Flux linked datasources") + }) + + t.Run("error for InfluxQL linked Datasource", func(t *testing.T) { + ctx := newTestContext() + _, err := listBuckets(ctx, ListBucketArgs{ + DatasourceUID: "influxdb-influxql", + }) + require.EqualError(t, err, "datasource is not configured with Flux, bucket listing is explicit to Flux linked datasources") + }) +} +// Test_Query verifies querying InfluxDB with Flux, SQL and InfluxQL query languages. +func Test_Query(t *testing.T) { + + t.Run("Flux Query", func(t *testing.T) { + ctx := newTestContext() + + query := ` + from(bucket: "b-system-logs") + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "auth_events") + |> filter(fn: (r) => r["_field"] == "severity") + |> aggregateWindow(every: v.windowPeriod, column: "_value", fn: mean, createEmpty: false) + ` + + result, err := queryInflux(ctx, InfluxQueryArgs{ + DatasourceUID: "influxdb-flux", + Query: query, + QueryType: FluxQueryType, + Start: "now-24h", + End: "now", + }) + require.NoError(t, err) + assert.NotEmpty(t, result.Frames) + + t.Log(result.Frames[0], result.Hints) + + assert.Equal(t, 10, len(result.Frames), "should contain frames of all groups") + + for _, frame := range result.Frames { + assert.Equal(t, 1, len(frame.Rows), "should contain non-empty results for a frame") + _, ok := frame.Rows[0]["severity"] + // should contain field 'severity' + assert.True(t, ok, "should contain queried fields") + } + }) + + t.Run("SQL Query", func(t *testing.T) { + ctx := newTestContext() + query := `SELECT MAX("attempt_count") AS count FROM "auth_events";` + + result, err := queryInflux(ctx, InfluxQueryArgs{ + DatasourceUID: "influxdb-sql", + Query: query, + QueryType: SQLQueryType, + Start: "now-24h", + End: "now", + }) + + require.NoError(t, err) + + assert.NotEmpty(t, result.Frames, "should contain a frame") + assert.Len(t, result.Frames, result.FramesCount, "should specify frame count equal to len(frames)") + + t.Log(result.Frames[0], result.Hints) + assert.NotEmpty(t, result.Frames[0].Rows, "should contain results") + + attemptCount, ok := result.Frames[0].Rows[0]["count"].(float64) + + require.True(t, ok, "should contain queried columns with expected type in a row") + assert.Equal(t, 20.0, attemptCount) + }) + + t.Run("InfluxQL Query", func(t *testing.T) { + ctx := newTestContext() + + query := `SELECT mean("severity") FROM "auth_events" GROUP BY time($__interval) fill(null)` + + result, err := queryInflux(ctx, InfluxQueryArgs{ + DatasourceUID: "influxdb-influxql", + Query: query, + QueryType: InfluxQLQueryType, + Start: "now-24h", + }) + require.NoError(t, err) + assert.NotEmpty(t, result.Frames) + + t.Log(result.Frames[0], result.Hints) + assert.GreaterOrEqual(t, len(result.Frames[0].Rows), 20, "should contain query results") + + t.Log(result.Frames[0].Rows[0], result.Frames[0].Columns) + _, ok := result.Frames[0].Rows[0][`auth_events.mean`].(float64) + + require.True(t, ok, "should contain queried columns with expected type in a row") + }) +} +// Test_ListMeasurements verifies the listing of measurements for different InfluxDB datasource linked types. +func Test_ListMeasurements(t *testing.T) { + t.Run("require bucket for Flux Datasource", func(t *testing.T) { + ctx := newTestContext() + _, err := listMeasurements(ctx, ListMeasurementsArgs{ + DatasourceUID: "influxdb-flux", + }) + require.EqualError(t, err, fmt.Sprintf("bucket is required for %s linked InfluxDB datasources", FluxQueryType)) + }) + + t.Run("bucket optional for SQL/InfluxQL Datasource", func(t *testing.T) { + dataSourceUIDs := []string{"influxdb-sql", "influxdb-influxql"} + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + _, err := listMeasurements(ctx, ListMeasurementsArgs{ + DatasourceUID: uid, + }) + require.NoError(t, err) + } + }) + + t.Run("list measurements of a Datasource", func(t *testing.T) { + ctx := newTestContext() + + dataSourceUIDs := []string{"influxdb-flux", "influxdb-sql", "influxdb-influxql"} + + for _, uid := range dataSourceUIDs { + result, err := listMeasurements(ctx, ListMeasurementsArgs{ + DatasourceUID: uid, + Bucket: "b-system-logs", + }) + require.NoError(t, err) + + t.Log(result.Measurements, result.Hints, result.MeasurementCount) + assert.Subset(t, *result.Measurements, + []string{"auth_events", "db_queries", "http_requests", "queue_stats", "resource_usage", "syslog"}, + "should list measurements for %s linked Datasource", uid) + } + }) + +} +// Test_ListTagKeys verifies the listing of tag keys for different InfluxDB datasource linked types. +func Test_ListTagKeys(t *testing.T) { + + t.Run("require bucket for Flux Datasource", func(t *testing.T) { + ctx := newTestContext() + _, err := listTagKeys(ctx, ListTagKeysArgs{ + DatasourceUID: "influxdb-flux", + Measurement: "auth_events", + }) + require.EqualError(t, err, fmt.Sprintf("bucket is required for %s linked InfluxDB datasources", FluxQueryType)) + }) + + t.Run("list tags keys", func(t *testing.T) { + dataSourceUIDs := []string{"influxdb-flux", "influxdb-sql", "influxdb-influxql"} + + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + bucket := "" + + if uid == "influxdb-flux" { + bucket = "b-system-logs" + } + + result, err := listTagKeys(ctx, ListTagKeysArgs{ + DatasourceUID: uid, + Bucket: bucket, + Measurement: "auth_events", + }) + require.NoError(t, err) + + t.Log(result.TagKeys, uid, result.Hints) + + assert.Subset(t, *result.TagKeys, + []string{"ip", "status", "service"}, + "should list tag keys for %s linked Datasource", uid) + } + }) + + t.Run("hints for empty results", func(t *testing.T) { + dataSourceUIDs := []string{"influxdb-sql", "influxdb-influxql"} + + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + result, err := listTagKeys(ctx, ListTagKeysArgs{ + DatasourceUID: uid, + Measurement: "nonexistent", + }) + require.NoError(t, err) + + t.Log(result.TagKeys, uid, result.Hints) + + assert.NotNil(t, result.Hints, "should return hints") + + assert.Empty(t, *result.TagKeys, "should return empty list for non-existent measurement") + } + }) + +} +// Test_ListFieldKeys verifies the listing of field keys for different InfluxDB datasource linked types. +func Test_ListFieldKeys(t *testing.T) { + + t.Run("require bucket for Flux Datasource", func(t *testing.T) { + ctx := newTestContext() + _, err := listFieldKeys(ctx, ListFieldKeysArgs{ + DatasourceUID: "influxdb-flux", + Measurement: "auth_events", + }) + require.EqualError(t, err, fmt.Sprintf("bucket is required for %s linked InfluxDB datasources", FluxQueryType)) + }) + + t.Run("list field keys", func(t *testing.T) { + dataSourceUIDs := []string{"influxdb-flux", "influxdb-sql", "influxdb-influxql"} + + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + bucket := "" + + if uid == "influxdb-flux" { + bucket = "b-system-logs" + } + + result, err := listFieldKeys(ctx, ListFieldKeysArgs{ + DatasourceUID: uid, + Bucket: bucket, + Measurement: "auth_events", + }) + require.NoError(t, err) + + t.Log(result.FieldKeys, uid, result.Hints) + + assert.Subset(t, *result.FieldKeys, + []string{"attempt_count", "severity"}, + "should list field keys for %s linked Datasource", uid) + } + }) + + t.Run("hints for empty results", func(t *testing.T) { + dataSourceUIDs := []string{"influxdb-sql", "influxdb-influxql"} + + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + result, err := listFieldKeys(ctx, ListFieldKeysArgs{ + DatasourceUID: uid, + Measurement: "nonexistent", + }) + require.NoError(t, err) + + t.Log(result.FieldKeys, uid, result.Hints) + + assert.NotNil(t, result.Hints, "should return hints") + + assert.Empty(t, *result.FieldKeys, "should return empty list for non-existent measurement") + } + }) + +} +// Test_Limit verifies the correct application of rate limits on queries across datasource linked types. +func Test_Limit(t *testing.T) { + dataSourceUIDs := []string{"influxdb-flux", "influxdb-sql", "influxdb-influxql"} + + t.Run("list measurements with limits ", func(t *testing.T) { + + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + bucket := "" + if uid == "influxdb-flux" { + bucket = "b-system-logs" + } + + result, err := listMeasurements(ctx, ListMeasurementsArgs{ + DatasourceUID: uid, + Bucket: bucket, + Limit: 1, + }) + require.NoError(t, err) + + t.Log(result.Measurements, uid, result.Hints) + + assert.Len(t, *result.Measurements, 1) + } + }) + + t.Run("list tag keys with limit", func(t *testing.T) { + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + bucket := "" + + if uid == "influxdb-flux" { + bucket = "b-system-logs" + } + + result, err := listTagKeys(ctx, ListTagKeysArgs{ + DatasourceUID: uid, + Bucket: bucket, + Measurement: "auth_events", + Limit: 1, + }) + require.NoError(t, err) + + t.Log(result.TagKeys, uid, result.Hints) + + assert.Len(t, *result.TagKeys, 1) + } + }) + + t.Run("list field keys with limit", func(t *testing.T) { + for _, uid := range dataSourceUIDs { + ctx := newTestContext() + + bucket := "" + + if uid == "influxdb-flux" { + bucket = "b-system-logs" + } + + result, err := listFieldKeys(ctx, ListFieldKeysArgs{ + DatasourceUID: uid, + Bucket: bucket, + Measurement: "auth_events", + Limit: 1, + }) + require.NoError(t, err) + + t.Log(result.FieldKeys, uid, result.Hints) + + assert.Len(t, *result.FieldKeys, 1) + } + }) + + t.Run("execute query with limits", func(t *testing.T) { + + queries := []*InfluxQueryArgs{ + { + Query: ` + import "generate" + t1 = + generate.from( + count: 4, + fn: (n) => n + 1, + start: 2022-01-01T00:00:00Z, + stop: 2022-01-05T00:00:00Z, + ) + |> set(key: "tag", value: "foo") + |> group(columns: ["tag"]) + + t2 = + generate.from( + count: 4, + fn: (n) => n * (-1), + start: 2022-01-01T00:00:00Z, + stop: 2022-01-05T00:00:00Z, + ) + |> set(key: "tag", value: "bar") + |> group(columns: ["tag"]) + + union(tables: [t1, t2])// user comments should not bypass limits + `, + DatasourceUID: "influxdb-flux", + QueryType: FluxQueryType, + }, + { + Query: `SELECT "attempt_count" FROM "auth_events" LIMIT 3;`, + DatasourceUID: "influxdb-sql", + QueryType: SQLQueryType, + }, + { + // enforceQueryLimit rewrites the LIMIT clause to the enforced value while + // preserving the OFFSET, so `LIMIT 10 OFFSET 2` becomes `LIMIT 1 OFFSET 2`. + Query: `SELECT attempt_count FROM "auth_events" fill(null) LIMIT 10 OFFSET 2`, + DatasourceUID: "influxdb-influxql", + QueryType: InfluxQLQueryType, + }, + } + limit := 1 + for _, query := range queries { + ctx := newTestContext() + + query.Start = "now-4h" + query.End = "now" + query.Limit = uint(limit) + + result, err := queryInflux(ctx, *query) + + require.NoError(t, err) + t.Log(result.Frames, result.Hints) + assert.GreaterOrEqual(t, len(result.Frames), 1) + for _, frame := range result.Frames { + assert.Equal(t, limit, len(frame.Rows), "should limit number of rows of a frame") + } + } + }) +} diff --git a/tools/influxdb_unit_test.go b/tools/influxdb_unit_test.go new file mode 100644 index 00000000..80b4cf3c --- /dev/null +++ b/tools/influxdb_unit_test.go @@ -0,0 +1,487 @@ +//go:build unit + +package tools + +import ( + "errors" + "strings" + "testing" + "time" + + "github.com/grafana/mcp-grafana/pkg/grafana" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_enforceFieldKeysLimit(t *testing.T) { + t.Run("test_feature", func(t *testing.T) { + t.Run("should apply maximum limit when exceeded", func(t *testing.T) { + args := ListFieldKeysArgs{Limit: InfluxDBTagsMaxLimit + 10} + enforceFieldKeysLimit(&args) + assert.Equal(t, InfluxDBTagsMaxLimit, args.Limit, "limit should be maximum limit") + t.Log("applied max limit") + }) + + t.Run("should apply default limit when limit is 0", func(t *testing.T) { + args := ListFieldKeysArgs{Limit: 0} + enforceFieldKeysLimit(&args) + assert.Equal(t, InfluxDBTagsDefaultLimit, args.Limit, "limit should be default limit") + t.Log("applied default limit") + }) + + t.Run("should keep custom limit when within bounds", func(t *testing.T) { + args := ListFieldKeysArgs{Limit: 50} + enforceFieldKeysLimit(&args) + assert.Equal(t, uint(50), args.Limit, "limit should remain custom") + t.Log("kept custom limit") + }) + }) +} + +func Test_enforceTagKeysLimit(t *testing.T) { + t.Run("test_feature", func(t *testing.T) { + t.Run("should apply maximum limit when exceeded", func(t *testing.T) { + args := ListTagKeysArgs{Limit: InfluxDBTagsMaxLimit + 10} + enforceTagKeysLimit(&args) + assert.Equal(t, InfluxDBTagsMaxLimit, args.Limit, "limit should be maximum limit") + t.Log("applied max limit") + }) + + t.Run("should apply default limit when limit is 0", func(t *testing.T) { + args := ListTagKeysArgs{Limit: 0} + enforceTagKeysLimit(&args) + assert.Equal(t, InfluxDBTagsDefaultLimit, args.Limit, "limit should be default limit") + t.Log("applied default limit") + }) + + t.Run("should keep custom limit when within bounds", func(t *testing.T) { + args := ListTagKeysArgs{Limit: 80} + enforceTagKeysLimit(&args) + assert.Equal(t, uint(80), args.Limit, "limit should remain custom") + t.Log("kept custom limit") + }) + }) +} + +func Test_enforceMeasurementsLimit(t *testing.T) { + t.Run("test_feature", func(t *testing.T) { + t.Run("should apply maximum limit when exceeded", func(t *testing.T) { + args := ListMeasurementsArgs{Limit: InfluxDBMeasurementsMaxLimit + 100} + enforceMeasurementsLimit(&args) + assert.Equal(t, InfluxDBMeasurementsMaxLimit, args.Limit, "limit should be maximum limit") + t.Log("applied max limit") + }) + + t.Run("should apply default limit when limit is 0", func(t *testing.T) { + args := ListMeasurementsArgs{Limit: 0} + enforceMeasurementsLimit(&args) + assert.Equal(t, InfluxDBMeasurementsDefaultLimit, args.Limit, "limit should be default limit") + t.Log("applied default limit") + }) + + t.Run("should keep custom limit when within bounds", func(t *testing.T) { + args := ListMeasurementsArgs{Limit: 120} + enforceMeasurementsLimit(&args) + assert.Equal(t, uint(120), args.Limit, "limit should remain custom") + t.Log("kept custom limit") + }) + }) +} + +func Test_enforceQueryLimit(t *testing.T) { + t.Run("test_feature", func(t *testing.T) { + t.Run("should wrap sql query and apply limit", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: SQLQueryType, Query: "SELECT * FROM my_table;", Limit: 10} + enforceQueryLimit(&args) + assert.Equal(t, "(SELECT * FROM my_table) LIMIT 10", args.Query, "sql query should be wrapped and limited") + t.Log("applied sql limit") + }) + + t.Run("should apply flux limit", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: FluxQueryType, Query: "from(bucket: \"my-bucket\")", Limit: 20} + enforceQueryLimit(&args) + assert.Equal(t, "from(bucket: \"my-bucket\")\n|> limit(n:20)", args.Query, "flux query should be limited") + t.Log("applied flux limit") + }) + + t.Run("should replace flux limit at absolute end", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: FluxQueryType, Query: "from(bucket: \"my-bucket\") |> limit(n:10)", Limit: 20} + enforceQueryLimit(&args) + assert.Equal(t, "from(bucket: \"my-bucket\") |> limit(n:20)", args.Query, "flux query should have limit replaced") + t.Log("replaced absolute end flux limit") + }) + + t.Run("should append flux limit when existing limit is not at end", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: FluxQueryType, Query: "from(bucket: \"my-bucket\") |> limit(n:10) |> count()", Limit: 20} + enforceQueryLimit(&args) + assert.Equal(t, "from(bucket: \"my-bucket\") |> limit(n:10) |> count()\n|> limit(n:20)", args.Query, "flux query should have another limit appended at end") + t.Log("appended flux limit after transformation") + }) + + t.Run("should handle whitespace and case-insensitivity in flux limit replacement", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: FluxQueryType, Query: "from(bucket: \"my-bucket\") |> LIMIT ( n : 5 ) ", Limit: 20} + enforceQueryLimit(&args) + assert.Equal(t, "from(bucket: \"my-bucket\") |> limit(n:20)", args.Query, "flux query should normalize and replace limit") + t.Log("normalized and replaced flux limit") + }) + + t.Run("should replace influxql limit if exists", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: InfluxQLQueryType, Query: "SELECT * FROM my_table LIMIT 100", Limit: 50} + enforceQueryLimit(&args) + assert.Equal(t, "SELECT * FROM my_table LIMIT 50", args.Query, "influxql limit should be replaced") + t.Log("applied influxql replaced limit") + }) + + t.Run("should apply default limit when no limit passed", func(t *testing.T) { + args := InfluxQueryArgs{QueryType: SQLQueryType, Query: "SELECT * FROM table", Limit: 0} + enforceQueryLimit(&args) + assert.Equal(t, "(SELECT * FROM table) LIMIT 100", args.Query, "sql query should use default limit") + t.Log("applied default sql limit") + }) + }) +} + +func Test_parseTimeRange(t *testing.T) { + t.Run("test_feature", func(t *testing.T) { + t.Run("should parse start and infer default end", func(t *testing.T) { + from, to, err := parseTimeRange("2026-02-02T19:00:00Z", "") + require.NoError(t, err, "should not have error") + assert.NotNil(t, from, "from time should not be nil") + assert.NotNil(t, to, "to time should not be nil") + expectedFrom, _ := time.Parse(time.RFC3339, "2026-02-02T19:00:00Z") + assert.Equal(t, expectedFrom, *from, "from time should match parsed start") + assert.Equal(t, expectedFrom.Add(time.Hour), *to, "to time should be 1 hour after from") + t.Log("parsed time range with start only") + }) + + t.Run("should parse start and end", func(t *testing.T) { + from, to, err := parseTimeRange("2026-02-02T19:00:00Z", "2026-02-02T20:00:00Z") + require.NoError(t, err, "should not have error") + expectedFrom, _ := time.Parse(time.RFC3339, "2026-02-02T19:00:00Z") + expectedTo, _ := time.Parse(time.RFC3339, "2026-02-02T20:00:00Z") + assert.Equal(t, expectedFrom, *from, "from time should match parsed start") + assert.Equal(t, expectedTo, *to, "to time should match parsed end") + t.Log("parsed time range with both start and end") + }) + + t.Run("should handle relative start times", func(t *testing.T) { + from, to, err := parseTimeRange("now-2h", "") + require.NoError(t, err, "should not have error parsing relative time") + assert.NotNil(t, from, "from time should not be nil") + assert.NotNil(t, to, "to time should not be nil") + t.Log("parsed relative time range") + }) + }) +} + +func Test_extractColValues(t *testing.T) { + t.Run("test_response", func(t *testing.T) { + t.Run("should extract values from valid response", func(t *testing.T) { + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ + "A": { + Frames: []grafana.DSQueryFrame{ + { + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{ + {Name: "my_col"}, + }, + }, + Data: grafana.DSQueryFrameData{ + Values: [][]interface{}{ + {"val1", "val2"}, + }, + }, + }, + }, + }, + }, + } + values, err := extractColValues(resp, "my_col") + require.NoError(t, err, "should not have error") + assert.NotNil(t, values, "values should not be nil") + assert.Subset(t, *values, []string{"val1", "val2"}, "values should include extracted strings") + t.Log("extracted valid col values") + }) + + t.Run("should propagate error from result", func(t *testing.T) { + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ + "A": { + Error: "some target error", + }, + }, + } + values, err := extractColValues(resp, "my_col") + assert.Error(t, err, "should have error") + assert.Equal(t, "some target error", err.Error(), "error message should match") + assert.Nil(t, values, "values should be nil") + t.Log("handled error property in result") + }) + }) +} + +func Test_parseQueryResponseFrames(t *testing.T) { + t.Run("test_response", func(t *testing.T) { + t.Run("should parse frames successfully", func(t *testing.T) { + field1 := grafana.DSQueryFrameField{Name: "time"} + field2 := grafana.DSQueryFrameField{Name: "_value", Labels: make(map[string]string)} + field2.Labels["_field"] = "temp" + + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ + "A": { + Frames: []grafana.DSQueryFrame{ + { + Schema: grafana.DSQueryFrameSchema{ + Name: "test_frame", + Fields: []grafana.DSQueryFrameField{ + field1, + field2, + }, + }, + Data: grafana.DSQueryFrameData{ + Values: [][]interface{}{ + {1000, 2000}, + {22.5, 23.0}, + }, + }, + }, + }, + }, + }, + } + frames, err := parseQueryResponseFrames(resp) + require.NoError(t, err, "should not have error") + require.Len(t, frames, 1, "should have 1 frame") + assert.Equal(t, "test_frame", frames[0].Name, "frame name should match") + assert.Subset(t, frames[0].Columns, []string{"time", "temp"}, "columns should be parsed and mapped") + assert.Equal(t, uint(2), frames[0].RowCount, "should have 2 rows") + t.Log("parsed frames successfully") + }) + + t.Run("should return error when results contain error", func(t *testing.T) { + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ + "A": { + Error: "query failed", + }, + }, + } + frames, err := parseQueryResponseFrames(resp) + assert.Error(t, err, "should return error") + assert.Nil(t, frames, "frames should be nil") + t.Log("returned error correctly for failed query") + }) + + t.Run("should return error when no rows", func(t *testing.T) { + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ + "A": { + Frames: []grafana.DSQueryFrame{ + { + Schema: grafana.DSQueryFrameSchema{ + Name: "test_frame", + Fields: []grafana.DSQueryFrameField{ + {Name: "time"}, + }, + }, + Data: grafana.DSQueryFrameData{ + Values: [][]interface{}{}, + }, + }, + }, + }, + }, + } + frames, err := parseQueryResponseFrames(resp) + assert.Error(t, err, "should return error when no rows exist") + assert.True(t, errors.Is(err, grafana.ErrNoRows), "error should be ErrNoRows") + assert.Len(t, frames, 0, "frames should be empty") + t.Log("returned no rows error correctly") + }) + }) +} + +func TestQuoting(t *testing.T) { + t.Run("quoteStringAsFluxLiteral", func(t *testing.T) { + assert.Equal(t, `"standard"`, quoteStringAsFluxLiteral("standard")) + assert.Equal(t, `"with \"quotes\""`, quoteStringAsFluxLiteral(`with "quotes"`)) + assert.Equal(t, `"with \\backslashes\\"`, quoteStringAsFluxLiteral(`with \backslashes\`)) + }) + + t.Run("quoteStringAsInfluxQLIdentifier", func(t *testing.T) { + tests := []struct { + input string + expected string + message string + }{ + { + input: "standard", + expected: `"standard"`, + message: "plain string with no special characters should just be wrapped in double quotes", + }, + { + input: `with "quotes"`, + expected: `"with \"quotes\""`, + message: "double quotes inside string should be escaped as \"", + }, + { + input: `with \backslashes`, + expected: `"with \\backslashes"`, + message: "backslashes should be escaped as \\\\ before quote escaping", + }, + { + input: `trailing\`, + expected: `"trailing\\"`, + message: "trailing backslash must be escaped to prevent unterminated identifier bug", + }, + { + input: `slash\"quote`, + expected: `"slash\\\"quote"`, + message: "backslash immediately before a double quote must both be escaped independently", + }, + { + input: "", + expected: `""`, + message: "empty string should produce a valid empty identifier", + }, + { + input: `"`, + expected: `"\""`, + message: "a lone double quote should be escaped as \"", + }, + { + input: `\`, + expected: `"\\"`, + message: "a lone backslash must be escaped to prevent unterminated identifier", + }, + } + + for _, tt := range tests { + t.Run(tt.message, func(t *testing.T) { + assert.Equal(t, tt.expected, quoteStringAsInfluxQLIdentifier(tt.input), tt.message) + }) + } + }) + + t.Run("quoteStringAsLiteral (SQL style)", func(t *testing.T) { + assert.Equal(t, "'standard'", quoteStringAsLiteral("standard")) + assert.Equal(t, "'it''s a test'", quoteStringAsLiteral("it's a test")) + }) +} + +func TestFindTopLevelSelectAfterCTE(t *testing.T) { + tests := []struct { + name string + query string + wantPos int // -1 if not found, otherwise we just check query[pos:] starts with SELECT + wantSel string // expected string at pos (trimmed, lowercased prefix) + }{ + { + name: "single CTE", + query: `WITH a AS ( + SELECT * FROM orders + ) + SELECT * FROM a`, + wantSel: "SELECT * FROM a", + }, + { + name: "multiple CTEs", + query: `WITH a AS ( + SELECT * FROM orders + ), + b AS ( + SELECT COUNT(*) AS cnt FROM a + ) + SELECT * FROM a JOIN b ON true`, + wantSel: "SELECT * FROM a JOIN b ON true", + }, + { + name: "CTE with nested subquery inside", + query: `WITH a AS ( + SELECT * FROM (SELECT id FROM orders WHERE id IN (SELECT id FROM archive)) sub + ) + SELECT * FROM a`, + wantSel: "SELECT * FROM a", + }, + { + name: "CTE with window function in body", + query: `WITH ranked AS ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn FROM orders + ) + SELECT * FROM ranked WHERE rn = 1`, + wantSel: "SELECT * FROM ranked WHERE rn = 1", + }, + { + name: "not a CTE query", + query: `SELECT * FROM orders`, + wantPos: -1, + }, + { + name: "empty string", + query: ``, + wantPos: -1, + }, + { + name: "CTE with no final select (malformed)", + query: `WITH a AS (SELECT * FROM orders)`, + wantPos: -1, + }, + { + name: "lowercase with", + query: `with a as ( + select * from orders + ) + select * from a`, + wantSel: "select * from a", + }, + { + name: "three CTEs", + query: `WITH a AS (SELECT * FROM t1), + b AS (SELECT * FROM t2), + c AS (SELECT * FROM a JOIN b ON a.id = b.id) + SELECT * FROM c`, + wantSel: "SELECT * FROM c", + }, + { + name: "CTE with deeply nested parens", + query: `WITH a AS ( + SELECT * FROM (SELECT * FROM (SELECT * FROM orders) t1) t2 + ) + SELECT * FROM a`, + wantSel: "SELECT * FROM a", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pos := findTopLevelSelectAfterCTE(tt.query) + + if tt.wantPos == -1 { + if pos != -1 { + t.Log(tt) + t.Errorf("expected -1, got %d", pos) + } + return + } + + if pos == -1 { + t.Fatalf("expected a valid position, got -1") + } + + got := strings.TrimSpace(tt.query[pos:]) + if !strings.EqualFold(got[:6], "select") { + t.Errorf("expected SELECT at pos %d, got: %q", pos, got[:10]) + } + + if tt.wantSel != "" { + gotTrimmed := strings.TrimSpace(got) + wantTrimmed := strings.TrimSpace(tt.wantSel) + if !strings.EqualFold(gotTrimmed, wantTrimmed) { + t.Errorf("select part mismatch:\n got: %q\n want: %q", gotTrimmed, wantTrimmed) + } + } + }) + } +} diff --git a/tools/loki.go b/tools/loki.go index 6d1404c3..c7254bb1 100644 --- a/tools/loki.go +++ b/tools/loki.go @@ -138,7 +138,7 @@ func (c *Client) makeRequest(ctx context.Context, method, urlPath string, params } // Read the response body with a limit to prevent memory issues - body := io.LimitReader(resp.Body, 1024*1024*10) //10MB limit + body := io.LimitReader(resp.Body, 1024*1024*10) // 10MB limit bodyBytes, err := io.ReadAll(body) if err != nil { return nil, fmt.Errorf("reading response body: %w", err) diff --git a/tools/prom_backend_cloudmonitoring.go b/tools/prom_backend_cloudmonitoring.go index 7645a3b3..b9d758d8 100644 --- a/tools/prom_backend_cloudmonitoring.go +++ b/tools/prom_backend_cloudmonitoring.go @@ -11,8 +11,9 @@ import ( "strings" "time" - mcpgrafana "github.com/grafana/mcp-grafana" "github.com/grafana/grafana-openapi-client-go/models" + mcpgrafana "github.com/grafana/mcp-grafana" + "github.com/grafana/mcp-grafana/pkg/grafana" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" ) @@ -273,7 +274,7 @@ func (b *cloudMonitoringBackend) labelValuesViaQuery(ctx context.Context, labelN } // doDSQuery executes a request against Grafana's /api/ds/query endpoint. -func (b *cloudMonitoringBackend) doDSQuery(ctx context.Context, payload map[string]interface{}) (*dsQueryResponse, error) { +func (b *cloudMonitoringBackend) doDSQuery(ctx context.Context, payload map[string]interface{}) (*grafana.DSQueryResponse, error) { payloadBytes, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("marshaling query payload: %w", err) @@ -300,7 +301,7 @@ func (b *cloudMonitoringBackend) doDSQuery(ctx context.Context, payload map[stri return nil, fmt.Errorf("query returned status %d: %s", resp.StatusCode, string(body[:min(len(body), 1024)])) } - var queryResp dsQueryResponse + var queryResp grafana.DSQueryResponse if err := json.Unmarshal(body, &queryResp); err != nil { return nil, fmt.Errorf("unmarshaling response: %w", err) } @@ -359,43 +360,10 @@ type gcpMetricDescriptor struct { ServiceShortName string `json:"serviceShortName,omitempty"` } -// --- /api/ds/query response types --- - -type dsQueryResponse struct { - Results map[string]dsQueryResult `json:"results"` -} - -type dsQueryResult struct { - Status int `json:"status,omitempty"` - Frames []dsQueryFrame `json:"frames,omitempty"` - Error string `json:"error,omitempty"` -} - -type dsQueryFrame struct { - Schema dsQueryFrameSchema `json:"schema"` - Data dsQueryFrameData `json:"data"` -} - -type dsQueryFrameSchema struct { - Name string `json:"name,omitempty"` - RefID string `json:"refId,omitempty"` - Fields []dsQueryFrameField `json:"fields"` -} - -type dsQueryFrameField struct { - Name string `json:"name"` - Type string `json:"type"` - Labels map[string]string `json:"labels,omitempty"` -} - -type dsQueryFrameData struct { - Values [][]interface{} `json:"values"` -} - // --- Frame conversion --- // framesToPrometheusValue converts /api/ds/query response frames to Prometheus model values. -func framesToPrometheusValue(resp *dsQueryResponse, queryType string) (model.Value, error) { +func framesToPrometheusValue(resp *grafana.DSQueryResponse, queryType string) (model.Value, error) { r, ok := resp.Results["A"] if !ok { if queryType == "instant" { @@ -414,7 +382,7 @@ func framesToPrometheusValue(resp *dsQueryResponse, queryType string) (model.Val return framesToMatrix(r.Frames) } -func framesToMatrix(frames []dsQueryFrame) (model.Matrix, error) { +func framesToMatrix(frames []grafana.DSQueryFrame) (model.Matrix, error) { var matrix model.Matrix for _, frame := range frames { timeIdx, valueIdx := findTimeAndValueFields(frame.Schema.Fields) @@ -457,7 +425,7 @@ func framesToMatrix(frames []dsQueryFrame) (model.Matrix, error) { return matrix, nil } -func framesToVector(frames []dsQueryFrame) (model.Vector, error) { +func framesToVector(frames []grafana.DSQueryFrame) (model.Vector, error) { var vector model.Vector for _, frame := range frames { timeIdx, valueIdx := findTimeAndValueFields(frame.Schema.Fields) @@ -497,7 +465,7 @@ func framesToVector(frames []dsQueryFrame) (model.Vector, error) { return vector, nil } -func findTimeAndValueFields(fields []dsQueryFrameField) (timeIdx, valueIdx int) { +func findTimeAndValueFields(fields []grafana.DSQueryFrameField) (timeIdx, valueIdx int) { timeIdx = -1 valueIdx = -1 for i, f := range fields { @@ -549,7 +517,7 @@ func toFloat(v interface{}) (float64, bool) { } // extractLabelNamesFromFrames extracts unique label keys from HEADERS query frames. -func extractLabelNamesFromFrames(resp *dsQueryResponse) []string { +func extractLabelNamesFromFrames(resp *grafana.DSQueryResponse) []string { seen := make(map[string]bool) r, ok := resp.Results["A"] if !ok { @@ -574,7 +542,7 @@ func extractLabelNamesFromFrames(resp *dsQueryResponse) []string { } // extractLabelValuesFromFrames extracts unique values for a label from HEADERS query frames. -func extractLabelValuesFromFrames(resp *dsQueryResponse, labelName string) []string { +func extractLabelValuesFromFrames(resp *grafana.DSQueryResponse, labelName string) []string { seen := make(map[string]bool) r, ok := resp.Results["A"] if !ok { diff --git a/tools/prom_backend_cloudmonitoring_test.go b/tools/prom_backend_cloudmonitoring_test.go index 0b05ffd7..b9eb7a2b 100644 --- a/tools/prom_backend_cloudmonitoring_test.go +++ b/tools/prom_backend_cloudmonitoring_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/grafana/grafana-openapi-client-go/models" + "github.com/grafana/mcp-grafana/pkg/grafana" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -14,16 +15,16 @@ import ( func TestFramesToMatrix(t *testing.T) { t.Run("single series", func(t *testing.T) { - frames := []dsQueryFrame{ + frames := []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ + Schema: grafana.DSQueryFrameSchema{ Name: "cpu_usage", - Fields: []dsQueryFrameField{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"host": "a"}}, }, }, - Data: dsQueryFrameData{ + Data: grafana.DSQueryFrameData{ Values: [][]interface{}{ {float64(1000), float64(2000), float64(3000)}, {float64(0.5), float64(0.7), float64(0.9)}, @@ -43,20 +44,20 @@ func TestFramesToMatrix(t *testing.T) { }) t.Run("multiple series", func(t *testing.T) { - frames := []dsQueryFrame{ + frames := []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ + Schema: grafana.DSQueryFrameSchema{ Name: "cpu", - Fields: []dsQueryFrameField{{Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"host": "a"}}}, + Fields: []grafana.DSQueryFrameField{{Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"host": "a"}}}, }, - Data: dsQueryFrameData{Values: [][]interface{}{{float64(1000)}, {float64(0.5)}}}, + Data: grafana.DSQueryFrameData{Values: [][]interface{}{{float64(1000)}, {float64(0.5)}}}, }, { - Schema: dsQueryFrameSchema{ + Schema: grafana.DSQueryFrameSchema{ Name: "cpu", - Fields: []dsQueryFrameField{{Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"host": "b"}}}, + Fields: []grafana.DSQueryFrameField{{Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"host": "b"}}}, }, - Data: dsQueryFrameData{Values: [][]interface{}{{float64(1000)}, {float64(0.8)}}}, + Data: grafana.DSQueryFrameData{Values: [][]interface{}{{float64(1000)}, {float64(0.8)}}}, }, } @@ -72,12 +73,12 @@ func TestFramesToMatrix(t *testing.T) { }) t.Run("frame missing time field", func(t *testing.T) { - frames := []dsQueryFrame{ + frames := []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ - Fields: []dsQueryFrameField{{Name: "Value", Type: "number"}}, + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{{Name: "Value", Type: "number"}}, }, - Data: dsQueryFrameData{Values: [][]interface{}{{float64(1.0)}}}, + Data: grafana.DSQueryFrameData{Values: [][]interface{}{{float64(1.0)}}}, }, } @@ -89,16 +90,16 @@ func TestFramesToMatrix(t *testing.T) { func TestFramesToVector(t *testing.T) { t.Run("single sample", func(t *testing.T) { - frames := []dsQueryFrame{ + frames := []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ + Schema: grafana.DSQueryFrameSchema{ Name: "up", - Fields: []dsQueryFrameField{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"job": "prometheus"}}, }, }, - Data: dsQueryFrameData{ + Data: grafana.DSQueryFrameData{ Values: [][]interface{}{ {float64(5000)}, {float64(1.0)}, @@ -117,15 +118,15 @@ func TestFramesToVector(t *testing.T) { }) t.Run("takes last value from multi-point frame", func(t *testing.T) { - frames := []dsQueryFrame{ + frames := []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ - Fields: []dsQueryFrameField{ + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number"}, }, }, - Data: dsQueryFrameData{ + Data: grafana.DSQueryFrameData{ Values: [][]interface{}{ {float64(1000), float64(2000), float64(3000)}, {float64(1.0), float64(2.0), float64(3.0)}, @@ -150,7 +151,7 @@ func TestFramesToVector(t *testing.T) { func TestFramesToPrometheusValue(t *testing.T) { t.Run("missing refId returns empty", func(t *testing.T) { - resp := &dsQueryResponse{Results: map[string]dsQueryResult{}} + resp := &grafana.DSQueryResponse{Results: map[string]grafana.DSQueryResult{}} v, err := framesToPrometheusValue(resp, "range") require.NoError(t, err) assert.Equal(t, model.Matrix{}, v) @@ -161,7 +162,7 @@ func TestFramesToPrometheusValue(t *testing.T) { }) t.Run("error in result", func(t *testing.T) { - resp := &dsQueryResponse{Results: map[string]dsQueryResult{ + resp := &grafana.DSQueryResponse{Results: map[string]grafana.DSQueryResult{ "A": {Error: "something went wrong"}, }} _, err := framesToPrometheusValue(resp, "range") @@ -234,29 +235,29 @@ func TestMapGCPMetricKind(t *testing.T) { } func TestExtractLabelValuesFromFrames(t *testing.T) { - resp := &dsQueryResponse{ - Results: map[string]dsQueryResult{ + resp := &grafana.DSQueryResponse{ + Results: map[string]grafana.DSQueryResult{ "A": { - Frames: []dsQueryFrame{ + Frames: []grafana.DSQueryFrame{ { - Schema: dsQueryFrameSchema{ - Fields: []dsQueryFrameField{ + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"zone": "us-east1-b", "project_id": "my-project"}}, }, }, }, { - Schema: dsQueryFrameSchema{ - Fields: []dsQueryFrameField{ + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"zone": "us-west1-a", "project_id": "my-project"}}, }, }, }, { - Schema: dsQueryFrameSchema{ - Fields: []dsQueryFrameField{ + Schema: grafana.DSQueryFrameSchema{ + Fields: []grafana.DSQueryFrameField{ {Name: "Time", Type: "time"}, {Name: "Value", Type: "number", Labels: map[string]string{"zone": "us-east1-b", "project_id": "other-project"}}, }, diff --git a/tools/sift.go b/tools/sift.go index 4499225a..440b83e5 100644 --- a/tools/sift.go +++ b/tools/sift.go @@ -28,7 +28,7 @@ const ( // errorPatternLogExampleLimit controls how many log examples are fetched per error pattern. const errorPatternLogExampleLimit = 3 -const siftResponseLimitBytes = 1024 * 1024 * 10 //10MB +const siftResponseLimitBytes = 1024 * 1024 * 10 // 10MB type analysisStatus string