Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support to enable backoff when rate limited by the Github API #6644

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Here is an overview of all new **experimental** features:
- **Elasticsearch Scaler**: Support IgnoreNullValues at Elasticsearch scaler ([#6599](https://github.com/kedacore/keda/pull/6599))
- **GitHub Scaler**: Add support to use ETag for conditional requests against the Github API ([#6503](https://github.com/kedacore/keda/issues/6503))
- **GitHub Scaler**: Filter workflows via query parameter for improved queue count accuracy ([#6519](https://github.com/kedacore/keda/pull/6519))
- **Github Scaler**: Implement backoff when receive rate limit errors ([#6643](https://github.com/kedacore/keda/issues/6643))
- **IBMMQ Scaler**: Handling StatusNotFound in IBMMQ scaler ([#6472](https://github.com/kedacore/keda/pull/6472))
- **MongoDB Scaler**: Support float queryValue for MongoDB scaler ([#6574](https://github.com/kedacore/keda/issues/6574))
- **RabbitMQ Scaler**: Support use of the ‘vhostName’ parameter in the ‘TriggerAuthentication’ resource ([#6369](https://github.com/kedacore/keda/issues/6369))
Expand Down
86 changes: 79 additions & 7 deletions pkg/scalers/github_runner_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type githubRunnerScaler struct {
previousRepos []string
previousWfrs map[string]map[string]*WorkflowRuns
previousJobs map[string][]Job
rateLimit RateLimit
}

type githubRunnerMetadata struct {
Expand All @@ -49,6 +50,7 @@ type githubRunnerMetadata struct {
labels []string
noDefaultLabels bool
enableEtags bool
enableBackoff bool
targetWorkflowQueueLength int64
triggerIndex int
applicationID *int64
Expand Down Expand Up @@ -331,6 +333,12 @@ type Job struct {
HeadBranch string `json:"head_branch"`
}

type RateLimit struct {
Remaining int `json:"remaining"`
ResetTime time.Time `json:"resetTime"`
RetryAfterTime time.Time `json:"retryAfterTime"`
}

// NewGitHubRunnerScaler creates a new GitHub Runner Scaler
func NewGitHubRunnerScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)
Expand Down Expand Up @@ -359,6 +367,7 @@ func NewGitHubRunnerScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
previousRepos := []string{}
previousJobs := make(map[string][]Job)
previousWfrs := make(map[string]map[string]*WorkflowRuns)
rateLimit := RateLimit{}

return &githubRunnerScaler{
metricType: metricType,
Expand All @@ -369,6 +378,7 @@ func NewGitHubRunnerScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
previousRepos: previousRepos,
previousJobs: previousJobs,
previousWfrs: previousWfrs,
rateLimit: rateLimit,
}, nil
}

Expand Down Expand Up @@ -452,6 +462,12 @@ func parseGitHubRunnerMetadata(config *scalersconfig.ScalerConfig) (*githubRunne
meta.enableEtags = false
}

if val, err := getBoolValueFromMetaOrEnv("enableBackoff", config.TriggerMetadata, config.ResolvedEnv); err == nil {
meta.enableBackoff = val
} else {
meta.enableBackoff = false
}

if val, err := getValueFromMetaOrEnv("repos", config.TriggerMetadata, config.ResolvedEnv); err == nil && val != "" {
meta.repos = strings.Split(val, ",")
}
Expand Down Expand Up @@ -579,6 +595,32 @@ func (s *githubRunnerScaler) getRepositories(ctx context.Context) ([]string, err
return repoList, nil
}

func (s *githubRunnerScaler) getRateLimit(header http.Header) RateLimit {
var retryAfterTime time.Time

remaining, _ := strconv.Atoi(header.Get("X-RateLimit-Remaining"))
reset, _ := strconv.ParseInt(header.Get("X-RateLimit-Reset"), 10, 64)
resetTime := time.Unix(reset, 0)

if retryAfterStr := header.Get("Retry-After"); retryAfterStr != "" {
if retrySeconds, err := strconv.Atoi(retryAfterStr); err == nil {
retryAfterTime = time.Now().Add(time.Duration(retrySeconds) * time.Second)
}
}

if retryAfterTime.IsZero() {
s.logger.V(1).Info(fmt.Sprintf("Github API rate limit: Remaining: %d, ResetTime: %s", remaining, resetTime))
} else {
s.logger.V(1).Info(fmt.Sprintf("Github API rate limit: Remaining: %d, ResetTime: %s, Retry-After: %s", remaining, resetTime, retryAfterTime))
}

return RateLimit{
Remaining: remaining,
ResetTime: resetTime,
RetryAfterTime: retryAfterTime,
}
}

func (s *githubRunnerScaler) getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMetadata, httpClient *http.Client) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
Expand Down Expand Up @@ -609,19 +651,22 @@ func (s *githubRunnerScaler) getGithubRequest(ctx context.Context, url string, m
}
_ = r.Body.Close()

if r.Header.Get("X-RateLimit-Remaining") != "" {
s.rateLimit = s.getRateLimit(r.Header)
}

if r.StatusCode != 200 {
if r.StatusCode == 304 && s.metadata.enableEtags {
s.logger.V(1).Info(fmt.Sprintf("The github rest api for the url: %s returned status %d %s", url, r.StatusCode, http.StatusText(r.StatusCode)))
return []byte{}, r.StatusCode, nil
}

if r.Header.Get("X-RateLimit-Remaining") != "" {
githubAPIRemaining, _ := strconv.Atoi(r.Header.Get("X-RateLimit-Remaining"))
if s.rateLimit.Remaining == 0 && !s.rateLimit.ResetTime.IsZero() {
return []byte{}, r.StatusCode, fmt.Errorf("GitHub API rate limit exceeded, reset time %s", s.rateLimit.ResetTime)
}

if githubAPIRemaining == 0 {
resetTime, _ := strconv.ParseInt(r.Header.Get("X-RateLimit-Reset"), 10, 64)
return []byte{}, r.StatusCode, fmt.Errorf("GitHub API rate limit exceeded, resets at %s", time.Unix(resetTime, 0))
}
if !s.rateLimit.RetryAfterTime.IsZero() && time.Now().Before(s.rateLimit.RetryAfterTime) {
return []byte{}, r.StatusCode, fmt.Errorf("GitHub API rate limit exceeded, retry after %s", s.rateLimit.RetryAfterTime)
}

return []byte{}, r.StatusCode, fmt.Errorf("the GitHub REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
Expand Down Expand Up @@ -780,9 +825,36 @@ func (s *githubRunnerScaler) GetWorkflowQueueLength(ctx context.Context) (int64,
return queueCount, nil
}

func (s *githubRunnerScaler) shouldWaitForRateLimit() (bool, time.Duration) {
if s.rateLimit.Remaining == 0 && !s.rateLimit.ResetTime.IsZero() && time.Now().Before(s.rateLimit.ResetTime) {
reset := time.Until(s.rateLimit.ResetTime)
s.logger.V(1).Info(fmt.Sprintf("Rate limit exceeded, resets at %s, waiting for %s", s.rateLimit.ResetTime, reset))
return true, reset
}

if !s.rateLimit.RetryAfterTime.IsZero() && time.Now().Before(s.rateLimit.RetryAfterTime) {
retry := time.Until(s.rateLimit.RetryAfterTime)
s.logger.V(1).Info(fmt.Sprintf("Rate limit exceeded, retry after %s, waiting for %s", s.rateLimit.RetryAfterTime, retry))
return true, retry
}

return false, 0
}

func (s *githubRunnerScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueLen, err := s.GetWorkflowQueueLength(ctx)
if s.metadata.enableBackoff {
wait, waitDuration := s.shouldWaitForRateLimit()
if wait {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case <-time.After(waitDuration):
// Proceed after wait
}
}
}

queueLen, err := s.GetWorkflowQueueLength(ctx)
if err != nil {
s.logger.Error(err, "error getting workflow queue length")
return []external_metrics.ExternalMetricValue{}, false, err
Expand Down
43 changes: 43 additions & 0 deletions pkg/scalers/github_runner_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func apiStubHandlerCustomJob(hasRateLeft bool, exceeds30Repos bool, jobResponse
} else {
w.Header().Set("X-RateLimit-Remaining", "0")
w.WriteHeader(http.StatusForbidden)
return
}
if strings.HasSuffix(r.URL.String(), "jobs?per_page=100") {
// nosemgrep: no-direct-write-to-responsewriter
Expand Down Expand Up @@ -500,6 +501,48 @@ func TestNewGitHubRunnerScaler_QueueLength_SingleRepo_WithNotModified(t *testing
}
}

func TestNewGitHubRunnerScaler_ShouldWait_ResetTime(t *testing.T) {
mockGitHubRunnerScaler := githubRunnerScaler{
rateLimit: RateLimit{
Remaining: 0,
ResetTime: time.Now().Add(15 * time.Second),
RetryAfterTime: time.Now(),
},
}

wait, waitDuration := mockGitHubRunnerScaler.shouldWaitForRateLimit()

if !wait {
t.Fail()
}

expectedWait := 15 * time.Second
if waitDuration < expectedWait-1*time.Second || waitDuration > expectedWait+1*time.Second {
t.Fail()
}
}

func TestNewGitHubRunnerScaler_ShouldWait_RetryAfterTime(t *testing.T) {
mockGitHubRunnerScaler := githubRunnerScaler{
rateLimit: RateLimit{
Remaining: 0,
ResetTime: time.Now(),
RetryAfterTime: time.Now().Add(15 * time.Second),
},
}

wait, waitDuration := mockGitHubRunnerScaler.shouldWaitForRateLimit()

if !wait {
t.Fail()
}

expectedWait := 15 * time.Second
if waitDuration < expectedWait-1*time.Second || waitDuration > expectedWait+1*time.Second {
t.Fail()
}
}

func TestNewGitHubRunnerScaler_404(t *testing.T) {
var apiStub = apiStubHandler404()

Expand Down
1 change: 1 addition & 0 deletions tests/scalers/github_runner/github_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ spec:
labels: {{.Labels}}
runnerScopeFromEnv: "RUNNER_SCOPE"
enableEtags: "true"
enableBackoff: "true"
authenticationRef:
name: github-trigger-auth
`
Expand Down
Loading