Skip to content

Commit 68730c3

Browse files
committed
feat: support metrics v2 interface, which is memory mode.
Close: https://project.feishu.cn/taosdata_td/feature/detail/6622579928
1 parent 5f555f1 commit 68730c3

15 files changed

+2021
-11
lines changed

tools/keeper/api/exporter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func TestMain(m *testing.M) {
6363
CreatTables(conf.TDengine.Username, conf.TDengine.Password, conf.TDengine.Host, conf.TDengine.Port, conf.TDengine.Usessl, conf.Metrics.Database.Name, createList)
6464

6565
processor := process.NewProcessor(conf)
66-
node := NewNodeExporter(processor)
66+
memoryStore := process.NewMemoryStore()
67+
node := NewNodeExporter(processor, memoryStore, reporter)
6768
node.Init(router)
6869
m.Run()
6970
if _, err = conn.Exec(ctx, fmt.Sprintf("drop database if exists %s", dbName), util.GetQidOwn(config.Conf.InstanceID)); err != nil {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package api
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"strings"
7+
8+
"github.com/gin-gonic/gin"
9+
"github.com/taosdata/taoskeeper/infrastructure/log"
10+
)
11+
12+
var middlewareLogger = log.GetLogger("METRIC_MIDDLEWARE")
13+
14+
// MetricCacheMiddleware AOP middleware (synchronous version)
15+
func MetricCacheMiddleware(parser *MetricParser) gin.HandlerFunc {
16+
return func(c *gin.Context) {
17+
// Fast path 1: non-POST request
18+
if c.Request.Method != "POST" {
19+
c.Next()
20+
return
21+
}
22+
23+
// Fast path 2: path not matched
24+
path := c.Request.URL.Path
25+
if !shouldCachePath(path) {
26+
c.Next()
27+
return
28+
}
29+
30+
// Read request body
31+
body, err := io.ReadAll(c.Request.Body)
32+
if err != nil {
33+
c.Next()
34+
return
35+
}
36+
c.Request.Body = io.NopCloser(bytes.NewBuffer(body))
37+
38+
// Synchronous parsing (~5µs only, negligible latency)
39+
if err := parser.ParseAndStore(c, body); err != nil {
40+
middlewareLogger.Debugf("Failed to parse metrics: %v", err)
41+
}
42+
43+
c.Next()
44+
}
45+
}
46+
47+
// shouldCachePath matches paths
48+
var cachePathMap = map[string]bool{
49+
"/general-metric": true,
50+
"/taosd-cluster-basic": true,
51+
"/slow-sql-detail-batch": true,
52+
"/adapter_report": true,
53+
}
54+
55+
func shouldCachePath(path string) bool {
56+
for prefix := range cachePathMap {
57+
if strings.HasPrefix(path, prefix) {
58+
return true
59+
}
60+
}
61+
return false
62+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package api
2+
3+
import (
4+
"net/http/httptest"
5+
"strings"
6+
"testing"
7+
8+
"github.com/gin-gonic/gin"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/taosdata/taoskeeper/process"
11+
)
12+
13+
func TestMetricCacheMiddleware_ShouldCachePath(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
path string
17+
expected bool
18+
}{
19+
{
20+
name: "general-metric should cache",
21+
path: "/general-metric",
22+
expected: true,
23+
},
24+
{
25+
name: "taosd-cluster-basic should cache",
26+
path: "/taosd-cluster-basic",
27+
expected: true,
28+
},
29+
{
30+
name: "slow-sql-detail-batch should cache",
31+
path: "/slow-sql-detail-batch",
32+
expected: true,
33+
},
34+
{
35+
name: "adapter_report should cache",
36+
path: "/adapter_report",
37+
expected: true,
38+
},
39+
{
40+
name: "other paths should not cache",
41+
path: "/metrics",
42+
expected: false,
43+
},
44+
{
45+
name: "health check should not cache",
46+
path: "/check_health",
47+
expected: false,
48+
},
49+
}
50+
51+
for _, tt := range tests {
52+
t.Run(tt.name, func(t *testing.T) {
53+
result := shouldCachePath(tt.path)
54+
assert.Equal(t, tt.expected, result)
55+
})
56+
}
57+
}
58+
59+
func TestMetricCacheMiddleware_InterceptAndCache(t *testing.T) {
60+
gin.SetMode(gin.TestMode)
61+
62+
store := process.NewMemoryStore()
63+
parser := NewMetricParser(store, []string{})
64+
65+
// Setup router and middleware
66+
router := gin.New()
67+
router.Use(MetricCacheMiddleware(parser))
68+
router.POST("/general-metric", func(c *gin.Context) {
69+
c.JSON(200, gin.H{"status": "ok"})
70+
})
71+
72+
// Send request
73+
requestBody := `[{
74+
"ts": "1703226836761",
75+
"protocol": 2,
76+
"tables": [{
77+
"name": "taosd_cluster_info",
78+
"metric_groups": [{
79+
"tags": [{"name": "cluster_id", "value": "123"}],
80+
"metrics": [{"name": "dbs_total", "value": 1}]
81+
}]
82+
}]
83+
}]`
84+
85+
req := httptest.NewRequest("POST", "/general-metric", strings.NewReader(requestBody))
86+
req.Header.Set("Content-Type", "application/json")
87+
w := httptest.NewRecorder()
88+
89+
router.ServeHTTP(w, req)
90+
91+
// Verify response
92+
assert.Equal(t, 200, w.Code)
93+
94+
// Synchronous parsing, immediately available
95+
allData := store.GetAll()
96+
assert.Equal(t, 1, len(allData))
97+
assert.Equal(t, "taosd_cluster_info", allData[0].TableName)
98+
}
99+
100+
func TestMetricCacheMiddleware_SkipNonPostRequests(t *testing.T) {
101+
gin.SetMode(gin.TestMode)
102+
103+
store := process.NewMemoryStore()
104+
parser := NewMetricParser(store, []string{})
105+
106+
router := gin.New()
107+
router.Use(MetricCacheMiddleware(parser))
108+
router.GET("/general-metric", func(c *gin.Context) {
109+
c.JSON(200, gin.H{"status": "ok"})
110+
})
111+
112+
req := httptest.NewRequest("GET", "/general-metric", nil)
113+
w := httptest.NewRecorder()
114+
router.ServeHTTP(w, req)
115+
116+
assert.Equal(t, 200, w.Code)
117+
118+
// GET requests should not cache data
119+
allData := store.GetAll()
120+
assert.Equal(t, 0, len(allData))
121+
}
122+
123+
func TestMetricCacheMiddleware_SkipNonMatchingPaths(t *testing.T) {
124+
gin.SetMode(gin.TestMode)
125+
126+
store := process.NewMemoryStore()
127+
parser := NewMetricParser(store, []string{})
128+
129+
router := gin.New()
130+
router.Use(MetricCacheMiddleware(parser))
131+
router.POST("/other-path", func(c *gin.Context) {
132+
c.JSON(200, gin.H{"status": "ok"})
133+
})
134+
135+
requestBody := `{"test": "data"}`
136+
req := httptest.NewRequest("POST", "/other-path", strings.NewReader(requestBody))
137+
req.Header.Set("Content-Type", "application/json")
138+
w := httptest.NewRecorder()
139+
140+
router.ServeHTTP(w, req)
141+
142+
assert.Equal(t, 200, w.Code)
143+
144+
// Non-matching paths should not cache
145+
allData := store.GetAll()
146+
assert.Equal(t, 0, len(allData))
147+
}
148+
149+
func TestMetricCacheMiddleware_PreserveRequestBody(t *testing.T) {
150+
gin.SetMode(gin.TestMode)
151+
152+
store := process.NewMemoryStore()
153+
parser := NewMetricParser(store, []string{})
154+
155+
router := gin.New()
156+
router.Use(MetricCacheMiddleware(parser))
157+
router.POST("/general-metric", func(c *gin.Context) {
158+
// Handler can read request body normally
159+
c.JSON(200, gin.H{"status": "ok"})
160+
})
161+
162+
requestBody := `[{
163+
"ts": "1703226836761",
164+
"protocol": 2,
165+
"tables": [{
166+
"name": "taosd_cluster_info",
167+
"metric_groups": [{
168+
"tags": [{"name": "cluster_id", "value": "123"}],
169+
"metrics": [{"name": "dbs_total", "value": 1}]
170+
}]
171+
}]
172+
}]`
173+
174+
req := httptest.NewRequest("POST", "/general-metric", strings.NewReader(requestBody))
175+
req.Header.Set("Content-Type", "application/json")
176+
w := httptest.NewRecorder()
177+
178+
router.ServeHTTP(w, req)
179+
180+
// Verify handler responds normally (request body correctly restored)
181+
assert.Equal(t, 200, w.Code)
182+
}

0 commit comments

Comments
 (0)