@@ -37,6 +37,9 @@ type networkContext struct {
37
37
// the actual network allocation.
38
38
nwkAllocator networkallocator.NetworkAllocator
39
39
40
+ // The port allocator instance for allocating node ports
41
+ portAllocator * portAllocator
42
+
40
43
// A set of tasks which are ready to be allocated as a batch. This is
41
44
// distinct from "unallocatedTasks" which are tasks that failed to
42
45
// allocate on the first try, being held for a future retry.
@@ -95,6 +98,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
95
98
96
99
nc := & networkContext {
97
100
nwkAllocator : na ,
101
+ portAllocator : newPortAllocator (),
98
102
pendingTasks : make (map [string ]* api.Task ),
99
103
unallocatedTasks : make (map [string ]* api.Task ),
100
104
unallocatedServices : make (map [string ]* api.Service ),
@@ -233,7 +237,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
233
237
break
234
238
}
235
239
236
- if nc .nwkAllocator . IsServiceAllocated (s ) {
240
+ if nc .isServiceAllocated (s ) {
237
241
break
238
242
}
239
243
@@ -261,8 +265,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
261
265
break
262
266
}
263
267
264
- if nc .nwkAllocator . IsServiceAllocated (s ) {
265
- if ! nc .nwkAllocator . HostPublishPortsNeedUpdate (s ) {
268
+ if nc .isServiceAllocated (s ) {
269
+ if ! nc .portAllocator . hostPublishPortsNeedUpdate (s ) {
266
270
break
267
271
}
268
272
updatePortsInHostPublishMode (s )
@@ -284,7 +288,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
284
288
case api.EventDeleteService :
285
289
s := v .Service .Copy ()
286
290
287
- if err := nc .nwkAllocator . DeallocateService (s ); err != nil {
291
+ if err := nc .deallocateService (s ); err != nil {
288
292
log .G (ctx ).WithError (err ).Errorf ("Failed deallocation during delete of service %s" , s .ID )
289
293
} else {
290
294
nc .somethingWasDeallocated = true
@@ -681,7 +685,7 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly
681
685
682
686
var allocatedServices []* api.Service
683
687
for _ , s := range services {
684
- if nc .nwkAllocator . IsServiceAllocated (s , networkallocator .OnInit ) {
688
+ if nc .isServiceAllocated (s , networkallocator .OnInit ) {
685
689
continue
686
690
}
687
691
if existingAddressesOnly &&
@@ -713,6 +717,23 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly
713
717
return nil
714
718
}
715
719
720
+ // isServiceAllocated returns false if the passed service needs to have network resources allocated/updated.
721
+ func (nc * networkContext ) isServiceAllocated (s * api.Service , flags ... func (* networkallocator.ServiceAllocationOpts )) bool {
722
+ if ! nc .nwkAllocator .IsServiceAllocated (s , flags ... ) {
723
+ return false
724
+ }
725
+
726
+ var options networkallocator.ServiceAllocationOpts
727
+ for _ , flag := range flags {
728
+ flag (& options )
729
+ }
730
+ if (s .Spec .Endpoint != nil && len (s .Spec .Endpoint .Ports ) != 0 ) ||
731
+ (s .Endpoint != nil && len (s .Endpoint .Ports ) != 0 ) {
732
+ return nc .portAllocator .isPortsAllocatedOnInit (s , options .OnInit )
733
+ }
734
+ return true
735
+ }
736
+
716
737
// allocateTasks allocates tasks in the store so far before we started watching.
717
738
func (a * Allocator ) allocateTasks (ctx context.Context , existingAddressesOnly bool ) error {
718
739
var (
@@ -815,7 +836,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo
815
836
// network configured or service endpoints have been
816
837
// allocated.
817
838
return (len (t .Networks ) == 0 || nc .nwkAllocator .IsTaskAllocated (t )) &&
818
- (s == nil || nc .nwkAllocator . IsServiceAllocated (s ))
839
+ (s == nil || nc .isServiceAllocated (s ))
819
840
}
820
841
821
842
func taskUpdateNetworks (t * api.Task , networks []* api.NetworkAttachment ) {
@@ -1200,13 +1221,13 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin
1200
1221
// is not there
1201
1222
// service has no user-defined endpoints while has already allocated network resources,
1202
1223
// need deallocated.
1203
- if err := nc .nwkAllocator . DeallocateService (s ); err != nil {
1224
+ if err := nc .deallocateService (s ); err != nil {
1204
1225
return err
1205
1226
}
1206
1227
nc .somethingWasDeallocated = true
1207
1228
}
1208
1229
1209
- if err := nc .nwkAllocator . AllocateService (s ); err != nil {
1230
+ if err := nc .allocateService (s ); err != nil {
1210
1231
nc .unallocatedServices [s .ID ] = s
1211
1232
return err
1212
1233
}
@@ -1229,6 +1250,26 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin
1229
1250
return nil
1230
1251
}
1231
1252
1253
+ func (nc * networkContext ) allocateService (s * api.Service ) error {
1254
+ if err := nc .portAllocator .serviceAllocatePorts (s ); err != nil {
1255
+ return err
1256
+ }
1257
+ if err := nc .nwkAllocator .AllocateService (s ); err != nil {
1258
+ nc .portAllocator .serviceDeallocatePorts (s )
1259
+ return err
1260
+ }
1261
+
1262
+ return nil
1263
+ }
1264
+
1265
+ func (nc * networkContext ) deallocateService (s * api.Service ) error {
1266
+ if err := nc .nwkAllocator .DeallocateService (s ); err != nil {
1267
+ return err
1268
+ }
1269
+ nc .portAllocator .serviceDeallocatePorts (s )
1270
+ return nil
1271
+ }
1272
+
1232
1273
func (a * Allocator ) commitAllocatedService (ctx context.Context , batch * store.Batch , s * api.Service ) error {
1233
1274
if err := batch .Update (func (tx store.Tx ) error {
1234
1275
err := store .UpdateService (tx , s )
@@ -1241,7 +1282,7 @@ func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Bat
1241
1282
1242
1283
return errors .Wrapf (err , "failed updating state in store transaction for service %s" , s .ID )
1243
1284
}); err != nil {
1244
- if err := a .netCtx .nwkAllocator . DeallocateService (s ); err != nil {
1285
+ if err := a .netCtx .deallocateService (s ); err != nil {
1245
1286
log .G (ctx ).WithError (err ).Errorf ("failed rolling back allocation of service %s" , s .ID )
1246
1287
}
1247
1288
@@ -1298,7 +1339,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
1298
1339
return
1299
1340
}
1300
1341
1301
- if ! nc .nwkAllocator . IsServiceAllocated (s ) {
1342
+ if ! nc .isServiceAllocated (s ) {
1302
1343
err = fmt .Errorf ("service %s to which task %s belongs has pending allocations" , s .ID , t .ID )
1303
1344
return
1304
1345
}
@@ -1423,7 +1464,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
1423
1464
nc := a .netCtx
1424
1465
var allocatedServices []* api.Service
1425
1466
for _ , s := range nc .unallocatedServices {
1426
- if ! nc .nwkAllocator . IsServiceAllocated (s ) {
1467
+ if ! nc .isServiceAllocated (s ) {
1427
1468
if err := a .allocateService (ctx , s , false ); err != nil {
1428
1469
log .G (ctx ).WithError (err ).Debugf ("Failed allocation of unallocated service %s" , s .ID )
1429
1470
continue
0 commit comments