Skip to content

Commit d8fee39

Browse files
committed
Pushdown integer upcasts to source operators in LocalPlanner
Changed pushdown_integer_upcasts_to_scan to pushdown_integer_upcasts_to_source PUshdown integer upcasts to source operators Only when there are join nodes or result output nodes in the fragment that an integer upcast is needed. In this case, pushdown the upcasts to source operators like TableScanNode or ExchangeNode.
1 parent 8a466df commit d8fee39

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1994
-214
lines changed

velox/connectors/Connector.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,8 @@ class Connector {
604604
const RowTypePtr& outputType,
605605
const ConnectorTableHandlePtr& tableHandle,
606606
const connector::ColumnHandleMap& columnHandles,
607-
ConnectorQueryCtx* connectorQueryCtx) = 0;
607+
ConnectorQueryCtx* connectorQueryCtx,
608+
bool pushdownCasts = false) = 0;
608609

609610
/// Returns true if addSplit of DataSource can use 'dataSource' from
610611
/// ConnectorSplit in addSplit(). If so, TableScan can preload splits

velox/connectors/fuzzer/FuzzerConnector.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ class FuzzerConnector final : public Connector {
111111
const RowTypePtr& outputType,
112112
const ConnectorTableHandlePtr& tableHandle,
113113
const connector::ColumnHandleMap& /*columnHandles*/,
114-
ConnectorQueryCtx* connectorQueryCtx) override final {
114+
ConnectorQueryCtx* connectorQueryCtx,
115+
bool pushdownCasts = false) override final {
115116
return std::make_unique<FuzzerDataSource>(
116117
outputType, tableHandle, connectorQueryCtx->memoryPool());
117118
}

velox/connectors/hive/HiveConnector.cpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,17 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
5757
const RowTypePtr& outputType,
5858
const ConnectorTableHandlePtr& tableHandle,
5959
const std::unordered_map<std::string, ColumnHandlePtr>& columnHandles,
60-
ConnectorQueryCtx* connectorQueryCtx) {
60+
ConnectorQueryCtx* connectorQueryCtx,
61+
bool pushdownCasts) {
6162
return std::make_unique<HiveDataSource>(
6263
outputType,
6364
tableHandle,
6465
columnHandles,
6566
&fileHandleFactory_,
6667
ioExecutor_,
6768
connectorQueryCtx,
68-
hiveConfig_);
69+
hiveConfig_,
70+
pushdownCasts);
6971
}
7072

7173
std::unique_ptr<DataSink> HiveConnector::createDataSink(
@@ -192,4 +194,55 @@ void HivePartitionFunctionSpec::registerSerDe() {
192194
"HivePartitionFunctionSpec", HivePartitionFunctionSpec::deserialize);
193195
}
194196

197+
std::shared_ptr<core::PartitionFunctionSpec>
198+
HivePartitionFunctionSpec::rewriteInputType(
199+
const RowTypePtr& oldInputType,
200+
const RowTypePtr& newInputType) const {
201+
// If the layout is identical, no change.
202+
if (*oldInputType == *newInputType) {
203+
return nullptr;
204+
}
205+
206+
std::vector<column_index_t> newChannels;
207+
newChannels.reserve(channels_.size());
208+
bool changed = false;
209+
210+
for (auto channel : channels_) {
211+
VELOX_CHECK_LT(
212+
channel,
213+
oldInputType->size(),
214+
"Hive bucket channel index {} is out of bounds for old input type {}",
215+
channel,
216+
oldInputType->toString());
217+
218+
const auto& name = oldInputType->nameOf(channel);
219+
auto newIndexOpt = newInputType->getChildIdxIfExists(name);
220+
VELOX_USER_CHECK(
221+
newIndexOpt.has_value(),
222+
"Failed to find Hive bucket column '{}' in LocalPartition new input type {}",
223+
name,
224+
newInputType->toString());
225+
226+
auto newIndex = static_cast<column_index_t>(*newIndexOpt);
227+
newChannels.push_back(newIndex);
228+
if (newIndex != channel) {
229+
changed = true;
230+
}
231+
}
232+
233+
if (!changed) {
234+
return nullptr;
235+
}
236+
237+
// If we don't have a bucketToPartition map yet, keep using the "late binding"
238+
// constructor. Otherwise preserve the bucketToPartition_.
239+
if (bucketToPartition_.empty()) {
240+
return std::make_shared<HivePartitionFunctionSpec>(
241+
numBuckets_, std::move(newChannels), constValues_);
242+
}
243+
244+
return std::make_shared<HivePartitionFunctionSpec>(
245+
numBuckets_, bucketToPartition_, std::move(newChannels), constValues_);
246+
}
247+
195248
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveConnector.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class HiveConnector : public Connector {
4242
const RowTypePtr& outputType,
4343
const ConnectorTableHandlePtr& tableHandle,
4444
const connector::ColumnHandleMap& columnHandles,
45-
ConnectorQueryCtx* connectorQueryCtx) override;
45+
ConnectorQueryCtx* connectorQueryCtx,
46+
bool pushdownCasts = false) override;
4647

4748
bool supportsSplitPreload() const override {
4849
return true;
@@ -138,6 +139,10 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec {
138139

139140
static void registerSerDe();
140141

142+
std::shared_ptr<core::PartitionFunctionSpec> rewriteInputType(
143+
const RowTypePtr& oldInputType,
144+
const RowTypePtr& newInputType) const override;
145+
141146
private:
142147
const int numBuckets_;
143148
const std::vector<int> bucketToPartition_;

0 commit comments

Comments
 (0)