Skip to content

Commit

Permalink
retry producer creation upon error
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzming committed Nov 17, 2023
1 parent ec846ff commit c676c7b
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,49 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
} else {
p.userProvidedProducerName = false
}
err := p.grabCnx()
// retry to create producer when failed with maxRetry
var maxRetry int
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}

var delayReconnectTime time.Duration
defaultBackoff := internal.DefaultBackoff{}

var err error
for maxRetry != 0 {
if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
}

atomic.AddUint64(&p.epoch, 1)
err = p.grabCnx()
if err == nil {
break
}
p.log.WithError(err).Error("Failed to create producer at newPartitionProducer")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is not found, do not attempt to reconnect
p.log.Warn("Failed to create producer due to Topic Not Found")
break
}

if strings.Contains(errMsg, "TopicTerminatedError") {
p.log.Info("Topic was terminated, failing pending messages, will not create producer")
break
}

if maxRetry > 0 {
maxRetry--
}
logger.WithError(err).Error("Failed to create producer at newPartitionProducer retry to create producer", delayReconnectTime)
time.Sleep(delayReconnectTime)
}
if err != nil {
p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
Expand Down

0 comments on commit c676c7b

Please sign in to comment.