Skip to content

Commit 23acbac

Browse files
Yuhtafacebook-github-bot
authored andcommitted
Move Velox batch reader and writer to OSS nimble repo (facebookincubator#62)
Summary: Pull Request resolved: facebookincubator#62 Just move files around, no functionality change. Differential Revision: D58109796
1 parent acad60d commit 23acbac

12 files changed

+1397
-0
lines changed

dwio/nimble/velox/NimbleConfig.cpp

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "dwio/nimble/velox/NimbleConfig.h"
17+
18+
#include "dwio/nimble/encodings/EncodingSelectionPolicy.h"
19+
20+
#include <folly/json/dynamic.h>
21+
#include <folly/json/json.h>
22+
23+
DEFINE_string(
24+
nimble_selection_read_factors,
25+
"Constant=1.0;Trivial=0.5;FixedBitWidth=0.9;MainlyConstant=1.0;SparseBool=1.0;Dictionary=1.0;RLE=1.0;Varint=1.0",
26+
"Encoding selection read factors, in the format: "
27+
"<EncodingName>=<FactorFloatValue>;<EncodingName>=<FactorFloatValue>;...");
28+
29+
DEFINE_double(
30+
nimble_selection_compression_accept_ratio,
31+
0.97,
32+
"Encoding selection compression accept ratio.");
33+
34+
DEFINE_bool(
35+
nimble_zstrong_enable_variable_bit_width_compressor,
36+
false,
37+
"Enable zstrong variable bit width compressor at write time. Transparent at read time.");
38+
39+
DEFINE_string(
40+
nimble_writer_input_buffer_default_growth_config,
41+
"{\"32\":4.0,\"512\":1.414,\"4096\":1.189}",
42+
"Default growth config for writer input buffers, each entry in the format of {range_start,growth_factor}");
43+
44+
namespace facebook::nimble {
45+
namespace {
46+
template <typename T>
47+
std::vector<T> parseVector(const std::string& str) {
48+
std::vector<T> result;
49+
if (!str.empty()) {
50+
std::vector<folly::StringPiece> pieces;
51+
folly::split(',', str, pieces, true);
52+
for (auto& p : pieces) {
53+
const auto& trimmedCol = folly::trimWhitespace(p);
54+
if (!trimmedCol.empty()) {
55+
result.push_back(folly::to<T>(trimmedCol));
56+
}
57+
}
58+
}
59+
return result;
60+
}
61+
62+
std::map<uint64_t, float> parseGrowthConfigMap(const std::string& str) {
63+
std::map<uint64_t, float> ret;
64+
NIMBLE_CHECK(!str.empty(), "Can't supply an empty growth config.");
65+
folly::dynamic json = folly::parseJson(str);
66+
for (const auto& pair : json.items()) {
67+
auto [_, inserted] = ret.emplace(
68+
folly::to<uint64_t>(pair.first.asString()), pair.second.asDouble());
69+
NIMBLE_CHECK(
70+
inserted, fmt::format("Duplicate key: {}.", pair.first.asString()));
71+
}
72+
return ret;
73+
}
74+
} // namespace
75+
76+
/* static */ Config::Entry<bool> Config::FLATTEN_MAP("orc.flatten.map", false);
77+
78+
/* static */ Config::Entry<const std::vector<uint32_t>> Config::MAP_FLAT_COLS(
79+
"orc.map.flat.cols",
80+
{},
81+
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
82+
[](const std::string& /* key */, const std::string& val) {
83+
return parseVector<uint32_t>(val);
84+
});
85+
86+
/* static */ Config::Entry<const std::vector<uint32_t>>
87+
Config::DEDUPLICATED_COLS(
88+
"alpha.map.deduplicated.cols",
89+
{},
90+
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
91+
[](const std::string& /* key */, const std::string& val) {
92+
return parseVector<uint32_t>(val);
93+
});
94+
95+
/* static */ Config::Entry<const std::vector<uint32_t>>
96+
Config::BATCH_REUSE_COLS(
97+
"alpha.dictionaryarray.cols",
98+
{},
99+
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
100+
[](const std::string& /* key */, const std::string& val) {
101+
return parseVector<uint32_t>(val);
102+
});
103+
104+
/* static */ Config::Entry<uint64_t> Config::RAW_STRIPE_SIZE(
105+
"alpha.raw.stripe.size",
106+
512L * 1024L * 1024L);
107+
108+
/* static */ Config::Entry<const std::vector<std::pair<EncodingType, float>>>
109+
Config::MANUAL_ENCODING_SELECTION_READ_FACTORS(
110+
"alpha.encodingselection.read.factors",
111+
ManualEncodingSelectionPolicyFactory::parseReadFactors(
112+
FLAGS_nimble_selection_read_factors),
113+
[](const std::vector<std::pair<EncodingType, float>>& val) {
114+
std::vector<std::string> encodingFactorStrings;
115+
std::transform(
116+
val.cbegin(),
117+
val.cend(),
118+
std::back_inserter(encodingFactorStrings),
119+
[](const auto& readFactor) {
120+
return fmt::format(
121+
"{}={}", toString(readFactor.first), readFactor.second);
122+
});
123+
return folly::join(";", encodingFactorStrings);
124+
},
125+
[](const std::string& /* key */, const std::string& val) {
126+
return ManualEncodingSelectionPolicyFactory::parseReadFactors(val);
127+
});
128+
129+
/* static */ Config::Entry<float>
130+
Config::ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO(
131+
"alpha.encodingselection.compression.accept.ratio",
132+
FLAGS_nimble_selection_compression_accept_ratio);
133+
134+
/* static */ Config::Entry<uint32_t> Config::ZSTRONG_COMPRESSION_LEVEL(
135+
"alpha.zstrong.compression.level",
136+
4);
137+
138+
/* static */ Config::Entry<uint32_t> Config::ZSTRONG_DECOMPRESSION_LEVEL(
139+
"alpha.zstrong.decompression.level",
140+
2);
141+
142+
/* static */ Config::Entry<bool>
143+
Config::ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR(
144+
"alpha.zstrong.enable.variable.bit.width.compressor",
145+
FLAGS_nimble_zstrong_enable_variable_bit_width_compressor);
146+
147+
/* static */ Config::Entry<const std::map<uint64_t, float>>
148+
Config::INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS(
149+
"alpha.writer.input.buffer.default.growth.configs",
150+
parseGrowthConfigMap(
151+
FLAGS_nimble_writer_input_buffer_default_growth_config),
152+
[](const std::map<uint64_t, float>& val) {
153+
folly::dynamic obj = folly::dynamic::object;
154+
for (const auto& [rangeStart, growthFactor] : val) {
155+
obj[folly::to<std::string>(rangeStart)] = growthFactor;
156+
}
157+
return folly::toJson(obj);
158+
},
159+
[](const std::string& /* key */, const std::string& val) {
160+
return parseGrowthConfigMap(val);
161+
});
162+
} // namespace facebook::nimble

dwio/nimble/velox/NimbleConfig.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "dwio/nimble/common/Types.h"
19+
20+
#include "velox/common/config/Config.h"
21+
22+
namespace facebook::nimble {
23+
24+
class Config : public velox::common::ConfigBase<Config> {
25+
public:
26+
template <typename T>
27+
using Entry = velox::common::ConfigBase<Config>::Entry<T>;
28+
29+
static Entry<bool> FLATTEN_MAP;
30+
static Entry<const std::vector<uint32_t>> MAP_FLAT_COLS;
31+
static Entry<const std::vector<uint32_t>> BATCH_REUSE_COLS;
32+
static Entry<const std::vector<uint32_t>> DEDUPLICATED_COLS;
33+
static Entry<uint64_t> RAW_STRIPE_SIZE;
34+
static Entry<const std::vector<std::pair<EncodingType, float>>>
35+
MANUAL_ENCODING_SELECTION_READ_FACTORS;
36+
static Entry<float> ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO;
37+
static Entry<uint32_t> ZSTRONG_COMPRESSION_LEVEL;
38+
static Entry<uint32_t> ZSTRONG_DECOMPRESSION_LEVEL;
39+
static Entry<bool> ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR;
40+
static Entry<const std::map<uint64_t, float>>
41+
INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS;
42+
43+
static std::shared_ptr<Config> fromMap(
44+
const std::map<std::string, std::string>& map) {
45+
auto ret = std::make_shared<Config>();
46+
ret->configs_.insert(map.cbegin(), map.cend());
47+
return ret;
48+
}
49+
};
50+
51+
} // namespace facebook::nimble

dwio/nimble/velox/NimbleReader.cpp

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "dwio/nimble/velox/NimbleReader.h"
18+
19+
#include "dwio/nimble/tablet/Constants.h"
20+
#include "dwio/nimble/velox/VeloxUtil.h"
21+
22+
namespace facebook::velox::nimble {
23+
24+
namespace {
25+
26+
const std::vector<std::string> kPreloadOptionalSections = {
27+
std::string(facebook::nimble::kSchemaSection)};
28+
29+
class NimbleRowReader : public dwio::common::RowReader {
30+
public:
31+
NimbleRowReader(
32+
std::unique_ptr<facebook::nimble::VeloxReader> reader,
33+
const std::shared_ptr<common::ScanSpec>& scanSpec)
34+
: reader_(std::move(reader)), scanSpec_(scanSpec) {
35+
reader_->loadStripeIfAny();
36+
}
37+
38+
int64_t nextRowNumber() override {
39+
VELOX_NYI();
40+
}
41+
42+
int64_t nextReadSize(uint64_t /*size*/) override {
43+
VELOX_NYI();
44+
}
45+
46+
uint64_t next(
47+
uint64_t size,
48+
VectorPtr& result,
49+
const dwio::common::Mutation* mutation) override {
50+
TypePtr resultType;
51+
VectorPtr rawResult;
52+
if (result) {
53+
resultType = result->type();
54+
rawResult = std::move(rawVectorForBatchReader(*result));
55+
result.reset();
56+
}
57+
if (!reader_->next(size, rawResult)) {
58+
if (rawResult) {
59+
result = BaseVector::create(resultType, 0, &reader_->memoryPool());
60+
rawVectorForBatchReader(*result) = std::move(rawResult);
61+
}
62+
return 0;
63+
}
64+
auto scanned = rawResult->size();
65+
result = projectColumns(rawResult, *scanSpec_, mutation);
66+
rawVectorForBatchReader(*result) = std::move(rawResult);
67+
return scanned;
68+
}
69+
70+
void updateRuntimeStats(
71+
dwio::common::RuntimeStatistics& /*stats*/) const override {
72+
// No-op for non-selective reader.
73+
}
74+
75+
void resetFilterCaches() override {
76+
// No-op for non-selective reader.
77+
}
78+
79+
std::optional<size_t> estimatedRowSize() const override {
80+
return std::optional(reader_->estimatedRowSize());
81+
}
82+
83+
private:
84+
std::unique_ptr<facebook::nimble::VeloxReader> reader_;
85+
std::shared_ptr<common::ScanSpec> scanSpec_;
86+
87+
static VectorPtr& rawVectorForBatchReader(BaseVector& vector) {
88+
auto* rowVector = vector.as<RowVector>();
89+
VELOX_CHECK_NOT_NULL(rowVector);
90+
return rowVector->rawVectorForBatchReader();
91+
}
92+
};
93+
94+
class NimbleReader : public dwio::common::Reader {
95+
public:
96+
NimbleReader(
97+
const dwio::common::ReaderOptions& options,
98+
const std::shared_ptr<ReadFile>& readFile)
99+
: options_(options),
100+
readFile_(readFile),
101+
tabletReader_(std::make_shared<facebook::nimble::TabletReader>(
102+
options.getMemoryPool(),
103+
readFile_.get(),
104+
kPreloadOptionalSections)) {
105+
if (!options_.getFileSchema()) {
106+
facebook::nimble::VeloxReader tmpReader(
107+
options.getMemoryPool(), tabletReader_);
108+
options_.setFileSchema(tmpReader.type());
109+
}
110+
}
111+
112+
std::optional<uint64_t> numberOfRows() const override {
113+
return tabletReader_->tabletRowCount();
114+
}
115+
116+
std::unique_ptr<dwio::common::ColumnStatistics> columnStatistics(
117+
uint32_t /*index*/) const override {
118+
// TODO
119+
return nullptr;
120+
}
121+
122+
const RowTypePtr& rowType() const override {
123+
return options_.getFileSchema();
124+
}
125+
126+
const std::shared_ptr<const dwio::common::TypeWithId>& typeWithId()
127+
const override {
128+
if (!typeWithId_) {
129+
typeWithId_ = dwio::common::TypeWithId::create(rowType());
130+
}
131+
return typeWithId_;
132+
}
133+
134+
std::unique_ptr<dwio::common::RowReader> createRowReader(
135+
const dwio::common::RowReaderOptions& options) const override {
136+
facebook::nimble::VeloxReadParams params;
137+
params.fileRangeStartOffset = options.getOffset();
138+
params.fileRangeEndOffset = options.getLimit();
139+
params.decodingExecutor = options.getDecodingExecutor();
140+
auto selector = options.getSelector();
141+
if (!selector) {
142+
selector = std::make_shared<dwio::common::ColumnSelector>(rowType());
143+
}
144+
facebook::dwio::api::populateFeatureSelector(
145+
*selector, options.getMapColumnIdAsStruct(), params);
146+
auto reader = std::make_unique<facebook::nimble::VeloxReader>(
147+
options_.getMemoryPool(),
148+
tabletReader_,
149+
std::move(selector),
150+
std::move(params));
151+
return std::make_unique<NimbleRowReader>(
152+
std::move(reader), options.getScanSpec());
153+
}
154+
155+
private:
156+
dwio::common::ReaderOptions options_;
157+
std::shared_ptr<ReadFile> readFile_;
158+
std::shared_ptr<facebook::nimble::TabletReader> tabletReader_;
159+
mutable std::shared_ptr<const dwio::common::TypeWithId> typeWithId_;
160+
};
161+
162+
} // namespace
163+
164+
std::unique_ptr<dwio::common::Reader> NimbleReaderFactory::createReader(
165+
std::unique_ptr<dwio::common::BufferedInput> input,
166+
const dwio::common::ReaderOptions& options) {
167+
return std::make_unique<NimbleReader>(options, input->getReadFile());
168+
}
169+
170+
void registerNimbleReaderFactory() {
171+
dwio::common::registerReaderFactory(std::make_shared<NimbleReaderFactory>());
172+
}
173+
174+
void unregisterNimbleReaderFactory() {
175+
dwio::common::unregisterReaderFactory(dwio::common::FileFormat::NIMBLE);
176+
}
177+
178+
} // namespace facebook::velox::nimble

0 commit comments

Comments
 (0)