Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Before adding a new constant, check if it already exists in `ResourceLabels`, `P
- Prefer `List.of()` for empty immutable lists
- Use `LOG.infof()` / `LOG.debugf()` (JBoss logging with format strings)
- No `assert` statements (checkstyle enforced)
- Every file must end with a newline (trailing newline). Verify before committing.

### Javadoc

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright StreamsHub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streamshub.mcp.common.dto;

import java.util.List;

/**
* Result of collecting logs from a list of pods.
*
* @param podNames the names of pods logs were collected from
* @param logs the concatenated log content
* @param errorCount the number of lines containing errors or exceptions
* @param totalLines the total number of log lines collected
*/
public record PodLogsResult(
List<String> podNames,
String logs,
int errorCount,
int totalLines
) {

/**
* Whether any error lines were found in the logs.
*
* @return true if errors were detected
*/
public boolean hasErrors() {
return errorCount > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.streamshub.mcp.common.config.KubernetesConstants;
import io.streamshub.mcp.common.dto.PodLogsResult;
import io.streamshub.mcp.common.dto.PodSummaryResponse;
import io.streamshub.mcp.common.util.InputUtils;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class PodsService {
static final Set<String> FULL_SECTIONS = Set.of("full");
static final Set<String> NO_SECTIONS = Set.of();
private static final Logger LOG = Logger.getLogger(PodsService.class);
private static final int DEFAULT_LOG_TAIL_LINES = 100;

@Inject
KubernetesClient kubernetesClient;
Expand Down Expand Up @@ -477,4 +479,53 @@ private String determineComponentFromPodInfo(String name, Map<String, String> la

return "app";
}

/**
* Collect logs from a list of pods with error analysis.
* Tails the last {@value #DEFAULT_LOG_TAIL_LINES} lines from each pod
* and counts lines containing ERROR or EXCEPTION.
*
* @param namespace the namespace of the pods
* @param pods the list of pods to collect logs from
* @return the aggregated log result
*/
public PodLogsResult collectLogs(final String namespace, final List<Pod> pods) {
List<String> podNames = pods.stream()
.map(pod -> pod.getMetadata().getName())
.toList();

StringBuilder allLogs = new StringBuilder();
int errorCount = 0;
int totalLines = 0;

for (Pod pod : pods) {
String podName = pod.getMetadata().getName();
try {
String podLog = kubernetesClient.pods()
.inNamespace(namespace)
.withName(podName)
.tailingLines(DEFAULT_LOG_TAIL_LINES)
.getLog();

if (podLog != null && !podLog.isEmpty()) {
allLogs.append("=== Pod: ").append(podName).append(" ===\n");
allLogs.append(podLog).append("\n");

String[] lines = podLog.split("\n");
totalLines += lines.length;
for (String line : lines) {
String upperLine = line.toUpperCase(Locale.ENGLISH);
if (upperLine.contains("ERROR") || upperLine.contains("EXCEPTION")) {
errorCount++;
}
}
}
} catch (Exception e) {
LOG.debugf("Could not retrieve logs from pod %s: %s", podName, e.getMessage());
allLogs.append("=== Pod: ").append(podName).append(" === (logs unavailable)\n");
}
}

return new PodLogsResult(podNames, allLogs.toString(), errorCount, totalLines);
}
}
35 changes: 27 additions & 8 deletions strimzi-mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@

A Quarkus application that provides Strimzi Kafka management tools via **MCP (Model Context Protocol)** for AI assistants and automation.

## Features

- **Real Kubernetes Integration**: Live Strimzi operator management with K8s API calls
- **Pure MCP Server**: Standard Model Context Protocol for AI assistants (Claude, etc.)
- **Smart Discovery**: Auto-finds operators and clusters across namespaces
- **Structured Results**: Rich JSON responses with health analysis
- **Lightweight**: No LLM dependencies required

## Prerequisites

### Kubernetes Cluster with Strimzi
Expand Down Expand Up @@ -71,6 +63,33 @@ Use the MCP inspector to browse all available tools and their parameters:
npx @modelcontextprotocol/inspector http://localhost:8080/mcp
```

