|
37 | 37 | import java.util.stream.Collectors; |
38 | 38 |
|
39 | 39 | import static io.tapdata.base.ConnectorBase.writeListResult; |
| 40 | +import static io.tapdata.entity.event.TapCallbackOffset.KEY_BATCH_OFFSET; |
| 41 | +import static io.tapdata.entity.event.TapCallbackOffset.KEY_STREAM_OFFSET; |
40 | 42 |
|
41 | 43 | /** |
42 | 44 | * @author jarad |
@@ -77,8 +79,8 @@ public class StarrocksStreamLoader { |
77 | 79 | // 表名到 TapTable 的映射,用于刷新时获取真正的 TapTable |
78 | 80 | private final Map<String, TapTable> tableNameToTapTableMap; |
79 | 81 |
|
80 | | - // 保存每个表的最后一个 TapOffset,用于在 flush 成功后回调 |
81 | | - private final Map<String, TapCallbackOffset> lastTapOffsetByTable; |
| 82 | + // 保存每个表的第一个 TapOffset,用于在 flush 成功后回调 |
| 83 | + private final Map<String, TapCallbackOffset> firstOffsetByTable; |
82 | 84 |
|
83 | 85 | // 日志打印控制 |
84 | 86 | private long lastLogTime; |
@@ -129,7 +131,7 @@ public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map<Stri |
129 | 131 | this.isFirstRecordByTable = new ConcurrentHashMap<>(); |
130 | 132 | this.pendingFlushTables = ConcurrentHashMap.newKeySet(); |
131 | 133 | this.tableNameToTapTableMap = new ConcurrentHashMap<>(); |
132 | | - this.lastTapOffsetByTable = new ConcurrentHashMap<>(); |
| 134 | + this.firstOffsetByTable = Collections.synchronizedMap(new LinkedHashMap<>()); |
133 | 135 |
|
134 | 136 | // 初始化定时刷新 |
135 | 137 | initializeFlushScheduler(); |
@@ -452,10 +454,13 @@ public void writeRecord(final List<TapRecordEvent> tapRecordEvents, final TapTab |
452 | 454 | .eventTime(tapRecordEvent.getReferenceTime()) |
453 | 455 | .nodeIds(nodeIds); |
454 | 456 |
|
455 | | - // 保存到 lastTapOffsetByTable,用于 flush 时回调 |
456 | 457 | // 只有当 offset 有效时才保存(避免覆盖之前的有效 offset) |
457 | 458 | if (tapOffset.hasValidOffset()) { |
458 | | - lastTapOffsetByTable.put(tableName, tapOffset); |
| 459 | + if (!firstOffsetByTable.containsKey(tableName)) { |
| 460 | + firstOffsetByTable.put(tableName, tapOffset); |
| 461 | + taplogger.debug("Saved first offset for table {}: streamOffset={}, batchOffset={}", |
| 462 | + tableName, streamOffset, batchOffset); |
| 463 | + } |
459 | 464 | } |
460 | 465 |
|
461 | 466 | byte[] bytes = messageSerializer.serialize(table, tapRecordEvent, isAgg); |
@@ -906,13 +911,40 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks |
906 | 911 | taplogger.debug("Table {} successfully flushed and removed from pending list. " + |
907 | 912 | "Remaining pending tables: {}", tableName, pendingFlushTables.size()); |
908 | 913 |
|
909 | | - // 如果所有表都已刷新完成,并且有 TapOffset 数据,主动通知引擎保存断点 |
910 | | - if (pendingFlushTables.isEmpty() && flushOffsetCallback != null) { |
911 | | - TapCallbackOffset tapOffset = lastTapOffsetByTable.get(tableName); |
912 | | - if (tapOffset != null && tapOffset.hasValidOffset()) { |
913 | | - taplogger.info("All tables flushed successfully, triggering flush offset callback with TapOffset: {}", tapOffset); |
| 914 | + if (flushOffsetCallback != null) { |
| 915 | + TapCallbackOffset offsetToSave = null; |
| 916 | + synchronized (firstOffsetByTable) { |
| 917 | + Map.Entry<String, TapCallbackOffset> firstEntry = firstOffsetByTable.entrySet() |
| 918 | + .stream() |
| 919 | + .findFirst() |
| 920 | + .orElse(null); |
| 921 | + |
| 922 | + if (firstEntry != null) { |
| 923 | + String firstTableName = firstEntry.getKey(); |
| 924 | + TapCallbackOffset firstOffset = firstEntry.getValue(); |
| 925 | + |
| 926 | + // 如果当前刷新的表是第一个表 |
| 927 | + offsetToSave = firstOffset; |
| 928 | + if (tableName.equals(firstTableName)) { |
| 929 | + firstOffsetByTable.remove(firstTableName); |
| 930 | + taplogger.info("Table {} is the first table in queue, saving its latest offset: " + |
| 931 | + "batchOffset={}, streamOffset={}", |
| 932 | + tableName, |
| 933 | + offsetToSave != null ? offsetToSave.get(KEY_BATCH_OFFSET) : null, |
| 934 | + offsetToSave != null ? offsetToSave.get(KEY_STREAM_OFFSET) : null); |
| 935 | + } else { |
| 936 | + taplogger.info("Table {} is not the first table, saving first table {}'s offset: " + |
| 937 | + "batchOffset={}, streamOffset={}", |
| 938 | + tableName, firstTableName, |
| 939 | + offsetToSave.get(KEY_BATCH_OFFSET), |
| 940 | + offsetToSave.get(KEY_STREAM_OFFSET)); |
| 941 | + } |
| 942 | + } |
| 943 | + } |
| 944 | + if (offsetToSave != null && offsetToSave.hasValidOffset()) { |
| 945 | + taplogger.info("Table flushed successfully, triggering flush offset callback with TapOffset: {}", offsetToSave); |
914 | 946 | try { |
915 | | - flushOffsetCallback.accept(tapOffset); |
| 947 | + flushOffsetCallback.accept(offsetToSave); |
916 | 948 | } catch (Exception e) { |
917 | 949 | taplogger.warn("Failed to flush offset callback: {}", e.getMessage(), e); |
918 | 950 | } |
|
0 commit comments