Skip to content

Commit

Permalink
fix: fix the internalSend() return without sendRequest.callback() (#880)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleiphir2769 authored Nov 2, 2022
1 parent 44b64aa commit 0412f28
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
var err error
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
request.callback(nil, request.msg, errors.New("can not set Value and Payload both"))
return
}

Expand All @@ -493,6 +494,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.publishSemaphore.Release()
request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
Expand Down Expand Up @@ -528,6 +530,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if err != nil {
p.publishSemaphore.Release()
p.log.WithError(err).Error("get schema version fail")
request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
Expand Down

0 comments on commit 0412f28

Please sign in to comment.