Skip to content

[Bug] [Zeta] Fix negative array size exception#10827

Open
lm-ylj wants to merge 6 commits intoapache:devfrom
DobestTech:fix-negative-array-size-exception
Open

[Bug] [Zeta] Fix negative array size exception#10827
lm-ylj wants to merge 6 commits intoapache:devfrom
DobestTech:fix-negative-array-size-exception

Conversation

@lm-ylj
Copy link
Copy Markdown
Contributor

@lm-ylj lm-ylj commented Apr 27, 2026

Purpose of this pull request

close #10826

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@github-actions github-actions Bot added the Zeta label Apr 27, 2026
dybyte
dybyte previously approved these changes Apr 27, 2026
Copy link
Copy Markdown
Contributor

@dybyte dybyte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if CI passes

@nzw921rx
Copy link
Copy Markdown
Collaborator

@lm-ylj Thank you for your submission. This is a great repair,

One compatibility concern remains with the current change.

Legacy data with arity <= 127 is encoded as 1 byte, but the new code reads it as 4 bytes using readInt(). This causes the stream position to shift by 3 bytes and breaks deserialization of all subsequent fields.

Impact:

  • This primarily affects users performing rolling upgrades with mixed old/new nodes.
  • New nodes may misread valid legacy records (arity <= 127), leading to deserialization failures, corrupted field decoding, task instability, and possible temporary service unavailability.
  • This is an upgrade-compatibility issue, not just an edge case for large arity values.

Fix scope:

  • Preserve backward compatibility for all valid legacy records, especially arity <= 127.
  • Fix example: on write, keep using writeByte(arity) when arity <= Byte.MAX_VALUE; for arity > Byte.MAX_VALUE, write a special marker first (for example, -1), followed by writeInt(arity). On read, read 1 byte first: if it is non-negative, treat it as the legacy arity; if it is the marker, read the following int as the actual arity.
  • Add compatibility tests for the old-writer -> new-reader path during rolling upgrades, including boundary cases such as 127 and 128.
    
    cc @davidzollo Can you please give me some advice?

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for chasing the negative-array-size issue. I pulled the latest head locally and rechecked the real cross-task-group row transport path through RecordSerializer.

Runtime path:

Source / Transform / Sink row transport
  -> RecordSerializer.write()
      -> writes type, tableId, rowKind, arity, fields
  -> Hazelcast byte transport
  -> RecordSerializer.read()
      -> reconstructs SeaTunnelRow

The bug is real, but the current fix still changes the on-wire row layout in a backward-incompatible way. Before this PR, arity was encoded in 1 byte. In the current head, the serializer switches to writeInt() / readInt(), so a new reader will consume 4 bytes from an old stream and shift the remaining field offsets. That makes rolling-upgrade / mixed-version clusters unsafe even for valid legacy rows with arity <= 127.

Blocking items:

  1. Please keep backward-compatible row decoding/encoding. The safer shape is to preserve the legacy 1-byte path for legacy-compatible arity values and use a sentinel + int only for larger arity.
  2. Please add compatibility coverage for old-writer -> new-reader, especially around 127 / 128, not only a new-format round trip.

Because this touches seatunnel-engine serialization, I do not recommend merging the current head until the compatibility path is fixed.

@lm-ylj
Copy link
Copy Markdown
Contributor Author

lm-ylj commented Apr 28, 2026

I appreciate your feedback and suggestions. I will address and fix this backward compatibility issue accordingly.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lm-ylj, thanks for the quick follow-up, and thanks for confirming you plan to address the compatibility concern.

I re-checked the current head locally after your latest reply. There is still no new code on top of commit 9b710c2fe, so the technical conclusion remains unchanged for now.

What this PR is trying to fix is real:

  • User pain: when a SeaTunnelRow has more than 127 fields, the current engine serializer stores arity in a signed byte, and deserialization can end up constructing new SeaTunnelRow(negativeValue), which triggers NegativeArraySizeException.
  • Fix approach in the current head: switch arity from writeByte/readByte to writeInt/readInt, and add a regression test for arity = 128.
  • One-line summary: the PR fixes the overflow case for single-version clusters, but it currently does so by changing the engine wire format in a backward-incompatible way.

Runtime chain I re-verified locally:

Source / Transform produces SeaTunnelRow
  -> SeaTunnelSourceCollector.collect() [SeaTunnelSourceCollector.java:93-112]
      -> sendRecordToNext(new Record<>(row))
  -> task-group queue publish
      -> RecordEventProducer.onData() [RecordEventProducer.java:29-53]
  -> Hazelcast serializer hook
      -> RecordSerializerHook.createSerializer() [RecordSerializerHook.java:32-35]
      -> RecordSerializer.write() [RecordSerializer.java:40-58]
  -> remote task-group receives bytes
      -> RecordSerializer.read() [RecordSerializer.java:66-92]
      -> RecordEventHandler.handleRecord() [RecordEventHandler.java:52-66]

Because this serializer sits on the real Zeta cross-task-group transport path, compatibility is the blocking concern here.

The main blocker is still:

  1. The current head changes the on-wire layout from 1 byte to 4 bytes for arity, but the new reader does not preserve any legacy decode path. That means a new node reading bytes written by an old node will consume 4 bytes where the old stream only wrote 1, shifting all following field boundaries. This makes rolling upgrades / mixed-version clusters unsafe.

What I recommend:

  • Preferred option: keep the legacy 1-byte encoding for compatible values, and only use a sentinel + int path for larger arity values.
  • Please also add compatibility coverage for old writer -> new reader, especially around the 127 / 128 boundary, not only a new-format round trip.

CI note:

  • I did not see a code-path failure from this PR itself in the current status rollup. The visible failing item is a PR labeler workflow, which does not change the engine compatibility conclusion above.

Conclusion: merge after fixes

Blocking items:

  • Fix the serializer to keep backward-compatible decoding/encoding for historical row bytes.
  • Add old-writer -> new-reader compatibility tests for the boundary cases.

Non-blocking suggestions:

  • No extra non-blocking asks from my side right now. Once the compatibility path is fixed, I’m happy to re-check it again.

Overall, this is still a worthwhile fix to continue, but because it touches seatunnel-engine serialization, I do not recommend merging the current head until the compatibility path is corrected.

@nzw921rx
Copy link
Copy Markdown
Collaborator

@lm-ylj Thank you for the quick fix.

The write path is already deterministic: writeRowArity emits either:

  • a non-negative single byte (0..127), or
  • the extended tuple {-1, MAGIC, int}.

Given that contract, both checks below in readRowArity are effectively unreachable:

if (encodedArity != EXTENDED_ROW_ARITY_MARKER) { ... }
if (extensionMagic != EXTENDED_ROW_ARITY_MAGIC) { ... }

Also, pre-fix records with arity > 127 were already non-recoverable under the legacy encoding, so keeping defensive branches for that path adds noise without practical value.

Suggested simplification:

private int readRowArity(ObjectDataInput in) throws IOException {
    byte b = in.readByte();
    if (b >= 0) {
        return b;
    }
    // Extended encoding written by writeRowArity: {-1, MAGIC, int}
    in.readInt(); // magic
    return in.readInt(); // arity
}

This keeps behavior aligned with the writer invariant and makes the decoding path easier to read and maintain.

@lm-ylj
Copy link
Copy Markdown
Contributor Author

lm-ylj commented Apr 28, 2026

@lm-ylj Thank you for the quick fix.

The write path is already deterministic: writeRowArity emits either:

  • a non-negative single byte (0..127), or
  • the extended tuple {-1, MAGIC, int}.

Given that contract, both checks below in readRowArity are effectively unreachable:

if (encodedArity != EXTENDED_ROW_ARITY_MARKER) { ... }
if (extensionMagic != EXTENDED_ROW_ARITY_MAGIC) { ... }

Also, pre-fix records with arity > 127 were already non-recoverable under the legacy encoding, so keeping defensive branches for that path adds noise without practical value.

Suggested simplification:

private int readRowArity(ObjectDataInput in) throws IOException {
    byte b = in.readByte();
    if (b >= 0) {
        return b;
    }
    // Extended encoding written by writeRowArity: {-1, MAGIC, int}
    in.readInt(); // magic
    return in.readInt(); // arity
}

This keeps behavior aligned with the writer invariant and makes the decoding path easier to read and maintain.

Updated

@nzw921rx
Copy link
Copy Markdown
Collaborator

+1 LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [Zeta] NegativeArraySizeException in SeaTunnelRow when executing Shuffle task

4 participants