Skip to content

Commit 01620f5

Browse files
committed
Add option to disable crc32c in exchange
1 parent 0477c1c commit 01620f5

File tree

5 files changed

+33
-8
lines changed

5 files changed

+33
-8
lines changed

velox/core/QueryConfig.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,9 @@ class QueryConfig {
716716
static constexpr const char* kRowSizeTrackingEnabled =
717717
"row_size_tracking_enabled";
718718

719+
static constexpr const char* kDisableCrc32ForShuffle =
720+
"disable_crc32_for_shuffle";
721+
719722
bool selectiveNimbleReaderEnabled() const {
720723
return get<bool>(kSelectiveNimbleReaderEnabled, false);
721724
}
@@ -1291,6 +1294,10 @@ class QueryConfig {
12911294
return get<std::string>(kClientTags, "");
12921295
}
12931296

1297+
bool isDisableCrc32ForShuffleEnabled() const {
1298+
return get<bool>(kDisableCrc32ForShuffle, false);
1299+
}
1300+
12941301
template <typename T>
12951302
T get(const std::string& key, const T& defaultValue) const {
12961303
return config_->get<T>(key, defaultValue);

velox/exec/PartitionedOutput.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
3030
options->compressionKind =
3131
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
3232
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
33+
options->disableCrc32c = queryConfig.isDisableCrc32ForShuffleEnabled();
3334
return options;
3435
}
3536
} // namespace
@@ -126,7 +127,12 @@ BlockingReason Destination::flush(
126127

127128
// Upper limit of message size with no columns.
128129
constexpr int32_t kMinMessageSize = 128;
129-
auto listener = bufferManager.newListener();
130+
131+
std::unique_ptr<OutputStreamListener> listener(nullptr);
132+
if (!serdeOptions_->disableCrc32c) {
133+
listener = bufferManager.newListener();
134+
}
135+
130136
IOBufOutputStream stream(
131137
*current_->pool(),
132138
listener.get(),

velox/serializers/PrestoSerializer.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,14 @@ void PrestoVectorSerde::serializeSingleColumn(
247247
Scratch scratch;
248248
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);
249249

250-
PrestoOutputStreamListener listener;
251-
OStreamOutputStream outputStream(output, &listener);
252-
stream->flush(&outputStream);
250+
if (opts->disableCrc32c) {
251+
OStreamOutputStream outputStream(output, nullptr);
252+
stream->flush(&outputStream);
253+
} else {
254+
PrestoOutputStreamListener listener;
255+
OStreamOutputStream outputStream(output, &listener);
256+
stream->flush(&outputStream);
257+
}
253258
}
254259

255260
// static

velox/serializers/PrestoSerializer.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,12 @@ class PrestoVectorSerde : public VectorSerde {
5757
common::CompressionKind _compressionKind,
5858
float _minCompressionRatio = 0.8,
5959
bool _nullsFirst = false,
60-
bool _preserveEncodings = false)
61-
: VectorSerde::Options(_compressionKind, _minCompressionRatio),
60+
bool _preserveEncodings = false,
61+
bool _disableCrc32c = false)
62+
: VectorSerde::Options(
63+
_compressionKind,
64+
_minCompressionRatio,
65+
_disableCrc32c),
6266
useLosslessTimestamp(_useLosslessTimestamp),
6367
nullsFirst(_nullsFirst),
6468
preserveEncodings(_preserveEncodings) {}

velox/vector/VectorStream.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,11 @@ class VectorSerde {
219219

220220
Options(
221221
common::CompressionKind _compressionKind,
222-
float _minCompressionRatio)
222+
float _minCompressionRatio,
223+
bool _disableCrc32c)
223224
: compressionKind(_compressionKind),
224-
minCompressionRatio(_minCompressionRatio) {}
225+
minCompressionRatio(_minCompressionRatio),
226+
disableCrc32c(_disableCrc32c) {}
225227

226228
virtual ~Options() = default;
227229

@@ -231,6 +233,7 @@ class VectorSerde {
231233
/// than this causes subsequent compression attempts to be skipped. The more
232234
/// times compression misses the target the less frequently it is tried.
233235
float minCompressionRatio{0.8};
236+
bool disableCrc32c{false};
234237
};
235238

236239
Kind kind() const {

0 commit comments

Comments
 (0)