Skip to content

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670

Open
xumingming wants to merge 1 commit intoapache:mainfrom
xumingming:extend-e2e-checked-zone
Open

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670
xumingming wants to merge 1 commit intoapache:mainfrom
xumingming:extend-e2e-checked-zone

Conversation

@xumingming
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Celeborn's E2E integrity check computes CRC_M inside ShuffleClientImpl.pushOrMergeData(), which runs in the async DataPusher thread. This leaves the segment from batch assembly in the writer thread through the DataPusher queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references.

Introduce ShuffleClient.computeBatchCRC() and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes: HashBasedShuffleWriter (spark-2/3) at flushSendBuffer(), pushGiantRecord(), and the per-partition flush in close(); and SortBasedPusher at the partition-change flush, buffer-overflow flush, pushGiantRecord(), and final flush. The now-redundant CRC computation inside pushOrMergeData() is removed.

This approach is less elegant than the original design, which had a single CRC call site inside pushOrMergeData() — one place to reason about and maintain. The new design scatters computeBatchCRC() across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.

Why are the changes needed?

Celeborn's E2E integrity check computes CRC_M inside ShuffleClientImpl.pushOrMergeData(), which runs in the async DataPusher thread. This leaves the segment from batch assembly in the writer thread through the DataPusher queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.

Does this PR resolve a correctness bug?

It could help us detect more correctness bug.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@xumingming xumingming force-pushed the extend-e2e-checked-zone branch 2 times, most recently from cf055e4 to a4ee01e Compare April 22, 2026 13:03
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 22, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 67.05%. Comparing base (b4cb5a0) to head (a4ee01e).
⚠️ Report is 26 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3670      +/-   ##
==========================================
+ Coverage   66.91%   67.05%   +0.15%     
==========================================
  Files         358      359       +1     
  Lines       21986    22197     +211     
  Branches     1946     1970      +24     
==========================================
+ Hits        14710    14883     +173     
- Misses       6262     6292      +30     
- Partials     1014     1022       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Extends Celeborn’s E2E shuffle integrity “checked zone” upstream by computing per-batch CRC immediately after batch assembly (writer thread) rather than inside the async DataPusher pipeline, aiming to detect corruption occurring between batch assembly and async dispatch.

Changes:

  • Adds ShuffleClient.computeBatchCRC() and implements it in ShuffleClientImpl (no-op in DummyShuffleClient).
  • Moves CRC accumulation call sites into Spark shuffle writers and SortBasedPusher right before enqueue/push/merge.
  • Adds a unit test validating CRC accumulation behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java Adds UT for computeBatchCRC() accumulation behavior.
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java Removes CRC accumulation from pushOrMergeData() and adds computeBatchCRC() implementation.
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Introduces the new computeBatchCRC() API with Javadoc.
client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java Implements new abstract method as a no-op.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Calls computeBatchCRC() for giant-record pushes.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Calls computeBatchCRC() before enqueue and per-partition final flush.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Calls computeBatchCRC() for giant-record pushes.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Calls computeBatchCRC() before enqueue and per-partition final flush.
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java Calls computeBatchCRC() before partition-change flush, overflow flush, and final flush.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +191 to +203
/**
* Pre-compute CRC for a batch immediately after assembly in the writer, before the data enters
* the async push pipeline. Calling this prevents double-computation in pushOrMergeData.
*/
public abstract void computeBatchCRC(
int shuffleId,
int mapId,
int attemptId,
int partitionId,
byte[] data,
int offset,
int length);

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The computeBatchCRC() Javadoc says calling it “prevents double-computation in pushOrMergeData”, but this PR removes CRC accumulation from pushOrMergeData entirely. Please update the comment to match the new semantics (i.e., computeBatchCRC is now required for integrity-check metadata unless a fallback remains in pushOrMergeData), otherwise API users will be misled.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed.

Comment on lines +749 to +767
// compute expected CRC for partition 1: two batches accumulated via CelebornCRC32.combine
crcUtil.reset();
crcUtil.update(batch1a, 0, batch1a.length);
int crc1a = (int) crcUtil.getValue();
crcUtil.reset();
crcUtil.update(batch1b, 0, batch1b.length);
int crc1b = (int) crcUtil.getValue();
// mirrors CelebornCRC32.combine(second, first): byte-wise addition
int expected1 = celebornCrcCombine(celebornCrcCombine(0, crc1a), crc1b);
assertEquals(expected1, crcPerPartition[1]);
assertEquals(batch1a.length + batch1b.length, bytesPerPartition[1]);
}

