Skip to content

Commit 1021d06

Browse files
committed
Fix TableView's existing key-value will never be updated (#487)
(cherry picked from commit d9dd029)
1 parent a142d02 commit 1021d06

8 files changed

+65
-31
lines changed

lib/ClientImpl.cc

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,12 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
222222
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
223223
CreateProducerCallback callback, ProducerImplBasePtr producer) {
224224
if (result == ResultOk) {
225-
auto pair = producers_.emplace(producer.get(), producer);
226-
if (!pair.second) {
227-
auto existingProducer = pair.first->second.lock();
225+
auto address = producer.get();
226+
auto existingProducer = producers_.putIfAbsent(address, producer);
227+
if (existingProducer) {
228+
auto producer = existingProducer.value().lock();
228229
LOG_ERROR("Unexpected existing producer at the same address: "
229-
<< pair.first->first << ", producer: "
230-
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
230+
<< address << ", producer: " << (producer ? producer->getProducerName() : "(null)"));
231231
callback(ResultUnknownError, {});
232232
return;
233233
}
@@ -311,12 +311,12 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
311311
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
312312
auto consumer = weakConsumerPtr.lock();
313313
if (consumer) {
314-
auto pair = consumers_.emplace(consumer.get(), consumer);
315-
if (!pair.second) {
316-
auto existingConsumer = pair.first->second.lock();
314+
auto address = consumer.get();
315+
auto existingConsumer = consumers_.putIfAbsent(address, consumer);
316+
if (existingConsumer) {
317+
consumer = existingConsumer.value().lock();
317318
LOG_ERROR("Unexpected existing consumer at the same address: "
318-
<< pair.first->first
319-
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
319+
<< address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
320320
}
321321
} else {
322322
LOG_ERROR("Unexpected case: the consumer is somehow expired");
@@ -512,12 +512,12 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
512512
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
513513
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
514514
if (result == ResultOk) {
515-
auto pair = consumers_.emplace(consumer.get(), consumer);
516-
if (!pair.second) {
517-
auto existingConsumer = pair.first->second.lock();
515+
auto address = consumer.get();
516+
auto existingConsumer = consumers_.putIfAbsent(address, consumer);
517+
if (existingConsumer) {
518+
auto consumer = existingConsumer.value().lock();
518519
LOG_ERROR("Unexpected existing consumer at the same address: "
519-
<< pair.first->first
520-
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
520+
<< address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
521521
callback(ResultUnknownError, {});
522522
return;
523523
}

lib/ConsumerImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
619619
return;
620620
}
621621
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
622-
possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
622+
possibleSendToDeadLetterTopicMessages_.put(m.getMessageId(), std::vector<Message>{m});
623623
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
624624
redeliverUnacknowledgedMessages({m.getMessageId()});
625625
increaseAvailablePermits(cnx);
@@ -786,7 +786,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
786786
}
787787

788788
if (!possibleToDeadLetter.empty()) {
789-
possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
789+
possibleSendToDeadLetterTopicMessages_.put(batchedMessage.getMessageId(), possibleToDeadLetter);
790790
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
791791
redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
792792
}

lib/MultiTopicsConsumerImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
260260
consumer->getConsumerCreatedFuture().addListener(std::bind(
261261
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
262262
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
263-
consumers_.emplace(topicName->toString(), consumer);
263+
consumers_.put(topicName->toString(), consumer);
264264
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
265265
consumer->start();
266266

@@ -287,7 +287,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
287287
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
288288
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
289289
consumer->setPartitionIndex(i);
290-
consumers_.emplace(topicPartitionName, consumer);
290+
consumers_.put(topicPartitionName, consumer);
291291
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
292292
consumer->start();
293293
}
@@ -1063,7 +1063,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
10631063
});
10641064
consumer->setPartitionIndex(partitionIndex);
10651065
consumer->start();
1066-
consumers_.emplace(topicPartitionName, consumer);
1066+
consumers_.put(topicPartitionName, consumer);
10671067
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
10681068
<< " consumerSize: " << consumers_.size());
10691069
}

lib/SynchronizedHashMap.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,22 @@ class SynchronizedHashMap {
5959
}
6060
}
6161

62-
template <typename... Args>
63-
std::pair<Iterator, bool> emplace(Args&&... args) {
62+
// Put a new key-value pair if the key does not exist.
63+
// Return boost::none if the key already exists or the existing value.
64+
OptValue putIfAbsent(const K& key, const V& value) {
6465
Lock lock(mutex_);
65-
return data_.emplace(std::forward<Args>(args)...);
66+
auto pair = data_.emplace(key, value);
67+
if (pair.second) {
68+
return boost::none;
69+
} else {
70+
return pair.first->second;
71+
}
72+
}
73+
74+
// Put a key-value pair no matter if the key exists.
75+
void put(const K& key, const V& value) {
76+
Lock lock(mutex_);
77+
data_[key] = value;
6678
}
6779

6880
void forEach(std::function<void(const K&, const V&)> f) const {

lib/TableViewImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
104104
if (msg.getLength() == 0) {
105105
data_.remove(msg.getPartitionKey());
106106
} else {
107-
data_.emplace(msg.getPartitionKey(), value);
107+
data_.put(msg.getPartitionKey(), value);
108108
}
109109

110110
Lock lock(listenersMutex_);
@@ -167,4 +167,4 @@ void TableViewImpl::readTailMessage() {
167167
});
168168
}
169169

170-
} // namespace pulsar
170+
} // namespace pulsar

tests/SynchronizedHashMapTest.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <algorithm>
2222
#include <atomic>
23+
#include <boost/optional/optional_io.hpp>
2324
#include <chrono>
2425
#include <thread>
2526
#include <vector>
@@ -100,18 +101,27 @@ TEST(SynchronizedHashMapTest, testForEach) {
100101
ASSERT_TRUE(values.empty());
101102
ASSERT_EQ(result, 1);
102103

103-
m.emplace(1, 100);
104+
ASSERT_EQ(m.putIfAbsent(1, 100), boost::none);
105+
ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional<int>(100));
104106
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
105107
[&result] { result = 2; });
106108
ASSERT_EQ(values, (std::vector<int>({100})));
107109
ASSERT_EQ(result, 1);
108110

