Skip to content

[Bug] [Zeta] Memory leak in SinkAggregatedCommitterTask due to uncleaned checkpoint caches #10188

@jw-itq

Description

@jw-itq

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Background

I am running approximately 25 real-time streaming synchronization jobs on SeaTunnel Zeta engine in a production environment. Each job has a checkpoint interval configured to around 5000ms (5 seconds). The jobs are primarily synchronizing data from MySQL CDC to Iceberg.

Symptom Timeline

Phase 1 - Normal Operation (Day 1-2):

  • All jobs started successfully and ran stably
  • Heap memory usage remained low and stable
  • ZGC (Z Garbage Collector) worked efficiently with:
    • ~50 GC cycles
    • ~4 minutes cycle duration
    • Heap memory close to 0

Phase 2 - Sudden Memory Spike (Around Day 3):

  • Heap memory suddenly spiked from near 0 to approximately 18.6 GiB
  • ZGC cycles increased dramatically from ~50 to ~150 cycles
  • ZGC cycle duration increased from ~4 minutes to ~12.5 minutes
  • ZGC trigger reasons shifted to "Allocation Rate" and "High Usage"
  • The system eventually crashed with OutOfMemoryError

This pattern repeated after restarting the jobs - stable for a few days, then sudden memory explosion.

Investigation

I captured a heap dump and analyzed it using Alibaba Cloud JVM Analysis Tool. The analysis revealed:

  1. Largest Memory Consumer: SinkAggregatedCommitterTask instances
  2. GC Root Path:
    TaskExecutionService$BlockingWorker (Thread)
      └── TaskTracker
            └── SinkAggregatedCommitterTask
                  ├── commitInfoCache (ConcurrentHashMap) - continuously growing
                  └── checkpointBarrierCounter (ConcurrentHashMap) - continuously growing
    

Root Cause Analysis (Code Deep Dive)

After analyzing the source code of SinkAggregatedCommitterTask.java, I identified the root cause:

Two internal maps are never cleaned up after checkpoint completion:

  1. commitInfoCache (ConcurrentMap<Long, List<CommandInfoT>>):

    // Data is ADDED here (receivedWriterCommitInfo method, line 284-291):
    public void receivedWriterCommitInfo(long checkpointID, CommandInfoT commitInfos) {
        commitInfoCache.computeIfAbsent(checkpointID, id -> new CopyOnWriteArrayList<>());
        commitInfoCache.get(checkpointID).add(commitInfos);  // ← Added but NEVER removed
    }
  2. checkpointBarrierCounter (Map<Long, Integer>):

    // Counter is INCREMENTED here (triggerBarrier method, line 211-213):
    Integer count = checkpointBarrierCounter.compute(
            barrier.getId(), (id, num) -> num == null ? 1 : ++num);  // ← Incremented but NEVER removed
  3. checkpointCommitInfoMap is properly cleaned in notifyCheckpointComplete() - but the other two maps are NOT:

    // notifyCheckpointComplete method (line 304-320):
    checkpointCommitInfoMap.forEach((key, value) -> {
        if (key > checkpointId) {
            return;
        }
        aggregatedCommitInfo.addAll(value);
        checkpointCommitInfoMap.remove(key);  // ✅ This is cleaned
        // ❌ commitInfoCache is NOT cleaned!
        // ❌ checkpointBarrierCounter is NOT cleaned!
    });

Why the Memory Spike Appears "Sudden"

The memory leak accumulates silently over time:

Metric Calculation
Checkpoint frequency Every 5 seconds
Checkpoints per hour 720 per job
Checkpoints per day 17,280 per job
Total (25 jobs) per day 432,000 uncleaned map entries
After 3 days ~1.3 million uncleaned entries

The "sudden" spike occurs because:

  1. Initially, the leaked memory is small and ZGC handles it easily
  2. As entries accumulate, they reach a critical threshold
  3. ZGC cannot reclaim these objects (they're still referenced)
  4. This triggers a cascade: more frequent GC → longer GC pauses → allocation stalls → memory explosion

This explains why monitoring shows stable memory for days, then a sudden vertical spike.

SeaTunnel Version

2.3.12

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MySQL-CDC {
    # CDC configuration
  }
}

sink {
  Iceberg {
    # Iceberg sink configuration
  }
}

Running Command

seatunnel.sh -c mysql_to_iceberg.conf --async

Error Exception

java.lang.OutOfMemoryError: Java heap space

=== Heap Dump Analysis (Alibaba Cloud JVM Tool) ===

Largest Object by Retained Size:
  - Class: org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask
  - Count: 25 instances (one per job)
  - Retained Size: Majority of heap

GC Root Path:
  java.lang.Thread @ TaskExecutionService$BlockingWorker
    └── org.apache.seatunnel.engine.server.execution.TaskTracker
          └── org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask
                ├── commitInfoCache: java.util.concurrent.ConcurrentHashMap
                │     └── Size: continuously growing (never cleaned)
                └── checkpointBarrierCounter: java.util.concurrent.ConcurrentHashMap
                      └── Size: continuously growing (never cleaned)

Code Search Verification:
  - "commitInfoCache.remove" → NOT FOUND in entire codebase
  - "commitInfoCache.clear" → NOT FOUND in entire codebase
  - "checkpointBarrierCounter.remove" → NOT FOUND in entire codebase
  - "checkpointBarrierCounter.clear" → NOT FOUND in entire codebase

Zeta or Flink or Spark Version

Zeta Engine (SeaTunnel native engine)

Java or Scala Version

JDK 17 (with ZGC enabled)

Screenshots

ZGC Monitoring Dashboard (6-hour intervals):

(Attach ZGC monitoring screenshot here showing:)

  • ZGC Cycles: Increased from ~50 to ~150
  • ZGC Heap Memory: Sudden spike from ~0 to ~18.6 GiB around day 3
  • ZGC Cycle Duration: Increased from ~4 min to ~12.5 min
  • Trigger Reasons: "Allocation Rate" and "High Usage" increased significantly

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions