Skip to content

Commit a87ae0f

Browse files
Merge pull request #1070 from heschlie/config-race
Resolving race condition with Global configs in KDD
2 parents db6af8d + 191d4f9 commit a87ae0f

File tree

3 files changed

+138
-5
lines changed

3 files changed

+138
-5
lines changed

calico_node/Makefile

+36-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ dist/allocate-ipip-addr: $(ALLOCATE_IPIP_FILES) vendor
236236
# Default value: directory with Makefile
237237
SOURCE_DIR?=$(dir $(lastword $(MAKEFILE_LIST)))
238238
SOURCE_DIR:=$(abspath $(SOURCE_DIR))
239+
CRD_PATH=$(CURDIR)/vendor/github.com/projectcalico/libcalico-go/test/
239240
LOCAL_IP_ENV?=$(shell ip route get 8.8.8.8 | head -1 | awk '{print $$7}')
241+
K8S_VERSION=v1.7.4
240242
ST_TO_RUN?=tests/st/
241243

242244
# Can exclude the slower tests with "-a '!slow'"
@@ -396,6 +398,39 @@ run-etcd-host:
396398
--advertise-client-urls "http://$(LOCAL_IP_ENV):2379,http://127.0.0.1:2379" \
397399
--listen-client-urls "http://0.0.0.0:2379"
398400

401+
## Kubernetes apiserver used for tests
402+
run-k8s-apiserver: stop-k8s-apiserver run-etcd vendor
403+
docker run \
404+
--net=host --name st-apiserver \
405+
--detach \
406+
gcr.io/google_containers/hyperkube-amd64:${K8S_VERSION} \
407+
/hyperkube apiserver \
408+
--bind-address=0.0.0.0 \
409+
--insecure-bind-address=0.0.0.0 \
410+
--etcd-servers=http://127.0.0.1:2379 \
411+
--admission-control=NamespaceLifecycle,LimitRanger,DefaultStorageClass,ResourceQuota \
412+
--authorization-mode=RBAC \
413+
--service-cluster-ip-range=10.101.0.0/16 \
414+
--v=10 \
415+
--logtostderr=true
416+
417+
# Wait until we can configure a cluster role binding which allows anonymous auth.
418+
while ! docker exec st-apiserver kubectl create clusterrolebinding anonymous-admin --clusterrole=cluster-admin --user=system:anonymous; do echo "Trying to create ClusterRoleBinding"; sleep 2; done
419+
420+
# Create CustomResourceDefinition (CRD) for Calico resources
421+
# from the manifest crds.yaml
422+
docker run \
423+
--net=host \
424+
--rm \
425+
-v $(CRD_PATH):/manifests \
426+
lachlanevenson/k8s-kubectl:${K8S_VERSION} \
427+
--server=http://localhost:8080 \
428+
apply -f /manifests/crds.yaml
429+
430+
## Stop Kubernetes apiserver
431+
stop-k8s-apiserver:
432+
@-docker rm -f st-apiserver
433+
399434
###############################################################################
400435
# calico_node FVs
401436
###############################################################################
@@ -421,7 +456,7 @@ node-fv:
421456

422457
PHONY: node-test-containerized
423458
## Run the tests in a container. Useful for CI, Mac dev.
424-
node-test-containerized: vendor run-etcd-host
459+
node-test-containerized: vendor run-etcd-host run-k8s-apiserver
425460
docker run --rm \
426461
-v $(CURDIR):/go/src/$(PACKAGE_NAME):rw \
427462
-v $(VERSIONS_FILE):/versions.yaml:ro \

calico_node/startup/startup.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -788,12 +788,12 @@ func ensureDefaultConfig(cfg *api.CalicoAPIConfig, c *client.Client, node *api.N
788788
}
789789

