Skip to content

Commit d8b1672

Browse files
PingLiuPingunidevel
authored andcommitted
Iceberg core code
Alchemy-item: (ID = 1153) Iceberg staging hub commit 1/6 - c5a69de3d1021073c13a99e1c7c6d6fcce355178 refactor: Move toValues from InPredicate.cpp to Filter.h The function toValues removes duplicated values from the vector and return them in a std::vector. It was used to build an InPredicate. It will be needed for building NOT IN filters for Iceberg equality delete read as well, therefore moving it from velox/functions/prestosql/InPred icate.cpp to velox/type/Filter.h. This commit also renames it to deDuplicateValues to make it easier to understand. feat(connector): Support reading Iceberg split with equality deletes This commit introduces EqualityDeleteFileReader, which is used to read Iceberg splits with equality delete files. The equality delete files are read to construct domain filters or filter functions, which then would be evaluated in the base file readers. When there is only one equality delete field, and when that field is an Iceberg identifier field, i.e. non-floating point primitive types, the values would be converted to a list as a NOT IN domain filter, with the NULL treated separately. This domain filter would then be pushed to the ColumnReaders to filter our unwanted rows before they are read into Velox vectors. When the equality delete column is a nested column, e.g. a sub-column in a struct, or the key in a map, such column may not be in the base file ScanSpec. We need to add/remove these subfields to/from the SchemaWithId and ScanSpec recursively if they were not in the ScanSpec already. A test is also added for such case. If there are more than one equality delete field, or the field is not an Iceberg identifier field, the values would be converted to a typed expression in the conjunct of disconjunts form. This expression would be evaluated as the remaining filter function after the rows are read into the Velox vectors. Note that this only works for Presto now as the "neq" function is not registered by Spark. See https://github.com/ facebookincubator/issues/12667 Note that this commit only supports integral types. VARCHAR and VARBINARY need to be supported in future commits (see facebookincubator#12664). Co-authored-by: Naveen Kumar Mahadevuni <Naveen.Mahadevuni@ibm.com> Alchemy-item: (ID = 1153) Iceberg staging hub commit 2/6 - 14edb98c67f1c572a5f40682923795bd5b08e7c3 Support insert data into iceberg table. Add iceberg partition transforms. Co-authored-by: Chengcheng Jin <Chengcheng.Jin@ibm.com> Add NaN statistics to parquet writer. Collect Iceberg data file statistics in dwio. Integrate Iceberg data file statistics and adding unit test. Support write field_id to parquet metadata SchemaElement. Implement iceberg sort order Add clustered Iceberg writer mode. Fix parquet writer ut Add IcebergConnector Fix unittest error Resolve confict Resolve confict Fix test build issue Fix crash
1 parent 446cce3 commit d8b1672

File tree

128 files changed

+9749
-3885
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

128 files changed

+9749
-3885
lines changed

velox/connectors/hive/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ velox_add_library(
3131
HiveDataSource.cpp
3232
FileIndexReader.cpp
3333
HiveIndexSource.cpp
34-
HivePartitionName.cpp
34+
HivePartitionUtil.cpp
3535
PartitionIdGenerator.cpp
3636
SplitReader.cpp
3737
TableHandle.cpp
@@ -45,6 +45,7 @@ velox_link_libraries(
4545
velox_dwio_catalog_fbhive
4646
velox_hive_partition_function
4747
velox_key_encoder
48+
PUBLIC velox_hive_iceberg_splitreader
4849
)
4950

5051
velox_add_library(velox_hive_partition_function HivePartitionFunction.cpp)

velox/connectors/hive/HiveConfig.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,9 @@ std::string HiveConfig::schema(const config::ConfigBase* session) const {
316316
kSchema, config_->get<std::string>(kSchema, ""));
317317
}
318318

