Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
*/
public class PaimonService implements Closeable {

private static final String TAG = PaimonService.class.getName();
private final PaimonConfig config;
private Catalog catalog;

Expand Down Expand Up @@ -803,7 +804,7 @@ private WriteListResult<TapRecordEvent> writeRecordsWithStreamWriteInternal(List
Identifier identifier = Identifier.create(database, tableName);

try {
// Get or create cached writer and commit
// Get or create cached writer and commit ghv
StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier);
StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier);

Expand All @@ -821,91 +822,33 @@ private WriteListResult<TapRecordEvent> writeRecordsWithStreamWriteInternal(List
}
}

// Update accumulated record count
AtomicInteger recordCount = accumulatedRecordCount.computeIfAbsent(tableKey, k -> new AtomicInteger(0));
int currentCount = recordCount.addAndGet(recordEvents.size());

// Initialize last commit time if not exists
AtomicLong lastCommit = lastCommitTime.computeIfAbsent(tableKey, k -> new AtomicLong(System.currentTimeMillis()));

// Determine if we should commit based on:
// 1. Accumulated record count exceeds threshold
// 2. Time since last commit exceeds interval
// 3. Batch accumulation is disabled (size = 0)
boolean shouldCommit = false;
Integer batchSize = config.getBatchAccumulationSize();
Integer commitInterval = config.getCommitIntervalMs();

if (batchSize == null || batchSize <= 0) {
// Batch accumulation disabled, commit immediately
shouldCommit = true;
} else if (currentCount >= batchSize) {
// Record count threshold reached
shouldCommit = true;
} else if (commitInterval != null && commitInterval > 0) {
// Check time-based commit
long timeSinceLastCommit = System.currentTimeMillis() - lastCommit.get();
if (timeSinceLastCommit >= commitInterval) {
shouldCommit = true;
}
}
// Prepare commit with commitIdentifier
// Use atomic counter to generate unique, incrementing commit identifier
long commitIdentifier = commitIdentifierGenerator.incrementAndGet();
List<CommitMessage> messages = writer.prepareCommit(false, commitIdentifier);

// Perform commit if needed
if (shouldCommit) {
// Use lock to ensure only one thread commits at a time for this table
Object lock = commitLocks.computeIfAbsent(tableKey, k -> new Object());
synchronized (lock) {
// Double-check if we still need to commit (another thread might have committed)
int finalCount = recordCount.get();
if (finalCount > 0) {
// Prepare commit with commitIdentifier
// Use atomic counter to generate unique, incrementing commit identifier
long commitIdentifier = commitIdentifierGenerator.incrementAndGet();
List<CommitMessage> messages = writer.prepareCommit(false, commitIdentifier);

// Commit the batch
commit.commit(commitIdentifier, messages);

// Reset counters after successful commit
recordCount.set(0);
lastCommit.set(System.currentTimeMillis());

connectorContext.getLog().debug("Committed {} accumulated records for table {}",
finalCount, tableKey);
}
}
}
// Commit the batch
commit.commit(commitIdentifier, messages);

// StreamTableWrite can be reused, so we don't clean up here
return result;

} catch (Exception e) {
if (retryCount < maxRetries && (isThreadGroupDestroyedError(e) || isPaimonConflict(e))) {
if (retryCount < maxRetries) {
if (isThreadGroupDestroyedError(e)) {
connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries);
} else if (isPaimonConflict(e)) {
connectorContext.getLog().warn("Commit conflict detected, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e);
} else {
connectorContext.getLog().warn("Failed to write records to table {}, error message: {}, retrying... (attempt {}/{})", tableName, e.getMessage(), retryCount + 1, maxRetries);
}
retryCount++;
reinitCatalog();
TimeUnit.SECONDS.sleep(1L);
CommonUtils.ignoreAnyError(() -> TimeUnit.SECONDS.sleep(1L), TAG);
continue;
}

Throwable illegalThreadStateException = CommonUtils.matchThrowable(e, IllegalThreadStateException.class);
if (null != illegalThreadStateException) {
String message = String.format(
"Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s",
tableName,
Thread.currentThread().getName(),
Thread.currentThread().getThreadGroup() != null
? Thread.currentThread().getThreadGroup().getName()
: "null"
);
throw new RuntimeException(message, e);
} else {
throw new RuntimeException("Failed to write records to table " + tableName, e);
}
throw new RuntimeException("Failed to write records to table " + tableName, e);
}
}
}
Expand Down
Loading