Skip to content

Commit 191d4f9

Browse files
committed
Resolving race condition with Global configs in KDD
- Added a check to see if the error we see if of the type `errors.ErrorResourceUpdateConflict` when we set global config variables, as this means another Node set it while the current Node was trying to. As all the Nodes are trying to set them as the same value we can safely ignore this and carry on. - Added test that spins up API server and creates 10 Nodes in goroutines to simulate a bunch of Nodes coming up at once.
1 parent db6af8d commit 191d4f9

File tree

3 files changed

+138
-5
lines changed

3 files changed

+138
-5
lines changed

calico_node/Makefile

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ dist/allocate-ipip-addr: $(ALLOCATE_IPIP_FILES) vendor
236236
# Default value: directory with Makefile
237237
SOURCE_DIR?=$(dir $(lastword $(MAKEFILE_LIST)))
238238
SOURCE_DIR:=$(abspath $(SOURCE_DIR))
239+
CRD_PATH=$(CURDIR)/vendor/github.com/projectcalico/libcalico-go/test/
239240
LOCAL_IP_ENV?=$(shell ip route get 8.8.8.8 | head -1 | awk '{print $$7}')
241+
K8S_VERSION=v1.7.4
240242
ST_TO_RUN?=tests/st/
241243

242244
# Can exclude the slower tests with "-a '!slow'"
@@ -396,6 +398,39 @@ run-etcd-host:
396398
--advertise-client-urls "http://$(LOCAL_IP_ENV):2379,http://127.0.0.1:2379" \
397399
--listen-client-urls "http://0.0.0.0:2379"
398400

401+
## Kubernetes apiserver used for tests
402+
run-k8s-apiserver: stop-k8s-apiserver run-etcd vendor
403+
docker run \
404+
--net=host --name st-apiserver \
405+
--detach \
406+
gcr.io/google_containers/hyperkube-amd64:${K8S_VERSION} \
407+
/hyperkube apiserver \
408+
--bind-address=0.0.0.0 \
409+
--insecure-bind-address=0.0.0.0 \
410+
--etcd-servers=http://127.0.0.1:2379 \
411+
--admission-control=NamespaceLifecycle,LimitRanger,DefaultStorageClass,ResourceQuota \
412+
--authorization-mode=RBAC \
413+
--service-cluster-ip-range=10.101.0.0/16 \
414+
--v=10 \
415+
--logtostderr=true
416+
417+
# Wait until we can configure a cluster role binding which allows anonymous auth.
418+
while ! docker exec st-apiserver kubectl create clusterrolebinding anonymous-admin --clusterrole=cluster-admin --user=system:anonymous; do echo "Trying to create ClusterRoleBinding"; sleep 2; done
419+
420+
# Create CustomResourceDefinition (CRD) for Calico resources
421+
# from the manifest crds.yaml
422+
docker run \
423+
--net=host \
424+
--rm \
425+
-v $(CRD_PATH):/manifests \
426+
lachlanevenson/k8s-kubectl:${K8S_VERSION} \
427+
--server=http://localhost:8080 \
428+
apply -f /manifests/crds.yaml
429+
430+
## Stop Kubernetes apiserver
431+
stop-k8s-apiserver:
432+
@-docker rm -f st-apiserver
433+
399434
###############################################################################
400435
# calico_node FVs
401436
###############################################################################
@@ -421,7 +456,7 @@ node-fv:
421456

422457
PHONY: node-test-containerized
423458
## Run the tests in a container. Useful for CI, Mac dev.
424-
node-test-containerized: vendor run-etcd-host
459+
node-test-containerized: vendor run-etcd-host run-k8s-apiserver
425460
docker run --rm \
426461
-v $(CURDIR):/go/src/$(PACKAGE_NAME):rw \
427462
-v $(VERSIONS_FILE):/versions.yaml:ro \

calico_node/startup/startup.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -788,12 +788,12 @@ func ensureDefaultConfig(cfg *api.CalicoAPIConfig, c *client.Client, node *api.N
788788
}
789789

