Skip to content

Solace Scaler: Enhancement to support a hostlist of Solace brokers #6541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
424edc1
Solace Scaler: Enhancement to support a hostlist of Solace brokers
dpochopsky Feb 12, 2025
22f8ae9
Merge branch 'main' into solace_scaler/hostlist-enhancement
dpochopsky Feb 12, 2025
97e35e5
Add feature enhancement for hostlist to CHANGELOG
dpochopsky Feb 12, 2025
6824360
Merge branch 'solace_scaler/hostlist-enhancement' of github.com:dpoch…
dpochopsky Feb 12, 2025
3cdcf33
Update scaler to use updated metadata field names
dpochopsky Feb 12, 2025
09cda29
Update test cases to deal with hostlists
dpochopsky Feb 12, 2025
5398636
Update test cases for hostlist feature
dpochopsky Feb 12, 2025
4d38019
Remove print statement as requested and update logs
dpochopsky Feb 12, 2025
e0dcd45
Fix test errors
dpochopsky Feb 12, 2025
901d2d5
remove unused variable
dpochopsky Feb 12, 2025
ee0fb16
remove unused package
dpochopsky Feb 12, 2025
94f90d3
remove unused strings package
dpochopsky Feb 12, 2025
76d9309
Merge branch 'main' into solace_scaler/hostlist-enhancement
dpochopsky Feb 18, 2025
aa64f4a
maintain state on last successful connection to host
dpochopsky Feb 18, 2025
92be34b
fix checkin errors with fmt
dpochopsky Feb 18, 2025
092f83a
fix checkin errors with fmt
dpochopsky Feb 18, 2025
cd14b80
fix checkin errors with fmt
dpochopsky Feb 18, 2025
de8d346
run go fmt against code
dpochopsky Feb 18, 2025
83a7d12
Merge branch 'main' into solace_scaler/hostlist-enhancement
dpochopsky Feb 19, 2025
c66cb8d
Link Issue #6566 to changelog
dpochopsky Feb 24, 2025
ab9be5f
Merge branch 'solace_scaler/hostlist-enhancement' of github.com:dpoch…
dpochopsky Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ Here is an overview of all new **experimental** features:
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))
- **Solace Scaler**: Add hostlist support for Solace brokers ([#6566](https://github.com/kedacore/keda/issues/6566))

### Fixes

Expand Down
163 changes: 132 additions & 31 deletions pkg/scalers/solace_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -24,6 +25,7 @@ const (
// REST ENDPOINT String Patterns
solaceSempQueryFieldURLSuffix = "?select=msgs,msgSpoolUsage,averageRxMsgRate"
solaceSempEndpointURLTemplate = "%s/%s/%s/monitor/msgVpns/%s/%ss/%s" + solaceSempQueryFieldURLSuffix
solaceSempVpnStateURLTemplate = "%s/%s/%s/monitor/msgVpns/%s" + "?select=state"

// SEMP REST API Context
solaceAPIName = "SEMP"
Expand Down Expand Up @@ -75,6 +77,8 @@ type SolaceScaler struct {
metadata *SolaceMetadata
httpClient *http.Client
logger logr.Logger
stateLock sync.Mutex
curHostIdx int
}

type SolaceMetadata struct {
Expand All @@ -84,7 +88,8 @@ type SolaceMetadata struct {
SolaceMetaSempBaseURL string `keda:"name=solaceSempBaseURL, order=triggerMetadata"`

// Full SEMP URL to target queue (CONSTRUCTED IN CODE)
EndpointURL string
endpointURLsList []string
vpnStateURLsList []string

// Solace Message VPN
MessageVpn string `keda:"name=messageVpn, order=triggerMetadata"`
Expand Down Expand Up @@ -119,20 +124,31 @@ func (s *SolaceMetadata) Validate() error {
return nil
}

// SEMP API Response Root Struct
type solaceSEMPResponse struct {
Collections solaceSEMPCollections `json:"collections"`
Data solaceSEMPData `json:"data"`
Meta solaceSEMPMetadata `json:"meta"`
// SEMP API Response Root Struct for VPNs
type solaceVpnSEMPResponse struct {
Data solaceVpnSEMPData `json:"data"`
Meta solaceSEMPMetadata `json:"meta"`
}

// SEMP API Response Queue Data Struct
type solaceVpnSEMPData struct {
State string `json:"state"`
}

// SEMP API Response Root Struct for Queues
type solaceQueueSEMPResponse struct {
Collections solaceQueueSEMPCollections `json:"collections"`
Data solaceQueueSEMPData `json:"data"`
Meta solaceSEMPMetadata `json:"meta"`
}

// SEMP API Response Collections Struct
type solaceSEMPCollections struct {
type solaceQueueSEMPCollections struct {
Msgs solaceSEMPMessages `json:"msgs"`
}

// SEMP API Response Queue Data Struct
type solaceSEMPData struct {
type solaceQueueSEMPData struct {
MsgSpoolUsage int `json:"msgSpoolUsage"`
MsgRcvRate int `json:"averageRxMsgRate"`
}
Expand Down Expand Up @@ -171,6 +187,7 @@ func NewSolaceScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metadata: solaceMetadata,
httpClient: httpClient,
logger: logger,
curHostIdx: 0,
}, nil
}

Expand All @@ -183,15 +200,26 @@ func parseSolaceMetadata(config *scalersconfig.ScalerConfig) (*SolaceMetadata, e
meta.triggerIndex = config.TriggerIndex

// Format Solace SEMP Queue Endpoint (REST URL)
meta.EndpointURL = fmt.Sprintf(
solaceSempEndpointURLTemplate,
meta.SolaceMetaSempBaseURL,
solaceAPIName,
solaceAPIVersion,
meta.MessageVpn,
solaceAPIObjectTypeQueue,
url.QueryEscape(meta.QueueName),
)
sempURLs := strings.Split(meta.SolaceMetaSempBaseURL, ",")
for i := 0; i < len(sempURLs); i++ {
sempURL := strings.TrimSpace(sempURLs[i])
meta.endpointURLsList = append(meta.endpointURLsList, fmt.Sprintf(
solaceSempEndpointURLTemplate,
sempURL,
solaceAPIName,
solaceAPIVersion,
meta.MessageVpn,
solaceAPIObjectTypeQueue,
url.QueryEscape(meta.QueueName),
))
meta.vpnStateURLsList = append(meta.vpnStateURLsList, fmt.Sprintf(
solaceSempVpnStateURLTemplate,
sempURL,
solaceAPIName,
solaceAPIVersion,
meta.MessageVpn,
))
}

return meta, nil
}
Expand Down Expand Up @@ -245,18 +273,53 @@ func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec
return metricSpecList
}

// returns SolaceMetricValues struct populated from broker SEMP endpoint
func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (SolaceMetricValues, error) {
var scaledMetricEndpointURL = s.metadata.EndpointURL
func (s *SolaceScaler) getVpnState(ctx context.Context, sempURL string) (string, error) {
var httpClient = s.httpClient
var sempResponse solaceSEMPResponse
var sempResponse solaceVpnSEMPResponse

// Define HTTP Request
request, err := http.NewRequestWithContext(ctx, "GET", sempURL, nil)
if err != nil {
return "", fmt.Errorf("failed attempting request to solace semp api: %s url: %s", err.Error(), sempURL)
}

// Add HTTP Auth and Headers
request.SetBasicAuth(s.metadata.Username, s.metadata.Password)
request.Header.Set("Content-Type", "application/json")

// Call Solace SEMP API
response, err := httpClient.Do(request)
if err != nil {
return "", fmt.Errorf("call to solace semp api failed: %s, url: %s", err.Error(), sempURL)
}
defer response.Body.Close()

// Check HTTP Status Code
if response.StatusCode < 200 || response.StatusCode > 299 {
return "", fmt.Errorf("semp request http status code: %d - %s, url: %s", response.StatusCode, response.Status, sempURL)
}

// Decode SEMP Response and Test
if err := json.NewDecoder(response.Body).Decode(&sempResponse); err != nil {
return "", fmt.Errorf("failed to read semp response body: %s, url: %s", err.Error(), sempURL)
}

if sempResponse.Meta.ResponseCode < 200 || sempResponse.Meta.ResponseCode > 299 {
return "", fmt.Errorf("solace semp api returned error status: %d, URL: %s", sempResponse.Meta.ResponseCode, sempURL)
}

return sempResponse.Data.State, nil
}

func (s *SolaceScaler) getQueueMetrics(ctx context.Context, sempURL string) (SolaceMetricValues, error) {
var httpClient = s.httpClient
var sempResponse solaceQueueSEMPResponse
var metricValues SolaceMetricValues

// RETRIEVE METRICS FROM SOLACE SEMP API
// Define HTTP Request
request, err := http.NewRequestWithContext(ctx, "GET", scaledMetricEndpointURL, nil)
request, err := http.NewRequestWithContext(ctx, "GET", sempURL, nil)
if err != nil {
return SolaceMetricValues{}, fmt.Errorf("failed attempting request to solace semp api: %w", err)
return SolaceMetricValues{}, fmt.Errorf("failed attempting request to solace semp api: %s, url: %s", err.Error(), sempURL)
}

// Add HTTP Auth and Headers
Expand All @@ -266,40 +329,78 @@ func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (Solac
// Call Solace SEMP API
response, err := httpClient.Do(request)
if err != nil {
return SolaceMetricValues{}, fmt.Errorf("call to solace semp api failed: %w", err)
return SolaceMetricValues{}, fmt.Errorf("call to solace semp api failed: %s, url: %s", err.Error(), sempURL)
}
defer response.Body.Close()

// Check HTTP Status Code
if response.StatusCode < 200 || response.StatusCode > 299 {
sempError := fmt.Errorf("semp request http status code: %s - %s", strconv.Itoa(response.StatusCode), response.Status)
return SolaceMetricValues{}, sempError
return SolaceMetricValues{}, fmt.Errorf("semp request http status code: %d - %s, url: %s", response.StatusCode, response.Status, sempURL)
}

// Decode SEMP Response and Test
if err := json.NewDecoder(response.Body).Decode(&sempResponse); err != nil {
return SolaceMetricValues{}, fmt.Errorf("failed to read semp response body: %w", err)
return SolaceMetricValues{}, fmt.Errorf("failed to read semp response body: %s, url: %s", err.Error(), sempURL)
}

if sempResponse.Meta.ResponseCode < 200 || sempResponse.Meta.ResponseCode > 299 {
return SolaceMetricValues{}, fmt.Errorf("solace semp api returned error status: %d", sempResponse.Meta.ResponseCode)
return SolaceMetricValues{}, fmt.Errorf("solace semp api returned error status: %d, url: %s", sempResponse.Meta.ResponseCode, sempURL)
}

// Set Return Values
metricValues.msgCount = sempResponse.Collections.Msgs.Count
metricValues.msgSpoolUsage = sempResponse.Data.MsgSpoolUsage
metricValues.msgRcvRate = sempResponse.Data.MsgRcvRate

return metricValues, nil
}

// returns SolaceMetricValues struct populated from broker SEMP endpoint
func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (SolaceMetricValues, error) {
var metricValues SolaceMetricValues
var errorList []string

s.stateLock.Lock()
defer s.stateLock.Unlock()

// RETRIEVE METRICS FROM SOLACE SEMP API
for i := 0; i < len(s.metadata.endpointURLsList); i++ {
idx := (s.curHostIdx + i) % len(s.metadata.endpointURLsList)
s.curHostIdx = idx
sempQueueURL := s.metadata.endpointURLsList[idx]
sempVpnStateURL := s.metadata.vpnStateURLsList[idx]

vpnState, err := s.getVpnState(ctx, sempVpnStateURL)
if err != nil {
errorList = append(errorList, "Host "+strconv.Itoa(idx+1)+" Error: "+err.Error())
continue
}

if vpnState != "up" {
errorList = append(errorList, "Host "+strconv.Itoa(idx+1)+" Error: Message vpn is not up ("+vpnState+") url: "+sempVpnStateURL)
continue
}

metricValues, err = s.getQueueMetrics(ctx, sempQueueURL)
if err != nil {
errorList = append(errorList, "Host "+strconv.Itoa(idx+1)+" Error: "+err.Error())
continue
}

return metricValues, nil
}

return SolaceMetricValues{}, fmt.Errorf("unable to collect metrics, error(s): %s", strings.Join(errorList, "\n "))
}

// INTERFACE METHOD
// Call SEMP API to retrieve metrics
// returns value for named metric
// returns true if queue messageCount > 0 || msgSpoolUsage > 0
func (s *SolaceScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var metricValues, mv SolaceMetricValues
var mve error

if mv, mve = s.getSolaceQueueMetricsFromSEMP(ctx); mve != nil {
s.logger.Error(mve, "call to semp endpoint failed")
return []external_metrics.ExternalMetricValue{}, false, mve
}
metricValues = mv
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/solace_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"testing"

v2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -534,7 +532,7 @@ var testSolaceExpectedMetricNames = map[string]string{
func TestSolaceParseSolaceMetadata(t *testing.T) {
for _, testData := range testParseSolaceMetadata {
fmt.Print(testData.testID)
meta, err := parseSolaceMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: nil, TriggerMetadata: testData.metadata, AuthParams: nil, TriggerIndex: testData.triggerIndex})
_, err := parseSolaceMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: nil, TriggerMetadata: testData.metadata, AuthParams: nil, TriggerIndex: testData.triggerIndex})
switch {
case err != nil && !testData.isError:
t.Error("expected success but got error: ", err)
Expand All @@ -545,10 +543,6 @@ func TestSolaceParseSolaceMetadata(t *testing.T) {
default:
fmt.Println(" --> PASS")
}
if !testData.isError && strings.Contains(testData.metadata["queueName"], "/") && !strings.Contains(meta.EndpointURL, url.QueryEscape(testData.metadata["queueName"])) {
t.Error("expected endpointURL to query escape special characters in the URL but got:", meta.EndpointURL)
fmt.Println(" --> FAIL")
}
}
for _, testData := range testSolaceEnvCreds {
fmt.Print(testData.testID)
Expand Down