Skip to content

Refactor MSSQL scaler #6714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ require (
github.com/beanstalkd/go-beanstalk v0.2.0
github.com/bradleyfalzon/ghinstallation/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.16.0
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dysnix/predictkube-libs v0.0.4-0.20230109175007-5a82fccd31c7
github.com/dysnix/predictkube-proto v0.0.0-20241017230806-4c74c627f2bb
github.com/elastic/go-elasticsearch/v7 v7.17.10
Expand Down Expand Up @@ -128,6 +127,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.6.0
)

require github.com/microsoft/go-mssqldb v1.8.0 // indirect

replace (
// we need a version with a proper license
github.com/chzyer/logex => github.com/chzyer/logex v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down Expand Up @@ -1395,6 +1393,8 @@ github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81T
github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U=
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHSH/GzLMJeu5zhYVZSx5RQxGKm1h96s=
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY=
github.com/microsoft/go-mssqldb v1.8.0 h1:7cyZ/AT7ycDsEoWPIXibd+aVKFtteUNhDGf3aobP+tw=
github.com/microsoft/go-mssqldb v1.8.0/go.mod h1:6znkekS3T2vp0waiMhen4GPU1BiAsrP+iXHcE7a7rFo=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
Expand Down
228 changes: 67 additions & 161 deletions pkg/scalers/mssql_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,47 @@ package scalers
import (
"context"
"database/sql"
"errors"
"fmt"
"net"
"net/url"
"strconv"

// mssql driver required for this scaler
_ "github.com/denisenkom/go-mssqldb"
"github.com/go-logr/logr"
// Import the MS SQL driver so it can register itself with database/sql
_ "github.com/microsoft/go-mssqldb"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

var (
// ErrMsSQLNoQuery is returned when "query" is missing from the config.
ErrMsSQLNoQuery = errors.New("no query given")

// ErrMsSQLNoTargetValue is returned when "targetValue" is missing from the config.
ErrMsSQLNoTargetValue = errors.New("no targetValue given")
)

// mssqlScaler exposes a data pointer to mssqlMetadata and sql.DB connection
type mssqlScaler struct {
metricType v2.MetricTargetType
metadata *mssqlMetadata
connection *sql.DB
logger logr.Logger
}

// mssqlMetadata defines metadata used by KEDA to query a Microsoft SQL database
type mssqlMetadata struct {
// The connection string used to connect to the MSSQL database.
// Both URL syntax (sqlserver://host?database=dbName) and OLEDB syntax is supported.
// +optional
connectionString string
// The username credential for connecting to the MSSQL instance, if not specified in the connection string.
// +optional
username string
// The password credential for connecting to the MSSQL instance, if not specified in the connection string.
// +optional
password string
// The hostname of the MSSQL instance endpoint, if not specified in the connection string.
// +optional
host string
// The port number of the MSSQL instance endpoint, if not specified in the connection string.
// +optional
port int
// The name of the database to query, if not specified in the connection string.
// +optional
database string
// The T-SQL query to run against the target database - e.g. SELECT COUNT(*) FROM table.
// +required
query string
// The threshold that is used as targetAverageValue in the Horizontal Pod Autoscaler.
// +required
targetValue float64
// The threshold that is used in activation phase
// +optional
activationTargetValue float64
// The index of the scaler inside the ScaledObject
// +internal
triggerIndex int
ConnectionString string `keda:"name=connectionString, order=authParams;resolvedEnv, optional"`
Username string `keda:"name=username, order=authParams;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv, optional"`
Host string `keda:"name=host, order=authParams;triggerMetadata, optional"`
Port int `keda:"name=port, order=authParams;triggerMetadata, optional"`
Database string `keda:"name=database, order=authParams;triggerMetadata, optional"`
Query string `keda:"name=query, order=triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`

TriggerIndex int
}

func (m *mssqlMetadata) Validate() error {
if m.ConnectionString == "" && m.Host == "" {
return fmt.Errorf("must provide either connectionstring or host")
}
return nil
}

// NewMSSQLScaler creates a new mssql scaler
func NewMSSQLScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -80,158 +54,92 @@ func NewMSSQLScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

meta, err := parseMSSQLMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing mssql metadata: %w", err)
}

conn, err := newMSSQLConnection(meta, logger)
if err != nil {
return nil, fmt.Errorf("error establishing mssql connection: %w", err)
return nil, err
}

return &mssqlScaler{
scaler := &mssqlScaler{
metricType: metricType,
metadata: meta,
connection: conn,
logger: logger,
}, nil
}

// parseMSSQLMetadata takes a ScalerConfig and returns a mssqlMetadata or an error if the config is invalid
func parseMSSQLMetadata(config *scalersconfig.ScalerConfig) (*mssqlMetadata, error) {
meta := mssqlMetadata{}

// Query
if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, ErrMsSQLNoQuery
}

// Target query value
if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %w", err)
}
meta.targetValue = targetValue
} else {
if config.AsMetricSource {
meta.targetValue = 0
} else {
return nil, ErrMsSQLNoTargetValue
}
}

