Skip to content

Commit a5d643f

Browse files
authored
fix: producer not close after shutdown (#355)
* fix: producer close not release memory and monirtor
1 parent 6ea69c9 commit a5d643f

File tree

3 files changed

+34
-18
lines changed

3 files changed

+34
-18
lines changed

producer/io_thread_pool.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,14 @@ func (threadPool *IoThreadPool) addTask(batch *ProducerBatch) {
3333
func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThreadPoolwait *sync.WaitGroup) {
3434
defer ioThreadPoolwait.Done()
3535
for task := range threadPool.taskCh {
36-
if task == nil {
37-
level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent")
38-
threadPool.stopped.Store(true)
39-
return
40-
}
41-
4236
threadPool.ioworker.startSendTask(ioWorkerWaitGroup)
4337
go func(producerBatch *ProducerBatch) {
4438
defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup)
4539
threadPool.ioworker.sendToServer(producerBatch)
4640
}(task)
4741
}
42+
level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent")
43+
threadPool.stopped.Store(true)
4844
}
4945

5046
func (threadPool *IoThreadPool) ShutDown() {

producer/monitor.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package producer
22

33
import (
4+
"sync"
45
"sync/atomic"
56
"time"
67

@@ -23,14 +24,23 @@ type ProducerMetrics struct {
2324

2425
type ProducerMonitor struct {
2526
metrics atomic.Value // *ProducerMetrics
27+
stopCh chan struct{}
28+
wg sync.WaitGroup
2629
}
2730

2831
func newProducerMonitor() *ProducerMonitor {
29-
m := &ProducerMonitor{}
32+
m := &ProducerMonitor{
33+
stopCh: make(chan struct{}),
34+
}
3035
m.metrics.Store(&ProducerMetrics{})
3136
return m
3237
}
3338

39+
func (m *ProducerMonitor) Stop() {
40+
close(m.stopCh)
41+
m.wg.Wait()
42+
}
43+
3444
func (m *ProducerMonitor) recordSuccess(sendBegin time.Time, sendEnd time.Time) {
3545
metrics := m.metrics.Load().(*ProducerMetrics)
3646
metrics.sendBatch.AddSample(float64(sendEnd.Sub(sendBegin).Microseconds()))
@@ -72,17 +82,24 @@ func (m *ProducerMonitor) getAndResetMetrics() *ProducerMetrics {
7282
}
7383

7484
func (m *ProducerMonitor) reportThread(reportInterval time.Duration, logger log.Logger) {
85+
defer m.wg.Done()
7586
ticker := time.NewTicker(reportInterval)
76-
for range ticker.C {
77-
metrics := m.getAndResetMetrics()
78-
level.Info(logger).Log("msg", "report status",
79-
"sendBatch", metrics.sendBatch.String(),
80-
"retryCount", metrics.retryCount.Load(),
81-
"createBatch", metrics.createBatch.Load(),
82-
"onSuccess", metrics.onSuccess.String(),
83-
"onFail", metrics.onFail.String(),
84-
"waitMemory", metrics.waitMemory.String(),
85-
"waitMemoryFailCount", metrics.waitMemoryFailCount.Load(),
86-
)
87+
for {
88+
select {
89+
case <-ticker.C:
90+
metrics := m.getAndResetMetrics()
91+
level.Info(logger).Log("msg", "report status",
92+
"sendBatch", metrics.sendBatch.String(),
93+
"retryCount", metrics.retryCount.Load(),
94+
"createBatch", metrics.createBatch.Load(),
95+
"onSuccess", metrics.onSuccess.String(),
96+
"onFail", metrics.onFail.String(),
97+
"waitMemory", metrics.waitMemory.String(),
98+
"waitMemoryFailCount", metrics.waitMemoryFailCount.Load(),
99+
)
100+
case <-m.stopCh:
101+
ticker.Stop()
102+
return
103+
}
87104
}
88105
}

producer/producer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,12 +288,14 @@ func (producer *Producer) Start() {
288288
producer.ioThreadPoolWaitGroup.Add(1)
289289
go producer.threadPool.start(producer.ioWorkerWaitGroup, producer.ioThreadPoolWaitGroup)
290290
if !producer.producerConfig.DisableRuntimeMetrics {
291+
producer.monitor.wg.Add(1)
291292
go producer.monitor.reportThread(time.Minute, producer.logger)
292293
}
293294
}
294295

295296
// Limited closing transfer parameter nil, safe closing transfer timeout time, timeout Ms parameter in milliseconds
296297
func (producer *Producer) Close(timeoutMs int64) error {
298+
producer.monitor.Stop()
297299
startCloseTime := time.Now()
298300
producer.sendCloseProdcerSignal()
299301
producer.moverWaitGroup.Wait()
@@ -310,6 +312,7 @@ func (producer *Producer) Close(timeoutMs int64) error {
310312
}
311313

312314
func (producer *Producer) SafeClose() {
315+
producer.monitor.Stop()
313316
producer.sendCloseProdcerSignal()
314317
producer.moverWaitGroup.Wait()
315318
level.Info(producer.logger).Log("msg", "Mover close finish")

0 commit comments

Comments
 (0)