Skip to content

Commit 51b3c60

Browse files
committed
Add configurable probe timeout and frequency for activator
Signed-off-by: Dhruv Bindra <[email protected]>
1 parent a94c607 commit 51b3c60

File tree

4 files changed

+30
-29
lines changed

4 files changed

+30
-29
lines changed

cmd/activator/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type config struct {
8383
// TODO: run loadtests using these flags to determine optimal default values.
8484
MaxIdleProxyConns int `split_words:"true" default:"1000"`
8585
MaxIdleProxyConnsPerHost int `split_words:"true" default:"100"`
86+
87+
ProbeTimeout int `split_words:"true" default:"300"`
88+
ProbeFrequency int `split_words:"true" default:"200"`
8689
}
8790

8891
func main() {
@@ -158,7 +161,8 @@ func main() {
158161
// transport so that throttler probe connections can be reused after probing
159162
// (via keep-alive) to send real requests, avoiding needing an extra
160163
// reconnect for the first request after the probe succeeds.
161-
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d", env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)
164+
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d, ProbeTimeout: %dms, ProbeFrequency: %dms",
165+
env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, env.ProbeTimeout, env.ProbeFrequency)
162166
transport := pkgnet.NewProxyAutoTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)
163167

164168
// Fetch networking configuration to determine whether EnableMeshPodAddressability
@@ -191,7 +195,9 @@ func main() {
191195

192196
// Start throttler.
193197
throttler := activatornet.NewThrottler(ctx, env.PodIP)
194-
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)
198+
probeTimeout := time.Duration(env.ProbeTimeout) * time.Millisecond
199+
probeFrequency := time.Duration(env.ProbeFrequency) * time.Millisecond
200+
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode, probeTimeout, probeFrequency)
195201

196202
// Set up our config store
197203
configMapWatcher := configmapinformer.NewInformedWatcher(kubeClient, system.Namespace())

pkg/activator/net/revision_backends.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,6 @@ func (d dests) MarshalLogObject(enc zapcore.ObjectEncoder) error {
8787
return nil
8888
}
8989

90-
const (
91-
probeTimeout time.Duration = 300 * time.Millisecond
92-
defaultProbeFrequency time.Duration = 200 * time.Millisecond
93-
)
94-
9590
// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
9691
// to supply revisionDestsUpdate events on updateCh
9792
type revisionWatcher struct {
@@ -131,13 +126,15 @@ type revisionWatcher struct {
131126
// cover the revision's ready conditions, for example when an exec probe is
132127
// being used.
133128
enableProbeOptimisation bool
129+
130+
probeTimeout time.Duration
134131
}
135132

136133
func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol pkgnet.ProtocolType,
137134
updateCh chan<- revisionDestsUpdate, destsCh chan dests,
138135
transport http.RoundTripper, serviceLister corev1listers.ServiceLister,
139136
usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode,
140-
enableProbeOptimisation bool,
137+
enableProbeOptimisation bool, probeTimeout time.Duration,
141138
logger *zap.SugaredLogger,
142139
) *revisionWatcher {
143140
ctx, cancel := context.WithCancel(ctx)
@@ -155,6 +152,7 @@ func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol
155152
usePassthroughLb: usePassthroughLb,
156153
meshMode: meshMode,
157154
enableProbeOptimisation: enableProbeOptimisation,
155+
probeTimeout: probeTimeout,
158156
logger: logger.With(zap.String(logkey.Key, rev.String())),
159157
}
160158
}
@@ -219,7 +217,7 @@ func (rw *revisionWatcher) getDest() (string, error) {
219217
}
220218

221219
func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) {
222-
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
220+
ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout)
223221
defer cancel()
224222
match, _, err := rw.probe(ctx, dest)
225223
return match, err
@@ -248,7 +246,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee
248246
}
249247

250248
// Context used for our probe requests.
251-
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
249+
ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout)
252250
defer cancel()
253251

254252
// Empty errgroup is used as cancellation on first error is not desired, all probes should be
@@ -459,19 +457,12 @@ type revisionBackendsManager struct {
459457
usePassthroughLb bool
460458
meshMode netcfg.MeshCompatibilityMode
461459
logger *zap.SugaredLogger
460+
probeTimeout time.Duration
462461
probeFrequency time.Duration
463462
}
464463

