88import io .tapdata .connector .starrocks .streamload .exception .StreamLoadException ;
99import io .tapdata .connector .starrocks .streamload .rest .models .RespContent ;
1010import io .tapdata .connector .starrocks .util .MinuteWriteLimiter ;
11+ import io .tapdata .entity .event .TapCallbackOffset ;
1112import io .tapdata .entity .event .dml .TapDeleteRecordEvent ;
1213import io .tapdata .entity .event .dml .TapInsertRecordEvent ;
1314import io .tapdata .entity .event .dml .TapRecordEvent ;
@@ -76,6 +77,9 @@ public class StarrocksStreamLoader {
7677 // 表名到 TapTable 的映射,用于刷新时获取真正的 TapTable
7778 private final Map <String , TapTable > tableNameToTapTableMap ;
7879
80+ // 保存每个表的最后一个 TapOffset,用于在 flush 成功后回调
81+ private final Map <String , TapCallbackOffset > lastTapOffsetByTable ;
82+
7983 // 日志打印控制
8084 private long lastLogTime ;
8185 private static final long LOG_INTERVAL_MS = 30 * 1000 ; // 30秒
@@ -91,6 +95,8 @@ public class StarrocksStreamLoader {
9195 private final Log taplogger ;
9296 private boolean cannotClean ;
9397 private AtomicReference <Exception > globalException = new AtomicReference <>();
98+ // 回调 flush offset
99+ private Consumer <Object > flushOffsetCallback ;
94100
95101 public StarrocksStreamLoader (StarrocksJdbcContext StarrocksJdbcContext , Map <String , CloseableHttpClient > httpClientMap , boolean useHttps , Log taplogger ) {
96102 this .StarrocksConfig = (StarrocksConfig ) StarrocksJdbcContext .getConfig ();
@@ -123,6 +129,7 @@ public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map<Stri
123129 this .isFirstRecordByTable = new ConcurrentHashMap <>();
124130 this .pendingFlushTables = ConcurrentHashMap .newKeySet ();
125131 this .tableNameToTapTableMap = new ConcurrentHashMap <>();
132+ this .lastTapOffsetByTable = new ConcurrentHashMap <>();
126133
127134 // 初始化定时刷新
128135 initializeFlushScheduler ();
@@ -425,6 +432,32 @@ public void writeRecord(final List<TapRecordEvent> tapRecordEvents, final TapTab
425432 long batchDataSize = 0 ;
426433
427434 for (TapRecordEvent tapRecordEvent : tapRecordEvents ) {
435+ // 构建 TapOffset 对象,用于在 flush 成功后回调
436+ TapCallbackOffset tapOffset = new TapCallbackOffset ();
437+
438+ // 从 TapRecordEvent.info 中提取 offset 信息
439+ // 这些信息由 HazelcastTargetPdkBaseNode.handleTapdataEventDML 方法添加
440+ Object batchOffset = tapRecordEvent .getInfo ("batchOffset" );
441+ Object streamOffset = tapRecordEvent .getInfo ("streamOffset" );
442+ Object syncStage = tapRecordEvent .getInfo ("syncStage" );
443+ Object sourceTime = tapRecordEvent .getInfo ("sourceTime" );
444+ Object nodeIds = tapRecordEvent .getInfo ("nodeIds" );
445+
446+ // 填充 TapOffset
447+ tapOffset .batchOffset (batchOffset )
448+ .streamOffset (streamOffset )
449+ .tableId (tapRecordEvent .getTableId ())
450+ .syncStage (syncStage != null ? syncStage .toString () : null )
451+ .sourceTime (sourceTime instanceof Long ? (Long ) sourceTime : null )
452+ .eventTime (tapRecordEvent .getReferenceTime ())
453+ .nodeIds (nodeIds );
454+
455+ // 保存到 lastTapOffsetByTable,用于 flush 时回调
456+ // 只有当 offset 有效时才保存(避免覆盖之前的有效 offset)
457+ if (tapOffset .hasValidOffset ()) {
458+ lastTapOffsetByTable .put (tableName , tapOffset );
459+ }
460+
428461 byte [] bytes = messageSerializer .serialize (table , tapRecordEvent , isAgg );
429462 batchDataSize += bytes .length ;
430463
@@ -862,30 +895,52 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
862895 taplogger .info ("Updated last flush time for table {}: {} -> {} (diff: {} ms)" ,
863896 tableName , oldFlushTime , newFlushTime , newFlushTime - oldFlushTime );
864897
865- // 清理该表的缓存文件
866- cleanupCacheFileForTable (tableName );
898+ // 成功: 清理该表的缓存文件(删除文件)
899+ cleanupCacheFileForTable (tableName , true );
867900 // 从待刷新列表中移除该表
868901 pendingFlushTables .remove (tableName );
869902 // 清理该表的批次大小
870903 currentBatchSizeByTable .remove (tableName );
871- cannotClean = false ;
872- // 注意:刷新时间只在成功时更新,失败时不更新以便重试
904+
905+ // 数据成功刷新后,主动通知引擎可以保存断点
906+ taplogger .debug ("Table {} successfully flushed and removed from pending list. " +
907+ "Remaining pending tables: {}" , tableName , pendingFlushTables .size ());
908+
909+ // 如果所有表都已刷新完成,并且有 TapOffset 数据,主动通知引擎保存断点
910+ if (pendingFlushTables .isEmpty () && flushOffsetCallback != null ) {
911+ TapCallbackOffset tapOffset = lastTapOffsetByTable .get (tableName );
912+ if (tapOffset != null && tapOffset .hasValidOffset ()) {
913+ taplogger .info ("All tables flushed successfully, triggering flush offset callback with TapOffset: {}" , tapOffset );
914+ try {
915+ flushOffsetCallback .accept (tapOffset );
916+ } catch (Exception e ) {
917+ taplogger .warn ("Failed to flush offset callback: {}" , e .getMessage (), e );
918+ }
919+ } else {
920+ taplogger .debug ("No valid TapOffset found for table {}, skipping callback" , tableName );
921+ }
922+ }
923+
873924 return respContent ;
874925 } catch (StarrocksRetryableException e ) {
875926 long flushEndTime = System .currentTimeMillis ();
876927 long flushDuration = flushEndTime - flushStartTime ;
877928 taplogger .warn ("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
878929 "flush_duration={} ms, error={}" ,
879930 tableName , formatBytes (tableDataSize ), waitTime , flushDuration , e .getMessage ());
880- cannotClean = true ;
931+
932+ // 失败:保留缓存文件(不删除),仅关闭文件流
933+ cleanupCacheFileForTable (tableName , false );
881934 throw e ;
882935 } catch (Exception e ) {
883936 long flushEndTime = System .currentTimeMillis ();
884937 long flushDuration = flushEndTime - flushStartTime ;
885938 taplogger .warn ("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
886939 "flush_duration={} ms, error={}" ,
887940 tableName , formatBytes (tableDataSize ), waitTime , flushDuration , e .getMessage ());
888- cannotClean = true ;
941+
942+ // 失败:保留缓存文件(不删除),仅关闭文件流
943+ cleanupCacheFileForTable (tableName , false );
889944 throw new StarrocksRuntimeException (e );
890945 } finally {
891946
@@ -957,17 +1012,16 @@ public void shutdown() {
9571012
9581013 // 清理所有Map,释放内存
9591014 cacheFileStreamsByTable .clear ();
960- if (!cannotClean ) {
961- // 清理所有表的缓存文件
962- cleanupAllCacheFiles ();
963-
964- tempCacheFilesByTable .clear ();
965- isFirstRecordByTable .clear ();
966- dataColumnsByTable .clear ();
967- pendingFlushTables .clear ();
968- currentBatchSizeByTable .clear ();
969- lastFlushTimeByTable .clear ();
970- }
1015+
1016+ // 清理所有表的缓存文件
1017+ cleanupAllCacheFiles ();
1018+
1019+ tempCacheFilesByTable .clear ();
1020+ isFirstRecordByTable .clear ();
1021+ dataColumnsByTable .clear ();
1022+ pendingFlushTables .clear ();
1023+ currentBatchSizeByTable .clear ();
1024+ lastFlushTimeByTable .clear ();
9711025 // 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存
9721026
9731027 // 强制垃圾回收
@@ -1066,13 +1120,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
10661120 cacheFileStream = new FileOutputStream (tempCacheFile .toFile (), true );
10671121 cacheFileStreamsByTable .put (tableName , cacheFileStream );
10681122 }
1069- if (!cannotClean ) {
1070- // 写入批次结束标记
1123+
1124+ // 检查文件是否已经有结束标记
1125+ boolean needsEndMarker = true ;
1126+ if (Files .exists (tempCacheFile ) && Files .size (tempCacheFile ) > 0 ) {
1127+ // 读取文件最后几个字节,检查是否已经有 ']'
1128+ byte [] lastBytes = new byte [10 ];
1129+ try (FileInputStream fis = new FileInputStream (tempCacheFile .toFile ())) {
1130+ long fileSize = Files .size (tempCacheFile );
1131+ long skipBytes = Math .max (0 , fileSize - 10 );
1132+ fis .skip (skipBytes );
1133+ int bytesRead = fis .read (lastBytes );
1134+ String lastContent = new String (lastBytes , 0 , bytesRead , java .nio .charset .StandardCharsets .UTF_8 );
1135+ needsEndMarker = !lastContent .trim ().endsWith ("]" );
1136+ }
1137+ }
1138+
1139+ // 只在需要时写入结束标记
1140+ if (needsEndMarker ) {
10711141 cacheFileStream .write (messageSerializer .batchEnd ());
1142+ taplogger .debug ("Added end marker ']' to cache file for table {}" , tableName );
1143+ } else {
1144+ taplogger .debug ("Cache file for table {} already has end marker, skipping" , tableName );
10721145 }
1146+
10731147 cacheFileStream .flush ();
10741148 cacheFileStream .close ();
10751149
1150+ // 验证文件完整性
1151+ verifyFileCompleteness (tableName , tempCacheFile );
1152+
10761153 taplogger .debug ("Finalized cache file for table {}: {}, size: {}" ,
10771154 tableName , tempCacheFile .toString (), formatBytes (Files .size (tempCacheFile )));
10781155 }
@@ -1082,10 +1159,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
10821159 }
10831160 }
10841161
1162+ private void verifyFileCompleteness (String tableName , Path tempCacheFile ) {
1163+ try {
1164+ if (Files .exists (tempCacheFile ) && Files .size (tempCacheFile ) > 0 ) {
1165+ byte [] lastBytes = new byte [100 ];
1166+ try (FileInputStream fis = new FileInputStream (tempCacheFile .toFile ())) {
1167+ long fileSize = Files .size (tempCacheFile );
1168+ long skipBytes = Math .max (0 , fileSize - 100 );
1169+ fis .skip (skipBytes );
1170+ int bytesRead = fis .read (lastBytes );
1171+ String lastContent = new String (lastBytes , 0 , bytesRead , java .nio .charset .StandardCharsets .UTF_8 );
1172+
1173+ if (lastContent .trim ().endsWith ("]" )) {
1174+ taplogger .info ("File verification passed for table {}: JSON is complete" , tableName );
1175+ } else {
1176+ taplogger .warn ("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}" , tableName , lastContent );
1177+ }
1178+ }
1179+ }
1180+ } catch (Exception e ) {
1181+ taplogger .warn ("Failed to verify file completeness for table {}: {}" , tableName , e .getMessage ());
1182+ }
1183+ }
1184+
10851185 /**
10861186 * 清理指定表的缓存文件
1187+ * @param tableName 表名
1188+ * @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件)
10871189 */
1088- private void cleanupCacheFileForTable (String tableName ) {
1190+ private void cleanupCacheFileForTable (String tableName , boolean deleteFile ) {
10891191 try {
10901192 FileOutputStream cacheFileStream = cacheFileStreamsByTable .get (tableName );
10911193 Path tempCacheFile = tempCacheFilesByTable .get (tableName );
@@ -1103,18 +1205,31 @@ private void cleanupCacheFileForTable(String tableName) {
11031205 try {
11041206 long fileSize = Files .size (tempCacheFile );
11051207
1106- // 直接删除缓存文件
1107- Files .deleteIfExists (tempCacheFile );
1108-
1109- taplogger .info ("=== File Cleanup Completed ===" );
1110- taplogger .info ("Table: {}" , tableName );
1111- taplogger .info ("Deleted File: {}" , tempCacheFile .toString ());
1112- taplogger .info ("File Size: {}" , formatBytes (fileSize ));
1113- taplogger .info ("==============================" );
1208+ if (deleteFile ) {
1209+ // 成功时删除文件
1210+ Files .deleteIfExists (tempCacheFile );
1211+ taplogger .info ("=== File Cleanup Completed ===" );
1212+ taplogger .info ("Table: {}" , tableName );
1213+ taplogger .info ("Deleted File: {}" , tempCacheFile .toString ());
1214+ taplogger .info ("File Size: {}" , formatBytes (fileSize ));
1215+ taplogger .info ("==============================" );
1216+ } else {
1217+ // 失败时保留文件
1218+ taplogger .warn ("=== File Preserved for Debugging ===" );
1219+ taplogger .warn ("Table: {}" , tableName );
1220+ taplogger .warn ("Preserved File: {}" , tempCacheFile .toString ());
1221+ taplogger .warn ("File Size: {}" , formatBytes (fileSize ));
1222+ taplogger .warn ("Reason: Flush failed, file kept for troubleshooting" );
1223+ taplogger .warn ("=====================================" );
1224+ }
11141225 } catch (IOException e ) {
1115- taplogger .warn ("Failed to delete cache file for table {}: {}" , tableName , e .getMessage ());
1226+ taplogger .warn ("Failed to process cache file for table {}: {}" , tableName , e .getMessage ());
1227+ }
1228+
1229+ // 只有在删除文件时才从 map 中移除
1230+ if (deleteFile ) {
1231+ tempCacheFilesByTable .remove (tableName );
11161232 }
1117- tempCacheFilesByTable .remove (tableName );
11181233 }
11191234
11201235 // 清理相关状态
@@ -1128,6 +1243,13 @@ private void cleanupCacheFileForTable(String tableName) {
11281243 }
11291244 }
11301245
1246+ /**
1247+ * 清理指定表的缓存文件(默认删除文件)
1248+ */
1249+ private void cleanupCacheFileForTable (String tableName ) {
1250+ cleanupCacheFileForTable (tableName , true );
1251+ }
1252+
11311253 /**
11321254 * 清理所有表的缓存文件
11331255 */
@@ -1328,4 +1450,13 @@ public WriteListResult<TapRecordEvent> createResultList() {
13281450 return result ;
13291451 }
13301452 }
1453+
1454+ /**
1455+ * 设置 flush offset callback
1456+ *
1457+ * @param flushOffsetCallback 回调函数,在数据成功刷新后调用
1458+ */
1459+ public void setFlushOffsetCallback (Consumer <Object > flushOffsetCallback ) {
1460+ this .flushOffsetCallback = flushOffsetCallback ;
1461+ }
13311462}
0 commit comments