Skip to content

Commit 92d9328

Browse files
authored
feat: producer double close will not panic (#356)
* monitor safe close * sts ch close
1 parent a5d643f commit 92d9328

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

producer/monitor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type ProducerMetrics struct {
2525
type ProducerMonitor struct {
2626
metrics atomic.Value // *ProducerMetrics
2727
stopCh chan struct{}
28+
once sync.Once
2829
wg sync.WaitGroup
2930
}
3031

@@ -37,7 +38,9 @@ func newProducerMonitor() *ProducerMonitor {
3738
}
3839

3940
func (m *ProducerMonitor) Stop() {
40-
close(m.stopCh)
41+
m.once.Do(func() {
42+
close(m.stopCh)
43+
})
4144
m.wg.Wait()
4245
}
4346

@@ -82,6 +85,7 @@ func (m *ProducerMonitor) getAndResetMetrics() *ProducerMetrics {
8285
}
8386

8487
func (m *ProducerMonitor) reportThread(reportInterval time.Duration, logger log.Logger) {
88+
m.wg.Add(1)
8589
defer m.wg.Done()
8690
ticker := time.NewTicker(reportInterval)
8791
for {

producer/producer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Producer struct {
2828
logger log.Logger
2929
producerLogGroupSize int64
3030
monitor *ProducerMonitor
31+
stsCloseOnce sync.Once
3132
}
3233

3334
func NewProducer(producerConfig *ProducerConfig) (*Producer, error) {
@@ -288,7 +289,6 @@ func (producer *Producer) Start() {
288289
producer.ioThreadPoolWaitGroup.Add(1)
289290
go producer.threadPool.start(producer.ioWorkerWaitGroup, producer.ioThreadPoolWaitGroup)
290291
if !producer.producerConfig.DisableRuntimeMetrics {
291-
producer.monitor.wg.Add(1)
292292
go producer.monitor.reportThread(time.Minute, producer.logger)
293293
}
294294
}
@@ -333,7 +333,9 @@ func (producer *Producer) sendCloseProdcerSignal() {
333333

334334
func (producer *Producer) closeStstokenChannel() {
335335
if producer.producerConfig.StsTokenShutDown != nil {
336-
close(producer.producerConfig.StsTokenShutDown)
337-
level.Info(producer.logger).Log("msg", "producer closed ststoken")
336+
producer.stsCloseOnce.Do(func() {
337+
close(producer.producerConfig.StsTokenShutDown)
338+
level.Info(producer.logger).Log("msg", "producer closed ststoken")
339+
})
338340
}
339341
}

0 commit comments

Comments
 (0)