Skip to content

(feat): Add Sumo Logic scaler #6736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
794feb1
(feat): Add barebones for sumologic scaler
mittalvaibhav1 Mar 7, 2025
b33ff1e
(feat): refactoring...
mittalvaibhav1 Mar 8, 2025
f53592b
(feat): add client support for metric queries
mittalvaibhav1 Mar 8, 2025
fd34ff7
(feat): process and return log search response as floats
mittalvaibhav1 Mar 8, 2025
8abba75
(feat): fix tests
mittalvaibhav1 Mar 8, 2025
1e23068
Adding sumologic scaler file
Anoop-Bansal Mar 9, 2025
21b9ea2
(feat): refactoring sumologic scaler
Anoop-Bansal Mar 10, 2025
1d026de
(feat): bug fixes
mittalvaibhav1 Mar 16, 2025
78944d4
(feat): adopt a logging frameworkin in sumologic package
mittalvaibhav1 Mar 16, 2025
43c0f27
(feat): fix tests..
mittalvaibhav1 Mar 16, 2025
0004f8b
(feat): fix metrics queries
mittalvaibhav1 Mar 16, 2025
ce545ca
(feat): bug fixes
Anoop-Bansal Mar 16, 2025
2a2de31
(feat): bug fixes
Anoop-Bansal Mar 16, 2025
3a28a88
(feat): bug fixes
Anoop-Bansal Mar 16, 2025
13777c4
(feat): bug fixes
Anoop-Bansal Mar 16, 2025
7f6e2b0
(feat): bug fixes
Anoop-Bansal Mar 16, 2025
0e3a2a9
(feat): fixing metric name
Anoop-Bansal Mar 16, 2025
2c52cfa
(feat): Adding resultfield and rollup for query result method
Anoop-Bansal Mar 16, 2025
f9c40ca
(feat): allow users to specify result field and rollup type
mittalvaibhav1 Mar 16, 2025
2a92abd
(feat): use cookies as recommended
mittalvaibhav1 Mar 16, 2025
84d2f40
(feat): Updating test cases for sumologic_scaler
Anoop-Bansal Mar 16, 2025
f75092e
(feat): Add support for multi metrics queries
mittalvaibhav1 Apr 21, 2025
25e03dd
(feat): fix tests
mittalvaibhav1 Apr 21, 2025
4b78137
(feat): bug fixes
mittalvaibhav1 Apr 21, 2025
2943ca0
(feat): fix typo + readability improvements
mittalvaibhav1 Apr 21, 2025
e26fb06
(feat): pre-commit changes + add tests
mittalvaibhav1 Apr 26, 2025
b37848a
(feat): Fix PR checks
mittalvaibhav1 Apr 26, 2025
b69b609
(feat): Update changelog
mittalvaibhav1 Apr 27, 2025
3913f63
(feat): PR checks
mittalvaibhav1 Apr 27, 2025
eefff32
(feat): use TypedConfig pattern
mittalvaibhav1 Apr 30, 2025
5f94c97
(feat): remove unnecessary fields
mittalvaibhav1 May 1, 2025
06b7443
(feat): Use builder
mittalvaibhav1 May 18, 2025
87159e9
(feat): Address comments
mittalvaibhav1 May 24, 2025
a97ae1d
(feat): Fix e2e
mittalvaibhav1 May 24, 2025
b4fdba7
(feat): Address comments
mittalvaibhav1 Jul 7, 2025
deab2e9
(feat): Trigger tests again
mittalvaibhav1 Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- Enable support on s390x for KEDA ([#6543](https://github.com/kedacore/keda/issues/6543))
- **General**: Introduce new Sumo Logic Scaler ([#6734](https://github.com/kedacore/keda/issues/6734))

#### Experimental

Expand Down
124 changes: 124 additions & 0 deletions pkg/scalers/sumologic/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sumologic

import (
"encoding/json"
"fmt"
"strconv"
"time"

"go.uber.org/zap"
)

const (
searchJobPath = "api/v1/search/jobs"
stateDone = "DONE GATHERING RESULTS"
stateCancelled = "CANCELLED"
statePaused = "FORCE PAUSED"
)

func (c *Client) createLogSearchJob(query, from, to, tz string) (string, error) {
requestPayload := LogSearchRequest{
Query: query,
From: from,
To: to,
TimeZone: tz,
}

payload, err := json.Marshal(requestPayload)
if err != nil {
return "", fmt.Errorf("failed to marshal log search request: %w", err)
}

url := fmt.Sprintf("%s/%s", c.config.Host, searchJobPath)
resp, err := c.makeRequest("POST", url, payload)
if err != nil {
return "", err
}

var jobResp LogSearchJobResponse
if err := json.Unmarshal(resp, &jobResp); err != nil {
return "", err
}

return jobResp.ID, nil
}

func (c *Client) waitForLogSearchJobCompletion(jobID string) (*LogSearchJobStatus, error) {
url := fmt.Sprintf("%s/%s/%s", c.config.Host, searchJobPath, jobID)

for {
resp, err := c.makeRequest("GET", url, nil)
if err != nil {
return nil, err
}

var status LogSearchJobStatus
if err := json.Unmarshal(resp, &status); err != nil {
return nil, err
}

c.logger.Debug("log search job state", zap.String("state", status.State), zap.Int("recordCount", status.RecordCount))

if status.State == stateDone {
return &status, nil
} else if status.State == stateCancelled || status.State == statePaused {
return nil, fmt.Errorf("search job failed, state: %s", status.State)
}

time.Sleep(1 * time.Second)
}
}

func (c *Client) getLogSearchRecords(jobID string, totalRecords int, resultField string) ([]float64, error) {
var allRecords []float64
offset := 0
limit := 10000

for offset < totalRecords {
remaining := totalRecords - offset
if remaining < limit {
limit = remaining
}

url := fmt.Sprintf("%s/%s/%s/records?offset=%d&limit=%d", c.config.Host, searchJobPath, jobID, offset, limit)
resp, err := c.makeRequest("GET", url, nil)
if err != nil {
return nil, err
}

var recordsResponse LogSearchRecordsResponse
if err := json.Unmarshal(resp, &recordsResponse); err != nil {
return nil, err
}

if len(recordsResponse.Records) == 0 {
break
}

for _, record := range recordsResponse.Records {
if result, exists := record.Map[resultField]; exists {
val, err := strconv.ParseFloat(result, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse resultField: %s value %w", resultField, err)
}
allRecords = append(allRecords, val)
}
}
offset += limit
}

c.logger.Debug("log search total records fetched", zap.Int("totalRecords", len(allRecords)))

return allRecords, nil
}

func (c *Client) deleteLogSearchJob(jobID string) error {
url := fmt.Sprintf("%s/%s/%s", c.config.Host, searchJobPath, jobID)

_, err := c.makeRequest("DELETE", url, nil)
if err == nil {
c.logger.Debug("log search job deleted", zap.String("jobID", jobID))
}

return err
}
172 changes: 172 additions & 0 deletions pkg/scalers/sumologic/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package sumologic

import (
"encoding/json"
"errors"
"fmt"
"time"
)

const (
metricsQueryPath = "api/v1/metricsQueries"
)

var validRollupTypes = map[string]bool{
"Avg": true,
"Sum": true,
"Min": true,
"Max": true,
"Count": true,
}

var validQueryAggregations = map[string]bool{
"Latest": true,
"Avg": true,
"Sum": true,
"Min": true,
"Max": true,
"Count": true,
}

func (c *Client) createMetricsQuery(query string, quantization time.Duration, from, to, rollup string) ([]byte, error) {
metricsQuery := MetricsQuery{
RowID: "A",
Query: query,
Quantization: int64(quantization / time.Millisecond),
Rollup: rollup,
}

timeRange := TimeRange{
Type: "BeginBoundedTimeRange",
From: TimeRangeBoundary{
Type: "Iso8601TimeRangeBoundary",
Iso8601Time: from,
},
To: TimeRangeBoundary{
Type: "Iso8601TimeRangeBoundary",
Iso8601Time: to,
},
}

requestPayload := MetricsQueryRequest{
Queries: []MetricsQuery{metricsQuery},
TimeRange: timeRange,
}

payload, err := json.Marshal(requestPayload)
if err != nil {
return nil, fmt.Errorf("failed to marshal metrics query request: %w", err)
}

url := fmt.Sprintf("%s/%s", c.config.Host, metricsQueryPath)
return c.makeRequest("POST", url, payload)
}

func (c *Client) createMultiMetricsQuery(queries map[string]string, quantization time.Duration, from, to, rollup string) ([]byte, error) {
metricsQueries := make([]MetricsQuery, 0)
for rowID, query := range queries {
metricsQuery := MetricsQuery{
RowID: rowID,
Query: query,
Quantization: int64(quantization / time.Millisecond),
Rollup: rollup,
}
metricsQueries = append(metricsQueries, metricsQuery)
}

timeRange := TimeRange{
Type: "BeginBoundedTimeRange",
From: TimeRangeBoundary{
Type: "Iso8601TimeRangeBoundary",
Iso8601Time: from,
},
To: TimeRangeBoundary{
Type: "Iso8601TimeRangeBoundary",
Iso8601Time: to,
},
}

requestPayload := MetricsQueryRequest{
Queries: metricsQueries,
TimeRange: timeRange,
}

payload, err := json.Marshal(requestPayload)
if err != nil {
return nil, fmt.Errorf("failed to marshal metrics query request: %w", err)
}

url := fmt.Sprintf("%s/%s", c.config.Host, metricsQueryPath)
return c.makeRequest("POST", url, payload)
}

func (c *Client) parseMetricsQueryResponse(response []byte) (*MetricsQueryResponse, error) {
var metricsResponse MetricsQueryResponse
if err := json.Unmarshal(response, &metricsResponse); err != nil {
return nil, fmt.Errorf("failed to unmarshal metrics query response: %w", err)
}

if metricsResponse.Errors != nil && len(metricsResponse.Errors.Errors) > 0 {
return nil, fmt.Errorf("query execution failed with errors %v", metricsResponse.Errors.Errors)
}

return &metricsResponse, nil
}

func (c *Client) metricsStats(values []float64, dimension string) (*float64, error) {
if len(values) == 0 {
return nil, errors.New("no values provided")
}

var result float64
switch dimension {
case "Latest":
result = values[len(values)-1]
case "Sum":
for _, v := range values {
result += v
}
case "Count":
result = float64(len(values))
case "Avg":
var sum float64
for _, v := range values {
sum += v
}
result = sum / float64(len(values))
case "Min":
minVal := values[0]
for _, v := range values[1:] {
if v < minVal {
minVal = v
}
}
result = minVal
case "Max":
maxVal := values[0]
for _, v := range values[1:] {
if v > maxVal {
maxVal = v
}
}
result = maxVal
default:
return nil, fmt.Errorf("invalid aggregation '%s', supported values: Latest, Avg, Sum, Count, Min, Max", dimension)
}

return &result, nil
}

func IsValidRollupType(rollup string) error {
if !validRollupTypes[rollup] {
return fmt.Errorf("invalid rollup value: %s, must be one of Avg, Sum, Min, Max, Count", rollup)
}
return nil
}

func IsValidQueryAggregation(aggregation string) error {
if !validQueryAggregations[aggregation] {
return fmt.Errorf("invalid aggregation '%s', supported values: Latest, Avg, Sum, Count, Min, Max", aggregation)
}
return nil
}
80 changes: 80 additions & 0 deletions pkg/scalers/sumologic/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sumologic

import "time"

type Query struct {
Type string
Query string
Queries map[string]string
ResultQueryRowID string
Quantization time.Duration
Rollup string
ResultField string
TimeRange time.Duration
Timezone string
Aggregator string
}

type QueryBuilder struct {
query Query
}

func NewQueryBuilder() *QueryBuilder {
return &QueryBuilder{
query: Query{},
}
}

func (qb *QueryBuilder) Type(qtype string) *QueryBuilder {
qb.query.Type = qtype
return qb
}

func (qb *QueryBuilder) Query(query string) *QueryBuilder {
qb.query.Query = query
return qb
}

func (qb *QueryBuilder) Queries(queries map[string]string) *QueryBuilder {
qb.query.Queries = queries
return qb
}

func (qb *QueryBuilder) ResultQueryRowID(resultQueryRowID string) *QueryBuilder {
qb.query.ResultQueryRowID = resultQueryRowID
return qb
}

func (qb *QueryBuilder) Quantization(qtz time.Duration) *QueryBuilder {
qb.query.Quantization = qtz
return qb
}

func (qb *QueryBuilder) Rollup(rollup string) *QueryBuilder {
qb.query.Rollup = rollup
return qb
}

func (qb *QueryBuilder) ResultField(resultField string) *QueryBuilder {
qb.query.ResultField = resultField
return qb
}

func (qb *QueryBuilder) TimeRange(timerange time.Duration) *QueryBuilder {
qb.query.TimeRange = timerange
return qb
}

func (qb *QueryBuilder) Timezone(timezone string) *QueryBuilder {
qb.query.Timezone = timezone
return qb
}

func (qb *QueryBuilder) Aggregator(aggregator string) *QueryBuilder {
qb.query.Aggregator = aggregator
return qb
}

func (qb *QueryBuilder) Build() Query {
return qb.query
}
Loading
Loading