Skip to content

Fix: SendAsync callback was not invoked when producer is in reconnecting #1333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}

func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) error {
return nil
}

func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
Expand Down
26 changes: 19 additions & 7 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type ConnectionListener interface {
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(ctx context.Context, data Buffer)
WriteData(ctx context.Context, data Buffer) error
RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler) error
Expand Down Expand Up @@ -456,26 +456,38 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
}
}

func (c *connection) WriteData(ctx context.Context, data Buffer) {
func (c *connection) WriteData(ctx context.Context, data Buffer) error {
// If the connection is closed, we should not write on it
if c.getState() != connectionReady {
c.log.Debug("Connection was already closed")
return errConnectionClosed
}

select {
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
// Channel is not full
return
return nil
case <-ctx.Done():
c.log.Debug("Write data context cancelled")
return
return ctx.Err()
default:
// Channel full, fallback to probe if connection is closed
}

// check if the connection is closed again
if c.getState() != connectionReady {
c.log.Debug("Connection was already closed")
return errConnectionClosed
}

for {
select {
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
// Successfully wrote on the channel
return
return nil
case <-ctx.Done():
c.log.Debug("Write data context cancelled")
return
return ctx.Err()
case <-time.After(100 * time.Millisecond):
// The channel is either:
// 1. blocked, in which case we need to wait until we have space
Expand All @@ -484,7 +496,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {

if c.getState() != connectionReady {
c.log.Debug("Connection was already closed")
return
return errConnectionClosed
}
}
}
Expand Down
32 changes: 29 additions & 3 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type partitionProducer struct {
dataChan chan *sendRequest
cmdChan chan interface{}
connectClosedCh chan *connectionClosed
connectClosed atomic.Bool
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
schemaInfo *SchemaInfo
Expand Down Expand Up @@ -394,7 +395,10 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p._getConn().WriteData(pi.ctx, pi.buffer)
err = p.writeConn(pi.ctx, pi.buffer)
if err != nil {
p.log.WithError(err).Warn("failed to write data, it will be retried later")
}

if pi == lastViewItem {
break
Expand Down Expand Up @@ -429,6 +433,8 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu
closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls())
}

// mark the connection as closed
p.markConnClosed(true)
select {
case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}:
default:
Expand Down Expand Up @@ -581,7 +587,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(connectionClosed)
// reconnect to broker in a new goroutine so that it won't block the event loop, see issue #1332
go p.reconnectToBroker(connectionClosed)
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
Expand Down Expand Up @@ -907,7 +914,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(ctx, buffer)

// If the connection is closed, WriteData() will failed, but it is fine, the buffer is still kept in p.pendingQueue,
// it will be sent out or timeout finally.
err := p.writeConn(ctx, buffer)
if err != nil {
p.log.WithError(err).Warn("failed to write data, it will be retried later")
}
}
}

Expand Down Expand Up @@ -1758,6 +1771,7 @@ func (i *pendingItem) done(err error) {
// Note: should only be called by this partition producer when a new connection is available.
func (p *partitionProducer) _setConn(conn internal.Connection) {
p.conn.Store(conn)
p.markConnClosed(false)
}

// _getConn returns internal connection field of this partition producer atomically.
Expand All @@ -1769,6 +1783,18 @@ func (p *partitionProducer) _getConn() internal.Connection {
return p.conn.Load().(internal.Connection)
}

func (p *partitionProducer) markConnClosed(closed bool) {
p.connectClosed.Store(closed)
}

func (p *partitionProducer) writeConn(ctx context.Context, data internal.Buffer) error {
if p.connectClosed.Load() {
return errors.New("connection is closed")
}

return p._getConn().WriteData(ctx, data)
}

type chunkRecorder struct {
chunkedMsgID chunkMessageID
}
Expand Down
100 changes: 100 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
Expand Down Expand Up @@ -2575,6 +2577,104 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) {
}, 30*time.Second, 1*time.Second)
}

func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
req := testcontainers.ContainerRequest{
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
// use fixed port binding so that it can be reconnected after restart
HostConfigModifier: func(hostConfig *container.HostConfig) {
hostConfig.PortBindings = map[nat.Port][]nat.PortBinding{
"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6650"}},
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8080"}},
}
},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err, "Failed to start the pulsar container")
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err, "Failed to get the pulsar endpoint")

client, err := NewClient(ClientOptions{
URL: endpoint,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer client.Close()

var testProducer Producer
require.Eventually(t, func() bool {
testProducer, err = client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
Schema: NewBytesSchema(nil),
SendTimeout: 3 * time.Second,
})
return err == nil
}, 30*time.Second, 1*time.Second)

// send a message
errChan := make(chan error)
defer close(errChan)

testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
// should success
err = <-errChan
require.NoError(t, err)

// stop pulsar server
timeout := 10 * time.Second
_ = c.Stop(context.Background(), &timeout)

// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
// should get a timeout error
err = <-errChan
require.True(t, errors.Is(err, ErrSendTimeout))

oldConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn()
// restart pulsar server
err = c.Start(context.Background())
require.NoError(t, err)
defer c.Terminate(context.Background())

// wait for reconnection success
waitTime := 0
for {
newConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn()
if oldConn != newConn {
break
}
time.Sleep(5 * time.Second)
waitTime += 5
if waitTime > 60 {
break
}
}

// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
// should success
err = <-errChan
require.NoError(t, err)
}

func TestSelectConnectionForSameProducer(t *testing.T) {
topicName := newTopicName()

Expand Down