Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions velox/serializers/PrestoSerializerDeserializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,56 @@ void readStructNulls(
source->skip(numValues * sizeof(T));
}

// Companion to `tryReadWidenedColumn` for the two-pass null-tracking pre-pass.
// When the producer emitted a narrow encoding but the consumer declared a wider
// type, the first (structured-nulls) pass would otherwise throw via
// `checkTypeEncoding` before the main read pass had a chance to coerce. Here we
// just need to skip the right number of bytes — `numValues * sizeof(SourceT)`,
// using the SOURCE width implied by the encoding, not the wider target. Returns
// true if the (encoding, columnType) pair is a known widening and the source
// bytes have been consumed; false otherwise (caller falls through to the
// regular encoding-mismatch path).
bool tryStructNullsSkipWidened(
std::string_view encoding,
const TypePtr& columnType,
ByteInputStream* source,
Scratch& scratch) {
size_t sourceBytes;
bool isWidening = false;
if (encoding == kByteArray) {
// TINYINT (int8_t) on the wire.
sourceBytes = 1;
auto k = columnType->kind();
isWidening = (k == TypeKind::SMALLINT || k == TypeKind::INTEGER ||
k == TypeKind::BIGINT);
} else if (encoding == kShortArray) {
// SMALLINT (int16_t) on the wire.
sourceBytes = 2;
auto k = columnType->kind();
isWidening = (k == TypeKind::INTEGER || k == TypeKind::BIGINT);
} else if (encoding == kIntArray) {
// INTEGER / REAL / DATE on the wire (all 4 bytes).
sourceBytes = 4;
if (columnType->isDate()) {
// DATE on the wire as the *consumer* type means encoding == kIntArray ==
// typeToEncodingName(DATE) — not a widening, normal path handles it.
return false;
}
auto k = columnType->kind();
isWidening = (k == TypeKind::BIGINT || k == TypeKind::DOUBLE ||
k == TypeKind::TIMESTAMP);
} else {
return false;
}
if (!isWidening) {
return false;
}
const int32_t size = source->read<int32_t>();
auto numValues = valueCount(source, size, scratch);
source->skip(numValues * sourceBytes);
return true;
}

template <>
void readStructNulls<StringView>(
ByteInputStream* source,
Expand Down Expand Up @@ -264,6 +314,14 @@ void readStructNullsColumns(
readDictionaryVectorStructNulls(
source, columnType, useLosslessTimestamp, scratch);
} else {
// Consumer-side widening (PushDownWidenCast). If the producer emitted a
// narrower encoding for this column than the consumer declared, skip the
// right number of *source* bytes here so the next pass can deserialize
// with the matching `tryReadWidenedColumn` path. Without this, the
// checkTypeEncoding call below throws on the first (null-tracking) pass.
if (tryStructNullsSkipWidened(encoding, columnType, source, scratch)) {
continue;
}
checkTypeEncoding(encoding, columnType);
const auto it = readers.find(
isIPPrefixType(columnType) ? TypeKind::VARCHAR : columnType->kind());
Expand Down Expand Up @@ -1271,6 +1329,169 @@ bool tryReadNullColumn(
return true;
}

// Reads `size` narrow-typed values (`SourceT`) from `source`, applies `cvt` to
// each, and writes the widened value into the `TargetT`-typed `values` buffer.
// Used to support PushDownWidenCast-style consumer-side coercion: when the
// producer fragment emits narrow-typed data on the wire but the consumer's
// RemoteSource has been rewritten to a wider declared type, the Exchange
// operator (which calls into this serializer) coerces narrow->wide as it
// emits pages.
template <typename SourceT, typename TargetT, typename Converter>
void readWideningValues(
ByteInputStream* source,
vector_size_t size,
vector_size_t offset,
const BufferPtr& nulls,
vector_size_t nullCount,
const BufferPtr& values,
Converter cvt) {
auto* rawValues = values->asMutable<TargetT>();
if (nullCount) {
int32_t toClear = offset;
bits::forEachSetBit(
nulls->as<uint64_t>(), offset, offset + size, [&](int32_t row) {
// Set the values between the last non-null and this to type default.
for (; toClear < row; ++toClear) {
rawValues[toClear] = TargetT{};
}
rawValues[row] = cvt(source->read<SourceT>());
toClear = row + 1;
});
} else {
for (int32_t row = offset; row < offset + size; ++row) {
rawValues[row] = cvt(source->read<SourceT>());
}
}
}

// Top-level widened column reader: reads the size + nulls header, then dispatches
// to `readWideningValues<SourceT, TargetT>` to fill `result`. `result` must be a
// FlatVector<TargetT> of the wide `columnType` (the caller ensures this).
template <typename SourceT, typename TargetT, typename Converter>
void readWidened(
ByteInputStream* source,
const TypePtr& columnType,
vector_size_t resultOffset,
const uint64_t* incomingNulls,
int32_t numIncomingNulls,
velox::memory::MemoryPool* pool,
VectorPtr& result,
Converter cvt) {
if (result == nullptr) {
result = BaseVector::create(columnType, 0, pool);
} else if (
result->encoding() == VectorEncoding::Simple::CONSTANT ||
result->encoding() == VectorEncoding::Simple::DICTIONARY) {
BaseVector::ensureWritable(SelectivityVector::empty(), columnType, pool, result);
}
const int32_t size = source->read<int32_t>();
const auto numNewValues = sizeWithIncomingNulls(size, numIncomingNulls);
result->resize(resultOffset + numNewValues);
auto* flatResult = result->asUnchecked<FlatVector<TargetT>>();
auto nullCount = readNulls(
source, size, resultOffset, incomingNulls, numIncomingNulls, *flatResult);
BufferPtr values = flatResult->mutableValues();
readWideningValues<SourceT, TargetT>(
source, numNewValues, resultOffset, flatResult->nulls(), nullCount, values, cvt);
}

// Returns true if `encoding` is a narrow wire encoding that can be widened to
// `columnType`, and reads + widens the column into `result`. Returns false (and
// leaves the stream / result unchanged) if the pair is not a supported widening.
//
// Supported widening pairs (mirrors WIDENING_CAST_MAP in
// PushDownWidenCast.java):
// BYTE_ARRAY (TINYINT) -> SMALLINT, INTEGER, BIGINT
// SHORT_ARRAY (SMALLINT) -> INTEGER, BIGINT
// INT_ARRAY (INTEGER) -> BIGINT
// INT_ARRAY (REAL) -> DOUBLE
// INT_ARRAY (DATE) -> TIMESTAMP
bool tryReadWidenedColumn(
std::string_view encoding,
const TypePtr& columnType,
ByteInputStream* source,
vector_size_t resultOffset,
const uint64_t* incomingNulls,
int32_t numIncomingNulls,
velox::memory::MemoryPool* pool,
VectorPtr& result) {
if (encoding == kByteArray) {
// TINYINT (int8_t) on the wire.
switch (columnType->kind()) {
case TypeKind::SMALLINT:
readWidened<int8_t, int16_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int8_t v) { return static_cast<int16_t>(v); });
return true;
case TypeKind::INTEGER:
readWidened<int8_t, int32_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int8_t v) { return static_cast<int32_t>(v); });
return true;
case TypeKind::BIGINT:
readWidened<int8_t, int64_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int8_t v) { return static_cast<int64_t>(v); });
return true;
default:
return false;
}
}
if (encoding == kShortArray) {
// SMALLINT (int16_t) on the wire.
switch (columnType->kind()) {
case TypeKind::INTEGER:
readWidened<int16_t, int32_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int16_t v) { return static_cast<int32_t>(v); });
return true;
case TypeKind::BIGINT:
readWidened<int16_t, int64_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int16_t v) { return static_cast<int64_t>(v); });
return true;
default:
return false;
}
}
if (encoding == kIntArray) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this introduce a risk that a type encoding mismatch that previously throw an error by checkTypeEncoding is now incorrectly processed silently?
For example, if the serialized data is REAL that is encoded as kIntArray and columnType->kind() is BIGINT.

