Skip to content

feat(serde): Coerce narrow encoding to widened type during PrestoSerializer deserialization#2169

Open
yingsu00 wants to merge 1 commit into
IBM:boltfrom
yingsu00:deserialization-type-coercion
Open

feat(serde): Coerce narrow encoding to widened type during PrestoSerializer deserialization#2169
yingsu00 wants to merge 1 commit into
IBM:boltfrom
yingsu00:deserialization-type-coercion

Conversation

@yingsu00

Copy link
Copy Markdown
Collaborator

When PushDownWidenCast rewrites a consumer fragment's source operator (TableScan or RemoteSourceNode) to declare a wider type, the producer fragment still emits narrow-typed bytes on the wire (its outputLayout stays narrow). The native worker's Exchange operator deserializes those pages via PrestoVectorSerde and must coerce narrow->wide on receive.

Before this commit, the deserialization path threw on the encoding mismatch:
"Serialized encoding is not compatible with requested type: BIGINT.
Expected LONG_ARRAY. Got INT_ARRAY."

This commti patches two call sites of checkTypeEncoding, mirroring the two-pass deserialization in readTopColumns:

  1. readColumns (main read pass): Add tryReadWidenedColumn which reads the size + null header, then dispatches to readWideningValues <SourceT, TargetT, Converter> to read narrow bytes from the stream and write widened values into the result FlatVector.

  2. readStructNullsColumns (null-tracking pre-pass, only runs when hasNestedStructs(childTypes) is true): Add tryStructNullsSkipWidened which skips numValues * sizeof(SourceT) bytes. Critical: without this, the pre-pass threw before the main pass ever ran on queries whose row schema includes nested ROW types (real-world complex JOIN / aggregate plans hit this constantly).

Supported widening pairs (producer encoding -> consumer kind):

  • BYTE_ARRAY (TINYINT) -> SMALLINT, INTEGER, BIGINT
  • SHORT_ARRAY (SMALLINT) -> INTEGER, BIGINT
  • INT_ARRAY (INTEGER) -> BIGINT
  • INT_ARRAY (REAL) -> DOUBLE
  • INT_ARRAY (DATE) -> TIMESTAMP (days * kSecondsInDay)

Ordering matters: the widening check is inserted before the existing tryReadNullColumn check in readColumns. tryReadNullColumn consumes bytes from the stream as it tries to interpret the column as UNKNOWN-all-null; if it returns false the stream is past where the widening reader expects to start. Widening goes first; tryReadNullColumn keeps its original spot.

Tests

velox_serializer_test_PrestoSerializerTest adds eight new tests covering each widening pair plus nulls, the nested-struct two-pass path, and a negative test:

  • wideningCoercionTinyintToBigint
  • wideningCoercionSmallintToInteger
  • wideningCoercionIntegerToBigint
  • wideningCoercionRealToDouble
  • wideningCoercionDateToTimestamp
  • wideningCoercionWithNulls
  • wideningCoercionWithNestedStructInRow (exercises the pre-pass)
  • wideningCoercionUnsupportedPairStillThrows (BIGINT -> INTEGER is
    narrowing, must still throw)
    All pass under all 6 compression-kind test parameterizations (48 total
    test runs).

…alizer deserialization

When PushDownWidenCast rewrites a consumer fragment's source operator
(TableScan or RemoteSourceNode) to declare a wider type, the producer
fragment still emits narrow-typed bytes on the wire (its outputLayout
stays narrow). The native worker's Exchange operator deserializes those
pages via PrestoVectorSerde and must coerce narrow->wide on receive.

Previously the deserialization path threw on the encoding mismatch:
  "Serialized encoding is not compatible with requested type: BIGINT.
   Expected LONG_ARRAY. Got INT_ARRAY."

Two call sites of `checkTypeEncoding` are patched, mirroring the two-pass
deserialization in `readTopColumns`:

1. `readColumns` (main read pass): Add `tryReadWidenedColumn` which
   reads the size + null header, then dispatches to `readWideningValues
   <SourceT, TargetT, Converter>` to read narrow bytes from the stream
   and write widened values into the result FlatVector<TargetT>.

2. `readStructNullsColumns` (null-tracking pre-pass, only runs when
   `hasNestedStructs(childTypes)` is true): Add
   `tryStructNullsSkipWidened` which skips `numValues * sizeof(SourceT)`
   bytes. Critical: without this, the pre-pass threw before the main
   pass ever ran on queries whose row schema includes nested ROW types
   (real-world complex JOIN / aggregate plans hit this constantly).

Supported widening pairs (producer encoding -> consumer kind):
  - BYTE_ARRAY  (TINYINT)  -> SMALLINT, INTEGER, BIGINT
  - SHORT_ARRAY (SMALLINT) -> INTEGER, BIGINT
  - INT_ARRAY   (INTEGER)  -> BIGINT
  - INT_ARRAY   (REAL)     -> DOUBLE
  - INT_ARRAY   (DATE)     -> TIMESTAMP   (days * kSecondsInDay)

Ordering matters: the widening check is inserted *before* the existing
`tryReadNullColumn` check in `readColumns`. `tryReadNullColumn` consumes
bytes from the stream as it tries to interpret the column as
UNKNOWN-all-null; if it returns false the stream is past where the
widening reader expects to start. Widening goes first; tryReadNullColumn
keeps its original spot.

Tests
-----
velox_serializer_test_PrestoSerializerTest adds eight new tests covering
each widening pair plus nulls, the nested-struct two-pass path, and a
negative test:
  - wideningCoercionTinyintToBigint
  - wideningCoercionSmallintToInteger
  - wideningCoercionIntegerToBigint
  - wideningCoercionRealToDouble
  - wideningCoercionDateToTimestamp
  - wideningCoercionWithNulls
  - wideningCoercionWithNestedStructInRow      (exercises the pre-pass)
  - wideningCoercionUnsupportedPairStillThrows (BIGINT -> INTEGER is
    narrowing, must still throw)
All pass under all 6 compression-kind test parameterizations (48 total
test runs).
@yingsu00 yingsu00 requested a review from xin-zhang2 June 23, 2026 02:43
@yingsu00 yingsu00 self-assigned this Jun 23, 2026
@yingsu00 yingsu00 added the bolt label Jun 23, 2026
source, columnType, resultOffset, incomingNulls, numIncomingNulls, pool, result,
[](int32_t days) {
return Timestamp(
static_cast<int64_t>(days) * Timestamp::kSecondsInDay, 0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should timezone be considered here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I have fixed it. DATE→TIMESTAMP converter now does Timestamp::fromMillis(days * 86'400'000) then timestamp.toGMT(*zone) if opts.sessionTimezone is non-null, exactly what CastExpr::castFromDate does.

return false;
}
}
if (encoding == kIntArray) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this introduce a risk that a type encoding mismatch that previously throw an error by checkTypeEncoding is now incorrectly processed silently?
For example, if the serialized data is REAL that is encoded as kIntArray and columnType->kind() is BIGINT.

@xin-zhang2 xin-zhang2 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 I left a few comments. Please take a look. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants