Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/common/ScopedRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "folly/Synchronized.h"
#include "folly/container/F14Map.h"
#include "folly/container/F14Set.h"
Expand Down
15 changes: 11 additions & 4 deletions velox/common/tests/ScopedRegistryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

#include "velox/common/ScopedRegistry.h"

#include <gmock/gmock.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>

#include <gtest/gtest.h>

namespace facebook::velox {
Expand All @@ -25,7 +30,7 @@ namespace {
// Minimal value type for testing.
class TestEntry {
public:
explicit TestEntry(const std::string& name) : name_{name} {}
explicit TestEntry(std::string name) : name_{std::move(name)} {}

const std::string& name() const {
return name_;
Expand Down Expand Up @@ -94,15 +99,17 @@ TEST(ScopedRegistryTest, snapshot) {
for (const auto& [key, _] : entries) {
keys.insert(key);
}
EXPECT_THAT(keys, testing::UnorderedElementsAre("a", "b"));
EXPECT_EQ(keys.size(), 2);
EXPECT_TRUE(keys.count("a"));
EXPECT_TRUE(keys.count("b"));
}

TEST(ScopedRegistryTest, parentFallback) {
ScopedRegistry<std::string, TestEntry> parent;
auto entry = std::make_shared<TestEntry>("from-parent");
parent.insert("key", entry);

ScopedRegistry<std::string, TestEntry> child(&parent);
const ScopedRegistry<std::string, TestEntry> child(&parent);
EXPECT_EQ(child.find("key"), entry);
}

Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/Connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/

#include "velox/connectors/Connector.h"

#include <memory>
#include <string>

#include "velox/common/ScopedRegistry.h"
#include "velox/common/base/Exceptions.h"
#include "velox/connectors/ConnectorRegistryInternal.h"

namespace facebook::velox::connector {
Expand All @@ -24,7 +30,7 @@ ScopedRegistry<std::string, Connector>& connectors() {
return instance;
}

bool registerConnector(std::shared_ptr<Connector> connector) {
bool registerConnector(const std::shared_ptr<Connector>& connector) {
connectors().insert(connector->connectorId(), connector);
return true;
}
Expand Down
8 changes: 6 additions & 2 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -768,14 +768,18 @@ class Connector {
trackers_;
};

// Legacy free functions. Prefer ConnectorRegistry methods for new code.
/// Deprecated free functions. Use ConnectorRegistry methods instead.

bool registerConnector(std::shared_ptr<Connector> connector);
[[deprecated("Use ConnectorRegistry::global().insert() instead.")]]
bool registerConnector(const std::shared_ptr<Connector>& connector);

[[deprecated("Use ConnectorRegistry::tryGet() instead.")]]
bool hasConnector(const std::string& connectorId);

[[deprecated("Use ConnectorRegistry::global().erase() instead.")]]
bool unregisterConnector(const std::string& connectorId);

[[deprecated("Use ConnectorRegistry::tryGet() instead.")]]
std::shared_ptr<Connector> getConnector(const std::string& connectorId);

} // namespace facebook::velox::connector
8 changes: 7 additions & 1 deletion velox/connectors/ConnectorRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
*/

#include "velox/connectors/ConnectorRegistry.h"
#include "velox/connectors/ConnectorRegistryInternal.h"

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "velox/connectors/Connector.h"
#include "velox/connectors/ConnectorRegistryInternal.h"
#include "velox/core/QueryCtx.h"

namespace facebook::velox::connector {
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/ConnectorRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

#pragma once

#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include "velox/common/ScopedRegistry.h"
#include "velox/connectors/Connector.h"

Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/ConnectorRegistryInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <string>

#include "velox/common/ScopedRegistry.h"
#include "velox/connectors/Connector.h"

Expand Down
11 changes: 7 additions & 4 deletions velox/connectors/tests/ConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/

#include "velox/connectors/Connector.h"
#include "velox/common/config/Config.h"

#include <memory>
#include <utility>

#include <fmt/format.h>
#include <gtest/gtest.h>

#include "velox/common/memory/Memory.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/core/QueryCtx.h"

#include <gmock/gmock.h>
#include <gtest/gtest.h>

namespace facebook::velox::connector {
namespace {

Expand Down
2 changes: 1 addition & 1 deletion velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <folly/Executor.h>
#include <folly/Synchronized.h>
#include <folly/container/F14Map.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <deque>
#include <functional>
#include <string_view>
#include <typeindex>
#include "velox/common/base/Exceptions.h"
#include "velox/common/caching/AsyncDataCache.h"
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/IndexLookupJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/buffer/Buffer.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/OperatorType.h"
Expand Down Expand Up @@ -347,7 +348,9 @@ IndexLookupJoin::IndexLookupJoin(
operatorType(),
lookupTableHandle_->connectorId()),
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr)},
connector_(connector::getConnector(lookupTableHandle_->connectorId())),
connector_(
connector::ConnectorRegistry::tryGet(
lookupTableHandle_->connectorId())),
maxNumInputBatches_(
1 + driverCtx->queryConfig().indexLookupJoinMaxPrefetchBatches()),
joinNode_{joinNode} {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "velox/exec/TableScan.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/exec/OperatorType.h"
#include "velox/exec/Task.h"

Expand Down Expand Up @@ -90,7 +91,8 @@ TableScan::TableScan(
driverCtx_->driverId,
operatorType(),
tableHandle_->connectorId())),
connector_(connector::getConnector(tableHandle_->connectorId())),
connector_(
connector::ConnectorRegistry::tryGet(tableHandle_->connectorId())),
getOutputTimeLimitMs_(
driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()),
outputBatchRowsOverride_(
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/TableWriter.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/exec/OperatorType.h"
#include "velox/exec/Task.h"

Expand Down Expand Up @@ -63,7 +64,7 @@ TableWriter::TableWriter(
&nonReclaimableSection_);
}
const auto& connectorId = tableWriteNode->insertTableHandle()->connectorId();
connector_ = connector::getConnector(connectorId);
connector_ = connector::ConnectorRegistry::tryGet(connectorId);
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
connectorId,
planNodeId(),
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/IndexLookupJoinTestExtra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/IndexLookupJoin.h"
#include "velox/exec/PlanNodeStats.h"
Expand Down Expand Up @@ -115,7 +116,7 @@ class IndexLookupJoinTest : public IndexLookupJoinTestBase,
}

void TearDown() override {
connector::unregisterConnector(kTestIndexConnectorName);
connector::ConnectorRegistry::global().erase(kTestIndexConnectorName);
HiveConnectorTestBase::TearDown();
}

Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/TableEvolutionFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/exec/tests/TableEvolutionFuzzer.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
Expand Down
3 changes: 2 additions & 1 deletion velox/experimental/cudf/exec/OperatorAdapters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "velox/experimental/cudf/exec/Validation.h"
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"

#include "velox/connectors/ConnectorRegistry.h"
#include "velox/exec/AssignUniqueId.h"
#include "velox/exec/CallbackSink.h"
#include "velox/exec/FilterProject.h"
Expand Down Expand Up @@ -100,7 +101,7 @@ class TableScanAdapter : public OperatorAdapter {
planNode->id());
return false;
}
auto const& connector = velox::connector::getConnector(
auto const& connector = velox::connector::ConnectorRegistry::tryGet(
tableScanNode->tableHandle()->connectorId());
auto cudfHiveConnector = std::dynamic_pointer_cast<
facebook::velox::cudf_velox::connector::hive::CudfHiveConnector>(
Expand Down
4 changes: 3 additions & 1 deletion velox/experimental/wave/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/connectors/ConnectorRegistry.h"
#include "velox/experimental/wave/exec/WaveOperator.h"

#include "velox/common/time/Timer.h"
Expand Down Expand Up @@ -49,7 +50,8 @@ class TableScan : public WaveSourceOperator {
->queryConfig()
.preferredOutputBatchRows()) {
defines_ = std::move(defines);
connector_ = connector::getConnector(tableHandle_->connectorId());
connector_ =
connector::ConnectorRegistry::tryGet(tableHandle_->connectorId());
}

std::vector<AdvanceResult> canAdvance(WaveStream& stream) override;
Expand Down
5 changes: 4 additions & 1 deletion velox/experimental/wave/exec/WaveHiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/experimental/wave/exec/WaveHiveDataSource.h"
#include "velox/connectors/ConnectorRegistry.h"
#include "velox/connectors/hive/HiveDataSource.h"

namespace facebook::velox::wave {

Expand Down Expand Up @@ -164,7 +166,8 @@ void WaveHiveDataSource::registerConnector() {
// Create hive connector with config...
connector::hive::HiveConnectorFactory factory;
auto hiveConnector = factory.newConnector("wavemock", config, nullptr);
connector::registerConnector(hiveConnector);
connector::ConnectorRegistry::global().insert(
hiveConnector->connectorId(), hiveConnector);
connector::hive::HiveDataSource::registerWaveDelegateHook(
[](const HiveTableHandlePtr& hiveTableHandle,
const std::shared_ptr<common::ScanSpec>& scanSpec,
Expand Down
10 changes: 8 additions & 2 deletions velox/python/runner/PyConnectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/

#include "velox/python/runner/PyConnectors.h"

#include <stdexcept>

#include "velox/connectors/ConnectorRegistry.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/tpch/TpchConnector.h"

Expand All @@ -35,7 +39,8 @@ void registerConnector(
TConnectorFactory factory;
auto connector = factory.newConnector(
connectorId, configBase, folly::getGlobalCPUExecutor().get());
connector::registerConnector(connector);
connector::ConnectorRegistry::global().insert(
connector->connectorId(), connector);
connectorRegistry().insert(connectorId);
}

Expand All @@ -57,7 +62,8 @@ void registerTpch(

// Is it ok to unregister connectors that were not registered.
void unregister(const std::string& connectorId) {
if (!facebook::velox::connector::unregisterConnector(connectorId)) {
if (!facebook::velox::connector::ConnectorRegistry::global().erase(
connectorId)) {
throw std::runtime_error(
fmt::format("Unable to unregister connector '{}'", connectorId));
}
Expand Down
5 changes: 3 additions & 2 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,15 @@ TraceReplayRunner::createReplayer() const {
taskTraceMetadataReader_->connectorId(FLAGS_node_id);
VELOX_CHECK(connectorId.has_value());

if (!connector::hasConnector(connectorId.value())) {
if (!connector::ConnectorRegistry::tryGet(connectorId.value())) {
connector::hive::HiveConnectorFactory factory;
const auto hiveConnector = factory.newConnector(
connectorId.value(),
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
connector::ConnectorRegistry::global().insert(
hiveConnector->connectorId(), hiveConnector);
}
replayer = std::make_unique<tool::trace::TableScanReplayer>(
FLAGS_root_dir,
Expand Down
Loading