## Prompt Templates

MCP prompt templates encode Strimzi domain knowledge and guide LLMs through structured diagnostic workflows. Instead of relying on the LLM to figure out what to check, prompts tell it exactly which tools to call and in what order.

| Prompt | Parameters | Description |
|--------|-----------|-------------|
| `diagnose-cluster-issue` | `cluster_name` (required), `namespace`, `symptom` | Step-by-step cluster diagnosis: checks status, node pools, operator logs, pod health, and correlates findings to identify root causes. |
| `troubleshoot-connectivity` | `cluster_name` (required), `namespace`, `listener_name` | Connectivity troubleshooting: checks listeners, bootstrap addresses, listener accessibility by type, and pod health. |

**How they work**: The MCP client discovers available prompts, the user selects one and fills in the parameters, and the client injects the structured instructions into the LLM conversation. The LLM then follows the steps, calling the MCP tools automatically.

## Resource Templates

MCP resource templates expose Strimzi data as structured JSON that clients can attach to conversations for immediate context — without the LLM needing to call tools first.

| Resource URI | Description |
|-------------|-------------|
| `strimzi://cluster/{namespace}/{name}/status` | Kafka cluster status: readiness, version, replicas, listeners, authentication, and storage configuration. |
| `strimzi://cluster/{namespace}/{name}/topology` | Cluster topology: node pools with roles, replica counts, and storage. |
| `strimzi://operator/{namespace}/status` | Strimzi operator deployment status, version, readiness, and uptime. |

### Resource Subscriptions

The server watches Kafka CRs, KafkaNodePool CRs, and Strimzi operator Deployments via Kubernetes watches. When a resource changes (e.g., a cluster goes from Ready to NotReady), subscribed MCP clients receive a `notifications/resources/updated` notification and can re-read the resource to get the latest data.

This enables reactive LLM agents that detect and investigate issues automatically without polling.

## Container Deployment

### Build Container Image
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright StreamsHub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streamshub.mcp.strimzi.dto;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.List;

/**
* Response containing logs from Kafka cluster pods.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @param pods the list of pod names logs were retrieved from
* @param hasErrors whether errors were found in the logs
* @param errorCount the number of error lines found
* @param logLines the total number of log lines retrieved
* @param logs the raw log content
* @param timestamp the time this result was generated
* @param message a human-readable summary of the result
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public record KafkaClusterLogsResponse(
@JsonProperty("cluster_name") String clusterName,
@JsonProperty("namespace") String namespace,
@JsonProperty("pods") List<String> pods,
@JsonProperty("has_errors") boolean hasErrors,
@JsonProperty("error_count") int errorCount,
@JsonProperty("log_lines") int logLines,
@JsonProperty("logs") String logs,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("message") String message
) {

/**
* Creates a successful result with Kafka pod log data.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @param pods the list of pod names
* @param hasErrors whether errors were found
* @param errorCount the number of error lines found
* @param logLines the total number of log lines
* @param logs the raw log content
* @return a response with the log data
*/
public static KafkaClusterLogsResponse of(String clusterName, String namespace, List<String> pods,
boolean hasErrors, int errorCount, int logLines, String logs) {
String msg = hasErrors
? String.format("Found %d errors in logs across %d pods", errorCount, pods.size())
: String.format("Logs retrieved from %d pods (no errors found)", pods.size());
return new KafkaClusterLogsResponse(clusterName, namespace, pods, hasErrors, errorCount,
logLines, logs, Instant.now(), msg);
}