319+
bool HiveConfig::fanoutEnabled(const config::ConfigBase* session) const {
320+
return session->get<bool>(
321+
kFanoutEnabledSession, config_->get<bool>(kFanoutEnabled, true));
322+
}
323+
319324
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveConfig.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,12 @@ class HiveConfig {
254254
static constexpr const char* kSource = "source";
255255
static constexpr const char* kSchema = "schema";
256256

257+
/// Controls the writer mode, whether the fanout mode writer is enabled,
258+
/// default value is true, setting to false means clustered mode.
259+
/// Currently applies only to the Iceberg writer.
260+
static constexpr const char* kFanoutEnabled = "fanout-enabled";
261+
static constexpr const char* kFanoutEnabledSession = "fanout_enabled";
262+
257263
InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
258264
const config::ConfigBase* session) const;
259265

@@ -364,6 +370,9 @@ class HiveConfig {
364370
/// Schema of the query. Used for storage logging.
365371
std::string schema(const config::ConfigBase* session) const;
366372

373+
/// Return if fanout writer mode is enabled.
374+
bool fanoutEnabled(const config::ConfigBase* session) const;
375+
367376
HiveConfig(std::shared_ptr<const config::ConfigBase> config) {
368377
VELOX_CHECK_NOT_NULL(
369378
config, "Config is null for HiveConfig initialization");

velox/connectors/hive/HiveConnector.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ HiveConnector::HiveConnector(
5757
std::unique_ptr<DataSource> HiveConnector::createDataSource(
5858
const RowTypePtr& outputType,
5959
const ConnectorTableHandlePtr& tableHandle,
60-
const ColumnHandleMap& columnHandles,
60+
const std::unordered_map<std::string, ColumnHandlePtr>& columnHandles,
6161
ConnectorQueryCtx* connectorQueryCtx) {
6262
return std::make_unique<HiveDataSource>(
6363
outputType,

velox/connectors/hive/HiveConnectorUtil.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
#include "velox/expression/ExprToSubfieldFilter.h"
2626
#include "velox/expression/FieldReference.h"
2727

28+
#include <boost/lexical_cast.hpp>
29+
#include <boost/uuid/uuid_generators.hpp>
30+
#include <boost/uuid/uuid_io.hpp>
31+
2832
namespace facebook::velox::connector::hive {
2933
namespace {
3034

@@ -884,7 +888,6 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
884888
}
885889

886890
namespace {
887-
888891
core::CallTypedExprPtr replaceInputs(
889892
const core::CallTypedExpr* call,
890893
std::vector<core::TypedExprPtr>&& inputs) {
@@ -1057,6 +1060,10 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
10571060
}
10581061
} // namespace
10591062

1063+
std::string makeUuid() {
1064+
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
1065+
}
1066+
10601067
core::TypedExprPtr extractFiltersFromRemainingFilter(
10611068
const core::TypedExprPtr& expr,
10621069
core::ExpressionEvaluator* evaluator,

velox/connectors/hive/HiveConnectorUtil.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,4 +256,6 @@ std::unique_ptr<common::Filter> createRangeFilter(
256256
const variant& lower,
257257
const variant& upper);
258258

259+
std::string makeUuid();
260+
259261
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 59 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,34 @@ RowVectorPtr makeDataInput(
120120
input->getNullCount());
121121
}
122122

123-
// Creates a PartitionIdGenerator if the table is partitioned, otherwise returns
124-
// nullptr.
125-
std::unique_ptr<PartitionIdGenerator> createPartitionIdGenerator(
126-
const RowTypePtr& inputType,
127-
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle,
128-
const std::shared_ptr<const HiveConfig>& hiveConfig,
129-
const ConnectorQueryCtx* connectorQueryCtx) {
130-
auto partitionChannels = insertTableHandle->partitionChannels();
131-
if (partitionChannels.empty()) {
132-
return nullptr;
123+
// Returns a subset of column indices corresponding to partition keys.
124+
std::vector<column_index_t> getPartitionChannels(
125+
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
126+
std::vector<column_index_t> channels;
127+
128+
for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
129+
i++) {
130+
if (insertTableHandle->inputColumns()[i]->isPartitionKey()) {
131+
channels.push_back(i);
132+
}
133133
}
134-
return std::make_unique<PartitionIdGenerator>(
135-
inputType,
136-
partitionChannels,
137-
hiveConfig->maxPartitionsPerWriters(
138-
connectorQueryCtx->sessionProperties()),
139-
connectorQueryCtx->memoryPool());
134+
135+
return channels;
136+
}
137+
138+
// Returns the column indices of non-partition data columns.
139+
std::vector<column_index_t> getNonPartitionChannels(
140+
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
141+
std::vector<column_index_t> dataChannels;
142+
143+
for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
144+
i++) {
145+
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
146+
dataChannels.push_back(i);
147+
}
148+
}
149+
150+
return dataChannels;
140151
}
141152

142153
std::string makePartitionDirectory(
@@ -148,10 +159,6 @@ std::string makePartitionDirectory(
148159
return tableDirectory;
149160
}
150161

151-
std::string makeUuid() {
152-
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
153-
}
154-
155162
std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames() {
156163
return {
157164
{LocationHandle::TableType::kNew, "kNew"},
@@ -418,52 +425,6 @@ std::string HiveBucketProperty::toString() const {
418425
return out.str();
419426
}
420427

421-
HiveInsertTableHandle::HiveInsertTableHandle(
422-
std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns,
423-
std::shared_ptr<const LocationHandle> locationHandle,
424-
dwio::common::FileFormat storageFormat,
425-
std::shared_ptr<const HiveBucketProperty> bucketProperty,
426-
std::optional<common::CompressionKind> compressionKind,
427-
const std::unordered_map<std::string, std::string>& serdeParameters,
428-
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
429-
// When this option is set the HiveDataSink will always write a file even
430-
// if there's no data. This is useful when the table is bucketed, but the
431-
// engine handles ensuring a 1 to 1 mapping from task to bucket.
432-
const bool ensureFiles,
433-
std::shared_ptr<const FileNameGenerator> fileNameGenerator)
434-
: inputColumns_(std::move(inputColumns)),
435-
locationHandle_(std::move(locationHandle)),
436-
storageFormat_(storageFormat),
437-
bucketProperty_(std::move(bucketProperty)),
438-
compressionKind_(compressionKind),
439-
serdeParameters_(serdeParameters),
440-
writerOptions_(writerOptions),
441-
ensureFiles_(ensureFiles),
442-
fileNameGenerator_(std::move(fileNameGenerator)),
443-
partitionChannels_(computePartitionChannels(inputColumns_)),
444-
nonPartitionChannels_(computeNonPartitionChannels(inputColumns_)) {
445-
if (compressionKind.has_value()) {
446-
VELOX_CHECK(
447-
compressionKind.value() != common::CompressionKind_MAX,
448-
"Unsupported compression type: CompressionKind_MAX");
449-
}
450-
451-
if (ensureFiles_) {
452-
// If ensureFiles is set and either the bucketProperty is set or some
453-
// partition keys are in the data, there is not a 1:1 mapping from Task to
454-
// files so we can't proactively create writers.
455-
VELOX_CHECK(
456-
bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0,
457-
"ensureFiles is not supported with bucketing");
458-
459-
for (const auto& inputColumn : inputColumns_) {
460-
VELOX_CHECK(
461-
!inputColumn->isPartitionKey(),
462-
"ensureFiles is not supported with partition keys in the data");
463-
}
464-
}
465-
}
466-
467428
HiveDataSink::HiveDataSink(
468429
RowTypePtr inputType,
469430
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
@@ -482,13 +443,18 @@ HiveDataSink::HiveDataSink(
482443
*insertTableHandle->bucketProperty(),
483444
inputType)
484445
: nullptr,
485-
insertTableHandle->partitionChannels(),
486-
insertTableHandle->nonPartitionChannels(),
487-
createPartitionIdGenerator(
488-
inputType,
489-
insertTableHandle,
490-
hiveConfig,
491-
connectorQueryCtx)) {}
446+
getPartitionChannels(insertTableHandle),
447+
getNonPartitionChannels(insertTableHandle),
448+
!getPartitionChannels(insertTableHandle).empty()
449+
? std::make_unique<PartitionIdGenerator>(
450+
inputType,
451+
getPartitionChannels(insertTableHandle),
452+
hiveConfig->maxPartitionsPerWriters(
453+
connectorQueryCtx->sessionProperties()),
454+
connectorQueryCtx->memoryPool(),
455+
hiveConfig->isPartitionPathAsLowerCase(
456+
connectorQueryCtx->sessionProperties()))
457+
: nullptr) {}
492458

493459
HiveDataSink::HiveDataSink(
494460
RowTypePtr inputType,
@@ -592,6 +558,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
592558
// Compute partition and bucket numbers.
593559
computePartitionAndBucketIds(input);
594560

561+
splitInputRowsAndEnsureWriters(input);
562+
595563
// All inputs belong to a single non-bucketed partition. The partition id
596564
// must be zero.
597565
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
@@ -600,8 +568,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
600568
return;
601569
}
602570

603-
splitInputRowsAndEnsureWriters();
604-
605571
for (auto index = 0; index < writers_.size(); ++index) {
606572
const vector_size_t partitionSize = partitionSizes_[index];
607573
if (partitionSize == 0) {
@@ -1014,7 +980,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
1014980

1015981
std::optional<std::string> partitionName;
1016982
if (isPartitioned()) {
1017-
partitionName = getPartitionName(id.partitionId.value());
983+
partitionName = getPartitionName(id);
1018984
}
1019985

1020986
// Without explicitly setting flush policy, the default memory based flush
@@ -1082,13 +1048,14 @@ std::unique_ptr<dwio::common::Writer> HiveDataSink::createWriterForIndex(
10821048
return maybeCreateBucketSortWriter(writerIndex, std::move(writer));
10831049
}
10841050

1085-
std::string HiveDataSink::getPartitionName(uint32_t partitionId) const {
1086-
VELOX_CHECK_NOT_NULL(partitionIdGenerator_);
1087-
1088-
return HivePartitionName::partitionName(
1089-
partitionId,
1090-
partitionIdGenerator_->partitionValues(),
1091-
partitionKeyAsLowerCase_);
1051+
std::optional<std::string> HiveDataSink::getPartitionName(
1052+
const HiveWriterId& id) const {
1053+
std::optional<std::string> partitionName;
1054+
if (isPartitioned()) {
1055+
partitionName =
1056+
partitionIdGenerator_->partitionName(id.partitionId.value());
1057+
}
1058+
return partitionName;
10921059
}
10931060

10941061
std::unique_ptr<facebook::velox::dwio::common::Writer>
@@ -1120,6 +1087,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
11201087
sortWriterFinishTimeSliceLimitMs_);
11211088
}
11221089

1090+
void HiveDataSink::extendBuffersForPartitionedTables() {
1091+
// Extends the buffer used for partition rows calculations.
1092+
partitionSizes_.emplace_back(0);
1093+
partitionRows_.emplace_back(nullptr);
1094+
rawPartitionRows_.emplace_back(nullptr);
1095+
}
1096+
11231097
HiveWriterId HiveDataSink::getWriterId(size_t row) const {
11241098
std::optional<int32_t> partitionId;
11251099
if (isPartitioned()) {
@@ -1152,7 +1126,7 @@ void HiveDataSink::updatePartitionRows(
11521126
++partitionSizes_[index];
11531127
}
11541128

1155-
void HiveDataSink::splitInputRowsAndEnsureWriters() {
1129+
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
11561130
VELOX_CHECK(isPartitioned() || isBucketed());
11571131
if (isBucketed() && isPartitioned()) {
11581132
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
@@ -1165,6 +1139,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
11651139
for (auto row = 0; row < numRows; ++row) {
11661140
const auto id = getWriterId(row);
11671141
const uint32_t index = ensureWriter(id);
1142+
11681143
updatePartitionRows(index, numRows, row);
11691144
}
11701145

0 commit comments

Comments
 (0)