// mirrors CelebornCRC32.combine(second=newChecksum, first=runningVal)
private static int celebornCrcCombine(int runningVal, int newChecksum) {
return (((byte) newChecksum + (byte) runningVal) & 0xFF)
| ((((byte) (newChecksum >> 8) + (byte) (runningVal >> 8)) & 0xFF) << 8)
| ((((byte) (newChecksum >> 16) + (byte) (runningVal >> 16)) & 0xFF) << 16)
| (((byte) (newChecksum >> 24) + (byte) (runningVal >> 24)) << 24);
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test re-implements CelebornCRC32.combine logic in celebornCrcCombine(), which duplicates production code and makes the test brittle if the checksum combination strategy ever changes (also the “second/first” wording is confusing given the current CelebornCRC32.combine signature). Prefer deriving the expected checksum via existing public APIs (e.g., build a CommitMetadata and call addDataWithOffsetAndLength twice, then assert its checksum/bytes match PushState), so the test validates behavior without copying internal implementation details.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed.

@@ -1044,12 +1044,6 @@ public int pushOrMergeData(
// increment batchId
final int nextBatchId = pushState.nextBatchId();

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing commit-metadata/CRC tracking from pushOrMergeData means integrity-check metadata is now only updated when callers explicitly invoke computeBatchCRC(). There are existing call sites that still call pushData()/mergeData() directly without computeBatchCRC (e.g. client-tez/tez/CelebornTezWriter.mergeData, client-mr/mr/CelebornSortBasedPusher.pushData, client-spark/columnar/ColumnarHashBasedShuffleWriter.closeColumnarWrite mergeData), so enabling celeborn.client.shuffle.integrityCheck.enabled will silently stop producing correct per-partition CRC/bytes for those paths. Please either keep a fallback CRC accumulation in pushOrMergeData (only when not already precomputed) or update all remaining writers/clients that call pushData/mergeData to invoke computeBatchCRC before enqueue/push.

Suggested change
// Preserve integrity-check metadata for callers that still invoke pushData()/mergeData()
// directly without explicitly calling computeBatchCRC() first. This must run before
// compression so CRC / bytes are recorded for the original batch payload.
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, nextBatchId, data, offset, length);

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End-to-End Integrity currently does not support Tez/MR even before this change. If we want to add support, the changes will be similar.

@gauravkm
Copy link
Copy Markdown
Contributor

Thanks for the fix!

I have been thinking about this change as well.
Copilot raises a valid point around potentially missing call sites where we should explicitly invoke the computation. That is where I sort of got stuck as well. I am going to spend more time thinking about what could be a more robust approach.

I am also interested in learning what other folks in the community think as well about the current explicit call approach from all writer call sites.

@gauravkm
Copy link
Copy Markdown
Contributor

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@RexXiong
Copy link
Copy Markdown
Contributor

RexXiong commented Apr 23, 2026

Thanks for this PR — the analysis of the coverage gap is spot-on. I have a suggestion on the API design that could make it more maintainable.

Concern: Implicit CRC contract

The current approach requires every call site to remember computeBatchCRC() before pushData/mergeData/addTask. This is an implicit contract — a new code path can easily call pushData directly and silently bypass the integrity check. The original single-site design was self-enforcing; this PR trades that away.

Suggestion: Encapsulate CRC + push as atomic operations

Introduce WithCRC variants that combine CRC computation and data push into a single call:

// ShuffleClient.java
public int pushDataWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] data, int offset, int length,
    int numMappers, int numPartitions) throws IOException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
  return pushData(shuffleId, mapId, attemptId, partitionId,
      data, offset, length, numMappers, numPartitions);
}

public int mergeDataWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] data, int offset, int length,
    int numMappers, int numPartitions) throws IOException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
  return mergeData(shuffleId, mapId, attemptId, partitionId,
      data, offset, length, numMappers, numPartitions);
}

// For the DataPusher path
public void addTaskWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] buffer, int size) throws InterruptedException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
  dataPusher.addTask(partitionId, buffer, size);
}

Then the 7 writer call sites each become a single call — no way to forget CRC:

// Before (two operations, easy to miss CRC)
shuffleClient.computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);

// After (one atomic operation)
shuffleClient.addTaskWithCRC(shuffleId, mapId, attemptId, partitionId, buffer, size);

The bare pushData/mergeData can be annotated as internal-only (or made package-private if feasible), so new code naturally gravitates toward the WithCRC variants.

