Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
531 changes: 465 additions & 66 deletions docs/en/14-reference/01-components/06-taoskeeper.md

Large diffs are not rendered by default.

534 changes: 467 additions & 67 deletions docs/zh/14-reference/01-components/06-taoskeeper.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion tools/keeper/api/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func TestMain(m *testing.M) {
CreatTables(conf.TDengine.Username, conf.TDengine.Password, conf.TDengine.Host, conf.TDengine.Port, conf.TDengine.Usessl, conf.Metrics.Database.Name, createList)

processor := process.NewProcessor(conf)
node := NewNodeExporter(processor)
memoryStore, _ := process.NewMemoryStore(5 * time.Minute)
defer memoryStore.Close()
node := NewNodeExporter(processor, memoryStore, reporter)
node.Init(router)
m.Run()
if _, err = conn.Exec(ctx, fmt.Sprintf("drop database if exists %s", dbName), util.GetQidOwn(config.Conf.InstanceID)); err != nil {
Expand Down
70 changes: 70 additions & 0 deletions tools/keeper/api/metric_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package api

import (
"bytes"
"io"
"strings"

"github.com/gin-gonic/gin"
"github.com/taosdata/taoskeeper/infrastructure/log"
)

const maxRequestBodySize = 1 << 20 // 1MB - maximum request body size for metric endpoints

var middlewareLogger = log.GetLogger("METRIC_MIDDLEWARE")

// MetricCacheMiddleware AOP middleware (synchronous version)
func MetricCacheMiddleware(parser *MetricParser) gin.HandlerFunc {
return func(c *gin.Context) {
// Fast path 1: non-POST request
if c.Request.Method != "POST" {
c.Next()
return
}

// Fast path 2: path not matched
path := c.Request.URL.Path
if !shouldCachePath(path) {
c.Next()
return
}

// Limit request body size to prevent DoS attacks
limitedReader := io.LimitReader(c.Request.Body, maxRequestBodySize)
body, err := io.ReadAll(limitedReader)
if err != nil {
c.Next()
return
}

// Check if body was truncated (exceeded max size)
if len(body) == maxRequestBodySize {
middlewareLogger.Warn("Request body exceeded 1MB limit, may have been truncated")
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(body))

// Synchronous parsing (~5µs only, negligible latency)
if err := parser.ParseAndStore(c, body); err != nil {
middlewareLogger.Debugf("Failed to parse metrics: %v", err)
}

c.Next()
}
}

// shouldCachePath matches paths
var cachePaths = []string{
"/general-metric",
"/taosd-cluster-basic",
"/slow-sql-detail-batch",
"/adapter_report",
}

func shouldCachePath(path string) bool {
for _, prefix := range cachePaths {
if strings.HasPrefix(path, prefix) {
return true
}
}
return false
}
187 changes: 187 additions & 0 deletions tools/keeper/api/metric_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package api

import (
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/taosdata/taoskeeper/process"
)

func TestMetricCacheMiddleware_ShouldCachePath(t *testing.T) {
tests := []struct {
name string
path string
expected bool
}{
{
name: "general-metric should cache",
path: "/general-metric",
expected: true,
},
{
name: "taosd-cluster-basic should cache",
path: "/taosd-cluster-basic",
expected: true,
},
{
name: "slow-sql-detail-batch should cache",
path: "/slow-sql-detail-batch",
expected: true,
},
{
name: "adapter_report should cache",
path: "/adapter_report",
expected: true,
},
{
name: "other paths should not cache",
path: "/metrics",
expected: false,
},
{
name: "health check should not cache",
path: "/check_health",
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := shouldCachePath(tt.path)
assert.Equal(t, tt.expected, result)
})
}
}

func TestMetricCacheMiddleware_InterceptAndCache(t *testing.T) {
gin.SetMode(gin.TestMode)

store, _ := process.NewMemoryStore(5 * time.Minute)
defer store.Close()
parser := NewMetricParser(store, []string{})

// Setup router and middleware
router := gin.New()
router.Use(MetricCacheMiddleware(parser))
router.POST("/general-metric", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok"})
})

// Send request
requestBody := `[{
"ts": "1703226836761",
"protocol": 2,
"tables": [{
"name": "taosd_cluster_info",
"metric_groups": [{
"tags": [{"name": "cluster_id", "value": "123"}],
"metrics": [{"name": "dbs_total", "value": 1}]
}]
}]
}]`

req := httptest.NewRequest("POST", "/general-metric", strings.NewReader(requestBody))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()

router.ServeHTTP(w, req)

// Verify response
assert.Equal(t, 200, w.Code)

// Synchronous parsing, immediately available
allData := store.GetAllFiltered(time.Unix(0, 0))
assert.Equal(t, 1, len(allData))
assert.Equal(t, "taosd_cluster_info", allData[0].TableName)
}

func TestMetricCacheMiddleware_SkipNonPostRequests(t *testing.T) {
gin.SetMode(gin.TestMode)

store, _ := process.NewMemoryStore(5 * time.Minute)
defer store.Close()
parser := NewMetricParser(store, []string{})

router := gin.New()
router.Use(MetricCacheMiddleware(parser))
router.GET("/general-metric", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok"})
})

req := httptest.NewRequest("GET", "/general-metric", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)

// GET requests should not cache data
allData := store.GetAllFiltered(time.Unix(0, 0))
assert.Equal(t, 0, len(allData))
}

func TestMetricCacheMiddleware_SkipNonMatchingPaths(t *testing.T) {
gin.SetMode(gin.TestMode)

store, _ := process.NewMemoryStore(5 * time.Minute)
defer store.Close()
parser := NewMetricParser(store, []string{})

router := gin.New()
router.Use(MetricCacheMiddleware(parser))
router.POST("/other-path", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok"})
})

requestBody := `{"test": "data"}`
req := httptest.NewRequest("POST", "/other-path", strings.NewReader(requestBody))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()

router.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)

// Non-matching paths should not cache
allData := store.GetAllFiltered(time.Unix(0, 0))
assert.Equal(t, 0, len(allData))
}

func TestMetricCacheMiddleware_PreserveRequestBody(t *testing.T) {
gin.SetMode(gin.TestMode)

store, _ := process.NewMemoryStore(5 * time.Minute)
defer store.Close()
parser := NewMetricParser(store, []string{})

router := gin.New()
router.Use(MetricCacheMiddleware(parser))
router.POST("/general-metric", func(c *gin.Context) {
// Handler can read request body normally
c.JSON(200, gin.H{"status": "ok"})
})

requestBody := `[{
"ts": "1703226836761",
"protocol": 2,
"tables": [{
"name": "taosd_cluster_info",
"metric_groups": [{
"tags": [{"name": "cluster_id", "value": "123"}],
"metrics": [{"name": "dbs_total", "value": 1}]
}]
}]
}]`

req := httptest.NewRequest("POST", "/general-metric", strings.NewReader(requestBody))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()

router.ServeHTTP(w, req)

// Verify handler responds normally (request body correctly restored)
assert.Equal(t, 200, w.Code)
}
Loading
Loading