|
18 | 18 | */
|
19 | 19 | package org.apache.pulsar.client.impl;
|
20 | 20 |
|
| 21 | +import static org.mockito.ArgumentMatchers.any; |
21 | 22 | import java.lang.reflect.Field;
|
22 | 23 | import java.nio.charset.StandardCharsets;
|
| 24 | +import java.util.concurrent.TimeUnit; |
23 | 25 | import lombok.Cleanup;
|
24 | 26 | import org.apache.pulsar.client.api.MessageId;
|
25 | 27 | import org.apache.pulsar.client.api.ProducerConsumerBase;
|
26 | 28 | import org.apache.pulsar.client.api.PulsarClientException;
|
27 | 29 | import org.apache.pulsar.client.api.Schema;
|
28 | 30 | import org.apache.pulsar.common.api.proto.PulsarApi;
|
29 | 31 | import org.apache.pulsar.common.util.FutureUtil;
|
| 32 | +import org.mockito.Mockito; |
30 | 33 | import org.testng.Assert;
|
31 | 34 | import org.testng.annotations.AfterMethod;
|
32 | 35 | import org.testng.annotations.BeforeMethod;
|
@@ -225,4 +228,44 @@ public void testEnsureNotBlockOnThePendingQueue() throws Exception {
|
225 | 228 | FutureUtil.waitForAll(futures).get();
|
226 | 229 | Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
|
227 | 230 | }
|
| 231 | + |
| 232 | + @Test(timeOut = 10_000) |
| 233 | + public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception { |
| 234 | + final int pendingQueueSize = 10; |
| 235 | + @Cleanup |
| 236 | + ProducerImpl<byte[]> producer = |
| 237 | + (ProducerImpl<byte[]>) pulsarClient.newProducer() |
| 238 | + .topic("testProducerSemaphoreRelease") |
| 239 | + .sendTimeout(2, TimeUnit.SECONDS) |
| 240 | + .maxPendingMessages(pendingQueueSize) |
| 241 | + .enableBatching(true) |
| 242 | + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) |
| 243 | + .batchingMaxBytes(15) |
| 244 | + .create(); |
| 245 | + this.stopBroker(); |
| 246 | + try { |
| 247 | + ProducerImpl<byte[]> spyProducer = Mockito.spy(producer); |
| 248 | + // Make the pendingMessages not empty |
| 249 | + spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); |
| 250 | + spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); |
| 251 | + |
| 252 | + Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer"); |
| 253 | + batchMessageContainerField.setAccessible(true); |
| 254 | + BatchMessageContainerImpl batchMessageContainer = |
| 255 | + (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer); |
| 256 | + batchMessageContainer.setProducer(spyProducer); |
| 257 | + Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer) |
| 258 | + .encryptMessage(any(), any()); |
| 259 | + |
| 260 | + try { |
| 261 | + spyProducer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get(); |
| 262 | + } catch (Exception e) { |
| 263 | + throw PulsarClientException.unwrap(e); |
| 264 | + } |
| 265 | + |
| 266 | + throw new IllegalStateException("can not reach here"); |
| 267 | + } catch (PulsarClientException.TimeoutException ex) { |
| 268 | + Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize); |
| 269 | + } |
| 270 | + } |
228 | 271 | }
|
0 commit comments