Skip to content

Commit 16927b2

Browse files
mbasmanovameta-codesync[bot]
authored andcommitted
refactor: Migrate production code to ConnectorRegistry API and deprecate free functions (facebookincubator#16986)
Summary: Pull Request resolved: facebookincubator#16986 Replace legacy free functions (registerConnector, unregisterConnector, getConnector, hasConnector) with ConnectorRegistry methods in all remaining velox/ production code: TableScan, TableWriter, IndexLookupJoin, TraceReplayRunner, PyConnectors, and experimental operators. Mark the free functions [[deprecated]] to prevent new usages. Reviewed By: jagill Differential Revision: D98963405
1 parent 7320219 commit 16927b2

File tree

9 files changed

+30
-11
lines changed

9 files changed

+30
-11
lines changed

velox/connectors/Connector.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,14 +768,18 @@ class Connector {
768768
trackers_;
769769
};
770770

771-
// Legacy free functions. Prefer ConnectorRegistry methods for new code.
771+
/// Deprecated free functions. Use ConnectorRegistry methods instead.
772772

773+
[[deprecated("Use ConnectorRegistry::global().insert() instead.")]]
773774
bool registerConnector(std::shared_ptr<Connector> connector);
774775

776+
[[deprecated("Use ConnectorRegistry::tryGet() instead.")]]
775777
bool hasConnector(const std::string& connectorId);
776778

779+
[[deprecated("Use ConnectorRegistry::global().erase() instead.")]]
777780
bool unregisterConnector(const std::string& connectorId);
778781

782+
[[deprecated("Use ConnectorRegistry::tryGet() instead.")]]
779783
std::shared_ptr<Connector> getConnector(const std::string& connectorId);
780784

781785
} // namespace facebook::velox::connector

velox/exec/IndexLookupJoin.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "velox/buffer/Buffer.h"
1919
#include "velox/common/testutil/TestValue.h"
2020
#include "velox/connectors/Connector.h"
21+
#include "velox/connectors/ConnectorRegistry.h"
2122
#include "velox/core/QueryConfig.h"
2223
#include "velox/exec/OperatorTraceWriter.h"
2324
#include "velox/exec/OperatorType.h"
@@ -347,7 +348,9 @@ IndexLookupJoin::IndexLookupJoin(
347348
operatorType(),
348349
lookupTableHandle_->connectorId()),
349350
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr)},
350-
connector_(connector::getConnector(lookupTableHandle_->connectorId())),
351+
connector_(
352+
connector::ConnectorRegistry::tryGet(
353+
lookupTableHandle_->connectorId())),
351354
maxNumInputBatches_(
352355
1 + driverCtx->queryConfig().indexLookupJoinMaxPrefetchBatches()),
353356
joinNode_{joinNode} {

velox/exec/TableScan.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "velox/exec/TableScan.h"
1717
#include "velox/common/testutil/TestValue.h"
1818
#include "velox/common/time/Timer.h"
19+
#include "velox/connectors/ConnectorRegistry.h"
1920
#include "velox/exec/OperatorType.h"
2021
#include "velox/exec/Task.h"
2122

@@ -90,7 +91,8 @@ TableScan::TableScan(
9091
driverCtx_->driverId,
9192
operatorType(),
9293
tableHandle_->connectorId())),
93-
connector_(connector::getConnector(tableHandle_->connectorId())),
94+
connector_(
95+
connector::ConnectorRegistry::tryGet(tableHandle_->connectorId())),
9496
getOutputTimeLimitMs_(
9597
driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()),
9698
outputBatchRowsOverride_(

velox/exec/TableWriter.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "velox/exec/TableWriter.h"
18+
#include "velox/connectors/ConnectorRegistry.h"
1819
#include "velox/exec/OperatorType.h"
1920
#include "velox/exec/Task.h"
2021

@@ -63,7 +64,7 @@ TableWriter::TableWriter(
6364
&nonReclaimableSection_);
6465
}
6566
const auto& connectorId = tableWriteNode->insertTableHandle()->connectorId();
66-
connector_ = connector::getConnector(connectorId);
67+
connector_ = connector::ConnectorRegistry::tryGet(connectorId);
6768
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
6869
connectorId,
6970
planNodeId(),

velox/experimental/cudf/exec/OperatorAdapters.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "velox/experimental/cudf/exec/Validation.h"
3030
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"
3131

