From c676c7b28881cc48bd79df67c4c3600bf062c3ef Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 17 Nov 2023 14:56:59 -0500 Subject: [PATCH] retry producer creation upon error --- pulsar/producer_partition.go | 44 +++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 65eef5b66b..62bca8ae4d 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -185,7 +185,49 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } else { p.userProvidedProducerName = false } - err := p.grabCnx() + // retry to create producer when failed with maxRetry + var maxRetry int + if p.options.MaxReconnectToBroker == nil { + maxRetry = -1 + } else { + maxRetry = int(*p.options.MaxReconnectToBroker) + } + + var delayReconnectTime time.Duration + defaultBackoff := internal.DefaultBackoff{} + + var err error + for maxRetry != 0 { + if p.options.BackoffPolicy == nil { + delayReconnectTime = defaultBackoff.Next() + } else { + delayReconnectTime = p.options.BackoffPolicy.Next() + } + + atomic.AddUint64(&p.epoch, 1) + err = p.grabCnx() + if err == nil { + break + } + p.log.WithError(err).Error("Failed to create producer at newPartitionProducer") + errMsg := err.Error() + if strings.Contains(errMsg, errTopicNotFount) { + // when topic is not found, do not attempt to reconnect + p.log.Warn("Failed to create producer due to Topic Not Found") + break + } + + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not create producer") + break + } + + if maxRetry > 0 { + maxRetry-- + } + logger.WithError(err).Error("Failed to create producer at newPartitionProducer retry to create producer", delayReconnectTime) + time.Sleep(delayReconnectTime) + } if err != nil { p.batchFlushTicker.Stop() logger.WithError(err).Error("Failed to create producer at newPartitionProducer")