diff --git a/plugins/common/aliyun/auth.go b/plugins/common/aliyun/auth.go new file mode 100644 index 0000000000000..b8e89c23c47d0 --- /dev/null +++ b/plugins/common/aliyun/auth.go @@ -0,0 +1,62 @@ +package aliyun + +import ( + "fmt" + + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers" +) + +// CredentialConfig holds Aliyun authentication configuration +type CredentialConfig struct { + AccessKeyID string + AccessKeySecret string + AccessKeyStsToken string + RoleArn string + RoleSessionName string + PrivateKey string + PublicKeyID string + RoleName string +} + +// GetCredentials retrieves Aliyun credentials using the credential chain +// Credentials are loaded in the following order: +// 1) Ram RoleArn credential +// 2) AccessKey STS token credential +// 3) AccessKey credential +// 4) Ecs Ram Role credential +// 5) RSA keypair credential +// 6) Environment variables credential +// 7) Instance metadata credential +func GetCredentials(config CredentialConfig) (auth.Credential, error) { + var ( + roleSessionExpiration = 3600 + sessionExpiration = 3600 + ) + + configuration := &providers.Configuration{ + AccessKeyID: config.AccessKeyID, + AccessKeySecret: config.AccessKeySecret, + AccessKeyStsToken: config.AccessKeyStsToken, + RoleArn: config.RoleArn, + RoleSessionName: config.RoleSessionName, + RoleSessionExpiration: &roleSessionExpiration, + PrivateKey: config.PrivateKey, + PublicKeyID: config.PublicKeyID, + SessionExpiration: &sessionExpiration, + RoleName: config.RoleName, + } + + credentialProviders := []providers.Provider{ + providers.NewConfigurationCredentialProvider(configuration), + providers.NewEnvCredentialProvider(), + providers.NewInstanceMetadataProvider(), + } + + credential, err := providers.NewChainProvider(credentialProviders).Retrieve() + if err != nil { + return nil, fmt.Errorf("failed to retrieve credential: %w", err) + } + + return credential, nil +} diff --git a/plugins/common/aliyun/auth_test.go b/plugins/common/aliyun/auth_test.go new file mode 100644 index 0000000000000..aa6eb07e0f9f2 --- /dev/null +++ b/plugins/common/aliyun/auth_test.go @@ -0,0 +1,105 @@ +package aliyun + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetCredentials(t *testing.T) { + tests := []struct { + name string + config CredentialConfig + expectError bool + }{ + { + name: "with access key credentials", + config: CredentialConfig{ + AccessKeyID: "test-key-id", + AccessKeySecret: "test-key-secret", + }, + expectError: false, + }, + { + name: "with access key and STS token", + config: CredentialConfig{ + AccessKeyID: "test-key-id", + AccessKeySecret: "test-key-secret", + AccessKeyStsToken: "test-sts-token", + }, + expectError: false, + }, + { + name: "with role ARN", + config: CredentialConfig{ + AccessKeyID: "test-key-id", + AccessKeySecret: "test-key-secret", + RoleArn: "acs:ram::123456:role/test-role", + RoleSessionName: "test-session", + }, + expectError: false, + }, + { + name: "with RSA keypair", + config: CredentialConfig{ + PublicKeyID: "test-public-key-id", + PrivateKey: "-----BEGIN RSA PRIVATE KEY-----\ntest\n-----END RSA PRIVATE KEY-----", + }, + expectError: true, + }, + { + name: "with ECS RAM role name", + config: CredentialConfig{ + RoleName: "test-ecs-role", + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cred, err := GetCredentials(tt.config) + + if tt.expectError { + if err == nil { + require.NotNil(t, cred) + } + } else { + require.NoError(t, err) + require.NotNil(t, cred) + } + }) + } +} + +func TestGetCredentialsEmpty(t *testing.T) { + cred, err := GetCredentials(CredentialConfig{}) + + if err != nil { + require.Contains(t, err.Error(), "failed to retrieve credential") + } else { + require.NotNil(t, cred) + } +} + +func TestCredentialConfigFields(t *testing.T) { + config := CredentialConfig{ + AccessKeyID: "key-id", + AccessKeySecret: "key-secret", + AccessKeyStsToken: "sts-token", + RoleArn: "role-arn", + RoleSessionName: "session-name", + PrivateKey: "private-key", + PublicKeyID: "public-key-id", + RoleName: "role-name", + } + + require.Equal(t, "key-id", config.AccessKeyID) + require.Equal(t, "key-secret", config.AccessKeySecret) + require.Equal(t, "sts-token", config.AccessKeyStsToken) + require.Equal(t, "role-arn", config.RoleArn) + require.Equal(t, "session-name", config.RoleSessionName) + require.Equal(t, "private-key", config.PrivateKey) + require.Equal(t, "public-key-id", config.PublicKeyID) + require.Equal(t, "role-name", config.RoleName) +} diff --git a/plugins/common/aliyun/discovery.go b/plugins/common/aliyun/discovery.go new file mode 100644 index 0000000000000..82343673bd427 --- /dev/null +++ b/plugins/common/aliyun/discovery.go @@ -0,0 +1,285 @@ +package aliyun + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strconv" + "sync" + "time" + + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/limiter" +) + +// DiscoveryRequest is a marker interface for Aliyun discovery requests +type DiscoveryRequest interface{} + +// AliyunSdkClient is an interface for Aliyun SDK clients +type AliyunSdkClient interface { + ProcessCommonRequest(req *requests.CommonRequest) (response *responses.CommonResponse, err error) +} + +// DiscoveryTool provides automatic discovery of Aliyun resources +type DiscoveryTool struct { + Req map[string]DiscoveryRequest // Discovery request per region + RateLimit int // Rate limit for API queries + ReqDefaultPageSize int // Default page size for pagination + Cli map[string]AliyunSdkClient // API client per region + + RespRootKey string // Root key in JSON response for discovery data + RespObjectIDKey string // Key for object ID in discovery data + + wg sync.WaitGroup // WaitGroup for discovery goroutine + Interval time.Duration // Discovery interval + done chan bool // Channel to stop discovery + DataChan chan map[string]interface{} // Channel for discovery data + Lg telegraf.Logger // Logger +} + +// ParsedDiscoveryResponse contains parsed discovery response data +type ParsedDiscoveryResponse struct { + Data []interface{} + TotalCount int + PageSize int + PageNumber int +} + +// getRPCReqFromDiscoveryRequest extracts RpcRequest from a discovery request +func getRPCReqFromDiscoveryRequest(req DiscoveryRequest) (*requests.RpcRequest, error) { + if reflect.ValueOf(req).Type().Kind() != reflect.Ptr || + reflect.ValueOf(req).IsNil() { + return nil, fmt.Errorf("unexpected type of the discovery request object: %q, %q", + reflect.ValueOf(req).Type(), reflect.ValueOf(req).Kind()) + } + + ptrV := reflect.Indirect(reflect.ValueOf(req)) + + for i := 0; i < ptrV.NumField(); i++ { + if ptrV.Field(i).Type().String() == "*requests.RpcRequest" { + if !ptrV.Field(i).CanInterface() { + return nil, fmt.Errorf("can't get interface of %q", ptrV.Field(i)) + } + + rpcReq, ok := ptrV.Field(i).Interface().(*requests.RpcRequest) + if !ok { + return nil, fmt.Errorf("can't convert interface of %q to '*requests.RpcRequest' type", + ptrV.Field(i).Interface()) + } + + return rpcReq, nil + } + } + return nil, fmt.Errorf("didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type()) +} + +// parseDiscoveryResponse parses the discovery API response +func (dt *DiscoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (*ParsedDiscoveryResponse, error) { + var ( + fullOutput = make(map[string]interface{}) + data []byte + foundDataItem bool + foundRootKey bool + pdResp = &ParsedDiscoveryResponse{} + ) + + data = resp.GetHttpContentBytes() + if data == nil { + return nil, errors.New("no data in response to be parsed") + } + + if err := json.Unmarshal(data, &fullOutput); err != nil { + return nil, fmt.Errorf("can't parse JSON from discovery response: %w", err) + } + + for key, val := range fullOutput { + switch key { + case dt.RespRootKey: + foundRootKey = true + rootKeyVal, ok := val.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("content of root key %q, is not an object: %q", key, val) + } + + // Find the array with discovered data + for _, item := range rootKeyVal { + if pdResp.Data, foundDataItem = item.([]interface{}); foundDataItem { + break + } + } + if !foundDataItem { + return nil, fmt.Errorf("didn't find array item in root key %q", key) + } + case "TotalCount", "TotalRecordCount": + pdResp.TotalCount = int(val.(float64)) + case "PageSize", "PageRecordCount": + pdResp.PageSize = int(val.(float64)) + case "PageNumber": + pdResp.PageNumber = int(val.(float64)) + } + } + if !foundRootKey { + return nil, fmt.Errorf("didn't find root key %q in discovery response", dt.RespRootKey) + } + + return pdResp, nil +} + +// getDiscoveryData retrieves discovery data from a single region with pagination +func (dt *DiscoveryTool) getDiscoveryData(cli AliyunSdkClient, req *requests.CommonRequest, lmtr chan bool) (map[string]interface{}, error) { + var ( + err error + resp *responses.CommonResponse + pDResp *ParsedDiscoveryResponse + discoveryData []interface{} + totalCount int + pageNumber int + ) + defer delete(req.QueryParams, "PageNumber") + + for { + if lmtr != nil { + <-lmtr // Rate limiting + } + + resp, err = cli.ProcessCommonRequest(req) + if err != nil { + return nil, err + } + + pDResp, err = dt.parseDiscoveryResponse(resp) + if err != nil { + return nil, err + } + discoveryData = append(discoveryData, pDResp.Data...) + pageNumber = pDResp.PageNumber + totalCount = pDResp.TotalCount + + // Pagination + pageNumber++ + req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber) + + if len(discoveryData) == totalCount { + // Map data to the appropriate shape before return + preparedData := make(map[string]interface{}, len(discoveryData)) + + for _, raw := range discoveryData { + elem, ok := raw.(map[string]interface{}) + if !ok { + return nil, errors.New("can't parse input data element, not a map[string]interface{} type") + } + if objectID, ok := elem[dt.RespObjectIDKey].(string); ok { + preparedData[objectID] = elem + } + } + return preparedData, nil + } + } +} + +// GetDiscoveryDataAcrossRegions retrieves discovery data from all configured regions +func (dt *DiscoveryTool) GetDiscoveryDataAcrossRegions(lmtr chan bool) (map[string]interface{}, error) { + resultData := make(map[string]interface{}) + + for region, cli := range dt.Cli { + dscReq, ok := dt.Req[region] + if !ok { + return nil, fmt.Errorf("error building common discovery request: not valid region %q", region) + } + + rpcReq, err := getRPCReqFromDiscoveryRequest(dscReq) + if err != nil { + return nil, err + } + + commonRequest := requests.NewCommonRequest() + commonRequest.Method = rpcReq.GetMethod() + commonRequest.Product = rpcReq.GetProduct() + commonRequest.Domain = rpcReq.GetDomain() + commonRequest.Version = rpcReq.GetVersion() + commonRequest.Scheme = rpcReq.GetScheme() + commonRequest.ApiName = rpcReq.GetActionName() + commonRequest.QueryParams = rpcReq.QueryParams + commonRequest.QueryParams["PageSize"] = strconv.Itoa(dt.ReqDefaultPageSize) + commonRequest.TransToAcsRequest() + + // Get discovery data using a common request + data, discoveryDataErr := dt.getDiscoveryData(cli, commonRequest, lmtr) + if discoveryDataErr != nil { + return nil, discoveryDataErr + } + + for k, v := range data { + resultData[k] = v + } + } + return resultData, nil +} + +// Start begins the discovery polling loop +func (dt *DiscoveryTool) Start() { + var ( + err error + data map[string]interface{} + lastData map[string]interface{} + ) + + dt.done = make(chan bool) + + dt.wg.Add(1) + go func() { + defer dt.wg.Done() + + ticker := time.NewTicker(dt.Interval) + defer ticker.Stop() + + lmtr := limiter.NewRateLimiter(dt.RateLimit, time.Second) + defer lmtr.Stop() + + for { + select { + case <-dt.done: + return + case <-ticker.C: + data, err = dt.GetDiscoveryDataAcrossRegions(lmtr.C) + if err != nil { + dt.Lg.Errorf("Can't get discovery data: %v", err) + continue + } + + if !reflect.DeepEqual(data, lastData) { + lastData = make(map[string]interface{}, len(data)) + for k, v := range data { + lastData[k] = v + } + + // Send discovery data in blocking mode + dt.DataChan <- data + } + } + } + }() +} + +// Stop stops the discovery loop +func (dt *DiscoveryTool) Stop() { + close(dt.done) + + // Shutdown timer + timer := time.NewTimer(time.Second * 3) + defer timer.Stop() +L: + for { + select { + case <-timer.C: + break L + case <-dt.DataChan: + } + } + + dt.wg.Wait() +} diff --git a/plugins/common/aliyun/discovery_test.go b/plugins/common/aliyun/discovery_test.go new file mode 100644 index 0000000000000..a8b8c27b271f8 --- /dev/null +++ b/plugins/common/aliyun/discovery_test.go @@ -0,0 +1,138 @@ +package aliyun + +import ( + "testing" + + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/stretchr/testify/require" +) + +// Mock types for testing +type mockDiscoveryRequest struct { + *requests.RpcRequest +} + +func TestDiscoveryToolFields(t *testing.T) { + dt := &DiscoveryTool{ + Req: map[string]DiscoveryRequest{}, + Cli: map[string]AliyunSdkClient{}, + RespRootKey: "TestRoot", + RespObjectIDKey: "TestID", + RateLimit: 100, + ReqDefaultPageSize: 20, + DataChan: make(chan map[string]interface{}, 1), + } + + require.NotNil(t, dt.Req) + require.NotNil(t, dt.Cli) + require.Equal(t, "TestRoot", dt.RespRootKey) + require.Equal(t, "TestID", dt.RespObjectIDKey) + require.Equal(t, 100, dt.RateLimit) + require.Equal(t, 20, dt.ReqDefaultPageSize) + require.NotNil(t, dt.DataChan) +} + +func TestParsedDiscoveryResponseFields(t *testing.T) { + parsed := &ParsedDiscoveryResponse{ + Data: []interface{}{map[string]interface{}{"id": "test"}}, + TotalCount: 10, + PageSize: 5, + PageNumber: 2, + } + + require.Len(t, parsed.Data, 1) + require.Equal(t, 10, parsed.TotalCount) + require.Equal(t, 5, parsed.PageSize) + require.Equal(t, 2, parsed.PageNumber) +} + +func TestGetRPCReqFromDiscoveryRequest(t *testing.T) { + tests := []struct { + name string + req DiscoveryRequest + expectError bool + }{ + { + name: "valid RPC request", + req: &mockDiscoveryRequest{ + RpcRequest: &requests.RpcRequest{}, + }, + expectError: false, + }, + { + name: "nil request", + req: (*mockDiscoveryRequest)(nil), + expectError: true, + }, + { + name: "struct without RpcRequest field", + req: &struct { + Field string + }{Field: "test"}, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rpcReq, err := getRPCReqFromDiscoveryRequest(tt.req) + if tt.expectError { + require.Error(t, err) + require.Nil(t, rpcReq) + } else { + require.NoError(t, err) + require.NotNil(t, rpcReq) + } + }) + } +} + +func TestDiscoveryInterfaces(t *testing.T) { + var req DiscoveryRequest + + req = &mockDiscoveryRequest{RpcRequest: &requests.RpcRequest{}} + require.NotNil(t, req) + + req = "string" + require.NotNil(t, req) + + req = 123 + require.NotNil(t, req) +} + +func TestDiscoveryToolCreation(t *testing.T) { + dataChan := make(chan map[string]interface{}, 10) + + dt := &DiscoveryTool{ + Req: make(map[string]DiscoveryRequest), + Cli: make(map[string]AliyunSdkClient), + RespRootKey: "Instances", + RespObjectIDKey: "InstanceId", + RateLimit: 50, + ReqDefaultPageSize: 100, + DataChan: dataChan, + } + + require.NotNil(t, dt) + require.Empty(t, dt.Req) + require.Empty(t, dt.Cli) + require.Equal(t, "Instances", dt.RespRootKey) + require.Equal(t, "InstanceId", dt.RespObjectIDKey) + require.Equal(t, 50, dt.RateLimit) + require.Equal(t, 100, dt.ReqDefaultPageSize) + require.Equal(t, dataChan, dt.DataChan) +} + +func TestDiscoveryRequestMapOperations(t *testing.T) { + reqMap := make(map[string]DiscoveryRequest) + + reqMap["region1"] = &mockDiscoveryRequest{RpcRequest: &requests.RpcRequest{}} + reqMap["region2"] = "string-request" + + require.Len(t, reqMap, 2) + require.NotNil(t, reqMap["region1"]) + require.NotNil(t, reqMap["region2"]) + + delete(reqMap, "region1") + require.Len(t, reqMap, 1) +} diff --git a/plugins/common/aliyun/regions.go b/plugins/common/aliyun/regions.go new file mode 100644 index 0000000000000..d49665fbd80e2 --- /dev/null +++ b/plugins/common/aliyun/regions.go @@ -0,0 +1,34 @@ +package aliyun + +// aliyunRegionList contains all supported Aliyun regions +// Source: https://www.alibabacloud.com/help/doc-detail/40654.htm +var aliyunRegionList = []string{ + "cn-qingdao", + "cn-beijing", + "cn-zhangjiakou", + "cn-huhehaote", + "cn-hangzhou", + "cn-shanghai", + "cn-shenzhen", + "cn-heyuan", + "cn-chengdu", + "cn-hongkong", + "ap-southeast-1", + "ap-southeast-2", + "ap-southeast-3", + "ap-southeast-5", + "ap-south-1", + "ap-northeast-1", + "us-west-1", + "us-east-1", + "eu-central-1", + "eu-west-1", + "me-east-1", +} + +// DefaultRegions returns a copy of the default region list +func DefaultRegions() []string { + regions := make([]string, len(aliyunRegionList)) + copy(regions, aliyunRegionList) + return regions +} diff --git a/plugins/common/aliyun/regions_test.go b/plugins/common/aliyun/regions_test.go new file mode 100644 index 0000000000000..232f98ef558f4 --- /dev/null +++ b/plugins/common/aliyun/regions_test.go @@ -0,0 +1,66 @@ +package aliyun + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAliyunRegionList(t *testing.T) { + require.NotEmpty(t, aliyunRegionList, "AliyunRegionList should not be empty") + + expectedRegions := []string{ + "cn-qingdao", + "cn-beijing", + "cn-hangzhou", + "cn-shanghai", + "cn-shenzhen", + "cn-hongkong", + "us-west-1", + "us-east-1", + "eu-central-1", + "ap-southeast-1", + } + + for _, expectedRegion := range expectedRegions { + require.Contains(t, aliyunRegionList, expectedRegion, "Region list should contain %s", expectedRegion) + } +} + +func TestDefaultRegions(t *testing.T) { + regions := DefaultRegions() + + require.NotEmpty(t, regions) + require.Len(t, regions, len(aliyunRegionList)) + + regions[0] = "modified-region" + regions = append(regions, "new-region") + + require.NotEqual(t, "modified-region", aliyunRegionList[0]) + require.Len(t, aliyunRegionList, len(regions)-1) +} + +func TestDefaultRegionsContent(t *testing.T) { + regions := DefaultRegions() + + for i, region := range aliyunRegionList { + require.Equal(t, region, regions[i], "Region at index %d should match", i) + } +} + +func TestRegionListCompleteness(t *testing.T) { + require.Len(t, aliyunRegionList, 21, "Should have 21 regions") + + regionSet := make(map[string]bool) + for _, region := range aliyunRegionList { + require.False(t, regionSet[region], "Region %s should not be duplicated", region) + regionSet[region] = true + } +} + +func TestRegionFormat(t *testing.T) { + for _, region := range aliyunRegionList { + require.NotEmpty(t, region, "Region should not be empty") + require.Contains(t, region, "-", "Region should contain a hyphen: %s", region) + } +} diff --git a/plugins/inputs/aliyuncms/README.md b/plugins/inputs/aliyuncms/README.md index 85c915ce02670..2bb533ac6ceae 100644 --- a/plugins/inputs/aliyuncms/README.md +++ b/plugins/inputs/aliyuncms/README.md @@ -81,6 +81,7 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ## - acs_rds_dashboard ## - acs_slb_dashboard ## - acs_vpc_eip + ## Note: RDS Performance metrics are now supported via the dedicated aliyunrds plugin regions = ["cn-hongkong"] ## Requested AliyunCMS aggregation Period (required) @@ -178,7 +179,7 @@ Plugin Configuration utilizes [preset metric items references][2] ## Metrics Each Aliyun CMS Project monitored records a measurement with fields for each -available Metric Statistic Project and Metrics are represented in [snake +available Metric Statistic Project, and Metrics are represented in [snake case](https://en.wikipedia.org/wiki/Snake_case) - aliyuncms_{project} @@ -187,6 +188,11 @@ case](https://en.wikipedia.org/wiki/Snake_case) - {metric}_maximum (metric Maximum value) - {metric}_value (metric Value value) +## RDS Performance metrics + +You can use the dedicated [aliyunrds](../aliyunrds/README.md) plugin to +collect RDS Performance metrics. + ## Example Output ```text diff --git a/plugins/inputs/aliyuncms/aliyuncms.go b/plugins/inputs/aliyuncms/aliyuncms.go index 83286b5722c8b..f84e650306419 100644 --- a/plugins/inputs/aliyuncms/aliyuncms.go +++ b/plugins/inputs/aliyuncms/aliyuncms.go @@ -12,7 +12,6 @@ import ( "time" "github.com/aliyun/alibaba-cloud-sdk-go/sdk" - "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms" "github.com/jmespath/go-jmespath" @@ -20,6 +19,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/limiter" + common_aliyun "github.com/influxdata/telegraf/plugins/common/aliyun" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -47,7 +47,8 @@ type ( Log telegraf.Logger `toml:"-"` - client aliyuncmsClient + cmsClient aliyuncmsClient + windowStart time.Time windowEnd time.Time dt *discoveryTool @@ -55,8 +56,6 @@ type ( discoveryData map[string]interface{} measurement string } - - // metric describes what metrics to get metric struct { ObjectsFilter string `toml:"objects_filter"` MetricNames []string `toml:"names"` @@ -67,77 +66,42 @@ type ( dtLock sync.Mutex // Guard for discoveryTags & dimensions discoveryTags map[string]map[string]string // Internal data structure that can enrich metrics with tags dimensionsUdObj map[string]string - dimensionsUdArr []map[string]string // Parsed Dimesnsions JSON string (unmarshalled) - requestDimensions []map[string]string // this is the actual dimensions list that would be used in API request + dimensionsUdArr []map[string]string // Parsed Dimensions JSON string (unmarshalled) + requestDimensions []map[string]string // this is the actual dimensions list that would be used in the API request requestDimensionsStr string // String representation of the above } - aliyuncmsClient interface { DescribeMetricList(request *cms.DescribeMetricListRequest) (response *cms.DescribeMetricListResponse, err error) } ) -// https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB -var aliyunRegionList = []string{ - "cn-qingdao", - "cn-beijing", - "cn-zhangjiakou", - "cn-huhehaote", - "cn-hangzhou", - "cn-shanghai", - "cn-shenzhen", - "cn-heyuan", - "cn-chengdu", - "cn-hongkong", - "ap-southeast-1", - "ap-southeast-2", - "ap-southeast-3", - "ap-southeast-5", - "ap-south-1", - "ap-northeast-1", - "us-west-1", - "us-east-1", - "eu-central-1", - "eu-west-1", - "me-east-1", -} - func (*AliyunCMS) SampleConfig() string { return sampleConfig } +// Init performs checks of plugin inputs and initializes internals func (s *AliyunCMS) Init() error { if s.Project == "" { return errors.New("project is not set") } - var ( - roleSessionExpiration = 600 - sessionExpiration = 600 - ) - configuration := &providers.Configuration{ - AccessKeyID: s.AccessKeyID, - AccessKeySecret: s.AccessKeySecret, - AccessKeyStsToken: s.AccessKeyStsToken, - RoleArn: s.RoleArn, - RoleSessionName: s.RoleSessionName, - RoleSessionExpiration: &roleSessionExpiration, - PrivateKey: s.PrivateKey, - PublicKeyID: s.PublicKeyID, - SessionExpiration: &sessionExpiration, - RoleName: s.RoleName, - } - credentialProviders := []providers.Provider{ - providers.NewConfigurationCredentialProvider(configuration), - providers.NewEnvCredentialProvider(), - providers.NewInstanceMetadataProvider(), - } - credential, err := providers.NewChainProvider(credentialProviders).Retrieve() + // Get credentials using common library + credential, err := common_aliyun.GetCredentials(common_aliyun.CredentialConfig{ + AccessKeyID: s.AccessKeyID, + AccessKeySecret: s.AccessKeySecret, + AccessKeyStsToken: s.AccessKeyStsToken, + RoleArn: s.RoleArn, + RoleSessionName: s.RoleSessionName, + PrivateKey: s.PrivateKey, + PublicKeyID: s.PublicKeyID, + RoleName: s.RoleName, + }) if err != nil { - return fmt.Errorf("failed to retrieve credential: %w", err) + return err } - s.client, err = cms.NewClientWithOptions("", sdk.NewConfig(), credential) + + s.cmsClient, err = cms.NewClientWithOptions("", sdk.NewConfig(), credential) if err != nil { return fmt.Errorf("failed to create cms client: %w", err) } @@ -167,7 +131,7 @@ func (s *AliyunCMS) Init() error { // Check regions if len(s.Regions) == 0 { - s.Regions = aliyunRegionList + s.Regions = common_aliyun.DefaultRegions() s.Log.Infof("'regions' is not set. Metrics will be queried across %d regions:\n%s", len(s.Regions), strings.Join(s.Regions, ",")) } @@ -182,7 +146,7 @@ func (s *AliyunCMS) Init() error { } } - s.discoveryData, err = s.dt.getDiscoveryDataAcrossRegions(nil) + s.discoveryData, err = s.dt.GetDiscoveryDataAcrossRegions(nil) if err != nil { s.Log.Errorf("Discovery tool is not activated: %v", err) s.dt = nil @@ -191,7 +155,7 @@ func (s *AliyunCMS) Init() error { s.Log.Infof("%d object(s) discovered...", len(s.discoveryData)) - // Special setting for acs_oss project since the API differs + // Special setting for acs_oss project since the API differs if s.Project == "acs_oss" { s.dimensionKey = "BucketName" } @@ -199,9 +163,9 @@ func (s *AliyunCMS) Init() error { return nil } -// Start plugin discovery loop, metrics are gathered through Gather +// Start a plugin discovery loop, metrics are gathered through Gather func (s *AliyunCMS) Start(telegraf.Accumulator) error { - // Start periodic discovery process + // Start a periodic discovery process if s.dt != nil { s.dt.start() } @@ -209,16 +173,15 @@ func (s *AliyunCMS) Start(telegraf.Accumulator) error { return nil } +// Gather implements telegraf.Inputs interface func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { s.updateWindow(time.Now()) - - // limit concurrency or we can easily exhaust user connection limit lmtr := limiter.NewRateLimiter(s.RateLimit, time.Second) defer lmtr.Stop() var wg sync.WaitGroup for _, m := range s.Metrics { - // Prepare internal structure with data from discovery + // Prepare an internal structure with data from discovery s.prepareTagsAndDimensions(m) wg.Add(len(m.MetricNames)) for _, metricName := range m.MetricNames { @@ -230,7 +193,6 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { } wg.Wait() } - return nil } @@ -241,6 +203,7 @@ func (s *AliyunCMS) Stop() { } } +// Gather given metric and emit error func (s *AliyunCMS) updateWindow(relativeTo time.Time) { // https://help.aliyun.com/document_detail/51936.html?spm=a2c4g.11186623.6.701.54025679zh6wiR // The start and end times are executed in the mode of @@ -253,86 +216,135 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) { // this is the first run, no window info, so just get a single period s.windowStart = windowEnd.Add(-time.Duration(s.Period)) } else { - // subsequent window, start where last window left off + // subsequent window, start where the last window left off s.windowStart = s.windowEnd } s.windowEnd = windowEnd } +// parseTimestamp normalizes various timestamp representations into seconds since epoch. +func (s *AliyunCMS) parseTimestamp(v interface{}) (int64, bool) { //nolint:revive // Valid case + switch t := v.(type) { + case int64: + return t, true + case float64: + // CMS timestamps are in ms + return int64(t) / 1000, true + case int: + return int64(t), true + case json.Number: + if val, err := t.Int64(); err == nil { + return val, true + } + default: + } + return 0, false +} + +// enrichTagsWithDiscovery applies discovery tags and returns whether the datapoint should be kept. +func (s *AliyunCMS) enrichTagsWithDiscovery(tags map[string]string, m *metric, id string) bool { + if m.discoveryTags == nil { + return true + } + disc, ok := m.discoveryTags[id] + if !ok && !m.AllowDataPointWODiscoveryData { + s.Log.Warnf("Instance %q is not found in discovery, skipping monitoring datapoint...", id) + return false + } + for k, v := range disc { + tags[k] = v + } + return true +} + // Gather given metric and emit error -func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *metric) error { +func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, m *metric) error { for _, region := range s.Regions { - req := cms.CreateDescribeMetricListRequest() - req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10) - req.MetricName = metricName - req.Length = "10000" - req.Namespace = s.Project - req.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10) - req.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10) - req.Dimensions = metric.requestDimensionsStr - req.RegionId = region - for more := true; more; { - resp, err := s.client.DescribeMetricList(req) + dataPoints, nextToken, err := s.fetchCMSDatapoints(region, metricName, m) if err != nil { - return fmt.Errorf("failed to query metricName list: %w", err) + return err } - if resp.Code != "200" { - s.Log.Errorf("failed to query metricName list: %v", resp.Message) + if len(dataPoints) == 0 { + s.Log.Debug("No metrics returned from CMS") break } - - var datapoints []map[string]interface{} - if err := json.Unmarshal([]byte(resp.Datapoints), &datapoints); err != nil { - return fmt.Errorf("failed to decode response datapoints: %w", err) - } - - if len(datapoints) == 0 { - s.Log.Debugf("No metrics returned from CMS, response msg: %s", resp.Message) - break - } - NextDataPoint: - for _, datapoint := range datapoints { - fields := make(map[string]interface{}, len(datapoint)) - tags := make(map[string]string, len(datapoint)) - datapointTime := int64(0) - for key, value := range datapoint { + for _, dp := range dataPoints { + fields := make(map[string]interface{}, len(dp)) + tags := make(map[string]string, len(dp)) + var ts int64 + + for key, value := range dp { switch key { case "instanceId", "BucketName": - tags[key] = value.(string) - if metric.discoveryTags != nil { // discovery can be not activated - // Skipping data point if discovery data not exist - _, ok := metric.discoveryTags[value.(string)] - if !ok && - !metric.AllowDataPointWODiscoveryData { - s.Log.Warnf("Instance %q is not found in discovery, skipping monitoring datapoint...", value.(string)) - continue NextDataPoint - } - - for k, v := range metric.discoveryTags[value.(string)] { - tags[k] = v - } + strVal, ok := value.(string) + if !ok { + s.Log.Warnf("Unexpected non-string %q value in datapoint, skipping...", key) + continue NextDataPoint + } + tags[key] = strVal + if keep := s.enrichTagsWithDiscovery(tags, m, strVal); !keep { + continue NextDataPoint } case "userId": - tags[key] = value.(string) + if str, ok := value.(string); ok { + tags[key] = str + } case "timestamp": - datapointTime = int64(value.(float64)) / 1000 + parsed, ok := s.parseTimestamp(value) + if !ok { + s.Log.Warnf("Unexpected timestamp type %T, skipping datapoint", value) + continue NextDataPoint + } + ts = parsed default: fields[formatField(metricName, key)] = value } } - acc.AddFields(s.measurement, fields, tags, time.Unix(datapointTime, 0)) + + acc.AddFields(s.measurement, fields, tags, time.Unix(ts, 0)) } - req.NextToken = resp.NextToken - more = req.NextToken != "" + more = nextToken != "" } } return nil } +// fetchCMSDatapoints queries CMS for datapoints and returns them along with the pagination token (if any). +func (s *AliyunCMS) fetchCMSDatapoints(region, metricName string, metric *metric) ([]map[string]interface{}, string, error) { + req := cms.CreateDescribeMetricListRequest() + req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10) + req.MetricName = metricName + req.Length = "10000" + req.Namespace = s.Project + req.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10) + req.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10) + req.Dimensions = metric.requestDimensionsStr + req.RegionId = region + + cmsResp, err := s.cmsClient.DescribeMetricList(req) + if err != nil { + return nil, "", fmt.Errorf("failed to query metricName list: %w", err) + } + if cmsResp.Code != "200" { + s.Log.Errorf("failed to query metricName list: %v", cmsResp.Message) + return nil, cmsResp.NextToken, nil + } + + var dataPoints []map[string]interface{} + if err := json.Unmarshal([]byte(cmsResp.Datapoints), &dataPoints); err != nil { + return nil, "", fmt.Errorf("failed to decode response datapoints: %w", err) + } + if len(dataPoints) == 0 { + s.Log.Debugf("No metrics returned from CMS, response msg: %s", cmsResp.Message) + return nil, cmsResp.NextToken, nil + } + return dataPoints, cmsResp.NextToken, nil +} + // tag helper func parseTag(tagSpec string, data interface{}) (tagKey, tagValue string, err error) { var ( @@ -373,12 +385,10 @@ func (s *AliyunCMS) prepareTagsAndDimensions(metric *metric) { if s.dt == nil { // Discovery is not activated return } - - // Reading all data from buffered channel L: for { select { - case s.discoveryData = <-s.dt.dataChan: + case s.discoveryData = <-s.dt.DataChan: newData = true continue default: @@ -386,8 +396,8 @@ L: } } - // new data arrives (so process it) or this is the first call - if newData || len(metric.discoveryTags) == 0 { + // new data arrives (so process it), or this is the first call, or we have initial discovery data + if newData || len(metric.discoveryTags) == 0 || (len(s.discoveryData) > 0 && len(metric.requestDimensions) == 0) { metric.dtLock.Lock() defer metric.dtLock.Unlock() @@ -436,16 +446,9 @@ L: metric.discoveryTags[instanceID][tagKey] = tagValue } - - // if no dimension configured in config file, use discovery data - if len(metric.dimensionsUdArr) == 0 && len(metric.dimensionsUdObj) == 0 { - metric.requestDimensions = append( - metric.requestDimensions, - map[string]string{s.dimensionKey: instanceID}) - } } - // add dimensions filter from config file + // add dimension filter from a config file if specified if len(metric.dimensionsUdArr) != 0 { metric.requestDimensions = append(metric.requestDimensions, metric.dimensionsUdArr...) } @@ -453,14 +456,20 @@ L: metric.requestDimensions = append(metric.requestDimensions, metric.dimensionsUdObj) } - // Unmarshalling to string - reqDim, err := json.Marshal(metric.requestDimensions) - if err != nil { - s.Log.Errorf("Can't marshal metric request dimensions %v :%v", - metric.requestDimensions, err) - metric.requestDimensionsStr = "" + // Marshal dimensions to string for API request + // Only send dimensions if explicitly configured in config file + if len(metric.requestDimensions) > 0 { + reqDim, err := json.Marshal(metric.requestDimensions) + if err != nil { + s.Log.Errorf("Can't marshal metric request dimensions %v :%v", + metric.requestDimensions, err) + metric.requestDimensionsStr = "" + } else { + metric.requestDimensionsStr = string(reqDim) + } } else { - metric.requestDimensionsStr = string(reqDim) + // No dimensions configured - let CMS API return all instances + metric.requestDimensionsStr = "" } } } diff --git a/plugins/inputs/aliyuncms/aliyuncms_test.go b/plugins/inputs/aliyuncms/aliyuncms_test.go index 83dfafad02e3e..ae7b875ee6719 100644 --- a/plugins/inputs/aliyuncms/aliyuncms_test.go +++ b/plugins/inputs/aliyuncms/aliyuncms_test.go @@ -2,6 +2,7 @@ package aliyuncms import ( "bytes" + "encoding/json" "errors" "fmt" "io" @@ -18,6 +19,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + common_aliyun "github.com/influxdata/telegraf/plugins/common/aliyun" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/testutil" ) @@ -29,7 +31,6 @@ type mockGatherAliyunCMSClient struct{} func (*mockGatherAliyunCMSClient) DescribeMetricList(request *cms.DescribeMetricListRequest) (*cms.DescribeMetricListResponse, error) { resp := new(cms.DescribeMetricListResponse) - // switch request.Metric { switch request.MetricName { case "InstanceActiveConnection": resp.Code = "200" @@ -65,6 +66,8 @@ func (*mockGatherAliyunCMSClient) DescribeMetricList(request *cms.DescribeMetric resp.Datapoints = `[]` case "ErrorResp": return nil, errors.New("error response") + default: + resp.Code = "200" } return resp, nil } @@ -150,7 +153,7 @@ func TestPluginInitialize(t *testing.T) { if err != nil { t.Fatalf("Can't create mock sdk cli: %v", err) } - plugin.dt.cli = map[string]aliyunSdkClient{plugin.Regions[0]: &mockCli} + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} tests := []struct { name string @@ -194,7 +197,7 @@ func TestPluginInitialize(t *testing.T) { require.NoError(t, plugin.Init()) } if len(tt.regions) == 0 { // Check if set to default - require.Equal(t, plugin.Regions, aliyunRegionList) + require.Equal(t, plugin.Regions, common_aliyun.DefaultRegions()) } }) } @@ -230,7 +233,7 @@ func TestPluginMetricsInitialize(t *testing.T) { if err != nil { t.Fatalf("Can't create mock sdk cli: %v", err) } - plugin.dt.cli = map[string]aliyunSdkClient{plugin.Regions[0]: &mockCli} + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} tests := []struct { name string @@ -250,7 +253,8 @@ func TestPluginMetricsInitialize(t *testing.T) { accessKeySecret: "dummy", metrics: []*metric{ { - Dimensions: `{"instanceId": "i-abcdefgh123456"}`, + MetricNames: make([]string, 0), + Dimensions: `{"instanceId": "i-abcdefgh123456"}`, }, }, }, @@ -262,7 +266,8 @@ func TestPluginMetricsInitialize(t *testing.T) { accessKeySecret: "dummy", metrics: []*metric{ { - Dimensions: `[{"instanceId": "p-example"},{"instanceId": "q-example"}]`, + MetricNames: make([]string, 0), + Dimensions: `[{"instanceId": "p-example"},{"instanceId": "q-example"}]`, }, }, }, @@ -275,7 +280,8 @@ func TestPluginMetricsInitialize(t *testing.T) { expectedErrorString: `cannot parse dimensions (neither obj, nor array) "[": unexpected end of JSON input`, metrics: []*metric{ { - Dimensions: `[`, + MetricNames: make([]string, 0), + Dimensions: `[`, }, }, }, @@ -298,6 +304,79 @@ func TestPluginMetricsInitialize(t *testing.T) { } } +func TestPluginMetricsRDSServiceInitialize(t *testing.T) { + var err error + + plugin := new(AliyunCMS) + plugin.Log = testutil.Logger{Name: inputTitle} + plugin.Regions = []string{"cn-shanghai"} + plugin.dt, err = getDiscoveryTool("acs_slb_dashboard", plugin.Regions) + if err != nil { + t.Fatalf("Can't create discovery tool object: %v", err) + } + + httpResp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBufferString( + `{ + "LoadBalancers": + { + "LoadBalancer": [ + {"LoadBalancerId":"bla"} + ] + }, + "TotalCount": 1, + "PageSize": 1, + "PageNumber": 1 + }`)), + } + mockCli, err := getMockSdkCli(httpResp) + if err != nil { + t.Fatalf("Can't create mock sdk cli: %v", err) + } + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} + + test := struct { + name string + metricServices []string + project string + accessKeyID string + accessKeySecret string + expectedErrorString string + regions []string + discoveryRegions []string + metrics []*metric + }{ + + name: "Valid project", + metricServices: []string{"cms", "rds"}, + project: "acs_rds_dashboard", + regions: []string{"cn-shanghai"}, + accessKeyID: "dummy", + accessKeySecret: "dummy", + metrics: []*metric{ + { + MetricNames: make([]string, 0), + Dimensions: `{"instanceId": "i-abcdefgh123456"}`, + }, + }, + } + + t.Run(test.name, func(t *testing.T) { + plugin.Project = test.project + plugin.AccessKeyID = test.accessKeyID + plugin.AccessKeySecret = test.accessKeySecret + plugin.Regions = test.regions + plugin.Metrics = test.metrics + + if test.expectedErrorString != "" { + require.EqualError(t, plugin.Init(), test.expectedErrorString) + } else { + require.NoError(t, plugin.Init()) + } + }) +} + func TestUpdateWindow(t *testing.T) { duration, err := time.ParseDuration("1m") require.NoError(t, err) @@ -319,7 +398,7 @@ func TestUpdateWindow(t *testing.T) { newStartTime := plugin.windowEnd - // initial window just has a single period + // the initial window just has a single period require.EqualValues(t, plugin.windowEnd, now.Add(-time.Duration(plugin.Delay))) require.EqualValues(t, plugin.windowStart, now.Add(-time.Duration(plugin.Delay)).Add(-time.Duration(plugin.Period))) @@ -334,16 +413,12 @@ func TestUpdateWindow(t *testing.T) { func TestGatherMetric(t *testing.T) { plugin := &AliyunCMS{ Project: "acs_slb_dashboard", - client: new(mockGatherAliyunCMSClient), + cmsClient: new(mockGatherAliyunCMSClient), measurement: formatMeasurement("acs_slb_dashboard"), Log: testutil.Logger{Name: inputTitle}, Regions: []string{"cn-shanghai"}, } - metric := &metric{ - Dimensions: `"instanceId": "i-abcdefgh123456"`, - } - tests := []struct { name string metricName string @@ -363,83 +438,196 @@ func TestGatherMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + metric := &metric{ + MetricNames: []string{tt.metricName}, + Dimensions: `{"instanceId": "i-abcdefgh123456"}`, + requestDimensions: []map[string]string{{"instanceId": "i-abcdefgh123456"}}, + } var acc telegraf.Accumulator require.EqualError(t, plugin.gatherMetric(acc, tt.metricName, metric), tt.expectedErrorString) }) } } -func TestGather(t *testing.T) { +func TestRDSServiceInitialization(t *testing.T) { + var err error + + plugin := new(AliyunCMS) + plugin.Log = testutil.Logger{Name: inputTitle} + plugin.Regions = []string{"cn-shanghai"} + plugin.dt, err = getDiscoveryTool("acs_rds_dashboard", plugin.Regions) + if err != nil { + t.Fatalf("Can't create discovery tool object: %v", err) + } + + httpResp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBufferString( + `{ + "DBInstances": { + "DBInstance": [ + {"DBInstanceId": "rds-1"} + ] + }, + "TotalCount": 1, + "PageSize": 1, + "PageNumber": 1 + }`)), + } + mockCli, err := getMockSdkCli(httpResp) + if err != nil { + t.Fatalf("Can't create mock sdk cli: %v", err) + } + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} + + plugin.Project = "acs_rds_dashboard" + plugin.AccessKeyID = "dummy" + plugin.AccessKeySecret = "dummy" + plugin.Regions = []string{"cn-shanghai"} + + require.NoError(t, plugin.Init()) +} + +func TestRDSMetricsConfiguration(t *testing.T) { + plugin := &AliyunCMS{ + Project: "acs_rds_dashboard", + Log: testutil.Logger{Name: inputTitle}, + } + + m := &metric{ + MetricNames: []string{"CPUUsage", "MemoryUsage"}, + Dimensions: `{"instanceId": "rds-instance-001"}`, + AllowDataPointWODiscoveryData: true, + } + + plugin.Metrics = []*metric{m} + + require.Len(t, plugin.Metrics, 1) + require.Len(t, plugin.Metrics[0].MetricNames, 2) + require.NotEmpty(t, plugin.Metrics[0].Dimensions) +} + +func TestRDSMetricDimensionParsing(t *testing.T) { + tests := []struct { + name string + dimensionsJSON string + shouldSucceed bool + }{ + { + name: "Single instance dimension", + dimensionsJSON: `{"instanceId": "rds-mysql-001"}`, + shouldSucceed: true, + }, + { + name: "Array of instances", + dimensionsJSON: `[{"instanceId": "rds-1"}, {"instanceId": "rds-2"}]`, + shouldSucceed: true, + }, + { + name: "Invalid JSON", + dimensionsJSON: `[{invalid`, + shouldSucceed: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &metric{ + Dimensions: tt.dimensionsJSON, + dimensionsUdObj: make(map[string]string), + dimensionsUdArr: make([]map[string]string, 0), + } + + err := json.Unmarshal([]byte(tt.dimensionsJSON), &m.dimensionsUdObj) + if err != nil { + // Try to parse as an array + err = json.Unmarshal([]byte(tt.dimensionsJSON), &m.dimensionsUdArr) + } + + if tt.shouldSucceed { + require.NoError(t, err) + require.True(t, len(m.dimensionsUdObj) > 0 || len(m.dimensionsUdArr) > 0) + } else { + require.Error(t, err) + } + }) + } +} + +func TestRDSMetricsGatheringWithMockClient(t *testing.T) { m := &metric{ - Dimensions: `{"instanceId": "i-abcdefgh123456"}`, + MetricNames: []string{"CPUUsage"}, + Dimensions: `{"instanceId": "i-1"}`, + requestDimensions: []map[string]string{{"instanceId": "i-1"}}, } + plugin := &AliyunCMS{ - AccessKeyID: "my_access_key_id", - AccessKeySecret: "my_access_key_secret", - Project: "acs_slb_dashboard", + AccessKeyID: "test_key", + AccessKeySecret: "test_secret", + Project: "acs_rds_dashboard", Metrics: []*metric{m}, RateLimit: 200, - measurement: formatMeasurement("acs_slb_dashboard"), + measurement: formatMeasurement("acs_rds_dashboard"), Regions: []string{"cn-shanghai"}, - client: new(mockGatherAliyunCMSClient), Log: testutil.Logger{Name: inputTitle}, } - // test table: + now := time.Now() + plugin.updateWindow(now) + + require.False(t, plugin.windowEnd.IsZero()) + require.False(t, plugin.windowStart.IsZero()) +} + +func TestRDSDataPointValidation(t *testing.T) { tests := []struct { - name string - hasMeasurement bool - metricNames []string - expected []telegraf.Metric + name string + dataPoint map[string]interface{} + shouldBeValid bool }{ { - name: "Empty data point", - metricNames: []string{"EmptyDatapoint"}, - expected: []telegraf.Metric{ - testutil.MustMetric( - "aliyuncms_acs_slb_dashboard", - nil, - nil, - time.Time{}), + name: "Valid RDS datapoint", + dataPoint: map[string]interface{}{ + "instanceId": "rds-mysql-001", + "CPUUsage": 75.5, + "timestamp": int64(1609556645), }, + shouldBeValid: true, }, { - name: "Data point with fields & tags", - hasMeasurement: true, - metricNames: []string{"InstanceActiveConnection"}, - expected: []telegraf.Metric{ - testutil.MustMetric( - "aliyuncms_acs_slb_dashboard", - map[string]string{ - "instanceId": "i-abcdefgh123456", - "userId": "1234567898765432", - }, - map[string]interface{}{ - "instance_active_connection_minimum": float64(100), - "instance_active_connection_maximum": float64(200), - "instance_active_connection_average": float64(150), - "instance_active_connection_value": float64(300), - }, - time.Unix(1490152860000, 0)), + name: "Missing instanceId", + dataPoint: map[string]interface{}{ + "CPUUsage": 75.5, + "timestamp": int64(1609556645), }, + shouldBeValid: false, + }, + { + name: "Missing timestamp", + dataPoint: map[string]interface{}{ + "instanceId": "rds-mysql-001", + "CPUUsage": 75.5, + }, + shouldBeValid: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var acc testutil.Accumulator - plugin.Metrics[0].MetricNames = tt.metricNames - require.NoError(t, acc.GatherError(plugin.Gather)) - require.Equal(t, acc.HasMeasurement("aliyuncms_acs_slb_dashboard"), tt.hasMeasurement) - if tt.hasMeasurement { - acc.AssertContainsTaggedFields(t, "aliyuncms_acs_slb_dashboard", tt.expected[0].Fields(), tt.expected[0].Tags()) + hasInstanceID := len(tt.dataPoint) > 0 && tt.dataPoint["instanceId"] != nil + hasTimestamp := len(tt.dataPoint) > 0 && tt.dataPoint["timestamp"] != nil + isValid := hasInstanceID && hasTimestamp + + if tt.shouldBeValid { + require.True(t, isValid) + } else { + require.False(t, isValid) } }) } } -func TestGetDiscoveryDataAcrossRegions(t *testing.T) { - // test table: +func TestGather(t *testing.T) { tests := []struct { name string project string @@ -502,8 +690,8 @@ func TestGetDiscoveryDataAcrossRegions(t *testing.T) { if err != nil { t.Fatalf("Can't create mock sdk cli: %v", err) } - dt.cli = map[string]aliyunSdkClient{tt.region: &mockCli} - data, err := dt.getDiscoveryDataAcrossRegions(nil) + dt.Cli = map[string]common_aliyun.AliyunSdkClient{tt.region: &mockCli} + data, err := dt.GetDiscoveryDataAcrossRegions(nil) require.Equal(t, tt.discData, data) if err != nil { @@ -512,3 +700,865 @@ func TestGetDiscoveryDataAcrossRegions(t *testing.T) { }) } } + +func TestRDSMetricConfiguration(t *testing.T) { + plugin := &AliyunCMS{ + Project: "acs_rds_dashboard", + Log: testutil.Logger{Name: inputTitle}, + } + + m := &metric{ + MetricNames: []string{"CPUUsage", "DiskUsage"}, + Dimensions: `{"instanceId": "rds-mysql-001"}`, + } + + plugin.Metrics = []*metric{m} + + require.Len(t, plugin.Metrics, 1) + require.Len(t, plugin.Metrics[0].MetricNames, 2) + require.Equal(t, "CPUUsage", plugin.Metrics[0].MetricNames[0]) + require.Equal(t, "DiskUsage", plugin.Metrics[0].MetricNames[1]) +} + +func TestRDSInstanceIdExtraction(t *testing.T) { + tests := []struct { + name string + dimensions string + shouldParse bool + }{ + { + name: "Valid single instance", + dimensions: `{"instanceId": "rds-mysql-001"}`, + shouldParse: true, + }, + { + name: "Valid array of instances", + dimensions: `[{"instanceId": "rds-mysql-001"}, {"instanceId": "rds-mysql-002"}]`, + shouldParse: true, + }, + { + name: "Invalid JSON", + dimensions: `[invalid`, + shouldParse: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &metric{ + Dimensions: tt.dimensions, + dimensionsUdObj: make(map[string]string), + dimensionsUdArr: make([]map[string]string, 0), + } + + err := json.Unmarshal([]byte(tt.dimensions), &m.dimensionsUdObj) + if err != nil { + err = json.Unmarshal([]byte(tt.dimensions), &m.dimensionsUdArr) + } + + if tt.shouldParse { + require.NoError(t, err) + require.True(t, len(m.dimensionsUdObj) > 0 || len(m.dimensionsUdArr) > 0) + } else { + require.Error(t, err) + } + }) + } +} + +func TestRDSWindowManagement(t *testing.T) { + plugin := &AliyunCMS{ + Project: "acs_rds_dashboard", + Log: testutil.Logger{Name: inputTitle}, + } + + duration, err := time.ParseDuration("1m") + if err != nil { + t.Fatalf("Failed to parse duration: %v", err) + } + plugin.Period = config.Duration(duration) + plugin.Delay = config.Duration(duration) + + now := time.Now() + plugin.updateWindow(now) + + require.False(t, plugin.windowStart.IsZero()) + require.False(t, plugin.windowEnd.IsZero()) + require.True(t, plugin.windowEnd.After(plugin.windowStart)) +} + +func TestRDSServiceInMetricServices(t *testing.T) { + tests := []struct { + name string + metricServices []string + hasRDS bool + }{ + { + name: "With RDS", + metricServices: []string{"cms", "rds"}, + hasRDS: true, + }, + { + name: "Without RDS", + metricServices: []string{"cms"}, + hasRDS: false, + }, + { + name: "RDS only", + metricServices: []string{"rds"}, + hasRDS: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hasRDS := false + for _, svc := range tt.metricServices { + if svc == "rds" { + hasRDS = true + break + } + } + require.Equal(t, tt.hasRDS, hasRDS) + }) + } +} + +func TestParseTag(t *testing.T) { + tests := []struct { + name string + tagSpec string + data interface{} + shouldError bool + expectedKey string + }{ + { + name: "Simple tag key", + tagSpec: "region", + data: map[string]interface{}{"region": "us-east-1"}, + shouldError: false, + expectedKey: "region", + }, + { + name: "Tag with query path", + tagSpec: "env:environment.name", + data: map[string]interface{}{"environment": map[string]interface{}{"name": "prod"}}, + shouldError: false, + expectedKey: "env", + }, + { + name: "Invalid query path", + tagSpec: "tag:nonexistent.path", + data: map[string]interface{}{}, + shouldError: false, + expectedKey: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tagKey, _, err := parseTag(tt.tagSpec, tt.data) + + if tt.shouldError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedKey, tagKey) + } + }) + } +} + +func TestMetricConfiguration(t *testing.T) { + tests := []struct { + name string + metric *metric + shouldErr bool + }{ + { + name: "Valid metric with names", + metric: &metric{ + MetricNames: []string{"CPUUsage", "MemoryUsage"}, + Dimensions: `{"instanceId": "i-123"}`, + }, + shouldErr: false, + }, + { + name: "Empty metric names", + metric: &metric{ + MetricNames: make([]string, 0), + Dimensions: `{"instanceId": "i-123"}`, + }, + shouldErr: false, + }, + { + name: "Multiple dimensions", + metric: &metric{ + MetricNames: []string{"CPUUsage"}, + Dimensions: `[{"instanceId": "i-1"},{"instanceId": "i-2"}]`, + }, + shouldErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.NotNil(t, tt.metric) + require.NotEmpty(t, tt.metric.Dimensions) + }) + } +} + +func TestDiscoveryTagEnrichmentScenarios(t *testing.T) { + plugin := &AliyunCMS{ + Log: testutil.Logger{Name: inputTitle}, + } + + tests := []struct { + name string + instanceID string + discoveryTags map[string]map[string]string + allowDataPointWODiscoveryData bool + expectedKeep bool + expectedTags map[string]string + }{ + { + name: "Found in discovery with multiple tags", + instanceID: "prod-001", + discoveryTags: map[string]map[string]string{ + "prod-001": { + "region": "cn-shanghai", + "env": "production", + "team": "platform", + "costcenter": "engineering", + }, + }, + allowDataPointWODiscoveryData: false, + expectedKeep: true, + expectedTags: map[string]string{ + "region": "cn-shanghai", + "env": "production", + "team": "platform", + "costcenter": "engineering", + }, + }, + { + name: "Not found but allow without discovery", + instanceID: "unknown-123", + discoveryTags: map[string]map[string]string{ + "known-001": {"env": "prod"}, + }, + allowDataPointWODiscoveryData: true, + expectedKeep: true, + expectedTags: map[string]string{}, + }, + { + name: "Not found and disallow without discovery", + instanceID: "unknown-123", + discoveryTags: map[string]map[string]string{ + "known-001": {"env": "prod"}, + }, + allowDataPointWODiscoveryData: false, + expectedKeep: false, + expectedTags: map[string]string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &metric{ + discoveryTags: tt.discoveryTags, + AllowDataPointWODiscoveryData: tt.allowDataPointWODiscoveryData, + } + + tags := make(map[string]string) + keep := plugin.enrichTagsWithDiscovery(tags, m, tt.instanceID) + + require.Equal(t, tt.expectedKeep, keep) + + for k, v := range tt.expectedTags { + require.Equal(t, v, tags[k], "Tag %s should be %s", k, v) + } + }) + } +} + +func TestWindowTimingCalculations(t *testing.T) { + tests := []struct { + name string + period time.Duration + delay time.Duration + firstCallTime time.Time + secondCallTime time.Time + }{ + { + name: "1 minute period with 1 minute delay", + period: time.Minute, + delay: time.Minute, + firstCallTime: time.Date(2025, 12, 7, 10, 0, 0, 0, time.UTC), + secondCallTime: time.Date(2025, 12, 7, 10, 2, 0, 0, time.UTC), + }, + { + name: "5 minute period with 0 delay", + period: 5 * time.Minute, + delay: 0, + firstCallTime: time.Date(2025, 12, 7, 10, 0, 0, 0, time.UTC), + secondCallTime: time.Date(2025, 12, 7, 10, 5, 0, 0, time.UTC), + }, + { + name: "30 second period with 30 second delay", + period: 30 * time.Second, + delay: 30 * time.Second, + firstCallTime: time.Date(2025, 12, 7, 10, 0, 0, 0, time.UTC), + secondCallTime: time.Date(2025, 12, 7, 10, 1, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &AliyunCMS{ + Project: "test", + Period: config.Duration(tt.period), + Delay: config.Duration(tt.delay), + Log: testutil.Logger{Name: inputTitle}, + } + + // First call + plugin.updateWindow(tt.firstCallTime) + firstStart := plugin.windowStart + firstEnd := plugin.windowEnd + + require.False(t, firstStart.IsZero()) + require.False(t, firstEnd.IsZero()) + require.True(t, firstEnd.After(firstStart)) + + // Second call + plugin.updateWindow(tt.secondCallTime) + secondStart := plugin.windowStart + secondEnd := plugin.windowEnd + + // Window should have moved forward + require.Equal(t, firstEnd, secondStart) + require.True(t, secondEnd.After(firstEnd)) + }) + } +} + +func TestTimestampConversionEdgeCases(t *testing.T) { + plugin := &AliyunCMS{ + Log: testutil.Logger{Name: inputTitle}, + } + + tests := []struct { + name string + value interface{} + shouldOK bool + checkZero bool + }{ + { + name: "Zero int64", + value: int64(0), + shouldOK: true, + checkZero: true, + }, + { + name: "Large int64", + value: int64(9999999999), + shouldOK: true, + checkZero: false, + }, + { + name: "Float with decimals", + value: float64(1609556645123.456), + shouldOK: true, + checkZero: false, + }, + { + name: "Negative value", + value: int64(-1000), + shouldOK: true, + checkZero: false, + }, + { + name: "Boolean (invalid)", + value: true, + shouldOK: false, + checkZero: false, + }, + { + name: "Empty string (invalid)", + value: "", + shouldOK: false, + checkZero: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, ok := plugin.parseTimestamp(tt.value) + + require.Equal(t, tt.shouldOK, ok) + if tt.shouldOK { + if tt.checkZero { + require.Equal(t, int64(0), result) + } + } + }) + } +} + +func TestClientSelection(t *testing.T) { + tests := []struct { + name string + cmsClient aliyuncmsClient + expectedRDS bool + expectedCMS bool + }{ + { + name: "Both clients available", + cmsClient: &mockGatherAliyunCMSClient{}, + expectedRDS: true, + expectedCMS: true, + }, + { + name: "Only CMS client", + cmsClient: &mockGatherAliyunCMSClient{}, + expectedRDS: false, + expectedCMS: true, + }, + { + name: "Only RDS client", + cmsClient: nil, + expectedRDS: true, + expectedCMS: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &AliyunCMS{ + cmsClient: tt.cmsClient, + Log: testutil.Logger{Name: inputTitle}, + } + + if tt.expectedCMS { + require.NotNil(t, plugin.cmsClient) + } else { + require.Nil(t, plugin.cmsClient) + } + }) + } +} + +func TestMetricsArrayHandling(t *testing.T) { + tests := []struct { + name string + metricsCount int + eachHasNames int + expectedTotal int + }{ + { + name: "Single metric with multiple names", + metricsCount: 1, + eachHasNames: 5, + expectedTotal: 5, + }, + { + name: "Multiple metrics with multiple names each", + metricsCount: 3, + eachHasNames: 4, + expectedTotal: 12, + }, + { + name: "Empty metrics array", + metricsCount: 0, + eachHasNames: 0, + expectedTotal: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &AliyunCMS{ + Log: testutil.Logger{Name: inputTitle}, + } + + var metrics []*metric + totalNames := 0 + + for i := 0; i < tt.metricsCount; i++ { + names := make([]string, tt.eachHasNames) + for j := 0; j < tt.eachHasNames; j++ { + names[j] = fmt.Sprintf("Metric_%d_%d", i, j) + totalNames++ + } + metrics = append(metrics, &metric{ + MetricNames: names, + Dimensions: `{"instanceId": "test"}`, + }) + } + + plugin.Metrics = metrics + + require.Len(t, plugin.Metrics, tt.metricsCount) + require.Equal(t, tt.expectedTotal, totalNames) + }) + } +} + +func TestRegionConfiguration(t *testing.T) { + tests := []struct { + name string + regions []string + expectedCount int + expectedRegions []string + }{ + { + name: "Single region", + regions: []string{"cn-shanghai"}, + expectedCount: 1, + expectedRegions: []string{"cn-shanghai"}, + }, + { + name: "Multiple regions", + regions: []string{"cn-shanghai", "cn-beijing", "cn-hongkong"}, + expectedCount: 3, + expectedRegions: []string{"cn-shanghai", "cn-beijing", "cn-hongkong"}, + }, + { + name: "International regions", + regions: []string{"us-west-1", "eu-central-1", "ap-southeast-1"}, + expectedCount: 3, + expectedRegions: []string{"us-west-1", "eu-central-1", "ap-southeast-1"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &AliyunCMS{ + Regions: tt.regions, + Log: testutil.Logger{Name: inputTitle}, + } + + require.Len(t, plugin.Regions, tt.expectedCount) + for i, region := range tt.expectedRegions { + require.Equal(t, region, plugin.Regions[i]) + } + }) + } +} + +func TestProjectAndMeasurement(t *testing.T) { + tests := []struct { + name string + project string + expectedContains string + }{ + { + name: "RDS project", + project: "acs_rds_dashboard", + expectedContains: "rds", + }, + { + name: "ECS project", + project: "acs_ecs_dashboard", + expectedContains: "ecs", + }, + { + name: "SLB project", + project: "acs_slb_dashboard", + expectedContains: "slb", + }, + { + name: "OSS project", + project: "acs_oss", + expectedContains: "oss", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + measurement := formatMeasurement(tt.project) + require.NotEmpty(t, measurement) + require.Contains(t, measurement, tt.expectedContains) + }) + } +} + +func TestFieldNameFormatting(t *testing.T) { + tests := []struct { + metricName string + fieldName string + expected string + }{ + { + metricName: "InstanceActiveConnection", + fieldName: "Value", + expected: "instance_active_connection_value", + }, + { + metricName: "InstanceActiveConnection", + fieldName: "Minimum", + expected: "instance_active_connection_minimum", + }, + { + metricName: "InstanceActiveConnection", + fieldName: "Maximum", + expected: "instance_active_connection_maximum", + }, + { + metricName: "InstanceActiveConnection", + fieldName: "Average", + expected: "instance_active_connection_average", + }, + { + metricName: "CpuUsage", + fieldName: "Value", + expected: "cpu_usage_value", + }, + { + metricName: "DiskUsage", + fieldName: "Percentage", + expected: "disk_usage_percentage", + }, + { + metricName: "NetworkIn", + fieldName: "Count", + expected: "network_in_count", + }, + { + metricName: "NetworkOut", + fieldName: "Rate", + expected: "network_out_rate", + }, + } + + for _, tt := range tests { + t.Run(tt.metricName+"_"+tt.fieldName, func(t *testing.T) { + result := formatField(tt.metricName, tt.fieldName) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestRateLimitConfiguration(t *testing.T) { + tests := []struct { + name string + rateLimit int + isValid bool + }{ + { + name: "Default rate limit", + rateLimit: 200, + isValid: true, + }, + { + name: "High rate limit", + rateLimit: 1000, + isValid: true, + }, + { + name: "Low rate limit", + rateLimit: 10, + isValid: true, + }, + { + name: "Zero rate limit", + rateLimit: 0, + isValid: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &AliyunCMS{ + RateLimit: tt.rateLimit, + Log: testutil.Logger{Name: inputTitle}, + } + + if tt.isValid { + require.Equal(t, tt.rateLimit, plugin.RateLimit) + } + }) + } +} + +func TestInitialDiscoveryDataIsUsed(t *testing.T) { + // This test verifies that discovery data retrieved in Init() is properly used in the first Gather() call + plugin := &AliyunCMS{ + AccessKeyID: "dummy", + AccessKeySecret: "dummy", + Project: "acs_rds_dashboard", + Regions: []string{"cn-shanghai"}, + Period: config.Duration(time.Minute), + Delay: config.Duration(time.Minute), + DiscoveryInterval: config.Duration(time.Minute), + RateLimit: 200, + Log: testutil.Logger{Name: inputTitle}, + Metrics: []*metric{ + { + MetricNames: []string{"CpuUsage"}, + }, + }, + } + + // Setup mock CMS client + plugin.cmsClient = &mockGatherAliyunCMSClient{} + + // Setup mock discovery tool + var err error + plugin.dt, err = getDiscoveryTool("acs_rds_dashboard", plugin.Regions) + require.NoError(t, err) + + // Mock SDK client response with discovery data + httpResp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBufferString( + `{ + "Items": { + "DBInstance": [ + { + "DBInstanceId": "rds-test-123", + "RegionId": "cn-shanghai", + "DBInstanceDescription": "Test RDS Instance" + } + ] + }, + "TotalRecordCount": 1, + "PageSize": 1, + "PageNumber": 1 + }`)), + } + mockCli, err := getMockSdkCli(httpResp) + require.NoError(t, err) + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} + + // Get initial discovery data (simulating what Init() does) + plugin.discoveryData, err = plugin.dt.GetDiscoveryDataAcrossRegions(nil) + require.NoError(t, err) + + // Verify discovery data was loaded + require.NotNil(t, plugin.discoveryData) + require.Len(t, plugin.discoveryData, 1) + require.Contains(t, plugin.discoveryData, "rds-test-123") + + // Initialize measurement + plugin.measurement = formatMeasurement(plugin.Project) + plugin.dimensionKey = "instanceId" + + // Update window for gathering + plugin.updateWindow(time.Now()) + + // Now gather metrics - this should use the initial discovery data + acc := &testutil.Accumulator{} + err = plugin.Gather(acc) + require.NoError(t, err) + + // Verify that the metric has discovery tags populated from initial data + m := plugin.Metrics[0] + require.NotNil(t, m.discoveryTags) + require.Contains(t, m.discoveryTags, "rds-test-123") + + // Note: requestDimensions is empty because no dimensions are configured + // Discovery data is used only for tag enrichment, not for filtering + // The CMS API will return all instances, and we enrich them with discovery tags + require.Empty(t, m.requestDimensions, "No dimensions should be sent when none configured") + require.Empty(t, m.requestDimensionsStr, "Dimensions string should be empty") +} + +func TestInitialDiscoveryDataWorksWithMultipleMetrics(t *testing.T) { + // This test verifies that discovery data works for multiple metrics in the same gather + plugin := &AliyunCMS{ + AccessKeyID: "dummy", + AccessKeySecret: "dummy", + Project: "acs_rds_dashboard", + Regions: []string{"cn-shanghai"}, + Period: config.Duration(time.Minute), + Delay: config.Duration(time.Minute), + DiscoveryInterval: config.Duration(time.Minute), + RateLimit: 200, + Log: testutil.Logger{Name: inputTitle}, + Metrics: []*metric{ + { + MetricNames: []string{"CpuUsage"}, + }, + { + MetricNames: []string{"MemoryUsage"}, + }, + { + MetricNames: []string{"DiskUsage"}, + }, + }, + } + + // Setup mock CMS client + plugin.cmsClient = &mockGatherAliyunCMSClient{} + + // Setup mock discovery tool + var err error + plugin.dt, err = getDiscoveryTool("acs_rds_dashboard", plugin.Regions) + require.NoError(t, err) + + // Mock SDK client response with multiple instances + httpResp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBufferString( + `{ + "Items": { + "DBInstance": [ + { + "DBInstanceId": "rds-test-1", + "RegionId": "cn-shanghai", + "DBInstanceDescription": "Test RDS Instance 1" + }, + { + "DBInstanceId": "rds-test-2", + "RegionId": "cn-shanghai", + "DBInstanceDescription": "Test RDS Instance 2" + } + ] + }, + "TotalRecordCount": 2, + "PageSize": 2, + "PageNumber": 1 + }`)), + } + mockCli, err := getMockSdkCli(httpResp) + require.NoError(t, err) + plugin.dt.Cli = map[string]common_aliyun.AliyunSdkClient{plugin.Regions[0]: &mockCli} + + // Get initial discovery data + plugin.discoveryData, err = plugin.dt.GetDiscoveryDataAcrossRegions(nil) + require.NoError(t, err) + + // Verify discovery data was loaded + require.NotNil(t, plugin.discoveryData) + require.Len(t, plugin.discoveryData, 2) + require.Contains(t, plugin.discoveryData, "rds-test-1") + require.Contains(t, plugin.discoveryData, "rds-test-2") + + // Initialize measurement + plugin.measurement = formatMeasurement(plugin.Project) + plugin.dimensionKey = "instanceId" + + // Update window for gathering + plugin.updateWindow(time.Now()) + + // Now gather metrics - ALL metrics should use the initial discovery data + acc := &testutil.Accumulator{} + err = plugin.Gather(acc) + require.NoError(t, err) + + // Verify that ALL metrics have discovery tags populated from initial data + for i, m := range plugin.Metrics { + require.NotNil(t, m.discoveryTags, "Metric %d should have discovery tags", i) + require.Contains(t, m.discoveryTags, "rds-test-1", "Metric %d should contain rds-test-1", i) + require.Contains(t, m.discoveryTags, "rds-test-2", "Metric %d should contain rds-test-2", i) + + // Note: requestDimensions is empty because no dimensions are configured + // Discovery data is used only for tag enrichment, not for filtering + // The CMS API will return all instances, and we enrich them with discovery tags + require.Empty(t, m.requestDimensions, "Metric %d should have no dimensions when none configured", i) + require.Empty(t, m.requestDimensionsStr, "Metric %d dimensions string should be empty", i) + } +} diff --git a/plugins/inputs/aliyuncms/discovery.go b/plugins/inputs/aliyuncms/discovery.go index a48172f7aa7c0..444faae0c27a1 100644 --- a/plugins/inputs/aliyuncms/discovery.go +++ b/plugins/inputs/aliyuncms/discovery.go @@ -1,93 +1,27 @@ package aliyuncms import ( - "encoding/json" - "errors" "fmt" - "reflect" - "strconv" "strings" - "sync" "time" "github.com/aliyun/alibaba-cloud-sdk-go/sdk" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth" - "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" - "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" "github.com/aliyun/alibaba-cloud-sdk-go/services/rds" "github.com/aliyun/alibaba-cloud-sdk-go/services/slb" "github.com/aliyun/alibaba-cloud-sdk-go/services/vpc" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/limiter" + common_aliyun "github.com/influxdata/telegraf/plugins/common/aliyun" ) -type discoveryRequest interface { -} - -type aliyunSdkClient interface { - ProcessCommonRequest(req *requests.CommonRequest) (response *responses.CommonResponse, err error) -} - -// discoveryTool is an object that provides discovery feature +// discoveryTool wraps the common library's DiscoveryTool for CMS type discoveryTool struct { - req map[string]discoveryRequest // Discovery request (specific per object type) - rateLimit int // Rate limit for API query, as it is limited by API backend - reqDefaultPageSize int // Default page size while querying data from API (how many objects per request) - cli map[string]aliyunSdkClient // API client, which perform discovery request - - respRootKey string // Root key in JSON response where to look for discovery data - respObjectIDKey string // Key in element of array under root key, that stores object ID - // for, the majority of cases it would be InstanceId, for OSS it is BucketName. This key is also used in dimension filtering - wg sync.WaitGroup // WG for primary discovery goroutine - interval time.Duration // Discovery interval - done chan bool // Done channel to stop primary discovery goroutine - dataChan chan map[string]interface{} // Discovery data - lg telegraf.Logger // Telegraf logger (should be provided) -} - -type parsedDResp struct { - data []interface{} - totalCount int - pageSize int - pageNumber int -} - -// getRPCReqFromDiscoveryRequest - utility function to map between aliyun request primitives -// discoveryRequest represents different type of discovery requests -func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, error) { - if reflect.ValueOf(req).Type().Kind() != reflect.Ptr || - reflect.ValueOf(req).IsNil() { - return nil, fmt.Errorf("unexpected type of the discovery request object: %q, %q", reflect.ValueOf(req).Type(), reflect.ValueOf(req).Kind()) - } - - ptrV := reflect.Indirect(reflect.ValueOf(req)) - - for i := 0; i < ptrV.NumField(); i++ { - if ptrV.Field(i).Type().String() == "*requests.RpcRequest" { - if !ptrV.Field(i).CanInterface() { - return nil, fmt.Errorf("can't get interface of %q", ptrV.Field(i)) - } - - rpcReq, ok := ptrV.Field(i).Interface().(*requests.RpcRequest) - - if !ok { - return nil, fmt.Errorf("can't convert interface of %q to '*requests.RpcRequest' type", ptrV.Field(i).Interface()) - } - - return rpcReq, nil - } - } - return nil, fmt.Errorf("didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type()) + *common_aliyun.DiscoveryTool } -// newDiscoveryTool function returns discovery tool object. -// The object is used to periodically get data about aliyun objects and send this -// data into channel. The intention is to enrich reported metrics with discovery data. -// Discovery is supported for a limited set of object types (defined by project) and can be extended in future. -// Discovery can be limited by region if not set, then all regions is queried. -// Request against API can inquire additional costs, consult with aliyun API documentation. +// newDiscoveryTool creates a discovery tool for CMS using the common library func newDiscoveryTool( regions []string, project string, @@ -104,17 +38,17 @@ func newDiscoveryTool( ) if len(regions) == 0 { - regions = aliyunRegionList + regions = common_aliyun.DefaultRegions() lg.Infof("'regions' is not provided! Discovery data will be queried across %d regions:\n%s", - len(aliyunRegionList), strings.Join(aliyunRegionList, ",")) + len(regions), strings.Join(regions, ",")) } - if rateLimit == 0 { // Can be a rounding case + if rateLimit == 0 { rateLimit = 1 } - dscReq := make(map[string]discoveryRequest, len(regions)) - cli := make(map[string]aliyunSdkClient, len(regions)) + dscReq := make(map[string]common_aliyun.DiscoveryRequest, len(regions)) + cli := make(map[string]common_aliyun.AliyunSdkClient, len(regions)) for _, region := range regions { switch project { case "acs_ecs_dashboard": @@ -134,7 +68,7 @@ func newDiscoveryTool( case "acs_ocs": return nil, noDiscoverySupportErr case "acs_oss": - // oss is really complicated and its' own format + // oss is really complicated and has its own format return nil, noDiscoverySupportErr case "acs_vpc_eip": dscReq[region] = vpc.CreateDescribeEipAddressesRequest() @@ -237,222 +171,27 @@ func newDiscoveryTool( return nil, fmt.Errorf("can't build discovery request for project: %q, regions: %v", project, regions) } - return &discoveryTool{ - req: dscReq, - cli: cli, - respRootKey: responseRootKey, - respObjectIDKey: responseObjectIDKey, - rateLimit: rateLimit, - interval: discoveryInterval, - reqDefaultPageSize: 20, - dataChan: make(chan map[string]interface{}, 1), - lg: lg, - }, nil -} - -func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (*parsedDResp, error) { - var ( - fullOutput = make(map[string]interface{}) - data []byte - foundDataItem bool - foundRootKey bool - pdResp = &parsedDResp{} - ) - - data = resp.GetHttpContentBytes() - if data == nil { // No data - return nil, errors.New("no data in response to be parsed") - } - - if err := json.Unmarshal(data, &fullOutput); err != nil { - return nil, fmt.Errorf("can't parse JSON from discovery response: %w", err) - } - - for key, val := range fullOutput { - switch key { - case dt.respRootKey: - foundRootKey = true - rootKeyVal, ok := val.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("content of root key %q, is not an object: %q", key, val) - } - - // It should contain the array with discovered data - for _, item := range rootKeyVal { - if pdResp.data, foundDataItem = item.([]interface{}); foundDataItem { - break - } - } - if !foundDataItem { - return nil, fmt.Errorf("didn't find array item in root key %q", key) - } - case "TotalCount", "TotalRecordCount": - pdResp.totalCount = int(val.(float64)) - case "PageSize", "PageRecordCount": - pdResp.pageSize = int(val.(float64)) - case "PageNumber": - pdResp.pageNumber = int(val.(float64)) - } - } - if !foundRootKey { - return nil, fmt.Errorf("didn't find root key %q in discovery response", dt.respRootKey) - } - - return pdResp, nil -} - -func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, lmtr chan bool) (map[string]interface{}, error) { - var ( - err error - resp *responses.CommonResponse - pDResp *parsedDResp - discoveryData []interface{} - totalCount int - pageNumber int - ) - defer delete(req.QueryParams, "PageNumber") - - for { - if lmtr != nil { - <-lmtr // Rate limiting - } - - resp, err = cli.ProcessCommonRequest(req) - if err != nil { - return nil, err - } - - pDResp, err = dt.parseDiscoveryResponse(resp) - if err != nil { - return nil, err - } - discoveryData = append(discoveryData, pDResp.data...) - pageNumber = pDResp.pageNumber - totalCount = pDResp.totalCount - - // Pagination - pageNumber++ - req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber) - - if len(discoveryData) == totalCount { // All data received - // Map data to the appropriate shape before return - preparedData := make(map[string]interface{}, len(discoveryData)) - - for _, raw := range discoveryData { - elem, ok := raw.(map[string]interface{}) - if !ok { - return nil, errors.New("can't parse input data element, not a map[string]interface{} type") - } - if objectID, ok := elem[dt.respObjectIDKey].(string); ok { - preparedData[objectID] = elem - } - } - return preparedData, nil - } + dt := &common_aliyun.DiscoveryTool{ + Req: dscReq, + Cli: cli, + RespRootKey: responseRootKey, + RespObjectIDKey: responseObjectIDKey, + RateLimit: rateLimit, + Interval: discoveryInterval, + ReqDefaultPageSize: 20, + DataChan: make(chan map[string]interface{}, 1), + Lg: lg, } -} - -func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[string]interface{}, error) { - resultData := make(map[string]interface{}) - - for region, cli := range dt.cli { - // Building common request, as the code below is the same no matter - // which aliyun object type (project) is used - dscReq, ok := dt.req[region] - if !ok { - return nil, fmt.Errorf("error building common discovery request: not valid region %q", region) - } - - rpcReq, err := getRPCReqFromDiscoveryRequest(dscReq) - if err != nil { - return nil, err - } - commonRequest := requests.NewCommonRequest() - commonRequest.Method = rpcReq.GetMethod() - commonRequest.Product = rpcReq.GetProduct() - commonRequest.Domain = rpcReq.GetDomain() - commonRequest.Version = rpcReq.GetVersion() - commonRequest.Scheme = rpcReq.GetScheme() - commonRequest.ApiName = rpcReq.GetActionName() - commonRequest.QueryParams = rpcReq.QueryParams - commonRequest.QueryParams["PageSize"] = strconv.Itoa(dt.reqDefaultPageSize) - commonRequest.TransToAcsRequest() - - // Get discovery data using common request - data, err := dt.getDiscoveryData(cli, commonRequest, lmtr) - if err != nil { - return nil, err - } - - for k, v := range data { - resultData[k] = v - } - } - return resultData, nil + return &discoveryTool{DiscoveryTool: dt}, nil } -// start the discovery pooling; in case something new is found, it will be reported back through `dataChan` +// start begins the discovery loop func (dt *discoveryTool) start() { - var ( - err error - data map[string]interface{} - lastData map[string]interface{} - ) - - // Initializing channel - dt.done = make(chan bool) - - dt.wg.Add(1) - go func() { - defer dt.wg.Done() - - ticker := time.NewTicker(dt.interval) - defer ticker.Stop() - - lmtr := limiter.NewRateLimiter(dt.rateLimit, time.Second) - defer lmtr.Stop() - - for { - select { - case <-dt.done: - return - case <-ticker.C: - data, err = dt.getDiscoveryDataAcrossRegions(lmtr.C) - if err != nil { - dt.lg.Errorf("Can't get discovery data: %v", err) - continue - } - - if !reflect.DeepEqual(data, lastData) { - lastData = make(map[string]interface{}, len(data)) - for k, v := range data { - lastData[k] = v - } - - // send discovery data in blocking mode - dt.dataChan <- data - } - } - } - }() + dt.Start() } -// stop the discovery loop, making sure all data is read from 'dataChan' +// stop stops the discovery loop func (dt *discoveryTool) stop() { - close(dt.done) - - // Shutdown timer - timer := time.NewTimer(time.Second * 3) - defer timer.Stop() -L: - for { // Unblock go routine by reading from dt.dataChan - select { - case <-timer.C: - break L - case <-dt.dataChan: - } - } - - dt.wg.Wait() + dt.Stop() } diff --git a/plugins/inputs/aliyuncms/sample.conf b/plugins/inputs/aliyuncms/sample.conf index ab343ed78e1da..1847e6e155418 100644 --- a/plugins/inputs/aliyuncms/sample.conf +++ b/plugins/inputs/aliyuncms/sample.conf @@ -39,6 +39,7 @@ ## - acs_rds_dashboard ## - acs_slb_dashboard ## - acs_vpc_eip + ## Note: RDS Performance metrics are now supported via the dedicated aliyunrds plugin regions = ["cn-hongkong"] ## Requested AliyunCMS aggregation Period (required)