Skip to content

Commit f736ec1

Browse files
mbasmanovameta-codesync[bot]
authored andcommitted
refactor: Add thread safety to connector registry (facebookincubator#16978)
Summary: Pull Request resolved: facebookincubator#16978 Wrap the connector registry in folly::Synchronized to make all operations thread-safe. Replace std::unordered_map with folly::F14FastMap for the internal storage. Remove the deprecated getAllConnectors() API. Reviewed By: srsuryadev Differential Revision: D98913848 fbshipit-source-id: b589ae001fef624682af13c0e294d3ca6e77cc2e
1 parent b79f0d1 commit f736ec1

File tree

5 files changed

+53
-43
lines changed

5 files changed

+53
-43
lines changed

velox/connectors/Connector.cpp

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,37 @@
2020
namespace facebook::velox::connector {
2121

2222
bool registerConnector(std::shared_ptr<Connector> connector) {
23-
bool ok = connectors().insert({connector->connectorId(), connector}).second;
24-
VELOX_CHECK(
25-
ok,
26-
"Connector with ID is already registered: {}",
27-
connector->connectorId());
28-
return true;
23+
return connectors().withWLock([&](auto& registry) {
24+
bool ok = registry.insert({connector->connectorId(), connector}).second;
25+
VELOX_CHECK(
26+
ok,
27+
"Connector with ID is already registered: {}",
28+
connector->connectorId());
29+
return true;
30+
});
2931
}
3032

3133
bool unregisterConnector(const std::string& connectorId) {
32-
return connectors().erase(connectorId) == 1;
34+
return connectors().withWLock(
35+
[&](auto& registry) { return registry.erase(connectorId) == 1; });
3336
}
3437

3538
std::shared_ptr<Connector> getConnector(const std::string& connectorId) {
36-
auto it = connectors().find(connectorId);
37-
VELOX_CHECK(
38-
it != connectors().end(),
39-
"Connector with ID is not registered: {}",
40-
connectorId);
41-
return it->second;
39+
return connectors().withRLock(
40+
[&](const auto& registry) -> std::shared_ptr<Connector> {
41+
auto it = registry.find(connectorId);
42+
VELOX_CHECK(
43+
it != registry.end(),
44+
"Connector with ID is not registered: {}",
45+
connectorId);
46+
return it->second;
47+
});
4248
}
4349

4450
bool hasConnector(const std::string& connectorId) {
45-
return connectors().find(connectorId) != connectors().end();
46-
}
47-
48-
const std::unordered_map<std::string, std::shared_ptr<Connector>>&
49-
getAllConnectors() {
50-
return connectors();
51+
return connectors().withRLock([&](const auto& registry) {
52+
return registry.find(connectorId) != registry.end();
53+
});
5154
}
5255

5356
bool DataSink::Stats::empty() const {

velox/connectors/Connector.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -785,10 +785,4 @@ bool unregisterConnector(const std::string& connectorId);
785785
/// exist.
786786
std::shared_ptr<Connector> getConnector(const std::string& connectorId);
787787

788-
/// Returns a map of all (connectorId -> connector) pairs currently
789-
/// registered.
790-
[[deprecated("Use ConnectorRegistry methods instead.")]] const std::
791-
unordered_map<std::string, std::shared_ptr<Connector>>&
792-
getAllConnectors();
793-
794788
} // namespace facebook::velox::connector

velox/connectors/ConnectorRegistry.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,30 @@ namespace facebook::velox::connector {
2222
// static
2323
std::shared_ptr<Connector> ConnectorRegistry::tryGet(
2424
const std::string& connectorId) {
25-
auto it = connectors().find(connectorId);
26-
if (it != connectors().end()) {
27-
return it->second;
28-
}
29-
return nullptr;
25+
return connectors().withRLock(
26+
[&](const auto& registry) -> std::shared_ptr<Connector> {
27+
auto it = registry.find(connectorId);
28+
if (it != registry.end()) {
29+
return it->second;
30+
}
31+
return nullptr;
32+
});
3033
}
3134

3235
// static
3336
void ConnectorRegistry::unregisterAll() {
34-
connectors().clear();
37+
folly::F14FastMap<std::string, std::shared_ptr<Connector>> entries;
38+
connectors().withWLock([&](auto& registry) { entries.swap(registry); });
3539
}
3640

3741
// static
38-
const std::unordered_map<std::string, std::shared_ptr<Connector>>&
39-
ConnectorRegistry::all() {
40-
return connectors();
42+
void ConnectorRegistry::forEach(
43+
std::function<void(const std::shared_ptr<Connector>&)> func) {
44+
connectors().withRLock([&](const auto& registry) {
45+
for (const auto& [_, connector] : registry) {
46+
func(connector);
47+
}
48+
});
4149
}
4250

4351
} // namespace facebook::velox::connector

velox/connectors/ConnectorRegistry.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
namespace facebook::velox::connector {
2222

2323
/// Provides static methods for connector registry operations.
24+
/// All methods are thread-safe.
2425
class ConnectorRegistry {
2526
public:
2627
/// Return the connector with the specified ID, or nullptr if not registered.
@@ -30,21 +31,21 @@ class ConnectorRegistry {
3031
template <typename T>
3132
static std::vector<std::shared_ptr<T>> findAll() {
3233
std::vector<std::shared_ptr<T>> result;
33-
for (const auto& [_, connector] : all()) {
34+
forEach([&](const std::shared_ptr<Connector>& connector) {
3435
if (auto casted = std::dynamic_pointer_cast<T>(connector)) {
3536
result.push_back(std::move(casted));
3637
}
37-
}
38+
});
3839
return result;
3940
}
4041

4142
/// Unregister all connectors.
4243
static void unregisterAll();
4344

4445
private:
45-
// Return a reference to the internal connector map.
46-
static const std::unordered_map<std::string, std::shared_ptr<Connector>>&
47-
all();
46+
// Invoke 'func' on each registered connector under the read lock.
47+
static void forEach(
48+
std::function<void(const std::shared_ptr<Connector>&)> func);
4849
};
4950

5051
} // namespace facebook::velox::connector

velox/connectors/ConnectorRegistryInternal.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,21 @@
1818

1919
#include <memory>
2020
#include <string>
21-
#include <unordered_map>
21+
22+
#include "folly/Synchronized.h"
23+
#include "folly/container/F14Map.h"
2224

2325
namespace facebook::velox::connector {
2426

2527
class Connector;
2628

2729
// Internal helper shared by Connector.cpp and ConnectorRegistry.cpp.
2830
// Not part of the public API. Do not include from outside velox/connectors/.
29-
inline std::unordered_map<std::string, std::shared_ptr<Connector>>&
30-
connectors() {
31-
static std::unordered_map<std::string, std::shared_ptr<Connector>> instance;
31+
using ConnectorMap = folly::Synchronized<
32+
folly::F14FastMap<std::string, std::shared_ptr<Connector>>>;
33+
34+
inline ConnectorMap& connectors() {
35+
static ConnectorMap instance;
3236
return instance;
3337
}
3438

0 commit comments

Comments
 (0)