Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <pulsar/TableView.h>
#include <pulsar/defines.h>

#include <optional>
#include <string>

namespace pulsar {
Expand All @@ -42,6 +43,12 @@ typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;

struct PULSAR_PUBLIC ServiceInfo {
std::string serviceUrl;
std::optional<AuthenticationPtr> authentication;
std::optional<std::string> tlsTrustCertsFilePath;
};

class ClientImpl;
class PulsarFriend;
class PulsarWrapper;
Expand Down Expand Up @@ -414,6 +421,23 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback);

/**
* Update the service information of the client.
*
* This method is used to switch the connection to a different Pulsar cluster. All connections will be
* closed and the internal service info will be updated.
*
* @param serviceInfo the service URL, authentication and TLS trust certs file path to use
*/
void updateServiceInfo(ServiceInfo serviceInfo);

/**
* Get the current service information of the client.
*
* @return the current service information
*/
ServiceInfo getServiceInfo() const;

private:
Client(const std::shared_ptr<ClientImpl>&);

Expand Down
8 changes: 6 additions & 2 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();

void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback) {
impl_->getLookup()
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
.addListener(std::move(callback));
}

void Client::updateServiceInfo(ServiceInfo serviceInfo) { impl_->updateServiceInfo(std::move(serviceInfo)); }

ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }

} // namespace pulsar
5 changes: 4 additions & 1 deletion lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
startConsumerStatsTimer(consumerStatsRequests);
}

void ClientConnection::close(Result result, bool detach) {
void ClientConnection::close(Result result, bool detach, bool switchCluster) {
Lock lock(mutex_);
if (isClosed()) {
return;
Expand Down Expand Up @@ -1368,6 +1368,9 @@ void ClientConnection::close(Result result, bool detach) {
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
auto consumer = it->second.lock();
if (consumer) {
if (switchCluster) {
consumer->onClusterSwitching();
}
consumer->handleDisconnection(result, self);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*
* @param result all pending futures will complete with this result
* @param detach remove it from the pool if it's true
* @param switchCluster whether the close is triggered by cluster switching
*
* `detach` should only be false when the connection pool is closed.
*/
void close(Result result = ResultConnectError, bool detach = true);
void close(Result result = ResultConnectError, bool detach = true, bool switchCluster = false);

bool isClosed() const;

Expand Down
Loading
Loading