Skip to content

Commit 80b6b6e

Browse files
committed
Move delegating auth middleware into common package and add MaxInFlight
Adds maximum in-flight request limits to agent join and p2p peer info request request handlers. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
1 parent 1924317 commit 80b6b6e

6 files changed

Lines changed: 106 additions & 93 deletions

File tree

pkg/agent/https/https.go

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,13 @@ package https
22

33
import (
44
"context"
5-
"net/http"
65
"strconv"
76
"sync"
87

98
"github.com/gorilla/mux"
109
"github.com/k3s-io/k3s/pkg/daemons/config"
11-
"github.com/k3s-io/api/pkg/generated/clientset/versioned/scheme"
10+
"github.com/k3s-io/k3s/pkg/server/auth"
1211
"github.com/k3s-io/k3s/pkg/util"
13-
"github.com/k3s-io/k3s/pkg/version"
14-
"k8s.io/apiserver/pkg/apis/apiserver"
15-
"k8s.io/apiserver/pkg/authentication/authenticator"
16-
"k8s.io/apiserver/pkg/authorization/authorizer"
17-
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
18-
apirequest "k8s.io/apiserver/pkg/endpoints/request"
1912
"k8s.io/apiserver/pkg/server"
2013
"k8s.io/apiserver/pkg/server/options"
2114
)
@@ -33,7 +26,7 @@ var err error
3326
func Start(ctx context.Context, nodeConfig *config.Node, runtime *config.ControlRuntime) (*mux.Router, error) {
3427
once.Do(func() {
3528
router = mux.NewRouter().SkipClean(true)
36-
config := server.Config{}
29+
config := &server.Config{}
3730

3831
if runtime == nil {
3932
// If we do not have an existing handler, set up a new listener
@@ -63,33 +56,7 @@ func Start(ctx context.Context, nodeConfig *config.Node, runtime *config.Control
6356
runtime.Handler = router
6457
}
6558

66-
authn := options.NewDelegatingAuthenticationOptions()
67-
authn.Anonymous = &apiserver.AnonymousAuthConfig{
68-
Enabled: false,
69-
}
70-
authn.SkipInClusterLookup = true
71-
authn.ClientCert = options.ClientCertAuthenticationOptions{
72-
ClientCA: nodeConfig.AgentConfig.ClientCA,
73-
}
74-
authn.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
75-
if applyErr := authn.ApplyTo(&config.Authentication, config.SecureServing, nil); applyErr != nil {
76-
err = applyErr
77-
return
78-
}
79-
80-
authz := options.NewDelegatingAuthorizationOptions()
81-
authz.AlwaysAllowPaths = []string{ // skip authz for paths that should not use SubjectAccessReview; basically everything that will use this router other than metrics
82-
"/v1-" + version.Program + "/p2p", // spegel libp2p peer discovery
83-
"/v2/*", // spegel registry mirror
84-
"/debug/pprof/*", // profiling
85-
}
86-
authz.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
87-
if applyErr := authz.ApplyTo(&config.Authorization); applyErr != nil {
88-
err = applyErr
89-
return
90-
}
91-
92-
router.Use(filterChain(config.Authentication.Authenticator, config.Authorization.Authorizer))
59+
router.Use(auth.Delegated(nodeConfig.AgentConfig.ClientCA, nodeConfig.AgentConfig.KubeConfigKubelet, config))
9360

9461
if config.SecureServing != nil {
9562
_, _, err = config.SecureServing.Serve(router, 0, ctx.Done())
@@ -98,16 +65,3 @@ func Start(ctx context.Context, nodeConfig *config.Node, runtime *config.Control
9865

9966
return router, err
10067
}
101-
102-
// filterChain runs the kubernetes authn/authz filter chain using the mux middleware API
103-
func filterChain(authn authenticator.Request, authz authorizer.Authorizer) mux.MiddlewareFunc {
104-
return func(handler http.Handler) http.Handler {
105-
requestInfoResolver := &apirequest.RequestInfoFactory{}
106-
failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
107-
handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
108-
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
109-
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
110-
handler = genericapifilters.WithCacheControl(handler)
111-
return handler
112-
}
113-
}

pkg/cluster/https.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"errors"
7+
"fmt"
78
"io"
89
"log"
910
"net"
@@ -90,27 +91,37 @@ func (c *Cluster) filterCN(cn ...string) []string {
9091
// initClusterAndHTTPS sets up the dynamic tls listener, request router,
9192
// and cluster database. Once the database is up, it starts the supervisor http server.
9293
func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
93-
// Set up dynamiclistener TLS listener and request handler
94-
listener, handler, err := c.newListener(ctx)
94+
// Set up dynamiclistener TLS listener and request handler.
95+
// The dynamiclistener request handler is always called first as a middleware to add TLS SANs for host headers.
96+
// It does not actually do any request handling or send a response.
97+
listener, certHandler, err := c.newListener(ctx)
9598
if err != nil {
9699
return err
97100
}
98101

99-
// Get the base request handler
100-
handler, err = c.getHandler(handler)
101-
if err != nil {
102-
return err
103-
}
102+
// Create a stub request handler that returns a Service Unavailable response
103+
// if the core request handlers have not yet been started yet.
104+
var handler http.Handler = http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
105+
if c.config.Runtime.Handler != nil {
106+
c.config.Runtime.Handler.ServeHTTP(rw, req)
107+
} else {
108+
util.SendError(fmt.Errorf("starting"), rw, req, http.StatusServiceUnavailable)
109+
}
110+
})
104111

105-
// Register database request handlers and controller callbacks
112+
// Register database request handlers and controller callbacks.
113+
// The database handler wraps the stub handler, and calls it for any requests not related to database bootstrapping.
106114
handler, err = c.registerDBHandlers(handler)
107115
if err != nil {
108116
return err
109117
}
110118

111119
// Create a HTTP server with the registered request handlers, using logrus for logging
112120
server := http.Server{
113-
Handler: handler,
121+
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
122+
certHandler.ServeHTTP(rw, req)
123+
handler.ServeHTTP(rw, req)
124+
}),
114125
}
115126

116127
if logrus.IsLevelEnabled(logrus.DebugLevel) {

pkg/cluster/router.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

pkg/server/auth/auth.go

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,20 @@ import (
88
"github.com/gorilla/mux"
99
"github.com/k3s-io/k3s/pkg/daemons/config"
1010
"github.com/k3s-io/k3s/pkg/util"
11+
"github.com/k3s-io/k3s/pkg/version"
1112
"github.com/sirupsen/logrus"
12-
"k8s.io/apiserver/pkg/endpoints/request"
13+
"k8s.io/apiserver/pkg/apis/apiserver"
14+
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
15+
apirequest "k8s.io/apiserver/pkg/endpoints/request"
16+
"k8s.io/apiserver/pkg/server"
17+
genericfilters "k8s.io/apiserver/pkg/server/filters"
18+
"k8s.io/apiserver/pkg/server/options"
19+
"k8s.io/client-go/kubernetes/scheme"
20+
)
21+
22+
var (
23+
requestInfoResolver = &apirequest.RequestInfoFactory{}
24+
failedHandler = genericapifilters.Unauthorized(scheme.Codecs)
1325
)
1426

1527
func hasRole(mustRoles []string, roles []string) bool {
@@ -48,7 +60,7 @@ func doAuth(roles []string, serverConfig *config.Control, next http.Handler, rw
4860
return
4961
}
5062

51-
ctx := request.WithUser(req.Context(), resp.User)
63+
ctx := apirequest.WithUser(req.Context(), resp.User)
5264
req = req.WithContext(ctx)
5365
next.ServeHTTP(rw, req)
5466
}
@@ -77,3 +89,54 @@ func IsLocalOrHasRole(serverConfig *config.Control, roles ...string) mux.Middlew
7789
})
7890
}
7991
}
92+
93+
// Delegated returns a middleware function that uses core Kubernetes
94+
// authentication/authorization via client certificate auth and the SubjectAccessReview API
95+
func Delegated(clientCA, kubeConfig string, config *server.Config) mux.MiddlewareFunc {
96+
if config == nil {
97+
config = &server.Config{}
98+
}
99+
100+
authn := options.NewDelegatingAuthenticationOptions()
101+
authn.Anonymous = &apiserver.AnonymousAuthConfig{
102+
Enabled: false,
103+
}
104+
authn.SkipInClusterLookup = true
105+
authn.ClientCert = options.ClientCertAuthenticationOptions{
106+
ClientCA: clientCA,
107+
}
108+
authn.RemoteKubeConfigFile = kubeConfig
109+
if err := authn.ApplyTo(&config.Authentication, config.SecureServing, nil); err != nil {
110+
logrus.Fatalf("Failed to apply authentication configuration: %v", err)
111+
}
112+
113+
authz := options.NewDelegatingAuthorizationOptions()
114+
authz.AlwaysAllowPaths = []string{ // skip authz for paths that should not use SubjectAccessReview; basically everything that will use this router other than metrics
115+
"/v1-" + version.Program + "/p2p", // spegel libp2p peer discovery
116+
"/v2/*", // spegel registry mirror
117+
"/debug/pprof/*", // profiling
118+
}
119+
authz.RemoteKubeConfigFile = kubeConfig
120+
if err := authz.ApplyTo(&config.Authorization); err != nil {
121+
logrus.Fatalf("Failed to apply authorization configuration: %v", err)
122+
}
123+
124+
return func(handler http.Handler) http.Handler {
125+
handler = genericapifilters.WithAuthorization(handler, config.Authorization.Authorizer, scheme.Codecs)
126+
handler = genericapifilters.WithAuthentication(handler, config.Authentication.Authenticator, failedHandler, nil, nil)
127+
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
128+
handler = genericapifilters.WithCacheControl(handler)
129+
return handler
130+
}
131+
}
132+
133+
// MaxInFlight returns a middleware function that limits the number of requests that are executed concurrently.
134+
// This is not strictly auth related, but it also uses the core Kubernetes request filters.
135+
func MaxInFlight(nonMutatingLimit, mutatingLimit int) mux.MiddlewareFunc {
136+
return func(handler http.Handler) http.Handler {
137+
handler = genericfilters.WithMaxInFlightLimit(handler, nonMutatingLimit, mutatingLimit, nil)
138+
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
139+
handler = genericapifilters.WithCacheControl(handler)
140+
return handler
141+
}
142+
}

