Skip to content

Commit 3667a7a

Browse files
committed
fix getMessageIdByTimestamp cannot redirect
1 parent fd7e0b4 commit 3667a7a

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1767,8 +1767,8 @@ public void getMessageIdByTimestamp(
17671767
if (isNot307And404Exception(ex)) {
17681768
log.error("[{}] Failed to get message ID by timestamp {} from {}",
17691769
clientAppId(), timestamp, topicName, ex);
1770-
resumeAsyncResponseExceptionally(asyncResponse, ex);
17711770
}
1771+
resumeAsyncResponseExceptionally(asyncResponse, ex);
17721772
return null;
17731773
});
17741774
}

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import java.util.Optional;
2727
import java.util.Set;
2828
import java.util.concurrent.ExecutionException;
29+
import lombok.Cleanup;
2930
import lombok.extern.slf4j.Slf4j;
3031
import org.apache.commons.lang3.RandomStringUtils;
3132
import org.apache.pulsar.broker.MultiBrokerBaseTest;
3233
import org.apache.pulsar.broker.PulsarService;
3334
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
3435
import org.apache.pulsar.client.admin.PulsarAdmin;
3536
import org.apache.pulsar.client.admin.PulsarAdminException;
37+
import org.apache.pulsar.client.api.MessageId;
38+
import org.apache.pulsar.client.api.Producer;
3639
import org.apache.pulsar.common.naming.TopicDomain;
3740
import org.apache.pulsar.common.naming.TopicName;
3841
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -122,4 +125,27 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws
122125
Assert.assertEquals(lookupResultSet.size(), 1);
123126
}
124127

128+
@Test(timeOut = 30 * 1000)
129+
public void testTopicGetMessageIdByTimestamp() throws Exception {
130+
PulsarAdmin admin0 = getAllAdmins().get(0);
131+
String topic = "public/default/t1";
132+
admin0.topics().createPartitionedTopic(topic, 1);
133+
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
134+
@Cleanup
135+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
136+
for (int i = 0; i < 20; i++) {
137+
producer.send("msg".getBytes());
138+
}
139+
String brokerUrl = admin0.lookups().lookupTopic(topic + "-partition-0");
140+
PulsarAdmin admin = null;
141+
for (PulsarService additionalBroker : additionalBrokers) {
142+
if (!brokerUrl.endsWith(String.valueOf(additionalBroker.getBrokerListenPort().get()))) {
143+
admin = additionalBroker.getAdminClient();
144+
}
145+
}
146+
Assert.assertNotNull(admin);
147+
MessageId msgId =
148+
admin.topics().getMessageIdByTimestamp(topic + "-partition-0", System.currentTimeMillis());
149+
Assert.assertNotNull(msgId);
150+
}
125151
}

0 commit comments

Comments
 (0)