Skip to content

Commit 6cd2b21

Browse files
zhouyuanJkSelf
authored andcommitted
fix iceberg reader
Signed-off-by: Yuan <yuanzhou@apache.org>
1 parent 46be3ab commit 6cd2b21

File tree

3 files changed

+75
-1
lines changed

3 files changed

+75
-1
lines changed

velox/connectors/hive/HiveDataSource.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ std::unique_ptr<FileSplitReader> HiveDataSource::createSplitReader() {
137137
auto bucketChannels = prepareSplit();
138138
auto hiveSplit = checkedPointerCast<const HiveConnectorSplit>(split_);
139139

140-
return std::make_unique<HiveSplitReader>(
140+
return HiveSplitReader::create(
141141
hiveSplit,
142142
tableHandle_,
143143
&partitionKeys_,

velox/connectors/hive/HiveSplitReader.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,64 @@
1919
#include "velox/connectors/hive/FileConfig.h"
2020
#include "velox/connectors/hive/HiveConnectorSplit.h"
2121
#include "velox/connectors/hive/HiveConnectorUtil.h"
22+
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
23+
#include "velox/connectors/hive/iceberg/IcebergSplitReader.h"
2224

2325
namespace facebook::velox::connector::hive {
2426

27+
std::unique_ptr<FileSplitReader> HiveSplitReader::create(
28+
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
29+
const FileTableHandlePtr& tableHandle,
30+
const std::unordered_map<std::string, FileColumnHandlePtr>* partitionKeys,
31+
const ConnectorQueryCtx* connectorQueryCtx,
32+
const std::shared_ptr<const FileConfig>& fileConfig,
33+
const RowTypePtr& readerOutputType,
34+
const std::shared_ptr<io::IoStatistics>& ioStatistics,
35+
const std::shared_ptr<IoStats>& ioStats,
36+
FileHandleFactory* fileHandleFactory,
37+
folly::Executor* ioExecutor,
38+
const std::shared_ptr<common::ScanSpec>& scanSpec,
39+
const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
40+
std::vector<column_index_t> bucketChannels,
41+
const common::SubfieldFilters* subfieldFiltersForValidation) {
42+
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
43+
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
44+
hiveSplit->customSplitInfo.at("table_format") == "hive-iceberg") {
45+
auto icebergSplit =
46+
std::dynamic_pointer_cast<const iceberg::HiveIcebergSplit>(hiveSplit);
47+
VELOX_CHECK_NOT_NULL(
48+
icebergSplit,
49+
"Expected HiveIcebergSplit for table_format=hive-iceberg");
50+
return std::make_unique<iceberg::IcebergSplitReader>(
51+
icebergSplit,
52+
tableHandle,
53+
partitionKeys,
54+
connectorQueryCtx,
55+
fileConfig,
56+
readerOutputType,
57+
ioStatistics,
58+
ioStats,
59+
fileHandleFactory,
60+
ioExecutor,
61+
scanSpec);
62+
} else {
63+
return std::make_unique<HiveSplitReader>(
64+
hiveSplit,
65+
tableHandle,
66+
partitionKeys,
67+
connectorQueryCtx,
68+
fileConfig,
69+
readerOutputType,
70+
ioStatistics,
71+
ioStats,
72+
fileHandleFactory,
73+
ioExecutor,
74+
scanSpec,
75+
infoColumns,
76+
std::move(bucketChannels));
77+
}
78+
}
79+
2580
HiveSplitReader::HiveSplitReader(
2681
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
2782
const FileTableHandlePtr& tableHandle,

velox/connectors/hive/HiveSplitReader.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,25 @@ struct HiveConnectorSplit;
3030
/// to keep only rows belonging to the target bucket.
3131
class HiveSplitReader : public FileSplitReader {
3232
public:
33+
/// Factory method to create the appropriate split reader based on split type.
34+
/// For Iceberg splits (identified by customSplitInfo["table_format"] == "hive-iceberg"),
35+
/// creates an IcebergSplitReader. Otherwise, creates a HiveSplitReader.
36+
static std::unique_ptr<FileSplitReader> create(
37+
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
38+
const FileTableHandlePtr& tableHandle,
39+
const std::unordered_map<std::string, FileColumnHandlePtr>* partitionKeys,
40+
const ConnectorQueryCtx* connectorQueryCtx,
41+
const std::shared_ptr<const FileConfig>& fileConfig,
42+
const RowTypePtr& readerOutputType,
43+
const std::shared_ptr<io::IoStatistics>& ioStatistics,
44+
const std::shared_ptr<IoStats>& ioStats,
45+
FileHandleFactory* fileHandleFactory,
46+
folly::Executor* ioExecutor,
47+
const std::shared_ptr<common::ScanSpec>& scanSpec,
48+
const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
49+
std::vector<column_index_t> bucketChannels = {},
50+
const common::SubfieldFilters* subfieldFiltersForValidation = nullptr);
51+
3352
HiveSplitReader(
3453
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
3554
const FileTableHandlePtr& tableHandle,

0 commit comments

Comments
 (0)