32+
#include "velox/connectors/ConnectorRegistry.h"
3233
#include "velox/exec/AssignUniqueId.h"
3334
#include "velox/exec/CallbackSink.h"
3435
#include "velox/exec/FilterProject.h"
@@ -99,7 +100,7 @@ class TableScanAdapter : public OperatorAdapter {
99100
planNode->id());
100101
return false;
101102
}
102-
auto const& connector = velox::connector::getConnector(
103+
auto const& connector = velox::connector::ConnectorRegistry::tryGet(
103104
tableScanNode->tableHandle()->connectorId());
104105
auto cudfHiveConnector = std::dynamic_pointer_cast<
105106
facebook::velox::cudf_velox::connector::hive::CudfHiveConnector>(

velox/experimental/wave/exec/TableScan.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
#pragma once
1717

18+
#include "velox/connectors/ConnectorRegistry.h"
1819
#include "velox/experimental/wave/exec/WaveOperator.h"
1920

2021
#include "velox/common/time/Timer.h"
@@ -49,7 +50,8 @@ class TableScan : public WaveSourceOperator {
4950
->queryConfig()
5051
.preferredOutputBatchRows()) {
5152
defines_ = std::move(defines);
52-
connector_ = connector::getConnector(tableHandle_->connectorId());
53+
connector_ =
54+
connector::ConnectorRegistry::tryGet(tableHandle_->connectorId());
5355
}
5456

5557
std::vector<AdvanceResult> canAdvance(WaveStream& stream) override;

velox/experimental/wave/exec/WaveHiveDataSource.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "velox/experimental/wave/exec/WaveHiveDataSource.h"
18+
#include "velox/connectors/ConnectorRegistry.h"
1819

1920
namespace facebook::velox::wave {
2021

@@ -164,7 +165,8 @@ void WaveHiveDataSource::registerConnector() {
164165
// Create hive connector with config...
165166
connector::hive::HiveConnectorFactory factory;
166167
auto hiveConnector = factory.newConnector("wavemock", config, nullptr);
167-
connector::registerConnector(hiveConnector);
168+
connector::ConnectorRegistry::global().insert(
169+
hiveConnector->connectorId(), hiveConnector);
168170
connector::hive::HiveDataSource::registerWaveDelegateHook(
169171
[](const HiveTableHandlePtr& hiveTableHandle,
170172
const std::shared_ptr<common::ScanSpec>& scanSpec,

velox/python/runner/PyConnectors.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "velox/python/runner/PyConnectors.h"
18+
#include "velox/connectors/ConnectorRegistry.h"
1819
#include "velox/connectors/hive/HiveConnector.h"
1920
#include "velox/connectors/tpch/TpchConnector.h"
2021

@@ -35,7 +36,8 @@ void registerConnector(
3536
TConnectorFactory factory;
3637
auto connector = factory.newConnector(
3738
connectorId, configBase, folly::getGlobalCPUExecutor().get());
38-
connector::registerConnector(connector);
39+
connector::ConnectorRegistry::global().insert(
40+
connector->connectorId(), connector);
3941
connectorRegistry().insert(connectorId);
4042
}
4143

@@ -57,7 +59,8 @@ void registerTpch(
5759

5860
// Is it ok to unregister connectors that were not registered.
5961
void unregister(const std::string& connectorId) {
60-
if (!facebook::velox::connector::unregisterConnector(connectorId)) {
62+
if (!facebook::velox::connector::ConnectorRegistry::global().erase(
63+
connectorId)) {
6164
throw std::runtime_error(
6265
fmt::format("Unable to unregister connector '{}'", connectorId));
6366
}

velox/tool/trace/TraceReplayRunner.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,15 @@ TraceReplayRunner::createReplayer() const {
381381
taskTraceMetadataReader_->connectorId(FLAGS_node_id);
382382
VELOX_CHECK(connectorId.has_value());
383383

384-
if (!connector::hasConnector(connectorId.value())) {
384+
if (!connector::ConnectorRegistry::tryGet(connectorId.value())) {
385385
connector::hive::HiveConnectorFactory factory;
386386
const auto hiveConnector = factory.newConnector(
387387
connectorId.value(),
388388
std::make_shared<config::ConfigBase>(
389389
std::unordered_map<std::string, std::string>()),
390390
ioExecutor_.get());
391-
connector::registerConnector(hiveConnector);
391+
connector::ConnectorRegistry::global().insert(
392+
hiveConnector->connectorId(), hiveConnector);
392393
}
393394
replayer = std::make_unique<tool::trace::TableScanReplayer>(
394395
FLAGS_root_dir,

0 commit comments

Comments
 (0)