790790
// Store the Calico Version as a global felix config setting.
791-
if err := c.Config().SetFelixConfig("CalicoVersion", "", VERSION); err != nil {
791+
if err := checkConflictError(c.Config().SetFelixConfig("CalicoVersion", "", VERSION)); err != nil {
792792
return err
793793
}
794794

795795
// Update the ClusterType with the type in CLUSTER_TYPE
796-
if err := updateClusterType(c, "ClusterType", os.Getenv("CLUSTER_TYPE")); err != nil {
796+
if err := checkConflictError(updateClusterType(c, "ClusterType", os.Getenv("CLUSTER_TYPE"))); err != nil {
797797
return err
798798
}
799799

@@ -818,7 +818,7 @@ func ensureGlobalFelixConfig(c *client.Client, key, def string) error {
818818
if val, assigned, err := c.Config().GetFelixConfig(key, ""); err != nil {
819819
return err
820820
} else if !assigned {
821-
return c.Config().SetFelixConfig(key, "", def)
821+
return checkConflictError(c.Config().SetFelixConfig(key, "", def))
822822
} else {
823823
log.WithField(key, val).Debug("Global Felix value already assigned")
824824
return nil
@@ -889,13 +889,24 @@ func ensureGlobalBGPConfig(c *client.Client, key, def string) error {
889889
if val, assigned, err := c.Config().GetBGPConfig(key, ""); err != nil {
890890
return err
891891
} else if !assigned {
892-
return c.Config().SetBGPConfig(key, "", def)
892+
return checkConflictError(c.Config().SetBGPConfig(key, "", def))
893893
} else {
894894
log.WithField(key, val).Debug("Global BGP value already assigned")
895895
return nil
896896
}
897897
}
898898

899+
// checkConflictError checks to see if the given error is of the type ErrorResourceUpdateConflict
900+
// and ignore it if so. This is to allow our global configs to ignore conflict from multiple Nodes
901+
// trying to set the same value at the same time.
902+
func checkConflictError(err error) error {
903+
if conflict, ok := err.(errors.ErrorResourceUpdateConflict); ok {
904+
log.Infof("Ignoring conflict when setting value %s", conflict.Identifier)
905+
return nil
906+
}
907+
return err
908+
}
909+
899910
// message prints a message to screen and to log. A newline terminator is
900911
// not required in the format string.
901912
func message(format string, args ...interface{}) {

calico_node/startup/startup_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package main
1616

1717
import (
18+
"fmt"
1819
"log"
1920
"os"
2021
"strings"
22+
"sync"
2123
"time"
2224

2325
. "github.com/onsi/ginkgo"
@@ -29,6 +31,12 @@ import (
2931
"github.com/projectcalico/libcalico-go/lib/ipip"
3032
"github.com/projectcalico/libcalico-go/lib/net"
3133
"github.com/projectcalico/libcalico-go/lib/testutils"
34+
"k8s.io/client-go/tools/clientcmd"
35+
36+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37+
"k8s.io/client-go/kubernetes"
38+
"k8s.io/client-go/pkg/api/v1"
39+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
3240
)
3341

3442
var exitCode int
@@ -454,3 +462,82 @@ var _ = Describe("UT for Node IP assignment and conflict checking.", func() {
454462
Entry("Test with \"IP6\" env var set to IP and BGP spec populated with different IP", makeNode("192.168.1.10/24", "2001:db8:85a3:8d3:1319:8a2e:370:7348/32"), []EnvItem{{"IP", "192.168.1.10/24"}, {"IP6", "2001:db8:85a3:8d3:1319:8a2e:370:7349/32"}}, true),
455463
)
456464
})
465+
466+
var _ = Describe("FV tests against K8s API server.", func() {
467+
It("should not throw an error when multiple Nodes configure the same global CRD value.", func() {
468+
// How many Nodes we want to "create".
469+
numNodes := 10
470+
471+
// Create a K8s client.
472+
configOverrides := &clientcmd.ConfigOverrides{
473+
ClusterInfo: clientcmdapi.Cluster{
474+
Server: "http://127.0.0.1:8080",
475+
InsecureSkipTLSVerify: true,
476+
},
477+
}
478+
479+
kcfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{}, configOverrides).ClientConfig()
480+
if err != nil {
481+
Fail(fmt.Sprintf("Failed to create K8s config: %v", err))
482+
}
483+
484+
cs, err := kubernetes.NewForConfig(kcfg)
485+
if err != nil {
486+
Fail(fmt.Sprintf("Could not create K8s client: %v", err))
487+
}
488+
489+
// Create Calico client with k8s backend.
490+
cfg := api.NewCalicoAPIConfig()
491+
cfg.Spec = api.CalicoAPIConfigSpec{
492+
DatastoreType: api.Kubernetes,
493+
KubeConfig: api.KubeConfig{
494+
K8sAPIEndpoint: "http://127.0.0.1:8080",
495+
K8sInsecureSkipTLSVerify: true,
496+
},
497+
}
498+
499+
c := testutils.CreateClient(*cfg)
500+
501+
// Create some Nodes using K8s client, Calico client does not support Node creation for KDD.
502+
kNodes := []*v1.Node{}
503+
for i := 0; i < numNodes; i++ {
504+
n := &v1.Node{
505+
ObjectMeta: metav1.ObjectMeta{
506+
Name: fmt.Sprintf("raceNode%02d", i+1),
507+
},
508+
}
509+
kNodes = append(kNodes, n)
510+
cs.Nodes().Create(n)
511+
}
512+
513+
// Pull above Nodes using Calico client.
514+
nodes, err := c.Nodes().List(api.NodeMetadata{})
515+
if err != nil {
516+
Fail(fmt.Sprintf("Could not retrieve Nodes %v", err))
517+
}
518+
519+
// Run ensureDefaultConfig against each of the Nodes using goroutines to simulate multiple Nodes coming online.
520+
var wg sync.WaitGroup
521+
errors := []error{}
522+
for _, node := range nodes.Items {
523+
wg.Add(1)
524+
go func() {
525+
defer wg.Done()
526+
err = ensureDefaultConfig(cfg, c, &node)
527+
if err != nil {
528+
errors = append(errors, err)
529+
}
530+
}()
531+
}
532+
533+
wg.Wait()
534+
535+
// Verify all runs complete without error.
536+
Expect(len(errors)).To(Equal(0))
537+
538+
// Clean up our Nodes.
539+
for _, node := range nodes.Items {
540+
cs.Nodes().Delete(node.Metadata.Name, &metav1.DeleteOptions{})
541+
}
542+
})
543+
})

0 commit comments

Comments
 (0)