Skip to content

Commit 3f611bf

Browse files
authored
[supervisor] fix ports forwarding hangs issue (#20841)
* [supervisor] fix ports forwarding hangs issue * fix unit tests * Fix lock issue * remove useless code
1 parent 919241c commit 3f611bf

File tree

4 files changed

+22
-20
lines changed

4 files changed

+22
-20
lines changed

components/supervisor/pkg/ports/exposed-ports.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ func NewGitpodExposedPorts(workspaceID string, instanceID string, workspaceUrl s
8787
WorkspaceUrl: workspaceUrl,
8888
gitpodService: gitpodService,
8989

90-
// allow clients to submit 30 expose requests without blocking
91-
requests: make(chan *exposePortRequest, 30),
92-
localExposedNotice: make(chan struct{}, 30),
90+
// allow clients to submit 3000 expose requests without blocking
91+
requests: make(chan *exposePortRequest, 3000),
92+
localExposedNotice: make(chan struct{}, 3000),
9393
}
9494
}
9595

components/supervisor/pkg/ports/ports.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type managedPort struct {
125125
// Subscription is a Subscription to status updates
126126
type Subscription struct {
127127
updates chan []*api.PortsStatus
128-
Close func() error
128+
Close func(lock bool) error
129129
}
130130

131131
// Updates returns the updates channel
@@ -151,7 +151,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
151151
pm.mu.Unlock()
152152

153153
for _, s := range subs {
154-
_ = s.Close()
154+
_ = s.Close(true)
155155
}
156156
}()
157157
defer cancel()
@@ -324,7 +324,7 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve
324324
case sub.updates <- status:
325325
case <-time.After(5 * time.Second):
326326
log.Error("ports subscription droped out")
327-
_ = sub.Close()
327+
_ = sub.Close(false)
328328
}
329329
}
330330
}
@@ -766,20 +766,21 @@ func (pm *Manager) Subscribe() (*Subscription, error) {
766766
}
767767

768768
if len(pm.subscriptions) > maxSubscriptions {
769-
return nil, ErrTooManySubscriptions
769+
return nil, fmt.Errorf("too many subscriptions: %d", len(pm.subscriptions))
770+
// return nil, ErrTooManySubscriptions
770771
}
771772

772773
sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)}
773774
var once sync.Once
774-
sub.Close = func() error {
775-
pm.mu.Lock()
776-
defer pm.mu.Unlock()
777-
775+
sub.Close = func(lock bool) error {
776+
if lock {
777+
pm.mu.Lock()
778+
defer pm.mu.Unlock()
779+
}
778780
once.Do(func() {
779781
close(sub.updates)
780782
})
781783
delete(pm.subscriptions, sub)
782-
783784
return nil
784785
}
785786
pm.subscriptions[sub] = struct{}{}

components/supervisor/pkg/ports/ports_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ func TestPortsUpdateState(t *testing.T) {
684684
}
685685
go func() {
686686
defer wg.Done()
687-
defer sub.Close()
687+
defer sub.Close(true)
688688

689689
for up := range sub.Updates() {
690690
updts = append(updts, up)
@@ -864,8 +864,8 @@ func TestPortsConcurrentSubscribe(t *testing.T) {
864864
}
865865
}()
866866

867-
eg, _ := errgroup.WithContext(context.Background())
868867
for i := 0; i < maxSubscriptions; i++ {
868+
eg, _ := errgroup.WithContext(context.Background())
869869
eg.Go(func() error {
870870
for j := 0; j < subscribes; j++ {
871871
sub, err := pm.Subscribe()
@@ -878,16 +878,17 @@ func TestPortsConcurrentSubscribe(t *testing.T) {
878878
// update
879879
case <-sub.Updates():
880880
}
881-
sub.Close()
881+
sub.Close(true)
882882
}
883883
return nil
884884
})
885+
err := eg.Wait()
886+
if err != nil {
887+
t.Fatal(err)
888+
}
889+
time.Sleep(50 * time.Millisecond)
885890
}
886-
err := eg.Wait()
887891
close(subscribing)
888-
if err != nil {
889-
t.Fatal(err)
890-
}
891892

892893
wg.Wait()
893894
}

components/supervisor/pkg/supervisor/services.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (s *statusService) PortsStatus(req *api.PortsStatusRequest, srv api.StatusS
243243
if err != nil {
244244
return status.Error(codes.Internal, err.Error())
245245
}
246-
defer sub.Close()
246+
defer sub.Close(true)
247247

248248
for {
249249
select {

0 commit comments

Comments
 (0)