This doesn't eliminate the 7 call sites, but it makes them self-contained and impossible to get wrong. The invariant shifts from "callers must remember to compute CRC first" (implicit, fragile) to "use the CRC-inclusive API" (explicit, hard to misuse).


Other than that, a couple of minor notes on the test:

  • celebornCrcCombine in the test duplicates CelebornCRC32.combine. If combine's implementation changes, the test won't catch it. Consider calling CelebornCRC32.combine directly instead.
  • It would be valuable to add an integration test that exercises the full writer → computeBatchCRCDataPusherpushOrMergeData path to guard against future regressions (e.g., someone re-adding CRC to pushOrMergeData causing double-counting).

Review generated with the assistance of Claude AI

@RexXiong
Copy link
Copy Markdown
Contributor

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@gauravkm interesting idea! Failing the task at the mapper side is far cheaper than discovering corruption at reducers and rerunning the entire job. We can explore this as a follow-up improvement.

@xumingming xumingming closed this Apr 23, 2026
@xumingming xumingming reopened this Apr 23, 2026
@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong Thanks for the review. After some thinking, I find the suggestion you provided to be not feasible. The reason I want to separate CRC calculation from pushOrMergeData is because current CRC calculation is too late, so I separate it from pushOrMergeData to call it at a earlier call site. The call site could be:

  • shuffleClient.pushData
  • shuffleClient.mergeData
  • dataPusher.addTask
  • etc

For adding pushDataWithCRC to replace pushData, it is doable technically, but it is actually similar to original pushOrMergeData, which is where we were from.
For adding addTaskWithCRC, it is trickier, because addTask is a method from DataPusher, DataPusher is a higher level concept than ShuffleClient, mix DataPusher into ShuffleClient seems not a very good idea.

Even if we have added all the xxxWithCRC methods. The methods in ShuffleClient will be:

  • pushData: can not be removed, because when data flowing through dataPusher.addTask, CRC is calculated there, but ultimately we need to call pushData to do actual push, we should not call pushDataWithCRC here.
  • mergeData: can not be removed, similar reason as above.
  • pushDataWithCRC: newly added
  • mergeDataWithCRC: newly added
  • addTaskWithCRC: newly added

It seems more complicated than my current solution. How do you think?

@xumingming
Copy link
Copy Markdown
Contributor Author

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@gauravkm Interesting idea! Looking forward to detailed proposal.

Celeborn's E2E integrity check computes CRC_M inside
`ShuffleClientImpl.pushOrMergeData()`, which runs in the async `DataPusher`
thread. This leaves the segment from batch assembly in the writer thread
through the `DataPusher` queue entirely outside the checked zone — meaning
any corruption that occurs in that window is invisible to the integrity check
and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness
bugs where data corruption occurs between batch assembly and async push
dispatch, including bugs involving shared buffer pool references.

Introduce `ShuffleClient.computeBatchCRC()` and call it immediately before
each assembled batch enters the async push pipeline, at 7 call sites across
3 classes: `HashBasedShuffleWriter` (spark-2/3) at `flushSendBuffer()`,
`pushGiantRecord()`, and the per-partition flush in `close()`; and
`SortBasedPusher` at the partition-change flush, buffer-overflow flush,
`pushGiantRecord()`, and final flush. The now-redundant CRC computation
inside `pushOrMergeData()` is removed.

This approach is less elegant than the original design, which had a single
CRC call site inside `pushOrMergeData()` — one place to reason about and
maintain. The new design scatters `computeBatchCRC()` across 7 call sites,
but the trade-off is justified: the checked zone now starts at batch assembly
rather than at async push dispatch, covering more of the data pipeline and
enabling detection of a broader class of correctness bugs.
@xumingming xumingming force-pushed the extend-e2e-checked-zone branch from a4ee01e to f3dd0f0 Compare April 23, 2026 09:02
@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas Have made corresponding changes, please take a look again.

@gauravkm
Copy link
Copy Markdown
Contributor

gauravkm commented Apr 23, 2026

@gauravkm Interesting idea! Looking forward to detailed proposal

There isn't a lot more to it. Essentially we keep both the existing checksum computation, and store the CRC computation being added from the writers (in this PR) separately. And then before we send metadata, we ensure that both the computations match. Otherwise we fail the task

Essentially there are two layers (writer and shuffle client) computing their own checksums, and then shuffle client compares them before propagation at mapper end

@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas Gentle ping :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants