Skip to content

Commit e057d7a

Browse files
committed
misc: Add config and session property for exchange checksum
Add exchange.checksum-enabled config property and native_exchange_checksum session property. The default value is false, meaning crc32 checksum computation is disabled by default.
1 parent 8e0b3a6 commit e057d7a

File tree

8 files changed

+38
-11
lines changed

8 files changed

+38
-11
lines changed

velox/core/QueryConfig.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,9 @@ class QueryConfig {
721721
static constexpr const char* kRowSizeTrackingEnabled =
722722
"row_size_tracking_enabled";
723723

724+
static constexpr const char* kExchangeChecksum =
725+
"exchange_checksum";
726+
724727
bool selectiveNimbleReaderEnabled() const {
725728
return get<bool>(kSelectiveNimbleReaderEnabled, false);
726729
}
@@ -1296,6 +1299,10 @@ class QueryConfig {
12961299
return get<std::string>(kClientTags, "");
12971300
}
12981301

1302+
bool isExchangeChecksumEnabled() const {
1303+
return get<bool>(kExchangeChecksum, false);
1304+
}
1305+
12991306
template <typename T>
13001307
T get(const std::string& key, const T& defaultValue) const {
13011308
return config_->get<T>(key, defaultValue);

velox/exec/PartitionedOutput.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
3030
options->compressionKind =
3131
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
3232
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
33+
options->exchangeChecksum = queryConfig.isExchangeChecksumEnabled();
3334
return options;
3435
}
3536
} // namespace
@@ -126,7 +127,12 @@ BlockingReason Destination::flush(
126127

127128
// Upper limit of message size with no columns.
128129
constexpr int32_t kMinMessageSize = 128;
129-
auto listener = bufferManager.newListener();
130+
131+
std::unique_ptr<OutputStreamListener> listener(nullptr);
132+
if (serdeOptions_->exchangeChecksum) {
133+
listener = bufferManager.newListener();
134+
}
135+
130136
IOBufOutputStream stream(
131137
*current_->pool(),
132138
listener.get(),

velox/serializers/PrestoSerializer.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,14 @@ void PrestoVectorSerde::serializeSingleColumn(
247247
Scratch scratch;
248248
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);
249249

250-
PrestoOutputStreamListener listener;
251-
OStreamOutputStream outputStream(output, &listener);
252-
stream->flush(&outputStream);
250+
if (!opts->exchangeChecksum) {
251+
OStreamOutputStream outputStream(output, nullptr);
252+
stream->flush(&outputStream);
253+
} else {
254+
PrestoOutputStreamListener listener;
255+
OStreamOutputStream outputStream(output, &listener);
256+
stream->flush(&outputStream);
257+
}
253258
}
254259

255260
// static

velox/serializers/PrestoSerializer.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,12 @@ class PrestoVectorSerde : public VectorSerde {
5757
common::CompressionKind _compressionKind,
5858
float _minCompressionRatio = 0.8,
5959
bool _nullsFirst = false,
60-
bool _preserveEncodings = false)
61-
: VectorSerde::Options(_compressionKind, _minCompressionRatio),
60+
bool _preserveEncodings = false,
61+
bool _exchangeChecksum = false)
62+
: VectorSerde::Options(
63+
_compressionKind,
64+
_minCompressionRatio,
65+
_exchangeChecksum),
6266
useLosslessTimestamp(_useLosslessTimestamp),
6367
nullsFirst(_nullsFirst),
6468
preserveEncodings(_preserveEncodings) {}

velox/serializers/tests/CompactRowSerializerTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class CompactRowSerializerTest : public ::testing::Test,
7777
appendRow_ = GetParam().appendRow;
7878
compressionKind_ = GetParam().compressionKind;
7979
microBatchDeserialize_ = GetParam().microBatchDeserialize;
80-
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
80+
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8, false);
8181
}
8282

8383
void TearDown() override {

velox/serializers/tests/SerializedPageFileTest.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class SerializedPageFileTest : public ::testing::TestWithParam<TestParams>,
101101
false); // preserveEncodings
102102
case SerdeType::kCompactRow:
103103
case SerdeType::kUnsafeRow:
104-
return std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
104+
return std::make_unique<VectorSerde::Options>(
105+
compressionKind_, 0.8, false);
105106
}
106107
return nullptr;
107108
}

velox/serializers/tests/UnsafeRowSerializerTest.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class UnsafeRowSerializerTest : public ::testing::Test,
7676
appendRow_ = GetParam().appendRow;
7777
compressionKind_ = GetParam().compressionKind;
7878
microBatchDeserialize_ = GetParam().microBatchDeserialize;
79-
options_ = std::make_unique<VectorSerde::Options>(compressionKind_, 0.8);
79+
options_ =
80+
std::make_unique<VectorSerde::Options>(compressionKind_, 0.8, false);
8081
}
8182

8283
void TearDown() override {

velox/vector/VectorStream.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,11 @@ class VectorSerde {
219219

220220
Options(
221221
common::CompressionKind _compressionKind,
222-
float _minCompressionRatio)
222+
float _minCompressionRatio,
223+
bool _exchangeChecksum)
223224
: compressionKind(_compressionKind),
224-
minCompressionRatio(_minCompressionRatio) {}
225+
minCompressionRatio(_minCompressionRatio),
226+
exchangeChecksum(_exchangeChecksum) {}
225227

226228
virtual ~Options() = default;
227229

@@ -231,6 +233,7 @@ class VectorSerde {
231233
/// than this causes subsequent compression attempts to be skipped. The more
232234
/// times compression misses the target the less frequently it is tried.
233235
float minCompressionRatio{0.8};
236+
bool exchangeChecksum{false};
234237
};
235238

236239
Kind kind() const {

0 commit comments

Comments
 (0)