Skip to content

Commit 5cf8b15

Browse files
authored
Merge pull request #3967 from ntnn/inter-shard-0.30
Inter-shard
2 parents 1703893 + a3349ab commit 5cf8b15

File tree

14 files changed

+477
-42
lines changed

14 files changed

+477
-42
lines changed

cmd/sharded-test-server/cache.go

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@ import (
3030
"github.com/abiosoft/lineprefix"
3131
"github.com/fatih/color"
3232

33+
"k8s.io/apiserver/pkg/authentication/user"
3334
"k8s.io/client-go/tools/clientcmd"
3435
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
3536
"k8s.io/klog/v2"
3637

3738
kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster"
3839
kcptestingserver "github.com/kcp-dev/sdk/testing/server"
40+
"github.com/kcp-dev/sdk/testing/third_party/library-go/crypto"
3941

4042
"github.com/kcp-dev/kcp/cmd/test-server/helpers"
4143
)
4244

43-
func startCacheServer(ctx context.Context, logDirPath, workingDir, hostIP string, syntheticDelay time.Duration) (<-chan error, string, error) {
45+
func startCacheServer(ctx context.Context, logDirPath, workingDir, hostIP string, syntheticDelay time.Duration, clientCA *crypto.CA, clientCAPath string) (<-chan error, string, error) {
4446
cyan := color.New(color.BgHiCyan, color.FgHiWhite).SprintFunc()
4547
inverse := color.New(color.BgHiWhite, color.FgHiCyan).SprintFunc()
4648
out := lineprefix.New(
@@ -52,6 +54,32 @@ func startCacheServer(ctx context.Context, logDirPath, workingDir, hostIP string
5254
lineprefix.Color(color.New(color.FgHiWhite)),
5355
)
5456
cacheWorkingDir := filepath.Join(workingDir, ".kcp-cache")
57+
58+
// Generate a client certificate for accessing the cache server.
59+
cacheClientCert := filepath.Join(cacheWorkingDir, "cache-client.crt")
60+
cacheClientKey := filepath.Join(cacheWorkingDir, "cache-client.key")
61+
if err := os.MkdirAll(cacheWorkingDir, 0755); err != nil {
62+
return nil, "", err
63+
}
64+
_, err := clientCA.MakeClientCertificate(cacheClientCert, cacheClientKey,
65+
&user.DefaultInfo{Name: "cache-client", Groups: []string{"system:masters"}}, 365)
66+
if err != nil {
67+
return nil, "", fmt.Errorf("failed to create cache client cert: %w", err)
68+
}
69+
70+
// Use absolute paths for the client cert/key so they resolve correctly
71+
// regardless of how the kubeconfig is loaded. ClientConfigLoadingRules
72+
// resolves relative paths relative to the kubeconfig file, while
73+
// LoadFromFile + NewNonInteractiveClientConfig uses them as-is from CWD.
74+
absCacheClientCert, err := filepath.Abs(cacheClientCert)
75+
if err != nil {
76+
return nil, "", fmt.Errorf("failed to resolve absolute path for cache client cert: %w", err)
77+
}
78+
absCacheClientKey, err := filepath.Abs(cacheClientKey)
79+
if err != nil {
80+
return nil, "", fmt.Errorf("failed to resolve absolute path for cache client key: %w", err)
81+
}
82+
5583
cachePort := 8012
5684
workdir, commandLine := kcptestingserver.Command("cache-server", "cache")
5785
commandLine = append(
@@ -62,6 +90,7 @@ func startCacheServer(ctx context.Context, logDirPath, workingDir, hostIP string
6290
"--embedded-etcd-peer-port=8011",
6391
fmt.Sprintf("--secure-port=%d", cachePort),
6492
fmt.Sprintf("--synthetic-delay=%s", syntheticDelay.String()),
93+
fmt.Sprintf("--client-ca-file=%s", clientCAPath),
6594
)
6695
fmt.Fprintf(out, "running: %v\n", strings.Join(commandLine, " "))
6796
cmd := exec.CommandContext(ctx, commandLine[0], commandLine[1:]...) //nolint:gosec
@@ -119,35 +148,40 @@ func startCacheServer(ctx context.Context, logDirPath, workingDir, hostIP string
119148
continue
120149
}
121150

122-
if _, err := os.Stat(cacheKubeconfigPath); os.IsNotExist(err) {
123-
cacheServerCert, err := os.ReadFile(filepath.Join(cacheWorkingDir, "apiserver.crt"))
124-
if err != nil {
125-
return nil, "", err
126-
}
127-
cacheServerKubeConfig := clientcmdapi.Config{
128-
Clusters: map[string]*clientcmdapi.Cluster{
129-
"cache": {
130-
Server: fmt.Sprintf("https://localhost:%d", cachePort),
131-
CertificateAuthorityData: cacheServerCert,
132-
},
151+
cacheServerCert, err := os.ReadFile(filepath.Join(cacheWorkingDir, "apiserver.crt"))
152+
if err != nil {
153+
return nil, "", err
154+
}
155+
cacheServerKubeConfig := clientcmdapi.Config{
156+
Clusters: map[string]*clientcmdapi.Cluster{
157+
"cache": {
158+
Server: fmt.Sprintf("https://localhost:%d", cachePort),
159+
CertificateAuthorityData: cacheServerCert,
133160
},
134-
Contexts: map[string]*clientcmdapi.Context{
135-
"cache": {
136-
Cluster: "cache",
137-
},
161+
},
162+
AuthInfos: map[string]*clientcmdapi.AuthInfo{
163+
"cache": {
164+
ClientCertificate: absCacheClientCert,
165+
ClientKey: absCacheClientKey,
138166
},
139-
CurrentContext: "cache",
140-
}
141-
if err := clientcmd.WriteToFile(cacheServerKubeConfig, cacheKubeconfigPath); err != nil {
142-
return nil, "", err
143-
}
167+
},
168+
Contexts: map[string]*clientcmdapi.Context{
169+
"cache": {
170+
Cluster: "cache",
171+
AuthInfo: "cache",
172+
},
173+
},
174+
CurrentContext: "cache",
175+
}
176+
if err := clientcmd.WriteToFile(cacheServerKubeConfig, cacheKubeconfigPath); err != nil {
177+
return nil, "", err
144178
}
145179

146-
cacheServerKubeConfig, err := clientcmd.LoadFromFile(cacheKubeconfigPath)
180+
loadedKubeConfig, err := clientcmd.LoadFromFile(cacheKubeconfigPath)
147181
if err != nil {
148182
return nil, "", err
149183
}
150-
cacheClientConfig := clientcmd.NewNonInteractiveClientConfig(*cacheServerKubeConfig, "cache", nil, nil)
184+
cacheClientConfig := clientcmd.NewNonInteractiveClientConfig(*loadedKubeConfig, "cache", nil, nil)
151185
cacheClientRestConfig, err := cacheClientConfig.ClientConfig()
152186
if err != nil {
153187
return nil, "", err

cmd/sharded-test-server/main.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,10 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
197197
standaloneVW := sets.New[string](shardFlags...).Has("--run-virtual-workspaces=false")
198198

199199
cacheServerErrCh := make(chan indexErrTuple)
200-
cacheServerConfigPath := ""
201-
cacheServerCh, configPath, err := startCacheServer(ctx, logDirPath, workDirPath, hostIP.String(), cacheSyntheticDelay)
200+
cacheServerCh, cacheServerConfigPath, err := startCacheServer(ctx, logDirPath, workDirPath, hostIP.String(), cacheSyntheticDelay, clientCA, filepath.Join(workDirPath, ".kcp", "client-ca.crt"))
202201
if err != nil {
203202
return fmt.Errorf("error starting the cache server: %w", err)
204203
}
205-
cacheServerConfigPath = configPath
206204
go func() {
207205
err := <-cacheServerCh
208206
cacheServerErrCh <- indexErrTuple{0, err}

docs/content/concepts/sharding/cache-server.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,15 @@ ctx = cacheclient.WithShardInContext(ctx, shard.New("cache"))
6969

7070
### Authorization/Authentication
7171

72-
Not implemented at the moment
72+
The cache server can only be connected to using x509 client certificates
73+
with the `system:masters` group. It does not support authz webhook,
74+
service accounts or other means of authentication or authorization.
75+
76+
This is because the cache server is only meant to be used by the shards
77+
directly, not by operators or humans.
78+
79+
The only exception are the health check endpoints, which are always
80+
accessible without authentication.
7381

7482
### Built-in Resources
7583

@@ -78,6 +86,7 @@ Out of the box, the server supports the following resources:
7886
- `apiresourceschemas`
7987
- `apiexports`
8088
- `shards`
89+
- `logicalclusters`
8190

8291
All those resources are represented as CustomResourceDefinitions and
8392
stored in `system:cache:server` shard under `system:system-crds` cluster.

pkg/cache/server/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest
126126
}
127127
serverConfig.LoopbackClientConfig = rest.CopyConfig(optionalLocalShardRestConfig)
128128
}
129-
if err := opts.Authentication.ApplyTo(&serverConfig.Config.Authentication, serverConfig.SecureServing, serverConfig.OpenAPIConfig); err != nil {
129+
if err := opts.Authentication.ApplyTo(&serverConfig.Config.Authentication, serverConfig.Config.SecureServing); err != nil {
130130
return nil, err
131131
}
132-
if err := opts.Authorization.ApplyTo(&serverConfig.Config.Authorization); err != nil {
132+
if err := opts.Authorization.ApplyTo(&serverConfig.Config); err != nil {
133133
return nil, err
134134
}
135135

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright 2026 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/spf13/pflag"
23+
24+
"k8s.io/apiserver/pkg/authentication/authenticator"
25+
"k8s.io/apiserver/pkg/authentication/request/anonymous"
26+
"k8s.io/apiserver/pkg/authentication/request/union"
27+
"k8s.io/apiserver/pkg/authentication/request/x509"
28+
genericapiserver "k8s.io/apiserver/pkg/server"
29+
"k8s.io/apiserver/pkg/server/dynamiccertificates"
30+
)
31+
32+
type Authentication struct {
33+
ClientCAFile string
34+
35+
// EmbeddedAuthenticator is an optional authenticator delegating to the
36+
// parent server's authentication chain. Set by the shard server when
37+
// the cache server runs embedded.
38+
EmbeddedAuthenticator authenticator.Request
39+
}
40+
41+
func NewAuthentication() *Authentication {
42+
return &Authentication{}
43+
}
44+
45+
func (o *Authentication) AddFlags(fs *pflag.FlagSet) {
46+
fs.StringVar(&o.ClientCAFile, "client-ca-file", o.ClientCAFile, "Path to a PEM-encoded certificate bundle. If set, any request presenting a client certificate signed by one of the authorities in the bundle is authenticated with an identity corresponding to the CommonName of the client certificate.")
47+
}
48+
49+
func (o *Authentication) Validate() []error {
50+
return nil
51+
}
52+
53+
func (o *Authentication) ApplyTo(authenticationInfo *genericapiserver.AuthenticationInfo, servingInfo *genericapiserver.SecureServingInfo) error {
54+
if o.ClientCAFile == "" && o.EmbeddedAuthenticator == nil {
55+
// This validation cannot happen in .Validate because these
56+
// options may be set by the shard embedding the cache server.
57+
// For the standalone cache server it doesn't matter if it's
58+
// validated here or in .Validate.
59+
return fmt.Errorf("either --client-ca-file or an embedded authenticator must be configured")
60+
}
61+
62+
var authenticators []authenticator.Request
63+
64+
if o.ClientCAFile != "" {
65+
caProvider, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", o.ClientCAFile)
66+
if err != nil {
67+
return fmt.Errorf("unable to load client CA file %q: %w", o.ClientCAFile, err)
68+
}
69+
70+
if err := authenticationInfo.ApplyClientCert(caProvider, servingInfo); err != nil {
71+
return fmt.Errorf("unable to apply client cert: %w", err)
72+
}
73+
74+
authenticators = append(authenticators, x509.NewDynamic(caProvider.VerifyOptions, x509.CommonNameUserConversion))
75+
}
76+
if o.EmbeddedAuthenticator != nil {
77+
authenticators = append(authenticators, o.EmbeddedAuthenticator)
78+
}
79+
authenticators = append(authenticators, anonymous.NewAuthenticator(nil))
80+
81+
authenticationInfo.Authenticator = union.New(authenticators...)
82+
83+
return nil
84+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
Copyright 2026 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
"github.com/spf13/pflag"
21+
22+
"k8s.io/apiserver/pkg/authentication/user"
23+
"k8s.io/apiserver/pkg/authorization/authorizer"
24+
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
25+
"k8s.io/apiserver/pkg/authorization/path"
26+
"k8s.io/apiserver/pkg/authorization/union"
27+
genericapiserver "k8s.io/apiserver/pkg/server"
28+
)
29+
30+
type Authorization struct {
31+
AlwaysAllowPaths []string
32+
AlwaysAllowGroups []string
33+
}
34+
35+
func NewAuthorization() *Authorization {
36+
return &Authorization{
37+
AlwaysAllowPaths: []string{"/healthz", "/readyz", "/livez"},
38+
AlwaysAllowGroups: []string{user.SystemPrivilegedGroup},
39+
}
40+
}
41+
42+
func (o *Authorization) AddFlags(fs *pflag.FlagSet) {
43+
fs.StringSliceVar(&o.AlwaysAllowPaths, "authorization-always-allow-paths", o.AlwaysAllowPaths,
44+
"A list of HTTP paths to skip during authorization, i.e. these are authorized without contacting the 'core' kubernetes server.")
45+
}
46+
47+
func (o *Authorization) Validate() []error {
48+
return nil
49+
}
50+
51+
func (o *Authorization) ApplyTo(config *genericapiserver.Config) error {
52+
var authorizers []authorizer.Authorizer
53+
54+
if len(o.AlwaysAllowGroups) > 0 {
55+
authorizers = append(authorizers, authorizerfactory.NewPrivilegedGroups(o.AlwaysAllowGroups...))
56+
}
57+
58+
if len(o.AlwaysAllowPaths) > 0 {
59+
a, err := path.NewAuthorizer(o.AlwaysAllowPaths)
60+
if err != nil {
61+
return err
62+
}
63+
authorizers = append(authorizers, a)
64+
}
65+
66+
config.Authorization.Authorizer = union.New(authorizers...)
67+
return nil
68+
}

pkg/cache/server/options/options.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type Options struct {
3232
ServerRunOptions *genericoptions.ServerRunOptions
3333
Etcd *genericoptions.EtcdOptions
3434
SecureServing *genericoptions.SecureServingOptionsWithLoopback
35-
Authentication *genericoptions.DelegatingAuthenticationOptions
36-
Authorization *genericoptions.DelegatingAuthorizationOptions
35+
Authentication *Authentication
36+
Authorization *Authorization
3737
APIEnablement *genericoptions.APIEnablementOptions
3838
EmbeddedEtcd etcdoptions.Options
3939
SyntheticDelay time.Duration
@@ -43,8 +43,8 @@ type completedOptions struct {
4343
ServerRunOptions *genericoptions.ServerRunOptions
4444
Etcd *genericoptions.EtcdOptions
4545
SecureServing *genericoptions.SecureServingOptionsWithLoopback
46-
Authentication *genericoptions.DelegatingAuthenticationOptions
47-
Authorization *genericoptions.DelegatingAuthorizationOptions
46+
Authentication *Authentication
47+
Authorization *Authorization
4848
APIEnablement *genericoptions.APIEnablementOptions
4949
EmbeddedEtcd etcdoptions.CompletedOptions
5050
SyntheticDelay time.Duration
@@ -72,8 +72,8 @@ func NewOptions(rootDir string) *Options {
7272
ServerRunOptions: genericoptions.NewServerRunOptions(),
7373
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
7474
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
75-
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
76-
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
75+
Authentication: NewAuthentication(),
76+
Authorization: NewAuthorization(),
7777
APIEnablement: genericoptions.NewAPIEnablementOptions(),
7878
EmbeddedEtcd: *etcdoptions.NewOptions(rootDir),
7979
}
@@ -94,10 +94,6 @@ func (o *Options) Complete() (*CompletedOptions, error) {
9494
o.EmbeddedEtcd.Enabled = true
9595
}
9696

97-
// TODO: enable authN/Z stack
98-
o.Authentication = nil
99-
o.Authorization = nil
100-
10197
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, nil); err != nil {
10298
return nil, err
10399
}
@@ -116,5 +112,7 @@ func (o *Options) Complete() (*CompletedOptions, error) {
116112
func (o *Options) AddFlags(fs *pflag.FlagSet) {
117113
o.EmbeddedEtcd.AddFlags(fs)
118114
o.SecureServing.AddFlags(fs)
115+
o.Authentication.AddFlags(fs)
116+
o.Authorization.AddFlags(fs)
119117
fs.DurationVar(&o.SyntheticDelay, "synthetic-delay", 0, "The duration of time the cache server will inject a delay for to all inbound requests. Useful for testing.")
120118
}

pkg/server/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func (s *Server) installCacheServer(ctx context.Context) error {
3434
// this could be reworked if we were providing Config/CompletedConfig for the additional servers
3535
wasEmbeddedEtcdEnabled := s.Options.EmbeddedEtcd.Enabled
3636
s.Options.Cache.Server.EmbeddedEtcd.Enabled = false
37+
s.Options.Cache.Server.Authentication.EmbeddedAuthenticator = s.GenericConfig.Authentication.Authenticator
3738
newCacheServerConfig, err := cacheserver.NewConfig(s.Options.Cache.Server, rest.CopyConfig(s.GenericConfig.LoopbackClientConfig))
3839
if err != nil {
3940
return err

0 commit comments

Comments
 (0)