Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ class QueryConfig {
static constexpr const char* kRowSizeTrackingEnabled =
"row_size_tracking_enabled";

static constexpr const char* kDisableCrc32ForShuffle =
"disable_crc32_for_shuffle";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -1291,6 +1294,10 @@ class QueryConfig {
return get<std::string>(kClientTags, "");
}

bool isDisableCrc32ForShuffleEnabled() const {
return get<bool>(kDisableCrc32ForShuffle, false);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
options->compressionKind =
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
options->disableCrc32c = queryConfig.isDisableCrc32ForShuffleEnabled();
return options;
}
} // namespace
Expand Down Expand Up @@ -126,7 +127,12 @@ BlockingReason Destination::flush(

// Upper limit of message size with no columns.
constexpr int32_t kMinMessageSize = 128;
auto listener = bufferManager.newListener();

std::unique_ptr<OutputStreamListener> listener(nullptr);
if (!serdeOptions_->disableCrc32c) {
listener = bufferManager.newListener();
}

IOBufOutputStream stream(
*current_->pool(),
listener.get(),
Expand Down
11 changes: 8 additions & 3 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,14 @@ void PrestoVectorSerde::serializeSingleColumn(
Scratch scratch;
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);

PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
if (opts->disableCrc32c) {
OStreamOutputStream outputStream(output, nullptr);
stream->flush(&outputStream);
} else {
PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
}
}

// static
Expand Down
8 changes: 6 additions & 2 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ class PrestoVectorSerde : public VectorSerde {
common::CompressionKind _compressionKind,
float _minCompressionRatio = 0.8,
bool _nullsFirst = false,
bool _preserveEncodings = false)
: VectorSerde::Options(_compressionKind, _minCompressionRatio),
bool _preserveEncodings = false,
bool _disableCrc32c = false)
: VectorSerde::Options(
_compressionKind,
_minCompressionRatio,
_disableCrc32c),
useLosslessTimestamp(_useLosslessTimestamp),
nullsFirst(_nullsFirst),
preserveEncodings(_preserveEncodings) {}
Expand Down
2 changes: 1 addition & 1 deletion velox/serializers/tests/CompactRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CompactRowSerializerTest : public ::testing::Test,
appendRow_ = GetParam().appendRow;
compressionKind_ = GetParam().compressionKind;
microBatchDeserialize_ = GetParam().microBatchDeserialize;
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8, false);
}

void TearDown() override {
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/SerializedPageFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class SerializedPageFileTest : public ::testing::TestWithParam<TestParams>,
false); // preserveEncodings
case SerdeType::kCompactRow:
case SerdeType::kUnsafeRow:
return std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
return std::make_unique<VectorSerde::Options>(
compressionKind_, 0.8, false);
}
return nullptr;
}
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/UnsafeRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class UnsafeRowSerializerTest : public ::testing::Test,
VectorSerde::Kind::kUnsafeRow);
appendRow_ = GetParam().appendRow;
compressionKind_ = GetParam().compressionKind;
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
options_ =
std::make_unique<VectorSerde::Options>(compressionKind_, 0.8, false);
}

void TearDown() override {
Expand Down
7 changes: 5 additions & 2 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,11 @@ class VectorSerde {

Options(
common::CompressionKind _compressionKind,
float _minCompressionRatio)
float _minCompressionRatio,
bool _disableCrc32c)
: compressionKind(_compressionKind),
minCompressionRatio(_minCompressionRatio) {}
minCompressionRatio(_minCompressionRatio),
disableCrc32c(_disableCrc32c) {}

virtual ~Options() = default;

Expand All @@ -231,6 +233,7 @@ class VectorSerde {
/// than this causes subsequent compression attempts to be skipped. The more
/// times compression misses the target the less frequently it is tried.
float minCompressionRatio{0.8};
bool disableCrc32c{false};
};

Kind kind() const {
Expand Down