Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1ec8a10
[TT-16809] Added MCP pump support for mongo, psql, hybrid, prometheus…
lghiur Mar 18, 2026
1a2d024
Merge branch 'master' into TT-16809-Pump-generate-mcp-analytics
andrei-tyk Mar 25, 2026
a299536
TT-16809, covereage and refactoring
andrei-tyk Mar 26, 2026
a2a4e09
TT-16809, covereage and refactoring
andrei-tyk Mar 26, 2026
238656d
TT-16809, covereage and refactoring3
andrei-tyk Mar 26, 2026
ea6c04a
trigger CI
andrei-tyk Mar 26, 2026
35d30df
trigger CI2
andrei-tyk Mar 26, 2026
9bb3229
TT-16809,excluded proto file via CI
andrei-tyk Mar 26, 2026
e5ac48a
trigger CI6
andrei-tyk Mar 26, 2026
d4b5281
Merge branch 'master' into TT-16809-Pump-generate-mcp-analytics
andrei-tyk Apr 9, 2026
d41b925
TT-16809, gate hybrid pump MCP aggregation behind EnableMCPAggregatio…
andrei-tyk Apr 9, 2026
3993156
TT-16809, document MCP custom Prometheus metrics, mcp_only and latenc…
andrei-tyk Apr 9, 2026
dd13241
Revert "TT-16809, document MCP custom Prometheus metrics, mcp_only an…
andrei-tyk Apr 9, 2026
de94185
TT-16809, addrssed comments
andrei-tyk Apr 9, 2026
2c16cbc
TT-16809, fix CI
andrei-tyk Apr 9, 2026
369bd17
TT-16809, bump gotestsum to v1.13.0 for Go 1.25 compatibility
andrei-tyk Apr 9, 2026
34b8ee9
TT-16809, fix for duplicate logging on requests made to non-mcp apis
andrei-tyk Apr 9, 2026
f2c2563
TT-16809, fixed mcp records not being excluded in some non-mcp pumps
andrei-tyk Apr 14, 2026
193b797
TT-16809,feature creep cleanup
andrei-tyk Apr 14, 2026
9643c89
TT-16809, fixed logs
andrei-tyk Apr 15, 2026
3f5d5ce
TT-16809, fixed raw mongo pump
andrei-tyk Apr 15, 2026
04403a3
skip MCP records in MongoAggregatePump WriteData
lghiur Apr 15, 2026
a4340f6
add composite index on tyk_mcp_aggregated for query performance
andrei-tyk Apr 16, 2026
10bc0d2
add sharded table migration for MCP SQL aggregate pump
andrei-tyk Apr 16, 2026
568b199
Merge remote-tracking branch 'origin/master' into TT-16809-Pump-gener…
andrei-tyk Apr 16, 2026
98ca874
Merge branch 'master' into TT-16809-Pump-generate-mcp-analytics
lghiur Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ jobs:
needs: [gotest, golangci]
uses: TykTechnologies/github-actions/.github/workflows/sonarcloud.yaml@d3fa20888fa2878e877e22bb7702141217290e7c # main
with:
exclusions: ""
exclusions: "analytics/proto/**/*.pb.go"
secrets:
GH_TOKEN: ${{ secrets.ORG_GH_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
6 changes: 5 additions & 1 deletion analytics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ func (f *AnalyticsRecordAggregate) AsTimeUpdate() model.DBM {

// We need to create lists of API data so that we can aggregate across the list
// in order to present top-20 style lists of APIs, Tokens etc.
// apis := make([]Counter, 0)
newUpdate["$set"].(model.DBM)["lists.apiid"] = f.getRecords("apiid", f.APIID, newUpdate)

newUpdate["$set"].(model.DBM)["lists.errors"] = f.getRecords("errors", f.Errors, newUpdate)
Expand Down Expand Up @@ -711,6 +710,11 @@ func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList [
continue
}

// We don't want to aggregate MCP Data with REST data - there is a different type for that.
if thisV.IsMCPRecord() {
continue
}

thisAggregate, found := analyticsPerOrg[orgID]

if !found {
Expand Down
165 changes: 165 additions & 0 deletions analytics/aggregate_mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package analytics

import (
"fmt"

"github.com/TykTechnologies/storage/persistent/model"
)

const (
MCPAggregateMixedCollectionName = "tyk_mcp_analytics_aggregate"
AggregateMCPSQLTable = "tyk_mcp_aggregated"
)

// MCPRecordAggregate holds aggregated MCP analytics grouped by API.
// It embeds AnalyticsRecordAggregate for all standard dimensions and adds
// MCP-specific dimension maps for method, primitive type, and primitive name.
type MCPRecordAggregate struct {
AnalyticsRecordAggregate `bson:",inline"`

Methods map[string]*Counter // keyed by JSONRPCMethod
Primitives map[string]*Counter // keyed by PrimitiveType
Names map[string]*Counter // keyed by PrimitiveName
}

// MCPSQLAnalyticsRecordAggregate is the SQL representation of an MCP aggregate record.
type MCPSQLAnalyticsRecordAggregate struct {
ID string `gorm:"primaryKey"`

OrgID string `json:"org_id"`
Dimension string `json:"dimension"`
DimensionValue string `json:"dimension_value"`
APIID string `json:"api_id"`

Counter `json:"counter" gorm:"embedded"`
Code `json:"code" gorm:"embedded"`

TimeStamp int64 `json:"timestamp"`
}

func (m *MCPSQLAnalyticsRecordAggregate) TableName() string {
return AggregateMCPSQLTable
}

// TableName returns the MongoDB collection name for this aggregate.
func (a *MCPRecordAggregate) TableName() string {
if a.Mixed {
return MCPAggregateMixedCollectionName
}
return "z_tyk_mcp_analyticz_aggregate_" + a.OrgID
}

// Dimensions returns all dimensions for MCP records, including MCP-specific
// dimension maps. This is required for AsChange() and AsTimeUpdate() to work.
func (a *MCPRecordAggregate) Dimensions() []Dimension {
dims := a.AnalyticsRecordAggregate.Dimensions()

for key, inc := range a.Methods {
dims = append(dims, Dimension{Name: "methods", Value: key, Counter: fnLatencySetter(inc)})
}

for key, inc := range a.Primitives {
dims = append(dims, Dimension{Name: "primitives", Value: key, Counter: fnLatencySetter(inc)})
}

for key, inc := range a.Names {
dims = append(dims, Dimension{Name: "names", Value: key, Counter: fnLatencySetter(inc)})
}

return dims
}

// NewMCPRecordAggregate creates a new MCPRecordAggregate with all maps initialized.
func NewMCPRecordAggregate() MCPRecordAggregate {
return MCPRecordAggregate{
AnalyticsRecordAggregate: AnalyticsRecordAggregate{}.New(),
Methods: make(map[string]*Counter),
Primitives: make(map[string]*Counter),
Names: make(map[string]*Counter),
}
}

// initMCPAggregateForRecord creates and initialises a new MCPRecordAggregate
// seeded from the first record seen for a given API.
func initMCPAggregateForRecord(record AnalyticsRecord, dbIdentifier string, aggregationTime int) MCPRecordAggregate {
agg := NewMCPRecordAggregate()
asTime := record.TimeStamp
agg.TimeStamp = setAggregateTimestamp(dbIdentifier, asTime, aggregationTime)
agg.ExpireAt = record.ExpireAt
agg.TimeID.Year = asTime.Year()
agg.TimeID.Month = int(asTime.Month())
agg.TimeID.Day = asTime.Day()
agg.TimeID.Hour = asTime.Hour()
agg.OrgID = record.OrgID
agg.LastTime = record.TimeStamp
agg.Total.ErrorMap = make(map[string]int)
return agg
}

// incrementMCPDimensions updates the MCP-specific dimension counters (method,
// primitive type, primitive name) for a single record.
func (a *MCPRecordAggregate) incrementMCPDimensions(counter Counter, rec MCPRecord) {
if method := rec.JSONRPCMethod; method != "" {
c := incrementOrSetUnit(&counter, a.Methods[method])
a.Methods[method] = c
a.Methods[method].Identifier = method
a.Methods[method].HumanIdentifier = method
}

if primType := rec.PrimitiveType; primType != "" {
c := incrementOrSetUnit(&counter, a.Primitives[primType])
a.Primitives[primType] = c
a.Primitives[primType].Identifier = primType
a.Primitives[primType].HumanIdentifier = primType
}

if primName := rec.PrimitiveName; primName != "" {
label := primName
if rec.PrimitiveType != "" {
label = fmt.Sprintf("%s_%s", rec.PrimitiveType, primName)
}
c := incrementOrSetUnit(&counter, a.Names[label])
a.Names[label] = c
a.Names[label].Identifier = label
a.Names[label].HumanIdentifier = primName
}
}

// AsTimeUpdate builds the MongoDB $set document for recalculating averages and lists.
// It extends the base AsTimeUpdate with MCP-specific lists for methods, primitives, and names.
func (a *MCPRecordAggregate) AsTimeUpdate() model.DBM {
newUpdate := a.AnalyticsRecordAggregate.AsTimeUpdate()

newUpdate["$set"].(model.DBM)["lists.methods"] = a.AnalyticsRecordAggregate.getRecords("methods", a.Methods, newUpdate)
newUpdate["$set"].(model.DBM)["lists.primitives"] = a.AnalyticsRecordAggregate.getRecords("primitives", a.Primitives, newUpdate)
newUpdate["$set"].(model.DBM)["lists.names"] = a.AnalyticsRecordAggregate.getRecords("names", a.Names, newUpdate)

return newUpdate
}

// AggregateMCPData collects MCP records into a map of MCPRecordAggregate keyed by APIID.
func AggregateMCPData(data []interface{}, dbIdentifier string, aggregationTime int) map[string]MCPRecordAggregate {
aggregateMap := make(map[string]MCPRecordAggregate)

Check warning on line 142 in analytics/aggregate_mcp.go

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The `AggregateMCPData` function duplicates the high-level structure and initialization logic found in the existing `AggregateData` function in `analytics/aggregate.go`. While the core `incrementAggregate` function is correctly reused, the surrounding boilerplate for iterating data, managing the aggregate map, and initializing new aggregate records is repeated.
Raw output
Refactor the common aggregation workflow into a single, generic function. This function could accept a strategy or configuration object that defines the type-specific logic, such as how to filter records, how to initialize a new aggregate struct, and how to increment type-specific dimensions. This would reduce code duplication and make the aggregation logic easier to extend in the future.

for _, item := range data {
record, ok := item.(AnalyticsRecord)
if !ok || !record.IsMCPRecord() {
continue
}

mcpRec := record.ToMCPRecord()

aggregate, found := aggregateMap[record.APIID]
if !found {
aggregate = initMCPAggregateForRecord(record, dbIdentifier, aggregationTime)
}

var counter Counter
aggregate.AnalyticsRecordAggregate, counter = incrementAggregate(&aggregate.AnalyticsRecordAggregate, &mcpRec.AnalyticsRecord, false, nil)
aggregate.incrementMCPDimensions(counter, mcpRec)

aggregateMap[record.APIID] = aggregate
}

Check warning on line 163 in analytics/aggregate_mcp.go

View check run for this annotation

probelabs / Visor: security

security Issue

The MCP analytics aggregation logic uses fields like `JSONRPCMethod`, `PrimitiveType`, and `PrimitiveName` from incoming analytics records as keys for in-memory maps. These records originate from an upstream source (e.g., Tyk Gateway). If an attacker can craft requests that generate a high number of unique values for these fields (high cardinality), it can lead to unbounded growth of these maps. This can cause excessive memory consumption in the Tyk Pump, leading to performance degradation or a denial-of-service (DoS) crash.
Raw output
To mitigate this, consider implementing a limit on the cardinality of each dimension within a single aggregation window. A configurable threshold could be introduced for the maximum number of unique methods, primitives, and names to be tracked per API. Once the threshold is reached, subsequent new values could be ignored, logged, or grouped into a generic "other" category. This would prevent uncontrolled memory growth and protect the pump from resource exhaustion attacks.
return aggregateMap
}
Loading
Loading