@@ -73,6 +73,18 @@ func (l *MaxInFlight) RegisterInformer(ctx context.Context, factory informers.Sh
73
73
return fmt .Errorf ("failed to sync informer cache" )
74
74
}
75
75
76
+ informerStore := jobInformer .GetStore ()
77
+ informerCacheKeys := informerStore .ListKeys ()
78
+ l .logger .Debug ("informer cache sync complete, dump informer cache keys..." ,
79
+ zap .Int ("cache-keys-found" , len (informerCacheKeys )),
80
+ )
81
+
82
+ for _ , cacheKey := range informerCacheKeys {
83
+ l .logger .Debug ("informer cache key found" ,
84
+ zap .String ("informer-cache-key" , fmt .Sprint (cacheKey )),
85
+ )
86
+ }
87
+
76
88
return nil
77
89
}
78
90
@@ -83,6 +95,17 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
83
95
// Block until there's a token in the bucket, or cancel if the job
84
96
// information becomes too stale.
85
97
start := time .Now ()
98
+
99
+ l .logger .Debug ("Waiting for token" ,
100
+ zap .String ("job-uuid" , job .Uuid ),
101
+ zap .Int ("max-in-flight" , l .MaxInFlight ),
102
+ zap .Int ("available-tokens" , len (l .tokenBucket )),
103
+ zap .Int ("running-jobs" , l .MaxInFlight - len (l .tokenBucket )),
104
+ )
105
+
106
+ waitingForTokenGauge .Inc ()
107
+ defer waitingForTokenGauge .Dec ()
108
+
86
109
select {
87
110
case <- ctx .Done ():
88
111
return context .Cause (ctx )
@@ -92,8 +115,11 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
92
115
l .logger .Debug ("Unable to create job before stale-job-data-timeout" ,
93
116
zap .String ("job-uuid" , job .Uuid ),
94
117
zap .Int ("max-in-flight" , l .MaxInFlight ),
118
+ zap .Int ("available-tokens" , len (l .tokenBucket )),
95
119
zap .Int ("running-jobs" , l .MaxInFlight - len (l .tokenBucket )),
120
+ zap .Duration ("wait-time" , time .Since (start )),
96
121
)
122
+ jobStaleWhileWaitingCounter .Inc ()
97
123
return model .ErrStaleJob
98
124
99
125
case <- l .tokenBucket :
@@ -198,7 +224,15 @@ func (l *MaxInFlight) tryTakeToken(source string) {
198
224
select {
199
225
case <- l .tokenBucket :
200
226
// Success.
227
+ l .logger .Debug ("Token taken successfully" ,
228
+ zap .String ("source" , source ),
229
+ zap .Int ("remaining-tokens" , len (l .tokenBucket )),
230
+ )
201
231
default :
232
+ l .logger .Debug ("Failed to take token - bucket empty" ,
233
+ zap .String ("source" , source ),
234
+ zap .Int ("max-in-flight" , l .MaxInFlight ),
235
+ )
202
236
tokenUnderflowCounter .WithLabelValues (source ).Inc ()
203
237
}
204
238
}
@@ -208,7 +242,15 @@ func (l *MaxInFlight) tryReturnToken(source string) {
208
242
select {
209
243
case l .tokenBucket <- struct {}{}:
210
244
// Success.
245
+ l .logger .Debug ("Token returned successfully" ,
246
+ zap .String ("source" , source ),
247
+ zap .Int ("available-tokens" , len (l .tokenBucket )),
248
+ )
211
249
default :
250
+ l .logger .Warn ("Failed to return token - bucket full" ,
251
+ zap .String ("source" , source ),
252
+ zap .Int ("max-in-flight" , l .MaxInFlight ),
253
+ )
212
254
tokenOverflowCounter .WithLabelValues (source ).Inc ()
213
255
}
214
256
}
0 commit comments