// Activation target value
meta.activationTargetValue = 0
if val, ok := config.TriggerMetadata["activationTargetValue"]; ok {
activationTargetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetValue parsing error %w", err)
}
meta.activationTargetValue = activationTargetValue
conn, err := newMSSQLConnection(scaler)
if err != nil {
return nil, fmt.Errorf("error establishing mssql connection: %w", err)
}

// Connection string, which can either be provided explicitly or via the helper fields
switch {
case config.AuthParams["connectionString"] != "":
meta.connectionString = config.AuthParams["connectionString"]
case config.TriggerMetadata["connectionStringFromEnv"] != "":
meta.connectionString = config.ResolvedEnv[config.TriggerMetadata["connectionStringFromEnv"]]
default:
meta.connectionString = ""
var err error

host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, err
}
meta.host = host

var paramPort string
paramPort, _ = GetFromAuthOrMeta(config, "port")
if paramPort != "" {
port, err := strconv.Atoi(paramPort)
if err != nil {
return nil, fmt.Errorf("port parsing error %w", err)
}
meta.port = port
}
scaler.connection = conn

meta.username, _ = GetFromAuthOrMeta(config, "username")
return scaler, nil
}

// database is optional in SQL s
meta.database, _ = GetFromAuthOrMeta(config, "database")
func parseMSSQLMetadata(config *scalersconfig.ScalerConfig) (*mssqlMetadata, error) {
meta := &mssqlMetadata{}
meta.TriggerIndex = config.TriggerIndex
if err := config.TypedConfig(meta); err != nil {
return nil, err
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
if !config.AsMetricSource && meta.TargetValue == 0 {
return nil, fmt.Errorf("no targetValue given")
}
meta.triggerIndex = config.TriggerIndex
return &meta, nil

return meta, nil
}

// newMSSQLConnection returns a new, opened SQL connection for the provided mssqlMetadata
func newMSSQLConnection(meta *mssqlMetadata, logger logr.Logger) (*sql.DB, error) {
connStr := getMSSQLConnectionString(meta)
func newMSSQLConnection(s *mssqlScaler) (*sql.DB, error) {
connStr := getMSSQLConnectionString(s)

db, err := sql.Open("sqlserver", connStr)
if err != nil {
logger.Error(err, fmt.Sprintf("Found error opening mssql: %s", err))
s.logger.Error(err, "Found error opening mssql")
return nil, err
}

err = db.Ping()
if err != nil {
logger.Error(err, fmt.Sprintf("Found error pinging mssql: %s", err))
s.logger.Error(err, "Found error pinging mssql")
return nil, err
}

return db, nil
}

// getMSSQLConnectionString returns a connection string from a mssqlMetadata
func getMSSQLConnectionString(meta *mssqlMetadata) string {
var connStr string

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
query := url.Values{}
if meta.database != "" {
query.Add("database", meta.database)
}
func getMSSQLConnectionString(s *mssqlScaler) string {
meta := s.metadata
if meta.ConnectionString != "" {
return meta.ConnectionString
}

connectionURL := &url.URL{Scheme: "sqlserver", RawQuery: query.Encode()}
if meta.username != "" {
if meta.password != "" {
connectionURL.User = url.UserPassword(meta.username, meta.password)
} else {
connectionURL.User = url.User(meta.username)
}
}
query := url.Values{}
if meta.Database != "" {
query.Add("database", meta.Database)
}

if meta.port > 0 {
connectionURL.Host = net.JoinHostPort(meta.host, fmt.Sprintf("%d", meta.port))
connectionURL := &url.URL{Scheme: "sqlserver", RawQuery: query.Encode()}
if meta.Username != "" {
if meta.Password != "" {
connectionURL.User = url.UserPassword(meta.Username, meta.Password)
} else {
connectionURL.Host = meta.host
connectionURL.User = url.User(meta.Username)
}
}

connStr = connectionURL.String()
if meta.Port > 0 {
connectionURL.Host = net.JoinHostPort(meta.Host, fmt.Sprintf("%d", meta.Port))
} else {
connectionURL.Host = meta.Host
}

return connStr
return connectionURL.String()
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *mssqlScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "mssql"),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, "mssql"),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.targetValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.TargetValue),
}

metricSpec := v2.MetricSpec{
Expand All @@ -241,7 +149,6 @@ func (s *mssqlScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns a value for a supported metric or an error if there is a problem getting the metric
func (s *mssqlScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
Expand All @@ -250,13 +157,13 @@ func (s *mssqlScaler) GetMetricsAndActivity(ctx context.Context, metricName stri

metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.activationTargetValue, nil
return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}

// getQueryResult returns the result of the scaler query
func (s *mssqlScaler) getQueryResult(ctx context.Context) (float64, error) {
var value float64
err := s.connection.QueryRowContext(ctx, s.metadata.query).Scan(&value)

err := s.connection.QueryRowContext(ctx, s.metadata.Query).Scan(&value)
switch {
case err == sql.ErrNoRows:
value = 0
Expand All @@ -268,7 +175,6 @@ func (s *mssqlScaler) getQueryResult(ctx context.Context) (float64, error) {
return value, nil
}

// Close closes the mssql database connections
func (s *mssqlScaler) Close(context.Context) error {
err := s.connection.Close()
if err != nil {
Expand Down
Loading
Loading