Skip to content

Commit 6531bf8

Browse files
committed
manager/allocator: lift portAllocator out of CNM
The port allocation logic does not depend on the network allocator implementation in any meaningful way. It has no knowledge of the CNM network allocator's state, and it does not need to change if the network allocator changes. Allocating node ports is fundamentally a seaparate concern from allocating network resources for services and tasks. Therefore the low-level network allocator should not be responsible for allocating both. Lift the port allocator into the Allocator's network context as a sibling of the low-level network allocator. Signed-off-by: Cory Snider <[email protected]>
1 parent 80fc22d commit 6531bf8

File tree

7 files changed

+57
-273
lines changed

7 files changed

+57
-273
lines changed

manager/allocator/allocator_test.go

-13
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,6 @@ func init() {
2121
retryInterval = 5 * time.Millisecond
2222
}
2323

24-
// Temporary copy of constants from cnmallocator/portallocator.go
25-
// to allow tests to build before portallocator.go is moved into
26-
// this package.
27-
const (
28-
// Start of the dynamic port range from which node ports will
29-
// be allocated when the user did not specify a port.
30-
dynamicPortStart = 30000
31-
32-
// End of the dynamic port range from which node ports will be
33-
// allocated when the user did not specify a port.
34-
dynamicPortEnd = 32767
35-
)
36-
3724
func TestAllocator(t *testing.T) {
3825
s := store.NewMemoryStore(nil)
3926
assert.NotNil(t, s)

manager/allocator/cnmallocator/networkallocator.go

+2-26
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@ type cnmNetworkAllocator struct {
4040
// The driver registry for all internal and external network drivers.
4141
networkRegistry drvregistry.Networks
4242

43-
// The port allocator instance for allocating node ports
44-
portAllocator *portAllocator
45-
4643
// Local network state used by cnmNetworkAllocator to do network management.
4744
networks map[string]*network
4845

@@ -108,8 +105,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat
108105
tasks: make(map[string]struct{}),
109106
nodes: make(map[string]map[string]struct{}),
110107
pg: pg,
111-
112-
portAllocator: newPortAllocator(),
113108
}
114109

115110
for ntype, i := range initializers {
@@ -205,11 +200,8 @@ func (na *cnmNetworkAllocator) Deallocate(n *api.Network) error {
205200
}
206201

207202
// AllocateService allocates all the network resources such as virtual
208-
// IP and ports needed by the service.
203+
// IP needed by the service.
209204
func (na *cnmNetworkAllocator) AllocateService(s *api.Service) (err error) {
210-
if err = na.portAllocator.serviceAllocatePorts(s); err != nil {
211-
return err
212-
}
213205
defer func() {
214206
if err != nil {
215207
na.DeallocateService(s)
@@ -296,7 +288,7 @@ networkLoop:
296288
}
297289

298290
// DeallocateService de-allocates all the network resources such as
299-
// virtual IP and ports associated with the service.
291+
// virtual IP associated with the service.
300292
func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error {
301293
if s.Endpoint == nil {
302294
return nil
@@ -312,7 +304,6 @@ func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error {
312304
}
313305
s.Endpoint.VirtualIPs = nil
314306

315-
na.portAllocator.serviceDeallocatePorts(s)
316307
delete(na.services, s.ID)
317308

318309
return nil
@@ -369,19 +360,8 @@ func (na *cnmNetworkAllocator) IsTaskAllocated(t *api.Task) bool {
369360
return true
370361
}
371362

372-
// HostPublishPortsNeedUpdate returns true if the passed service needs
373-
// allocations for its published ports in host (non ingress) mode
374-
func (na *cnmNetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool {
375-
return na.portAllocator.hostPublishPortsNeedUpdate(s)
376-
}
377-
378363
// IsServiceAllocated returns false if the passed service needs to have network resources allocated/updated.
379364
func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool {
380-
var options networkallocator.ServiceAllocationOpts
381-
for _, flag := range flags {
382-
flag(&options)
383-
}
384-
385365
specNetworks := serviceNetworks(s)
386366

387367
// If endpoint mode is VIP and allocator does not have the
@@ -443,10 +423,6 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(
443423
}
444424
}
445425

446-
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
447-
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
448-
return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit)
449-
}
450426
return true
451427
}
452428

manager/allocator/cnmallocator/networkallocator_test.go

+1-217
Original file line numberDiff line numberDiff line change
@@ -563,11 +563,7 @@ func TestAllocateService(t *testing.T) {
563563

564564
err = na.AllocateService(s)
565565
assert.NoError(t, err)
566-
assert.Equal(t, 2, len(s.Endpoint.Ports))
567-
assert.True(t, s.Endpoint.Ports[0].PublishedPort >= dynamicPortStart &&
568-
s.Endpoint.Ports[0].PublishedPort <= dynamicPortEnd)
569-
assert.True(t, s.Endpoint.Ports[1].PublishedPort >= dynamicPortStart &&
570-
s.Endpoint.Ports[1].PublishedPort <= dynamicPortEnd)
566+
assert.Len(t, s.Endpoint.Ports, 0) // Network allocator is not responsible for allocating ports.
571567

572568
assert.Equal(t, 1, len(s.Endpoint.VirtualIPs))
573569

@@ -579,94 +575,6 @@ func TestAllocateService(t *testing.T) {
579575
assert.Equal(t, true, subnet.Contains(ip))
580576
}
581577

582-
func TestAllocateServiceUserDefinedPorts(t *testing.T) {
583-
na := newNetworkAllocator(t)
584-
s := &api.Service{
585-
ID: "testID1",
586-
Spec: api.ServiceSpec{
587-
Endpoint: &api.EndpointSpec{
588-
Ports: []*api.PortConfig{
589-
{
590-
Name: "some_tcp",
591-
TargetPort: 1234,
592-
PublishedPort: 1234,
593-
},
594-
{
595-
Name: "some_udp",
596-
TargetPort: 1234,
597-
PublishedPort: 1234,
598-
Protocol: api.ProtocolUDP,
599-
},
600-
},
601-
},
602-
},
603-
}
604-
605-
err := na.AllocateService(s)
606-
assert.NoError(t, err)
607-
assert.Equal(t, 2, len(s.Endpoint.Ports))
608-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
609-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[1].PublishedPort)
610-
}
611-
612-
func TestAllocateServiceConflictingUserDefinedPorts(t *testing.T) {
613-
na := newNetworkAllocator(t)
614-
s := &api.Service{
615-
ID: "testID1",
616-
Spec: api.ServiceSpec{
617-
Endpoint: &api.EndpointSpec{
618-
Ports: []*api.PortConfig{
619-
{
620-
Name: "some_tcp",
621-
TargetPort: 1234,
622-
PublishedPort: 1234,
623-
},
624-
{
625-
Name: "some_other_tcp",
626-
TargetPort: 1234,
627-
PublishedPort: 1234,
628-
},
629-
},
630-
},
631-
},
632-
}
633-
634-
err := na.AllocateService(s)
635-
assert.Error(t, err)
636-
}
637-
638-
func TestDeallocateServiceAllocate(t *testing.T) {
639-
na := newNetworkAllocator(t)
640-
s := &api.Service{
641-
ID: "testID1",
642-
Spec: api.ServiceSpec{
643-
Endpoint: &api.EndpointSpec{
644-
Ports: []*api.PortConfig{
645-
{
646-
Name: "some_tcp",
647-
TargetPort: 1234,
648-
PublishedPort: 1234,
649-
},
650-
},
651-
},
652-
},
653-
}
654-
655-
err := na.AllocateService(s)
656-
assert.NoError(t, err)
657-
assert.Equal(t, 1, len(s.Endpoint.Ports))
658-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
659-
660-
err = na.DeallocateService(s)
661-
assert.NoError(t, err)
662-
assert.Equal(t, 0, len(s.Endpoint.Ports))
663-
// Allocate again.
664-
err = na.AllocateService(s)
665-
assert.NoError(t, err)
666-
assert.Equal(t, 1, len(s.Endpoint.Ports))
667-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
668-
}
669-
670578
func TestDeallocateServiceAllocateIngressMode(t *testing.T) {
671579
na := newNetworkAllocator(t)
672580

@@ -705,8 +613,6 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) {
705613

706614
err = na.AllocateService(s)
707615
assert.NoError(t, err)
708-
assert.Len(t, s.Endpoint.Ports, 1)
709-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
710616
assert.Len(t, s.Endpoint.VirtualIPs, 1)
711617

712618
err = na.DeallocateService(s)
@@ -719,129 +625,7 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) {
719625

720626
err = na.AllocateService(s)
721627
assert.NoError(t, err)
722-
assert.Len(t, s.Endpoint.Ports, 1)
723-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
724-
assert.Len(t, s.Endpoint.VirtualIPs, 1)
725-
}
726-
727-
func TestServiceAddRemovePortsIngressMode(t *testing.T) {
728-
na := newNetworkAllocator(t)
729-
730-
n := &api.Network{
731-
ID: "testNetID1",
732-
Spec: api.NetworkSpec{
733-
Annotations: api.Annotations{
734-
Name: "test",
735-
},
736-
Ingress: true,
737-
},
738-
}
739-
740-
err := na.Allocate(n)
741-
assert.NoError(t, err)
742-
743-
s := &api.Service{
744-
ID: "testID1",
745-
Spec: api.ServiceSpec{
746-
Endpoint: &api.EndpointSpec{
747-
Ports: []*api.PortConfig{
748-
{
749-
Name: "some_tcp",
750-
TargetPort: 1234,
751-
PublishedPort: 1234,
752-
PublishMode: api.PublishModeIngress,
753-
},
754-
},
755-
},
756-
},
757-
Endpoint: &api.Endpoint{},
758-
}
759-
760-
s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
761-
&api.Endpoint_VirtualIP{NetworkID: n.ID})
762-
763-
err = na.AllocateService(s)
764-
assert.NoError(t, err)
765-
assert.Len(t, s.Endpoint.Ports, 1)
766-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
767628
assert.Len(t, s.Endpoint.VirtualIPs, 1)
768-
allocatedVIP := s.Endpoint.VirtualIPs[0].Addr
769-
770-
// Unpublish port
771-
s.Spec.Endpoint.Ports = s.Spec.Endpoint.Ports[:0]
772-
err = na.AllocateService(s)
773-
assert.NoError(t, err)
774-
assert.Len(t, s.Endpoint.Ports, 0)
775-
assert.Len(t, s.Endpoint.VirtualIPs, 0)
776-
777-
// Publish port again and ensure VIP is not the same that was deallocated.
778-
// Since IP allocation is serial we should receive the next available IP.
779-
s.Spec.Endpoint.Ports = append(s.Spec.Endpoint.Ports, &api.PortConfig{Name: "some_tcp",
780-
TargetPort: 1234,
781-
PublishedPort: 1234,
782-
PublishMode: api.PublishModeIngress,
783-
})
784-
s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
785-
&api.Endpoint_VirtualIP{NetworkID: n.ID})
786-
err = na.AllocateService(s)
787-
assert.NoError(t, err)
788-
assert.Len(t, s.Endpoint.Ports, 1)
789-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
790-
assert.Len(t, s.Endpoint.VirtualIPs, 1)
791-
assert.NotEqual(t, allocatedVIP, s.Endpoint.VirtualIPs[0].Addr)
792-
}
793-
794-
func TestServiceUpdate(t *testing.T) {
795-
na1 := newNetworkAllocator(t)
796-
na2 := newNetworkAllocator(t)
797-
s := &api.Service{
798-
ID: "testID1",
799-
Spec: api.ServiceSpec{
800-
Endpoint: &api.EndpointSpec{
801-
Ports: []*api.PortConfig{
802-
{
803-
Name: "some_tcp",
804-
TargetPort: 1234,
805-
PublishedPort: 1234,
806-
},
807-
{
808-
Name: "some_other_tcp",
809-
TargetPort: 1235,
810-
PublishedPort: 0,
811-
},
812-
},
813-
},
814-
},
815-
}
816-
817-
err := na1.AllocateService(s)
818-
assert.NoError(t, err)
819-
assert.True(t, na1.IsServiceAllocated(s))
820-
assert.Equal(t, 2, len(s.Endpoint.Ports))
821-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
822-
assert.NotEqual(t, 0, s.Endpoint.Ports[1].PublishedPort)
823-
824-
// Cache the secode node port
825-
allocatedPort := s.Endpoint.Ports[1].PublishedPort
826-
827-
// Now allocate the same service in another allocator instance
828-
err = na2.AllocateService(s)
829-
assert.NoError(t, err)
830-
assert.True(t, na2.IsServiceAllocated(s))
831-
assert.Equal(t, 2, len(s.Endpoint.Ports))
832-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
833-
// Make sure we got the same port
834-
assert.Equal(t, allocatedPort, s.Endpoint.Ports[1].PublishedPort)
835-
836-
s.Spec.Endpoint.Ports[1].PublishedPort = 1235
837-
assert.False(t, na1.IsServiceAllocated(s))
838-
839-
err = na1.AllocateService(s)
840-
assert.NoError(t, err)
841-
assert.True(t, na1.IsServiceAllocated(s))
842-
assert.Equal(t, 2, len(s.Endpoint.Ports))
843-
assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort)
844-
assert.Equal(t, uint32(1235), s.Endpoint.Ports[1].PublishedPort)
845629
}
846630

847631
func TestServiceNetworkUpdate(t *testing.T) {

0 commit comments

Comments
 (0)