|
| 1 | +package scalers |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "encoding/json" |
| 6 | + "fmt" |
| 7 | + "net/http" |
| 8 | + "net/url" |
| 9 | + |
| 10 | + "github.com/go-logr/logr" |
| 11 | + "golang.org/x/sync/errgroup" |
| 12 | + v2 "k8s.io/api/autoscaling/v2" |
| 13 | + "k8s.io/metrics/pkg/apis/external_metrics" |
| 14 | + |
| 15 | + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" |
| 16 | + kedautil "github.com/kedacore/keda/v2/pkg/util" |
| 17 | +) |
| 18 | + |
| 19 | +type pipelineStatus string |
| 20 | + |
| 21 | +const ( |
| 22 | + // pipelinePendingStatus is the status of the pending pipelines. |
| 23 | + pipelinePendingStatus pipelineStatus = "pending" |
| 24 | + // pipelineWaitingForResourceStatus is the status of the pipelines that are waiting for resources. |
| 25 | + pipelineWaitingForResourceStatus pipelineStatus = "waiting_for_resource" |
| 26 | + // pipelineRunningStatus is the status of the running pipelines. |
| 27 | + pipelineRunningStatus pipelineStatus = "running" |
| 28 | + |
| 29 | + // maxGitlabAPIPageCount is the maximum number of pages to query for pipelines. |
| 30 | + maxGitlabAPIPageCount = 50 |
| 31 | + // gitlabAPIPerPage is the number of pipelines to query per page. |
| 32 | + gitlabAPIPerPage = "200" |
| 33 | +) |
| 34 | + |
| 35 | +type gitlabRunnerScaler struct { |
| 36 | + metricType v2.MetricTargetType |
| 37 | + metadata *gitlabRunnerMetadata |
| 38 | + httpClient *http.Client |
| 39 | + logger logr.Logger |
| 40 | +} |
| 41 | + |
| 42 | +type gitlabRunnerMetadata struct { |
| 43 | + GitLabAPIURL *url.URL `keda:"name=gitlabAPIURL, order=triggerMetadata, default=https://gitlab.com, optional"` |
| 44 | + PersonalAccessToken string `keda:"name=personalAccessToken, order=authParams"` |
| 45 | + ProjectID string `keda:"name=projectID, order=triggerMetadata"` |
| 46 | + |
| 47 | + TargetPipelineQueueLength int64 `keda:"name=targetPipelineQueueLength, order=triggerMetadata, default=1, optional"` |
| 48 | + TriggerIndex int |
| 49 | +} |
| 50 | + |
| 51 | +// NewGitLabRunnerScaler creates a new GitLab Runner Scaler |
| 52 | +func NewGitLabRunnerScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { |
| 53 | + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) |
| 54 | + |
| 55 | + metricType, err := GetMetricTargetType(config) |
| 56 | + if err != nil { |
| 57 | + return nil, fmt.Errorf("error getting scaler metric type: %w", err) |
| 58 | + } |
| 59 | + |
| 60 | + meta, err := parseGitLabRunnerMetadata(config) |
| 61 | + if err != nil { |
| 62 | + return nil, fmt.Errorf("error parsing GitLab Runner metadata: %w", err) |
| 63 | + } |
| 64 | + |
| 65 | + return &gitlabRunnerScaler{ |
| 66 | + metricType: metricType, |
| 67 | + metadata: meta, |
| 68 | + httpClient: httpClient, |
| 69 | + logger: InitializeLogger(config, "gitlab_runner_scaler"), |
| 70 | + }, nil |
| 71 | +} |
| 72 | + |
| 73 | +func parseGitLabRunnerMetadata(config *scalersconfig.ScalerConfig) (*gitlabRunnerMetadata, error) { |
| 74 | + meta := gitlabRunnerMetadata{} |
| 75 | + |
| 76 | + meta.TriggerIndex = config.TriggerIndex |
| 77 | + if err := config.TypedConfig(&meta); err != nil { |
| 78 | + return nil, fmt.Errorf("error parsing gitlabRunner metadata: %w", err) |
| 79 | + } |
| 80 | + |
| 81 | + return &meta, nil |
| 82 | +} |
| 83 | + |
| 84 | +func (s *gitlabRunnerScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { |
| 85 | + // Get the number of pending, waiting, and running for resource pipelines |
| 86 | + eg, ctx := errgroup.WithContext(ctx) |
| 87 | + |
| 88 | + getLen := func(status pipelineStatus, length *int64) func() error { |
| 89 | + return func() error { |
| 90 | + uri := constructGitlabAPIPipelinesURL(*s.metadata.GitLabAPIURL, s.metadata.ProjectID, status) |
| 91 | + var err error |
| 92 | + *length, err = s.getPipelineQueueLength(ctx, uri) |
| 93 | + return err |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + var pendingLen, waitingForResourceLen, runningLen int64 |
| 98 | + |
| 99 | + eg.Go(getLen(pipelinePendingStatus, &pendingLen)) |
| 100 | + eg.Go(getLen(pipelineWaitingForResourceStatus, &waitingForResourceLen)) |
| 101 | + eg.Go(getLen(pipelineRunningStatus, &runningLen)) |
| 102 | + |
| 103 | + err := eg.Wait() |
| 104 | + if err != nil { |
| 105 | + s.logger.Error(err, "error getting pipeline queue length") |
| 106 | + return []external_metrics.ExternalMetricValue{}, false, err |
| 107 | + } |
| 108 | + |
| 109 | + queueLen := pendingLen + waitingForResourceLen + runningLen |
| 110 | + |
| 111 | + metric := GenerateMetricInMili(metricName, float64(queueLen)) |
| 112 | + |
| 113 | + return []external_metrics.ExternalMetricValue{metric}, queueLen >= s.metadata.TargetPipelineQueueLength, nil |
| 114 | +} |
| 115 | + |
| 116 | +func (s *gitlabRunnerScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { |
| 117 | + externalMetric := &v2.ExternalMetricSource{ |
| 118 | + Metric: v2.MetricIdentifier{ |
| 119 | + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("gitlab-runner-%s", s.metadata.ProjectID))), |
| 120 | + }, |
| 121 | + Target: GetMetricTarget(s.metricType, s.metadata.TargetPipelineQueueLength), |
| 122 | + } |
| 123 | + metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} |
| 124 | + return []v2.MetricSpec{metricSpec} |
| 125 | +} |
| 126 | + |
| 127 | +func (s *gitlabRunnerScaler) Close(_ context.Context) error { |
| 128 | + if s.httpClient != nil { |
| 129 | + s.httpClient.CloseIdleConnections() |
| 130 | + } |
| 131 | + return nil |
| 132 | +} |
| 133 | + |
| 134 | +func constructGitlabAPIPipelinesURL(baseURL url.URL, projectID string, status pipelineStatus) url.URL { |
| 135 | + baseURL.Path = "/api/v4/projects/" + projectID + "/pipelines" |
| 136 | + |
| 137 | + qParams := baseURL.Query() |
| 138 | + qParams.Set("status", string(status)) |
| 139 | + qParams.Set("per_page", gitlabAPIPerPage) |
| 140 | + |
| 141 | + baseURL.RawQuery = qParams.Encode() |
| 142 | + |
| 143 | + return baseURL |
| 144 | +} |
| 145 | + |
| 146 | +// getPipelineCount returns the number of pipelines in the GitLab project (per the page set in url) |
| 147 | +func (s *gitlabRunnerScaler) getPipelineCount(ctx context.Context, uri string) (int64, error) { |
| 148 | + req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil) |
| 149 | + if err != nil { |
| 150 | + return 0, fmt.Errorf("creating request: %w", err) |
| 151 | + } |
| 152 | + |
| 153 | + req.Header.Set("Accept", "application/json") |
| 154 | + req.Header.Set("Content-Type", "application/json") |
| 155 | + req.Header.Set("PRIVATE-TOKEN", s.metadata.PersonalAccessToken) |
| 156 | + |
| 157 | + res, err := s.httpClient.Do(req) |
| 158 | + if err != nil { |
| 159 | + return 0, fmt.Errorf("doing request: %w", err) |
| 160 | + } |
| 161 | + defer res.Body.Close() |
| 162 | + |
| 163 | + if res.StatusCode != http.StatusOK { |
| 164 | + return 0, fmt.Errorf("unexpected status code: %d", res.StatusCode) |
| 165 | + } |
| 166 | + |
| 167 | + gitlabPipelines := make([]struct{}, 0) |
| 168 | + if err := json.NewDecoder(res.Body).Decode(&gitlabPipelines); err != nil { |
| 169 | + return 0, fmt.Errorf("decoding response: %w", err) |
| 170 | + } |
| 171 | + |
| 172 | + return int64(len(gitlabPipelines)), nil |
| 173 | +} |
| 174 | + |
| 175 | +// getPipelineQueueLength returns the number of pipelines in the |
| 176 | +// GitLab project that are waiting for resources. |
| 177 | +func (s *gitlabRunnerScaler) getPipelineQueueLength(ctx context.Context, baseURL url.URL) (int64, error) { |
| 178 | + var count int64 |
| 179 | + |
| 180 | + page := 1 |
| 181 | + for ; page < maxGitlabAPIPageCount; page++ { |
| 182 | + pagedURL := pagedURL(baseURL, fmt.Sprint(page)) |
| 183 | + |
| 184 | + gitlabPipelinesLen, err := s.getPipelineCount(ctx, pagedURL.String()) |
| 185 | + if err != nil { |
| 186 | + return 0, err |
| 187 | + } |
| 188 | + |
| 189 | + if gitlabPipelinesLen == 0 { |
| 190 | + break |
| 191 | + } |
| 192 | + |
| 193 | + count += gitlabPipelinesLen |
| 194 | + } |
| 195 | + |
| 196 | + return count, nil |
| 197 | +} |
| 198 | + |
| 199 | +func pagedURL(uri url.URL, page string) url.URL { |
| 200 | + qParams := uri.Query() |
| 201 | + qParams.Set("page", fmt.Sprint(page)) |
| 202 | + |
| 203 | + uri.RawQuery = qParams.Encode() |
| 204 | + |
| 205 | + return uri |
| 206 | +} |
0 commit comments