Skip to content

Commit 531342e

Browse files
authored
Merge pull request #234 from sak0/fix_issue225
fix leak for watch
2 parents 5141069 + ae4e6e2 commit 531342e

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

pkg/flow/watch_flow.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ func (w *WatchEngine) longPullAllServices(
235235
w.rwMutex.Unlock()
236236
defer func() {
237237
w.rwMutex.Lock()
238-
delete(w.watchContexts, nextId)
238+
if watchers, ok := w.servicesWatch[request.Namespace]; ok {
239+
delete(watchers, nextId)
240+
}
239241
w.rwMutex.Unlock()
240242
}()
241243
if !serivcesResp.IsInitialized() {
@@ -328,7 +330,11 @@ func (w *WatchEngine) longPullAllInstances(
328330
w.rwMutex.Unlock()
329331
defer func() {
330332
w.rwMutex.Lock()
331-
delete(w.watchContexts, nextId)
333+
if nsMap, ok := w.instancesWatch[request.Namespace]; ok {
334+
if svcMap, ok := nsMap[request.Service]; ok {
335+
delete(svcMap, nextId)
336+
}
337+
}
332338
w.rwMutex.Unlock()
333339
}()
334340
if !svcInstances.IsInitialized() {
@@ -345,7 +351,7 @@ func (w *WatchEngine) longPullAllInstances(
345351
latestSvcInstances = w.registry.GetInstances(&request.ServiceKey, false, false)
346352
}
347353
instancesResponse := data.BuildInstancesResponse(request.ServiceKey, nil, latestSvcInstances)
348-
return model.NewWatchAllInstancesResponse(nextId, instancesResponse, nil), nil
354+
return model.NewWatchAllInstancesResponse(nextId, instancesResponse, w.CancelWatch), nil
349355
}
350356

351357
type NotifyUpdateContext struct {

0 commit comments

Comments
 (0)