diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index 30ae5ccd17..1facae3b05 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -332,7 +332,8 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error { return nil } -func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) { +func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) error { + return nil } func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2ad1acb5fd..3583995b74 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -79,7 +79,7 @@ type ConnectionListener interface { type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error - WriteData(ctx context.Context, data Buffer) + WriteData(ctx context.Context, data Buffer) error RegisterListener(id uint64, listener ConnectionListener) error UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) error @@ -456,26 +456,38 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) { } } -func (c *connection) WriteData(ctx context.Context, data Buffer) { +func (c *connection) WriteData(ctx context.Context, data Buffer) error { + // If the connection is closed, we should not write on it + if c.getState() != connectionReady { + c.log.Debug("Connection was already closed") + return errConnectionClosed + } + select { case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Channel is not full - return + return nil case <-ctx.Done(): c.log.Debug("Write data context cancelled") - return + return ctx.Err() default: // Channel full, fallback to probe if connection is closed } + // check if the connection is closed again + if c.getState() != connectionReady { + c.log.Debug("Connection was already closed") + return errConnectionClosed + } + for { select { case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Successfully wrote on the channel - return + return nil case <-ctx.Done(): c.log.Debug("Write data context cancelled") - return + return ctx.Err() case <-time.After(100 * time.Millisecond): // The channel is either: // 1. blocked, in which case we need to wait until we have space @@ -484,7 +496,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { if c.getState() != connectionReady { c.log.Debug("Connection was already closed") - return + return errConnectionClosed } } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a4c2e3f8c4..bbbf728861 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -113,6 +113,7 @@ type partitionProducer struct { dataChan chan *sendRequest cmdChan chan interface{} connectClosedCh chan *connectionClosed + connectClosed atomic.Bool publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue schemaInfo *SchemaInfo @@ -394,7 +395,10 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { pi.sentAt = time.Now() pi.Unlock() p.pendingQueue.Put(pi) - p._getConn().WriteData(pi.ctx, pi.buffer) + err = p.writeConn(pi.ctx, pi.buffer) + if err != nil { + p.log.WithError(err).Warn("failed to write data, it will be retried later") + } if pi == lastViewItem { break @@ -429,6 +433,8 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls()) } + // mark the connection as closed + p.markConnClosed(true) select { case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: default: @@ -581,7 +587,8 @@ func (p *partitionProducer) runEventsLoop() { } case connectionClosed := <-p.connectClosedCh: p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker(connectionClosed) + // reconnect to broker in a new goroutine so that it won't block the event loop, see issue #1332 + go p.reconnectToBroker(connectionClosed) case <-p.batchFlushTicker.C: p.internalFlushCurrentBatch() } @@ -907,7 +914,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, sequenceID: sequenceID, sendRequests: callbacks, }) - p._getConn().WriteData(ctx, buffer) + + // If the connection is closed, WriteData() will failed, but it is fine, the buffer is still kept in p.pendingQueue, + // it will be sent out or timeout finally. + err := p.writeConn(ctx, buffer) + if err != nil { + p.log.WithError(err).Warn("failed to write data, it will be retried later") + } } } @@ -1758,6 +1771,7 @@ func (i *pendingItem) done(err error) { // Note: should only be called by this partition producer when a new connection is available. func (p *partitionProducer) _setConn(conn internal.Connection) { p.conn.Store(conn) + p.markConnClosed(false) } // _getConn returns internal connection field of this partition producer atomically. @@ -1769,6 +1783,18 @@ func (p *partitionProducer) _getConn() internal.Connection { return p.conn.Load().(internal.Connection) } +func (p *partitionProducer) markConnClosed(closed bool) { + p.connectClosed.Store(closed) +} + +func (p *partitionProducer) writeConn(ctx context.Context, data internal.Buffer) error { + if p.connectClosed.Load() { + return errors.New("connection is closed") + } + + return p._getConn().WriteData(ctx, data) +} + type chunkRecorder struct { chunkedMsgID chunkMessageID } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index b827929964..32f026d728 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -2575,6 +2577,104 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) { }, 30*time.Second, 1*time.Second) } +func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) { + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, + // use fixed port binding so that it can be reconnected after restart + HostConfigModifier: func(hostConfig *container.HostConfig) { + hostConfig.PortBindings = map[nat.Port][]nat.PortBinding{ + "6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6650"}}, + "8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8080"}}, + } + }, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err, "Failed to start the pulsar container") + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err, "Failed to get the pulsar endpoint") + + client, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer client.Close() + + var testProducer Producer + require.Eventually(t, func() bool { + testProducer, err = client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + Schema: NewBytesSchema(nil), + SendTimeout: 3 * time.Second, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + + // send a message + errChan := make(chan error) + defer close(errChan) + + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + errChan <- err + }) + // should success + err = <-errChan + require.NoError(t, err) + + // stop pulsar server + timeout := 10 * time.Second + _ = c.Stop(context.Background(), &timeout) + + // send again + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + errChan <- err + }) + // should get a timeout error + err = <-errChan + require.True(t, errors.Is(err, ErrSendTimeout)) + + oldConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn() + // restart pulsar server + err = c.Start(context.Background()) + require.NoError(t, err) + defer c.Terminate(context.Background()) + + // wait for reconnection success + waitTime := 0 + for { + newConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn() + if oldConn != newConn { + break + } + time.Sleep(5 * time.Second) + waitTime += 5 + if waitTime > 60 { + break + } + } + + // send again + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + errChan <- err + }) + // should success + err = <-errChan + require.NoError(t, err) +} + func TestSelectConnectionForSameProducer(t *testing.T) { topicName := newTopicName()