Skip to content

Commit 33889bc

Browse files
committed
refactor: replace global sharedResourceLock with a fine-grained resource-specific mutex pool for GCE load balancer operations
1 parent 9bff285 commit 33889bc

File tree

4 files changed

+308
-31
lines changed

4 files changed

+308
-31
lines changed

providers/gce/gce.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ type Cloud struct {
176176
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
177177
// prevent inconsistencies. For example, load balancers manipulation methods will take the
178178
// lock to prevent shared resources from being prematurely deleted while the operation is
179-
// in progress.
180179
sharedResourceLock sync.Mutex
180+
// sharedResourceLocks is a concurrent map used for resource-specific fine-grained locking of shared resources (e.g. InstanceGroups, shared HealthChecks).
181+
sharedResourceLocks sync.Map // map[string]*sync.Mutex
181182
// AlphaFeatureGate gates gce alpha features in Cloud instance.
182183
// Related wrapper functions that interacts with gce alpha api should examine whether
183184
// the corresponding api is enabled.
@@ -219,6 +220,30 @@ type Cloud struct {
219220
enableL4DenyFirewallRollbackCleanup bool
220221
}
221222

223+
type SharedResourceType string
224+
225+
const (
226+
ResourceTypeHealthCheck SharedResourceType = "hc"
227+
ResourceTypeInstanceGroup SharedResourceType = "ig"
228+
ResourceTypeFirewall SharedResourceType = "fw"
229+
)
230+
231+
func (g *Cloud) getLockForResource(resType SharedResourceType, name string) *sync.Mutex {
232+
key := string(resType) + ":" + name
233+
v, _ := g.sharedResourceLocks.LoadOrStore(key, &sync.Mutex{})
234+
return v.(*sync.Mutex)
235+
}
236+
237+
// lockResourceIfShared conditionally acquires a lock and returns a func to defer for unlocking.
238+
func (g *Cloud) lockResourceIfShared(shared bool, resType SharedResourceType, name string) func() {
239+
if !shared {
240+
return func() {} // No-op
241+
}
242+
lock := g.getLockForResource(resType, name)
243+
lock.Lock()
244+
return lock.Unlock
245+
}
246+
222247
// ConfigGlobal is the in memory representation of the gce.conf config data
223248
// TODO: replace gcfg with json
224249
type ConfigGlobal struct {

providers/gce/gce_loadbalancer_internal.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
128128
}
129129
}
130130

131-
// Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here
132-
g.sharedResourceLock.Lock()
133-
defer g.sharedResourceLock.Unlock()
134-
135131
// Ensure health check exists before creating the backend service. The health check is shared
136132
// if externalTrafficPolicy=Cluster.
137133
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
@@ -354,9 +350,6 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v
354350
klog.V(2).Infof("Skipped updateInternalLoadBalancer for service %s/%s as service contains %q loadBalancerClass.", svc.Namespace, svc.Name, *svc.Spec.LoadBalancerClass)
355351
return cloudprovider.ImplementedElsewhere
356352
}
357-
g.sharedResourceLock.Lock()
358-
defer g.sharedResourceLock.Unlock()
359-
360353
igName := makeInstanceGroupName(clusterID)
361354
igLinks, err := g.ensureInternalInstanceGroups(igName, nodes)
362355
if err != nil {
@@ -393,9 +386,6 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
393386
sharedBackend := shareBackendService(svc)
394387
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
395388

396-
g.sharedResourceLock.Lock()
397-
defer g.sharedResourceLock.Unlock()
398-
399389
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName)
400390
ensureAddressDeleted(g, loadBalancerName, g.region)
401391

@@ -500,7 +490,9 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s
500490
return nil
501491
}
502492

503-
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error {
493+
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string, shared bool) error {
494+
defer g.lockResourceIfShared(shared, ResourceTypeFirewall, fwName)()
495+
504496
klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
505497
targetTags, err := g.GetNodeTags(nodeNames(nodes))
506498
if err != nil {
@@ -587,15 +579,15 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s
587579
return err
588580
}
589581

590-
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName)
582+
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName, false)
591583
if err != nil {
592584
return err
593585
}
594586

595587
// Second firewall is for health checking nodes / services
596588
fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck)
597589
hcSrcRanges := L4LoadBalancerSrcRanges()
598-
return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "")
590+
return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "", sharedHealthCheck)
599591
}
600592

601593
func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) {
@@ -608,17 +600,34 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
608600
}
609601

610602
if hc == nil {
611-
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
612-
if err = g.CreateHealthCheck(expectedHC); err != nil {
613-
return nil, err
614-
}
615-
hc, err = g.GetHealthCheck(name)
603+
var created bool
604+
err = func() error {
605+
defer g.lockResourceIfShared(shared, ResourceTypeHealthCheck, name)()
606+
607+
hc, err = g.GetHealthCheck(name)
608+
if err != nil && !isNotFound(err) {
609+
return err
610+
}
611+
612+
if hc == nil {
613+
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
614+
if err = g.CreateHealthCheck(expectedHC); err != nil {
615+
return err
616+
}
617+
hc, err = g.GetHealthCheck(name)
618+
if err != nil {
619+
return err
620+
}
621+
created = true
622+
}
623+
return nil
624+
}()
616625
if err != nil {
617-
klog.Errorf("Failed to get http health check %v", err)
618626
return nil, err
619627
}
620-
klog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name)
621-
return hc, nil
628+
if created {
629+
return hc, nil
630+
}
622631
}
623632

624633
if needToUpdateHealthChecks(hc, expectedHC) {
@@ -638,6 +647,10 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
638647
}
639648

640649
func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node, emptyZoneNodes []*v1.Node) (string, error) {
650+
lock := g.getLockForResource(ResourceTypeInstanceGroup, name+"-"+zone)
651+
lock.Lock()
652+
defer lock.Unlock()
653+
641654
klog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes [node names limited, total number of nodes: %d], the following nodes have empty string in the zone field and won't be deleted: %v", name, zone, loggableNodeNames(nodes), len(nodes), loggableNodeNames(emptyZoneNodes))
642655
ig, err := g.GetInstanceGroup(name, zone)
643656
if err != nil && !isNotFound(err) {

0 commit comments

Comments
 (0)