Skip to content

Commit eb80370

Browse files
feat(UI): jetstream summary & raft details (#3466)
Signed-off-by: Surya Singh <suryapratap.personal@gmail.com>
1 parent c1429a1 commit eb80370

17 files changed

Lines changed: 1620 additions & 31 deletions

File tree

server/apis/v1/handler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ type handler struct {
101101
dexObj *DexObject
102102
localUsersAuthObject *LocalUsersAuthObject
103103
healthChecker *HealthChecker
104-
opts *handlerOptions
104+
// httpClient is used for direct calls to pod-local monitor endpoints.
105+
httpClient *http.Client
106+
opts *handlerOptions
105107
}
106108

107109
// NewHandler is used to provide a new instance of the handler type
@@ -143,6 +145,7 @@ func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *Lo
143145
dexObj: dexObj,
144146
localUsersAuthObject: localUsersAuthObject,
145147
healthChecker: NewHealthChecker(ctx),
148+
httpClient: &http.Client{Timeout: 5 * time.Second},
146149
opts: o,
147150
}, nil
148151
}
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
Copyright 2026 The Numaproj Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"net"
24+
"net/http"
25+
"sort"
26+
"strconv"
27+
"time"
28+
29+
"github.com/gin-gonic/gin"
30+
natsserver "github.com/nats-io/nats-server/v2/server"
31+
corev1 "k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
34+
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
35+
)
36+
37+
const jetStreamMonitorPort = 8222
38+
39+
// GetISBServiceJetStream returns JetStream monitor summary and RAFT meta group information for an ISB service.
40+
func (h *handler) GetISBServiceJetStream(c *gin.Context) {
41+
ns, isbsvcName := c.Param("namespace"), c.Param("isb-service")
42+
_, pods, ok := h.getJetStreamISBMonitorPods(c, ns, isbsvcName)
43+
if !ok {
44+
return
45+
}
46+
response := ISBJetStreamDTO{
47+
Summary: []JetStreamSummaryDTO{},
48+
RaftMetaGroup: []JetStreamRaftMetaDTO{},
49+
}
50+
for _, pod := range pods {
51+
jsInfo, err := h.fetchJetStreamInfo(c.Request.Context(), pod)
52+
if err != nil {
53+
response.Errors = append(response.Errors, ISBMonitorErrorDTO{Pod: pod.Name, Message: err.Error()})
54+
continue
55+
}
56+
response.Summary = append(response.Summary, newJetStreamSummaryDTO(pod.Name, jsInfo))
57+
response.RaftMetaGroup = append(response.RaftMetaGroup, newJetStreamRaftMetaDTOs(pod.Name, jsInfo)...)
58+
}
59+
if len(response.Summary) == 0 && len(response.Errors) > 0 {
60+
message := fmt.Sprintf("Failed to fetch JetStream monitor data for interstepbuffer service %q namespace %q", isbsvcName, ns)
61+
c.JSON(http.StatusBadGateway, NewNumaflowAPIResponse(&message, nil))
62+
return
63+
}
64+
response.RaftMetaGroup = dedupeJetStreamRaftMetaDTOs(response.RaftMetaGroup)
65+
sort.Slice(response.Summary, func(i, j int) bool {
66+
return response.Summary[i].Server < response.Summary[j].Server
67+
})
68+
sort.Slice(response.RaftMetaGroup, func(i, j int) bool {
69+
if response.RaftMetaGroup[i].Name != response.RaftMetaGroup[j].Name {
70+
return response.RaftMetaGroup[i].Name < response.RaftMetaGroup[j].Name
71+
}
72+
return response.RaftMetaGroup[i].ID < response.RaftMetaGroup[j].ID
73+
})
74+
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, response))
75+
}
76+
77+
// getJetStreamISBMonitorPods validates that the ISB service is JetStream backed and returns running pods with Pod IPs.
78+
func (h *handler) getJetStreamISBMonitorPods(c *gin.Context, ns, isbsvcName string) (*dfv1.InterStepBufferService, []corev1.Pod, bool) {
79+
isbsvc, err := h.numaflowClient.InterStepBufferServices(ns).Get(c, isbsvcName, metav1.GetOptions{})
80+
if err != nil {
81+
message := fmt.Sprintf("Failed to fetch interstepbuffer service %q namespace %q, %s", isbsvcName, ns, err.Error())
82+
c.JSON(http.StatusNotFound, NewNumaflowAPIResponse(&message, nil))
83+
return nil, nil, false
84+
}
85+
if isbsvc.Spec.JetStream == nil {
86+
message := fmt.Sprintf("Interstepbuffer service %q namespace %q is not JetStream backed", isbsvcName, ns)
87+
c.JSON(http.StatusBadRequest, NewNumaflowAPIResponse(&message, nil))
88+
return nil, nil, false
89+
}
90+
pods, err := h.kubeClient.CoreV1().Pods(ns).List(c, metav1.ListOptions{
91+
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentISBSvc, dfv1.KeyISBSvcName, isbsvcName),
92+
})
93+
if err != nil {
94+
message := fmt.Sprintf("Failed to list JetStream pods for interstepbuffer service %q namespace %q, %s", isbsvcName, ns, err.Error())
95+
c.JSON(http.StatusBadGateway, NewNumaflowAPIResponse(&message, nil))
96+
return nil, nil, false
97+
}
98+
runningPods := make([]corev1.Pod, 0, len(pods.Items))
99+
for _, pod := range pods.Items {
100+
if pod.Status.Phase == corev1.PodRunning && pod.Status.PodIP != "" {
101+
runningPods = append(runningPods, pod)
102+
}
103+
}
104+
if len(runningPods) == 0 {
105+
message := fmt.Sprintf("No running JetStream pods found for interstepbuffer service %q namespace %q", isbsvcName, ns)
106+
c.JSON(http.StatusNotFound, NewNumaflowAPIResponse(&message, nil))
107+
return nil, nil, false
108+
}
109+
sort.Slice(runningPods, func(i, j int) bool {
110+
return runningPods[i].Name < runningPods[j].Name
111+
})
112+
return isbsvc, runningPods, true
113+
}
114+
115+
// fetchJetStreamInfo queries the NATS monitor endpoint on a JetStream pod and decodes the /jsz response.
116+
func (h *handler) fetchJetStreamInfo(ctx context.Context, pod corev1.Pod) (*natsserver.JSInfo, error) {
117+
client := h.httpClient
118+
if client == nil {
119+
client = &http.Client{Timeout: 5 * time.Second}
120+
}
121+
hostPort := net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(jetStreamMonitorPort))
122+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/jsz?accounts=true&streams=true&consumers=true&config=true", hostPort), nil)
123+
if err != nil {
124+
return nil, err
125+
}
126+
resp, err := client.Do(req)
127+
if err != nil {
128+
return nil, err
129+
}
130+
defer func() {
131+
_ = resp.Body.Close()
132+
}()
133+
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
134+
return nil, fmt.Errorf("JetStream monitor returned status %d", resp.StatusCode)
135+
}
136+
var jsInfo natsserver.JSInfo
137+
if err = json.NewDecoder(resp.Body).Decode(&jsInfo); err != nil {
138+
return nil, fmt.Errorf("failed to decode JetStream monitor response: %w", err)
139+
}
140+
return &jsInfo, nil
141+
}
142+
143+
// newJetStreamSummaryDTO converts NATS JetStream monitor stats into the API summary payload.
144+
func newJetStreamSummaryDTO(defaultServer string, jsInfo *natsserver.JSInfo) JetStreamSummaryDTO {
145+
clusterName := ""
146+
metaLeader := false
147+
if jsInfo.Meta != nil {
148+
clusterName = jsInfo.Meta.Name
149+
metaLeader = jsInfo.Meta.Leader == defaultServer || jsInfo.Meta.Peer == jsInfo.ID
150+
}
151+
apiErrorRate := 0.0
152+
if jsInfo.API.Total > 0 {
153+
apiErrorRate = float64(jsInfo.API.Errors) / float64(jsInfo.API.Total)
154+
}
155+
return JetStreamSummaryDTO{
156+
Server: defaultServer,
157+
ServerID: jsInfo.ID,
158+
Cluster: clusterName,
159+
Streams: jsInfo.Streams,
160+
Consumers: jsInfo.Consumers,
161+
Messages: jsInfo.Messages,
162+
Bytes: jsInfo.Bytes,
163+
APIRequests: jsInfo.API.Total,
164+
APIErrors: jsInfo.API.Errors,
165+
APIErrorRate: apiErrorRate,
166+
MetaLeader: metaLeader,
167+
}
168+
}
169+
170+
// newJetStreamRaftMetaDTOs converts NATS meta cluster peer information into RAFT debug rows.
171+
func newJetStreamRaftMetaDTOs(defaultServer string, jsInfo *natsserver.JSInfo) []JetStreamRaftMetaDTO {
172+
if jsInfo.Meta == nil {
173+
return nil
174+
}
175+
if len(jsInfo.Meta.Replicas) == 0 {
176+
id := jsInfo.ID
177+
if id == "" {
178+
id = jsInfo.Meta.Peer
179+
}
180+
return []JetStreamRaftMetaDTO{
181+
{
182+
Name: defaultServer,
183+
ID: id,
184+
Leader: jsInfo.Meta.Leader == defaultServer || jsInfo.Meta.Leader == jsInfo.ID,
185+
Online: true,
186+
},
187+
}
188+
}
189+
current := true
190+
raftMeta := make([]JetStreamRaftMetaDTO, 0, len(jsInfo.Meta.Replicas)+1)
191+
seen := make(map[string]struct{})
192+
if jsInfo.Meta.Leader != "" {
193+
raftMeta = append(raftMeta, JetStreamRaftMetaDTO{
194+
Name: jsInfo.Meta.Leader,
195+
ID: jsInfo.Meta.Peer,
196+
Leader: true,
197+
Current: &current,
198+
Online: true,
199+
})
200+
seen[jsInfo.Meta.Leader] = struct{}{}
201+
if jsInfo.Meta.Peer != "" {
202+
seen[jsInfo.Meta.Peer] = struct{}{}
203+
}
204+
}
205+
for _, replica := range jsInfo.Meta.Replicas {
206+
if replica == nil {
207+
continue
208+
}
209+
if _, ok := seen[replica.Name]; ok {
210+
continue
211+
}
212+
if _, ok := seen[replica.Peer]; ok && replica.Peer != "" {
213+
continue
214+
}
215+
lag := replica.Lag
216+
raftMeta = append(raftMeta, JetStreamRaftMetaDTO{
217+
Name: replica.Name,
218+
ID: replica.Peer,
219+
Leader: replica.Name == jsInfo.Meta.Leader || replica.Peer == jsInfo.Meta.Peer,
220+
Current: &replica.Current,
221+
Online: !replica.Offline,
222+
Active: replica.Active.String(),
223+
Lag: &lag,
224+
})
225+
}
226+
return raftMeta
227+
}
228+
229+
// dedupeJetStreamRaftMetaDTOs removes duplicate peers reported by multiple JetStream pods.
230+
func dedupeJetStreamRaftMetaDTOs(items []JetStreamRaftMetaDTO) []JetStreamRaftMetaDTO {
231+
deduped := make([]JetStreamRaftMetaDTO, 0, len(items))
232+
seenIDs := make(map[string]int, len(items))
233+
seenNames := make(map[string]int, len(items))
234+
for _, item := range items {
235+
existingIndex := -1
236+
if item.ID != "" {
237+
if index, ok := seenIDs[item.ID]; ok {
238+
existingIndex = index
239+
}
240+
}
241+
if existingIndex == -1 && item.Name != "" {
242+
if index, ok := seenNames[item.Name]; ok {
243+
existingIndex = index
244+
}
245+
}
246+
if existingIndex != -1 {
247+
if isRicherJetStreamRaftMetaDTO(item, deduped[existingIndex]) {
248+
deduped[existingIndex] = item
249+
if item.ID != "" {
250+
seenIDs[item.ID] = existingIndex
251+
}
252+
if item.Name != "" {
253+
seenNames[item.Name] = existingIndex
254+
}
255+
}
256+
continue
257+
}
258+
if item.ID != "" {
259+
seenIDs[item.ID] = len(deduped)
260+
}
261+
if item.Name != "" {
262+
seenNames[item.Name] = len(deduped)
263+
}
264+
deduped = append(deduped, item)
265+
}
266+
return deduped
267+
}
268+
269+
func isRicherJetStreamRaftMetaDTO(candidate, existing JetStreamRaftMetaDTO) bool {
270+
return jetStreamRaftMetaDTOScore(candidate) > jetStreamRaftMetaDTOScore(existing)
271+
}
272+
273+
func jetStreamRaftMetaDTOScore(item JetStreamRaftMetaDTO) int {
274+
score := 0
275+
if item.ID != "" {
276+
score++
277+
}
278+
if item.Current != nil {
279+
score++
280+
}
281+
if item.Active != "" {
282+
score++
283+
}
284+
if item.Lag != nil {
285+
score++
286+
}
287+
return score
288+
}

0 commit comments

Comments
 (0)