Skip to content

Commit c85d911

Browse files
authored
Merge pull request #442 from buildkite/add-job-watcher
Add job watcher
2 parents 94ae79d + 1ea1b55 commit c85d911

File tree

17 files changed

+674
-93
lines changed

17 files changed

+674
-93
lines changed

.buildkite/rbac.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ rules:
3434
- pods/eviction
3535
verbs:
3636
- create
37+
- apiGroups:
38+
- ""
39+
resources:
40+
- events
41+
verbs:
42+
- list
3743
---
3844
apiVersion: v1
3945
kind: ServiceAccount

charts/agent-stack-k8s/templates/rbac.yaml.tpl

+6
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ rules:
3333
- pods/eviction
3434
verbs:
3535
- create
36+
- apiGroups:
37+
- ""
38+
resources:
39+
- events
40+
verbs:
41+
- list
3642
---
3743
apiVersion: rbac.authorization.k8s.io/v1
3844
kind: RoleBinding

cmd/controller/controller.go

+15
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ func AddConfigFlags(cmd *cobra.Command) {
9090
)
9191
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")
9292

93+
cmd.Flags().Duration(
94+
"stale-job-data-timeout",
95+
config.DefaultStaleJobDataTimeout,
96+
"Duration after querying jobs in Buildkite that the data is considered valid",
97+
)
98+
cmd.Flags().Int(
99+
"job-creation-concurrency",
100+
config.DefaultJobCreationConcurrency,
101+
"Number of concurrent goroutines to run for converting Buildkite jobs into Kubernetes jobs",
102+
)
93103
cmd.Flags().Duration(
94104
"image-pull-backoff-grace-period",
95105
config.DefaultImagePullBackOffGracePeriod,
@@ -100,6 +110,11 @@ func AddConfigFlags(cmd *cobra.Command) {
100110
config.DefaultJobCancelCheckerPollInterval,
101111
"Controls the interval between job state queries while a pod is still Pending",
102112
)
113+
cmd.Flags().Duration(
114+
"empty-job-grace-period",
115+
config.DefaultEmptyJobGracePeriod,
116+
"Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)",
117+
)
103118
cmd.Flags().Bool(
104119
"prohibit-kubernetes-plugin",
105120
false,

cmd/controller/controller_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) {
2626
JobTTL: 300 * time.Second,
2727
ImagePullBackOffGracePeriod: 60 * time.Second,
2828
JobCancelCheckerPollInterval: 10 * time.Second,
29+
EmptyJobGracePeriod: 50 * time.Second,
2930
PollInterval: 5 * time.Second,
3031
StaleJobDataTimeout: 10 * time.Second,
3132
JobCreationConcurrency: 5,

examples/config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest
44
job-ttl: 5m
55
image-pull-backoff-grace-period: 60s
66
job-cancel-checker-poll-interval: 10s
7+
empty-job-grace-period: 50s
78
poll-interval: 5s
89
stale-job-data-timeout: 10s
910
job-creation-concurrency: 5

internal/controller/config/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ const (
1515
BuildURLAnnotation = "buildkite.com/build-url"
1616
JobURLAnnotation = "buildkite.com/job-url"
1717
DefaultNamespace = "default"
18+
DefaultStaleJobDataTimeout = 10 * time.Second
1819
DefaultImagePullBackOffGracePeriod = 30 * time.Second
1920
DefaultJobCancelCheckerPollInterval = 5 * time.Second
21+
DefaultEmptyJobGracePeriod = 30 * time.Second
22+
DefaultJobCreationConcurrency = 5
2023
)
2124

2225
var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
@@ -49,6 +52,7 @@ type Config struct {
4952
PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"`
5053
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
5154
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
55+
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`
5256

5357
// WorkspaceVolume allows supplying a volume for /workspace. By default
5458
// an EmptyDir volume is created for it.

internal/controller/controller.go

+13
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,20 @@ func Run(
136136
logger.Fatal("failed to register completions informer", zap.Error(err))
137137
}
138138

139+
// JobWatcher watches for jobs in bad conditions to clean up:
140+
// * Jobs that fail without ever creating a pod
141+
// * Jobs that stall forever without ever creating a pod
142+
jobWatcher := scheduler.NewJobWatcher(
143+
logger.Named("jobWatcher"),
144+
k8sClient,
145+
cfg,
146+
)
147+
if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil {
148+
logger.Fatal("failed to register jobWatcher informer", zap.Error(err))
149+
}
150+
139151
// PodWatcher watches for other conditions to clean up pods:
152+
// * Pods where an init container failed for any reason
140153
// * Pods where a container is in ImagePullBackOff for too long
141154
// * Pods that are still pending, but the Buildkite job has been cancelled
142155
podWatcher := scheduler.NewPodWatcher(

internal/controller/monitor/monitor.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/Khan/genqlient/graphql"
1515
"github.com/buildkite/agent-stack-k8s/v2/api"
1616
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
17+
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
1718
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"
1819
"go.uber.org/zap"
1920
"k8s.io/client-go/kubernetes"
@@ -46,12 +47,12 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er
4647

4748
// Default StaleJobDataTimeout to 10s.
4849
if cfg.StaleJobDataTimeout <= 0 {
49-
cfg.StaleJobDataTimeout = 10 * time.Second
50+
cfg.StaleJobDataTimeout = config.DefaultStaleJobDataTimeout
5051
}
5152

5253
// Default CreationConcurrency to 5.
5354
if cfg.JobCreationConcurrency <= 0 {
54-
cfg.JobCreationConcurrency = 5
55+
cfg.JobCreationConcurrency = config.DefaultJobCreationConcurrency
5556
}
5657

5758
return &Monitor{

internal/controller/scheduler/completions.go

+15-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
77

88
"go.uber.org/zap"
9+
910
v1 "k8s.io/api/core/v1"
1011
kerrors "k8s.io/apimachinery/pkg/api/errors"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -16,9 +17,18 @@ import (
1617
"k8s.io/utils/ptr"
1718
)
1819

20+
const defaultTermGracePeriodSeconds = 60
21+
1922
type completionsWatcher struct {
2023
logger *zap.Logger
2124
k8s kubernetes.Interface
25+
26+
// This is the context passed to RegisterInformer.
27+
// It's being stored here (grrrr!) because the k8s ResourceEventHandler
28+
// interface doesn't have context args. (Working around an interface in a
29+
// library outside of our control is a carve-out from the usual rule.)
30+
// The context is needed to ensure goroutines are cleaned up.
31+
resourceEventHandlerCtx context.Context
2232
}
2333

2434
func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *completionsWatcher {
@@ -30,14 +40,12 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp
3040
}
3141

3242
// Creates a Pods informer and registers the handler on it
33-
func (w *completionsWatcher) RegisterInformer(
34-
ctx context.Context,
35-
factory informers.SharedInformerFactory,
36-
) error {
43+
func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
3744
informer := factory.Core().V1().Pods().Informer()
3845
if _, err := informer.AddEventHandler(w); err != nil {
3946
return err
4047
}
48+
w.resourceEventHandlerCtx = ctx // see note on field
4149
go factory.Start(ctx.Done())
4250
return nil
4351
}
@@ -49,7 +57,7 @@ func (w *completionsWatcher) OnDelete(obj any) {}
4957
func (w *completionsWatcher) OnAdd(obj any, isInInitialList bool) {
5058
completionWatcherOnAddEventCounter.Inc()
5159
pod := obj.(*v1.Pod)
52-
w.cleanupSidecars(pod)
60+
w.cleanupSidecars(w.resourceEventHandlerCtx, pod)
5361
}
5462

5563
func (w *completionsWatcher) OnUpdate(old any, new any) {
@@ -62,15 +70,15 @@ func (w *completionsWatcher) OnUpdate(old any, new any) {
6270
}
6371

6472
newPod := new.(*v1.Pod)
65-
w.cleanupSidecars(newPod)
73+
w.cleanupSidecars(w.resourceEventHandlerCtx, newPod)
6674
}
6775

6876
// cleanupSidecars first checks if the container status of the agent container
6977
// in the pod is Terminated. If so, it ensures the job is cleaned up by updating
7078
// it with an ActiveDeadlineSeconds value (defaultTermGracePeriodSeconds).
7179
// (So this is not actually sidecar-specific, but is needed because sidecars
7280
// would otherwise cause the pod to continue running.)
73-
func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
81+
func (w *completionsWatcher) cleanupSidecars(ctx context.Context, pod *v1.Pod) {
7482
terminated := getTermination(pod)
7583
if terminated == nil {
7684
return
@@ -82,7 +90,6 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
8290
)
8391

8492
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
85-
ctx := context.TODO()
8693
job, err := w.k8s.BatchV1().Jobs(pod.Namespace).Get(ctx, pod.Labels["job-name"], metav1.GetOptions{})
8794
if err != nil {
8895
return err

internal/controller/scheduler/fail_job.go

+37-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"fmt"
77
"os"
88

9+
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
10+
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
911
"github.com/buildkite/agent-stack-k8s/v2/internal/version"
1012

1113
agentcore "github.com/buildkite/agent/v3/core"
@@ -16,9 +18,42 @@ import (
1618
"k8s.io/client-go/kubernetes"
1719
)
1820

19-
// failJob fails the job in Buildkite. agentToken needs to be the token value.
21+
// acquireAndFailForObject figures out how to fail the BK job corresponding to
22+
// the k8s object (a pod or job) by inspecting the object's labels.
23+
func acquireAndFailForObject(
24+
ctx context.Context,
25+
logger *zap.Logger,
26+
k8sClient kubernetes.Interface,
27+
cfg *config.Config,
28+
obj metav1.Object,
29+
message string,
30+
) error {
31+
agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret)
32+
if err != nil {
33+
logger.Error("fetching agent token from secret", zap.Error(err))
34+
return err
35+
}
36+
37+
// Matching tags are required order to connect the temporary agent.
38+
labels := obj.GetLabels()
39+
jobUUID := labels[config.UUIDLabel]
40+
if jobUUID == "" {
41+
logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel))
42+
return errors.New("missing UUID label")
43+
}
44+
tags := agenttags.TagsFromLabels(labels)
45+
opts := cfg.AgentConfig.ControllerOptions()
46+
47+
if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil {
48+
logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err))
49+
return err
50+
}
51+
return nil
52+
}
53+
54+
// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value.
2055
// Use fetchAgentToken to fetch it from the k8s secret.
21-
func failJob(
56+
func acquireAndFail(
2257
ctx context.Context,
2358
zapLogger *zap.Logger,
2459
agentToken string,

0 commit comments

Comments
 (0)