465-
// NewRevisionBackendsManager returns a new RevisionBackendsManager with default
466-
// probe time out.
467-
func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) *revisionBackendsManager {
468-
return newRevisionBackendsManagerWithProbeFrequency(ctx, tr, usePassthroughLb, meshMode, defaultProbeFrequency)
469-
}
470-
471-
// newRevisionBackendsManagerWithProbeFrequency creates a fully spec'd RevisionBackendsManager.
472-
func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.RoundTripper,
473-
usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeFreq time.Duration,
474-
) *revisionBackendsManager {
464+
// newRevisionBackendsManager returns a new RevisionBackendsManager with configurable probe settings.
465+
func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFreq time.Duration) *revisionBackendsManager {
475466
rbm := &revisionBackendsManager{
476467
ctx: ctx,
477468
revisionLister: revisioninformer.Get(ctx).Lister(),
@@ -482,6 +473,7 @@ func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.R
482473
usePassthroughLb: usePassthroughLb,
483474
meshMode: meshMode,
484475
logger: logging.FromContext(ctx),
476+
probeTimeout: probeTimeout,
485477
probeFrequency: probeFreq,
486478
}
487479
endpointsInformer := endpointsinformer.Get(ctx)
@@ -565,7 +557,7 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names
565557
}
566558

567559
destsCh := make(chan dests)
568-
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger)
560+
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.probeTimeout, rbm.logger)
569561
rbm.revisionWatchers[revID] = rw
570562
go rw.run(rbm.probeFrequency)
571563
return rw, nil

pkg/activator/net/revision_backends_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ const (
5656
testNamespace = "test-namespace"
5757
testRevision = "test-revision"
5858

59-
probeFreq = 50 * time.Millisecond
60-
updateTimeout = 16 * probeFreq
59+
probeFreq = 50 * time.Millisecond
60+
updateTimeout = 16 * probeFreq
61+
defaultProbeTimeout = 300 * time.Millisecond
6162

6263
meshErrorStatusCode = http.StatusServiceUnavailable
6364
)
@@ -555,6 +556,7 @@ func TestRevisionWatcher(t *testing.T) {
555556
tc.usePassthroughLb, // usePassthroughLb
556557
tc.meshMode,
557558
true,
559+
defaultProbeTimeout,
558560
logger)
559561
rw.clusterIPHealthy = tc.initialClusterIPState
560562

@@ -993,7 +995,7 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
993995
t.Fatal("Failed to start informers:", err)
994996
}
995997

996-
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
998+
rbm := newRevisionBackendsManager(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
997999
defer func() {
9981000
cancel()
9991001
waitInformers()
@@ -1456,7 +1458,7 @@ func TestRevisionDeleted(t *testing.T) {
14561458
ri.Informer().GetIndexer().Add(rev)
14571459

14581460
fakeRT := activatortest.FakeRoundTripper{}
1459-
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
1461+
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
14601462
defer func() {
14611463
cancel()
14621464
waitInformers()
@@ -1512,7 +1514,7 @@ func TestServiceDoesNotExist(t *testing.T) {
15121514
}},
15131515
},
15141516
}
1515-
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
1517+
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
15161518
defer func() {
15171519
cancel()
15181520
waitInformers()
@@ -1576,7 +1578,7 @@ func TestServiceMoreThanOne(t *testing.T) {
15761578
}},
15771579
},
15781580
}
1579-
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
1581+
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
15801582
defer func() {
15811583
cancel()
15821584
waitInformers()

pkg/activator/net/throttler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sort"
2323
"sync"
2424
"sync/atomic"
25+
"time"
2526

2627
"go.uber.org/zap"
2728
"k8s.io/apimachinery/pkg/util/sets"
@@ -497,8 +498,8 @@ func NewThrottler(ctx context.Context, ipAddr string) *Throttler {
497498
}
498499

499500
// Run starts the throttler and blocks until the context is done.
500-
func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) {
501-
rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode)
501+
func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFrequency time.Duration) {
502+
rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode, probeTimeout, probeFrequency)
502503
// Update channel is closed when ctx is done.
503504
t.run(rbm.updates())
504505
}

0 commit comments

Comments
 (0)