/**
* Creates an empty result when no pods are found for the cluster.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @return an empty response indicating no pods were found
*/
public static KafkaClusterLogsResponse empty(String clusterName, String namespace) {
return new KafkaClusterLogsResponse(clusterName, namespace, List.of(), false, 0, 0, null,
Instant.now(),
String.format("No Kafka pods found for cluster '%s' in namespace '%s'", clusterName, namespace));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright StreamsHub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streamshub.mcp.strimzi.prompt;

import io.quarkiverse.mcp.server.Prompt;
import io.quarkiverse.mcp.server.PromptArg;
import io.quarkiverse.mcp.server.PromptMessage;
import io.quarkiverse.mcp.server.PromptResponse;
import jakarta.inject.Singleton;

import java.util.List;

/**
* MCP prompt template for diagnosing Kafka cluster issues.
*
* <p>Guides the LLM through a structured diagnostic workflow:
* check cluster status, node pools, operator logs, pod health,
* and pod logs to identify root causes.</p>
*/
@Singleton
public class DiagnoseClusterIssuePrompt {

DiagnoseClusterIssuePrompt() {
}

/**
* Generate a diagnostic prompt for a Kafka cluster issue.
*
* @param clusterName the name of the Kafka cluster
* @param namespace the Kubernetes namespace of the cluster
* @param symptom optional description of the observed symptom
* @return prompt response with diagnostic instructions
*/
@Prompt(
name = "diagnose-cluster-issue",
description = "Step-by-step diagnosis of a Kafka cluster issue."
+ " Guides through status checks, operator logs, and pod inspection."
)
public PromptResponse diagnoseClusterIssue(
@PromptArg(
name = "cluster_name",
description = "Name of the Kafka cluster to diagnose."
) final String clusterName,
@PromptArg(
name = "namespace",
description = "Kubernetes namespace where the Kafka cluster is deployed.",
required = false
) final String namespace,
@PromptArg(
name = "symptom",
description = "Observed symptom, e.g. 'NotReady for 15 minutes' or 'pods restarting'.",
required = false
) final String symptom
) {
String nsClause = namespace != null && !namespace.isBlank()
? " in namespace `" + namespace + "`"
: "";
String symptomClause = symptom != null && !symptom.isBlank()
? " The reported symptom is: " + symptom + "."
: "";

String instructions = """
You are diagnosing a Kafka cluster issue for cluster `%s`%s.%s

Follow these steps in order. After each step, analyze the results \
before proceeding to the next.

## Step 1: Check Kafka cluster status
Use `get_kafka_cluster` to retrieve the cluster status and conditions.
Look for: NotReady conditions, stalled reconciliation, \
mismatched observed/expected generation, warning conditions.

## Step 2: Check KafkaNodePool statuses
Use `list_kafka_node_pools` to list all node pools for this cluster.
For any pool that looks unhealthy, use `get_kafka_node_pool` for details.
Look for: pools with fewer ready replicas than expected, \
pools in non-Ready state, role mismatches.

## Step 3: Check Strimzi operator
Use `list_strimzi_operators` to find the operator managing this cluster.
Use `get_strimzi_operator_logs` to read operator logs.
Look for: reconciliation errors, exceptions, warnings related to \
`%s`, repeated error patterns.

## Step 4: Check pod health
Use `get_kafka_cluster_pods` to check all pods for the cluster.
Look for: CrashLoopBackOff, Pending pods, high restart counts, \
pods not in Running phase, containers not ready.

## Step 5: Investigate unhealthy pods
For any unhealthy pods found in Step 4, use `get_strimzi_operator_pod` \
or the appropriate pod detail tool to get environment, resources, and conditions.
Look for: OOMKilled termination reason, resource limits that are too low, \
missing volumes, failed liveness/readiness probes.

## Step 6: Correlate and summarize
Correlate the findings from all steps.
Distinguish between:
- Operator-initiated changes (rolling updates, certificate renewal, configuration changes)
- Infrastructure failures (OOM, disk full, node issues)
- Configuration errors (invalid resource specs, missing secrets)

Provide a clear summary of the root cause and actionable recommendations.\
""".formatted(clusterName, nsClause, symptomClause, clusterName);

return PromptResponse.withMessages(List.of(
PromptMessage.withUserRole(instructions)
));
}
}
Loading