111+
m.put(1, 102);
109112
values.clear();
110-
m.emplace(2, 200);
113+
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
114+
[&result] { result = 2; });
115+
ASSERT_EQ(values, (std::vector<int>({102})));
116+
ASSERT_EQ(result, 1);
117+
118+
values.clear();
119+
ASSERT_EQ(m.putIfAbsent(2, 200), boost::none);
120+
ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional<int>(200));
111121
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
112122
[&result] { result = 2; });
113123
std::sort(values.begin(), values.end());
114-
ASSERT_EQ(values, (std::vector<int>({100, 200})));
124+
ASSERT_EQ(values, (std::vector<int>({102, 200})));
115125
ASSERT_EQ(result, 1);
116126
}
117127

tests/TableViewTest.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,22 @@ TEST(TableViewTest, testSimpleTableView) {
9696

9797
// assert interfaces.
9898
std::string value;
99+
ASSERT_TRUE(tableView.containsKey("key1"));
99100
ASSERT_TRUE(tableView.getValue("key1", value));
100101
ASSERT_EQ(value, "value1");
102+
103+
// Test value update
104+
ASSERT_EQ(ResultOk,
105+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-update").build()));
106+
ASSERT_TRUE(waitUntil(std::chrono::seconds(2), [&tableView]() {
107+
std::string value;
108+
tableView.getValue("key1", value);
109+
return value == "value1-update";
110+
}));
111+
112+
// retrieveValue will remove the key/value from the table view.
101113
ASSERT_TRUE(tableView.retrieveValue("key1", value));
102-
ASSERT_EQ(value, "value1");
114+
ASSERT_EQ(value, "value1-update");
103115
ASSERT_FALSE(tableView.containsKey("key1"));
104116
ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
105117
ASSERT_EQ(tableView.size(), 0);

tests/extensibleLM/ExtensibleLoadManagerTest.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
116116
ASSERT_EQ(sendResult, ResultOk);
117117
ASSERT_TRUE(elapsed < maxWaitTimeMs);
118118

119-
producedMsgs.emplace(i, i);
119+
producedMsgs.put(i, i);
120120
i++;
121121
}
122122
LOG_INFO("producer finished");
@@ -143,7 +143,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
143143
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
144144
ASSERT_TRUE(elapsed < maxWaitTimeMs);
145145
ASSERT_EQ(ackResult, ResultOk);
146-
consumedMsgs.emplace(i, i);
146+
consumedMsgs.put(i, i);
147147
}
148148
LOG_INFO("consumer finished");
149149
};

0 commit comments

Comments
 (0)