pkg/server/handlers/router.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@ const (
1919
staticURL = "/static/"
2020
)
2121

22+
var (
23+
// When starting, each agent sequentially makes requests for certs, config, and apiservers, and will poll the readyz endpoint
24+
// before starting kube-proxy. These limits effectively cap the number of agents that can join simultaneously.
25+
// Agents will automatically retry with jitter when rate-limited.
26+
maxNonMutatingAgentRequests = 20 // max concurrent get/list/watch requests
27+
maxMutatingAgentRequests = 10 // max concurrent other requests; cert generation with client-provided private key uses post.
28+
)
29+
2230
func NewHandler(ctx context.Context, control *config.Control, cfg *cmds.Server) http.Handler {
2331
nodeAuth := nodepassword.GetNodeAuthValidator(ctx, control)
2432

2533
prefix := "/v1-{program}"
2634
authed := mux.NewRouter().SkipClean(true)
2735
authed.NotFoundHandler = APIServer(control, cfg)
28-
authed.Use(auth.HasRole(control, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup))
36+
authed.Use(auth.HasRole(control, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup), auth.MaxInFlight(maxNonMutatingAgentRequests, maxMutatingAgentRequests))
2937
authed.Handle(prefix+"/serving-kubelet.crt", ServingKubeletCert(control, nodeAuth))
3038
authed.Handle(prefix+"/client-kubelet.crt", ClientKubeletCert(control, nodeAuth))
3139
authed.Handle(prefix+"/client-kube-proxy.crt", ClientKubeProxyCert(control))

pkg/spegel/spegel.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/k3s-io/k3s/pkg/agent/https"
1919
"github.com/k3s-io/k3s/pkg/clientaccess"
2020
"github.com/k3s-io/k3s/pkg/daemons/config"
21+
"github.com/k3s-io/k3s/pkg/server/auth"
2122
"github.com/k3s-io/k3s/pkg/version"
2223
"github.com/rancher/dynamiclistener/cert"
2324
"k8s.io/apimachinery/pkg/util/wait"
@@ -56,6 +57,11 @@ var (
5657
P2pEnableLatestEnv = version.ProgramUpper + "_P2P_ENABLE_LATEST"
5758

5859
resolveLatestTag = false
60+
61+
// Agents request a list of peers when joining, and then again periodically afterwards.
62+
// Limit the number of concurrent peer list requests that will be served simultaneously.
63+
maxNonMutatingPeerInfoRequests = 20 // max concurrent get/list/watch requests
64+
maxMutatingPeerInfoRequests = 0 // max concurrent other requests; not used
5965
)
6066

6167
// Config holds fields for a distributed registry
@@ -231,7 +237,9 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
231237
return err
232238
}
233239
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
234-
mRouter.PathPrefix("/v1-{program}/p2p").Handler(c.peerInfo())
240+
sRouter := mRouter.PathPrefix("/v1-{program}/p2p").Subrouter()
241+
sRouter.Use(auth.MaxInFlight(maxNonMutatingPeerInfoRequests, maxMutatingPeerInfoRequests))
242+
sRouter.Handle("", c.peerInfo())
235243

236244
// Wait up to 5 seconds for the p2p network to find peers. This will return
237245
// immediately if the node is bootstrapping from itself.

0 commit comments

Comments
 (0)