// 4-byte values on the wire (INTEGER, REAL, or DATE — distinguished by the
// target columnType, which the planner-side PushDownWidenCast picks
// exclusively from the WIDENING_CAST_MAP).
if (columnType->isDate()) {
// DATE (days since epoch, int32) -> any wider type. Only TIMESTAMP is in
// the widening map.
return false;
}
switch (columnType->kind()) {
case TypeKind::BIGINT:
// INTEGER -> BIGINT.
readWidened<int32_t, int64_t>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int32_t v) { return static_cast<int64_t>(v); });
return true;
case TypeKind::DOUBLE:
// REAL -> DOUBLE. The 4 bytes are float bits.
readWidened<float, double>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](float v) { return static_cast<double>(v); });
return true;
case TypeKind::TIMESTAMP:
// DATE (int32 days since epoch) -> TIMESTAMP (seconds, nanos).
readWidened<int32_t, Timestamp>(
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int32_t days) {
return Timestamp(
static_cast<int64_t>(days) * Timestamp::kSecondsInDay, 0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should timezone be considered here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I have fixed it. DATE→TIMESTAMP converter now does Timestamp::fromMillis(days * 86'400'000) then timestamp.toGMT(*zone) if opts.sessionTimezone is non-null, exactly what CastExpr::castFromDate does.

});
return true;
default:
return false;
}
}
return false;
}

void readColumns(
ByteInputStream* source,
const std::vector<TypePtr>& types,
Expand Down Expand Up @@ -1339,6 +1560,22 @@ void readColumns(
} else {
auto typeToEncoding = typeToEncodingName(columnType);
if (encoding != typeToEncoding) {
// Consumer-side widening (PushDownWidenCast). Check this BEFORE
// tryReadNullColumn — tryReadNullColumn consumes bytes from the stream
// (it reads the column as UNKNOWN to test if it's all-null), so if it
// doesn't apply, our widening reader would see a partially-consumed
// stream and read garbage.
if (tryReadWidenedColumn(
encoding,
columnType,
source,
resultOffset,
incomingNulls,
numIncomingNulls,
pool,
columnResult)) {
continue;
}
if (encoding == kByteArray &&
tryReadNullColumn(
source,
Expand Down
Loading
Loading