Skip to content

Commit d4720db

Browse files
authored
Merge pull request #955 from ncdc/e2e-pcluster-kubeconfig
Towards more stable syncer
2 parents 3831c02 + f0a2269 commit d4720db

File tree

5 files changed

+111
-48
lines changed

5 files changed

+111
-48
lines changed

pkg/reconciler/workload/namespace/namespace_controller.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"k8s.io/apimachinery/pkg/util/runtime"
3333
"k8s.io/apimachinery/pkg/util/sets"
3434
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/apiserver/pkg/endpoints/request"
3635
"k8s.io/client-go/discovery"
3736
"k8s.io/client-go/dynamic"
3837
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -392,11 +391,7 @@ func (c *Controller) processCluster(ctx context.Context, key string) error {
392391
// controller. Rescheduling will always happen eventually due
393392
// to namespace informer resync.
394393

395-
clusterName, err := request.ClusterNameFrom(ctx)
396-
if err != nil {
397-
klog.Errorf("Unable to enqueue namespaces related to WorkloadCluster %s: %v", key, err)
398-
return nil
399-
}
394+
clusterName, _ := clusters.SplitClusterAwareKey(key)
400395

401396
return c.enqueueNamespaces(clusterName, labels.Everything())
402397
} else if err != nil {

pkg/reconciler/workload/namespace/namespace_reconcile.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,13 @@ func (c *Controller) reconcileResource(ctx context.Context, lclusterName logical
127127
}
128128

129129
old, new := lbls[ClusterLabel], ns.Labels[ClusterLabel]
130+
131+
klog.Infof("ANDY comparing %s %s|%s/%s clusterlabel %q to ns clusterlabel %q",
132+
gvr.String(), lclusterName, unstr.GetNamespace(), unstr.GetName(), old, new)
133+
130134
if old == new {
131135
// Already assigned to the right cluster.
136+
klog.Infof("ANDY no-op %s %s|%s/%s", gvr.String(), lclusterName, unstr.GetNamespace(), unstr.GetName())
132137
return nil
133138
}
134139

@@ -167,7 +172,7 @@ func (c *Controller) reconcileGVR(ctx context.Context, gvr schema.GroupVersionRe
167172
// will succeed without error if a cluster is assigned or if there are no viable clusters
168173
// to assign to. The condition of not being scheduled to a cluster will be reflected in
169174
// the namespace's status rather than by returning an error.
170-
func (c *Controller) ensureScheduled(ctx context.Context, ns *corev1.Namespace) error {
175+
func (c *Controller) ensureScheduled(ctx context.Context, ns *corev1.Namespace) (*corev1.Namespace, error) {
171176
oldPClusterName := ns.Labels[ClusterLabel]
172177

173178
scheduler := namespaceScheduler{
@@ -176,47 +181,46 @@ func (c *Controller) ensureScheduled(ctx context.Context, ns *corev1.Namespace)
176181
}
177182
newPClusterName, err := scheduler.AssignCluster(ns)
178183
if err != nil {
179-
return err
184+
return ns, err
180185
}
181186

182187
if oldPClusterName == newPClusterName {
183-
return nil
188+
return ns, nil
184189
}
185190

186191
klog.Infof("Patching to update cluster assignment for namespace %s|%s: %s -> %s",
187192
ns.ClusterName, ns.Name, oldPClusterName, newPClusterName)
188193
patchType, patchBytes := clusterLabelPatchBytes(newPClusterName)
189194
patchedNamespace, err := c.kubeClient.Cluster(logicalcluster.From(ns)).CoreV1().Namespaces().
190195
Patch(ctx, ns.Name, patchType, patchBytes, metav1.PatchOptions{})
191-
if err == nil {
192-
// Update the label to enable the caller to detect a scheduling change.
193-
ns.Labels[ClusterLabel] = patchedNamespace.Labels[ClusterLabel]
196+
if err != nil {
197+
return ns, err
194198
}
195199

196-
return err
200+
return patchedNamespace, nil
197201
}
198202

199203
// ensureScheduledStatus ensures the status of the given namespace reflects the
200204
// namespace's scheduled state.
201-
func (c *Controller) ensureScheduledStatus(ctx context.Context, ns *corev1.Namespace) error {
205+
func (c *Controller) ensureScheduledStatus(ctx context.Context, ns *corev1.Namespace) (*corev1.Namespace, error) {
202206
updatedNs := setScheduledCondition(ns)
203207

204208
if equality.Semantic.DeepEqual(ns.Status, updatedNs.Status) {
205-
return nil
209+
return ns, nil
206210
}
207211

208212
patchBytes, err := statusPatchBytes(ns, updatedNs)
209213
if err != nil {
210-
return err
214+
return ns, err
211215
}
212216

213-
_, err = c.kubeClient.Cluster(logicalcluster.From(ns)).CoreV1().Namespaces().
217+
patchedNamespace, err := c.kubeClient.Cluster(logicalcluster.From(ns)).CoreV1().Namespaces().
214218
Patch(ctx, ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
215219
if err != nil {
216-
return fmt.Errorf("failed to patch status on namespace %s|%s: %w", ns.ClusterName, ns.Name, err)
220+
return ns, fmt.Errorf("failed to patch status on namespace %s|%s: %w", ns.ClusterName, ns.Name, err)
217221
}
218222

219-
return nil
223+
return patchedNamespace, nil
220224
}
221225

222226
// reconcileNamespace is responsible for assigning a namespace to a cluster, if
@@ -240,11 +244,13 @@ func (c *Controller) reconcileNamespace(ctx context.Context, lclusterName logica
240244
ns.Labels = map[string]string{}
241245
}
242246

243-
if err := c.ensureScheduled(ctx, ns); err != nil {
247+
ns, err = c.ensureScheduled(ctx, ns)
248+
if err != nil {
244249
return err
245250
}
246251

247-
if err := c.ensureScheduledStatus(ctx, ns); err != nil {
252+
ns, err = c.ensureScheduledStatus(ctx, ns)
253+
if err != nil {
248254
return err
249255
}
250256

@@ -259,6 +265,7 @@ func (c *Controller) reconcileNamespace(ctx context.Context, lclusterName logica
259265
func (c *Controller) enqueueResourcesForNamespace(ns *corev1.Namespace) error {
260266
lastScheduling, previouslyEnqueued := c.namepaceContentsEnqueuedFor(ns)
261267
if previouslyEnqueued && lastScheduling == ns.Labels[ClusterLabel] {
268+
klog.Infof("ANDY enqueueResourcesForNamespace short-circuiting %s|%s", logicalcluster.From(ns), ns.Name)
262269
return nil
263270
}
264271

pkg/syncer/statussyncer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ func (c *Controller) updateStatusInUpstream(ctx context.Context, gvr schema.Grou
9191

9292
upstreamObj.SetResourceVersion(existing.GetResourceVersion())
9393
if _, err := c.toClient.Resource(gvr).Namespace(upstreamNamespace).UpdateStatus(ctx, upstreamObj, metav1.UpdateOptions{}); err != nil {
94-
klog.Errorf("Failed updating status of resource %s|%s/%s from pcluster namespace %s: %v", c.upstreamClusterName, upstreamNamespace, upstreamObj.GetName(), downstreamObj.GetNamespace(), err)
94+
klog.Errorf("Failed updating status of resource %q %s|%s/%s from pcluster namespace %s: %v", gvr.String(), c.upstreamClusterName, upstreamNamespace, upstreamObj.GetName(), downstreamObj.GetNamespace(), err)
9595
return err
9696
}
97-
klog.Infof("Updated status of resource %s|%s/%s from pcluster namespace %s", c.upstreamClusterName, upstreamNamespace, upstreamObj.GetName(), downstreamObj.GetNamespace())
97+
klog.Infof("Updated status of resource %q %s|%s/%s from pcluster namespace %s", gvr.String(), c.upstreamClusterName, upstreamNamespace, upstreamObj.GetName(), downstreamObj.GetNamespace())
9898

9999
return nil
100100
}

test/e2e/framework/fixture.go

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,23 @@ import (
3232
"github.com/kcp-dev/apimachinery/pkg/logicalcluster"
3333
"github.com/stretchr/testify/require"
3434

35+
appsv1 "k8s.io/api/apps/v1"
3536
corev1 "k8s.io/api/core/v1"
3637
apierrors "k8s.io/apimachinery/pkg/api/errors"
3738
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39+
"k8s.io/apimachinery/pkg/runtime"
40+
"k8s.io/apimachinery/pkg/runtime/schema"
3841
"k8s.io/apimachinery/pkg/util/sets"
3942
"k8s.io/apimachinery/pkg/util/wait"
43+
"k8s.io/client-go/dynamic"
4044
"k8s.io/client-go/kubernetes"
4145
kubernetesclientset "k8s.io/client-go/kubernetes"
4246
"k8s.io/client-go/rest"
4347
"k8s.io/client-go/tools/clientcmd"
4448
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
4549
"k8s.io/klog/v2"
4650

51+
apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
4752
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
4853
kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
4954
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
@@ -183,6 +188,10 @@ func LogToConsoleEnvSet() bool {
183188
return inProcess
184189
}
185190

191+
func preserveTestResources() bool {
192+
return os.Getenv("PRESERVE") != ""
193+
}
194+
186195
func NewOrganizationFixture(t *testing.T, server RunningServer) (orgClusterName logicalcluster.LogicalCluster) {
187196
ctx, cancelFunc := context.WithCancel(context.Background())
188197
t.Cleanup(cancelFunc)
@@ -202,6 +211,10 @@ func NewOrganizationFixture(t *testing.T, server RunningServer) (orgClusterName
202211
require.NoError(t, err, "failed to create organization workspace")
203212

204213
t.Cleanup(func() {
214+
if preserveTestResources() {
215+
return
216+
}
217+
205218
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
206219
defer cancelFn()
207220

@@ -257,6 +270,10 @@ func NewWorkspaceWithWorkloads(t *testing.T, server RunningServer, orgClusterNam
257270
require.NoError(t, err, "failed to create workspace")
258271

259272
t.Cleanup(func() {
273+
if preserveTestResources() {
274+
return
275+
}
276+
260277
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
261278
defer cancelFn()
262279

@@ -385,35 +402,86 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
385402
downstreamKubeClient, err := kubernetesclientset.NewForConfig(downstreamConfig)
386403
require.NoError(t, err)
387404

388-
if useDeployedSyncer {
389-
// Ensure cleanup of pcluster resources
405+
artifactDir, err := CreateTempDirForTest(t, "artifacts")
406+
if err != nil {
407+
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
408+
}
409+
410+
// collect both in deployed and in-process mode
411+
t.Cleanup(func() {
412+
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
413+
defer cancelFn()
414+
415+
t.Logf("Collecting imported resource info")
416+
upstreamCfg := sf.UpstreamServer.DefaultConfig(t)
417+
418+
gather := func(client dynamic.Interface, gvr schema.GroupVersionResource) {
419+
resourceClient := client.Resource(gvr)
420+
421+
t.Logf("gathering %q", gvr)
422+
list, err := resourceClient.List(ctx, metav1.ListOptions{})
423+
if err != nil {
424+
// Don't fail the test
425+
t.Logf("Error gathering %s: %v", gvr, err)
426+
return
427+
}
390428

429+
t.Logf("got %d items", len(list.Items))
430+
431+
for i := range list.Items {
432+
item := list.Items[i]
433+
sf.UpstreamServer.Artifact(t, func() (runtime.Object, error) {
434+
return &item, nil
435+
})
436+
}
437+
}
438+
439+
upstreamDynamic, err := dynamic.NewClusterForConfig(upstreamCfg)
440+
require.NoError(t, err, "error creating upstream dynamic client")
441+
442+
downstreamDynamic, err := dynamic.NewForConfig(downstreamConfig)
443+
require.NoError(t, err, "error creating downstream dynamic client")
444+
445+
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports"))
446+
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources"))
447+
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), corev1.SchemeGroupVersion.WithResource("namespaces"))
448+
gather(downstreamDynamic, corev1.SchemeGroupVersion.WithResource("namespaces"))
449+
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), appsv1.SchemeGroupVersion.WithResource("deployments"))
450+
gather(downstreamDynamic, appsv1.SchemeGroupVersion.WithResource("deployments"))
451+
})
452+
453+
if useDeployedSyncer {
391454
syncerID := syncerConfig.ID()
392455
t.Cleanup(func() {
393-
if useDeployedSyncer {
394-
t.Logf("Collecting syncer %s logs", syncerID)
395-
396-
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
397-
defer cancelFn()
456+
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
457+
defer cancelFn()
398458

459+
// collect syncer logs
460+
t.Logf("Collecting syncer pod logs")
461+
func() {
462+
t.Logf("Listing downstream pods in namespace %s", syncerID)
399463
pods, err := downstreamKubeClient.CoreV1().Pods(syncerID).List(ctx, metav1.ListOptions{})
400464
if err != nil {
401-
t.Errorf("failed to list pods in %s: %v", syncerID, err)
402-
}
403-
artifactDir, err := CreateTempDirForTest(t, "artifacts")
404-
if err != nil {
405-
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
465+
t.Logf("failed to list pods in %s: %v", syncerID, err)
466+
return
406467
}
468+
407469
for _, pod := range pods.Items {
408470
t.Logf("Collecting downstream logs for pod %s/%s", syncerID, pod.Name)
409471
logs := Kubectl(t, downstreamKubeconfigPath, "-n", syncerID, "logs", pod.Name)
472+
410473
artifactPath := filepath.Join(artifactDir, fmt.Sprintf("syncer-%s-%s.log", syncerID, pod.Name))
474+
411475
err = ioutil.WriteFile(artifactPath, logs, 0644)
412476
if err != nil {
413477
t.Logf("failed to write logs for pod %s in %s to %s: %v", pod.Name, syncerID, artifactPath, err)
414478
continue // not fatal
415479
}
416480
}
481+
}()
482+
483+
if preserveTestResources() {
484+
return
417485
}
418486

419487
t.Logf("Deleting syncer resources for logical cluster %q, workload cluster %q", syncerConfig.KCPClusterName, syncerConfig.WorkloadClusterName)
@@ -455,10 +523,8 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
455523
}
456524
}
457525
})
458-
459526
} else {
460527
// Start an in-process syncer
461-
462528
err := syncer.StartSyncer(ctx, syncerConfig, 2, 5*time.Second)
463529
require.NoError(t, err, "syncer failed to start")
464530
}

test/e2e/framework/util.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,7 @@ import (
4040
"k8s.io/apimachinery/pkg/runtime"
4141
"k8s.io/apimachinery/pkg/runtime/serializer/json"
4242
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
43-
"k8s.io/client-go/discovery"
44-
cacheddiscovery "k8s.io/client-go/discovery/cached"
4543
"k8s.io/client-go/kubernetes/scheme"
46-
"k8s.io/client-go/restmapper"
4744
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
4845

4946
apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
@@ -197,16 +194,14 @@ func artifact(t *testing.T, server RunningServer, producer func() (runtime.Objec
197194
gvk := gvks[0]
198195
data.GetObjectKind().SetGroupVersionKind(gvk)
199196

200-
cfg := server.DefaultConfig(t) // TODO(sttts): this doesn't make sense: discovery from a random workspace
201-
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
202-
require.NoError(t, err, "could not get discovery client for server")
197+
group := gvk.Group
198+
if group == "" {
199+
group = "core"
200+
}
203201

204-
scopedDiscoveryClient := discoveryClient.WithCluster(logicalcluster.From(accessor))
205-
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(scopedDiscoveryClient))
206-
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
207-
require.NoError(t, err, "could not get REST mapping for artifact's GVK")
202+
gvkForFilename := fmt.Sprintf("%s_%s", group, gvk.Kind)
208203

209-
file := path.Join(dir, fmt.Sprintf("%s_%s.yaml", mapping.Resource.GroupResource().String(), accessor.GetName()))
204+
file := path.Join(dir, fmt.Sprintf("%s-%s.yaml", gvkForFilename, accessor.GetName()))
210205
t.Logf("saving artifact to %s", file)
211206

212207
serializer := json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{Yaml: true})

0 commit comments

Comments
 (0)