790790
// Store the Calico Version as a global felix config setting.
791-
if err := c.Config().SetFelixConfig("CalicoVersion", "", VERSION); err != nil {
791+
if err := checkConflictError(c.Config().SetFelixConfig("CalicoVersion", "", VERSION)); err != nil {
792792
return err
793793
}
794794

795795
// Update the ClusterType with the type in CLUSTER_TYPE
796-
if err := updateClusterType(c, "ClusterType", os.Getenv("CLUSTER_TYPE")); err != nil {
796+
if err := checkConflictError(updateClusterType(c, "ClusterType", os.Getenv("CLUSTER_TYPE"))); err != nil {
797797
return err
798798
}
799799

@@ -818,7 +818,7 @@ func ensureGlobalFelixConfig(c *client.Client, key, def string) error {
818818
if val, assigned, err := c.Config().GetFelixConfig(key, ""); err != nil {
819819
return err
820820
} else if !assigned {
821-
return c.Config().SetFelixConfig(key, "", def)
821+
return checkConflictError(c.Config().SetFelixConfig(key, "", def))
822822
} else {
823823
log.WithField(key, val).Debug("Global Felix value already assigned")
824824
return nil
@@ -889,13 +889,24 @@ func ensureGlobalBGPConfig(c *client.Client, key, def string) error {
889889
if val, assigned, err := c.Config().GetBGPConfig(key, ""); err != nil {
890890
return err
891891
} else if !assigned {
892-
return c.Config().SetBGPConfig(key, "", def)
892+
return checkConflictError(c.Config().SetBGPConfig(key, "", def))
893893
} else {
894894
log.WithField(key, val).Debug("Global BGP value already assigned")
895895
return nil
896896
}
897897
}
898898

899+
// checkConflictError checks to see if the given error is of the type ErrorResourceUpdateConflict
900+
// and ignore it if so. This is to allow our global configs to ignore conflict from multiple Nodes
901+
// trying to set the same value at the same time.
902+
func checkConflictError(err error) error {
903+
if conflict, ok := err.(errors.ErrorResourceUpdateConflict); ok {
904+
log.Infof("Ignoring conflict when setting value %s", conflict.Identifier)
905+
return nil
906+
}
907+
return err
908+
}
909+
899910
// message prints a message to screen and to log. A newline terminator is
900911
// not required in the format string.
901912
func message(format string, args ...interface{}) {

calico_node/startup/startup_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package main
1616

1717
import (
18+
"fmt"
1819
"log"
1920
"os"
2021
"strings"
22+
"sync"
2123
"time"
2224

2325
. "github.com/onsi/ginkgo"
@@ -29,6 +31,12 @@ import (
2931
"github.com/projectcalico/libcalico-go/lib/ipip"
3032
"github.com/projectcalico/libcalico-go/lib/net"
3133
"github.com/projectcalico/libcalico-go/lib/testutils"
34+
"k8s.io/client-go/tools/clientcmd"
35+
36+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37+
"k8s.io/client-go/kubernetes"
38+
"k8s.io/client-go/pkg/api/v1"
39+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
3240
)
3341

3442
var exitCode int
@@ -454,3 +462,82 @@ var _ = Describe("UT for Node IP assignment and conflict checking.", func() {
454462
Entry("Test with \"IP6\" env var set to IP and BGP spec populated with different IP", makeNode("192.168.1.10/24", "2001:db8:85a3:8d3:1319:8a2e:370:7348/32"), []EnvItem{{"IP", "192.168.1.10/24"}, {"IP6", "2001:db8:85a3:8d3:1319:8a2e:370:7349/32"}}, true),
455463
)
456464
})
465+
466+
var _ = Describe("FV tests against K8s API server.", func() {
467+
It("should not throw an error when multiple Nodes configure the same global CRD value.", func() {
468+
// How many Nodes we want to "create".
469+
numNodes := 10
470+
471+
// Create a K8s client.
472+
configOverrides := &clientcmd.ConfigOverrides{
473+
ClusterInfo: clientcmdapi.Cluster{
474+
Server: "http://127.0.0.1:8080",
475+
InsecureSkipTLSVerify: true,
476+
},
477+
}
478+
479+
kcfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{}, configOverrides).ClientConfig()
480+
if err != nil {
481+
Fail(fmt.Sprintf("Failed to create K8s config: %v", err))
482+
}
483+
484+
cs, err := kubernetes.NewForConfig(kcfg)
485+
if err != nil {
486+
Fail(fmt.Sprintf("Could not create K8s client: %v", err))
487+
}
488+
489+
// Create Calico client with k8s backend.
490+
cfg := api.NewCalicoAPIConfig()
491+
cfg.Spec = api.CalicoAPIConfigSpec{
492+
DatastoreType: api.Kubernetes,
493+
KubeConfig: api.KubeConfig{
494+
K8sAPIEndpoint: "http://127.0.0.1:8080",
495+
K8sInsecureSkipTLSVerify: true,
496+
},
497+
}
498+
499+
c := testutils.CreateClient(*cfg)
500+
501+
// Create some Nodes using K8s client, Calico client does not support Node creation for KDD.
502+
kNodes := []*v1.Node{}
503+
for i := 0; i < numNodes; i++ {
504+
n := &v1.Node{
505+
ObjectMeta: metav1.ObjectMeta{
506+
Name: fmt.Sprintf("raceNode%02d", i+1),
507+
},
508+
}
509+
kNodes = append(kNodes, n)
510+
cs.Nodes().Create(n)
511+
}
512+
513+
// Pull above Nodes using Calico client.
514+
nodes, err := c.Nodes().List(api.NodeMetadata{})
515+
if err != nil {
516+
Fail(fmt.Sprintf("Could not retrieve Nodes %v", err))
517+
}
518+
519+
// Run ensureDefaultConfig against each of the Nodes using goroutines to simulate multiple Nodes coming online.
520+
var wg sync.WaitGroup
521+
errors := []error{}
522+
for _, node := range nodes.Items {
523+
wg.Add(1)
524+
go func() {
525+
defer wg.Done()
526+
err = ensureDefaultConfig(cfg, c, &node)
527+
if err != nil {
528+
errors = append(errors, err)
529+
}
530+
}()
531+
}
532+
533+
wg.Wait()
534+
535+
// Verify all runs complete without error.
536+
Expect(len(errors)).To(Equal(0))
537+
538+
// Clean up our Nodes.
539+
for _, node := range nodes.Items {
540+
cs.Nodes().Delete(node.Metadata.Name, &metav1.DeleteOptions{})
541+
}
542+
})
543+
})

0 commit comments

Comments
 (0)