11package io .tapdata .connector .paimon .service ;
22
3+ import com .google .gson .Gson ;
4+ import com .google .gson .GsonBuilder ;
35import io .tapdata .connector .paimon .config .PaimonConfig ;
46import io .tapdata .entity .event .dml .TapDeleteRecordEvent ;
57import io .tapdata .entity .event .dml .TapInsertRecordEvent ;
2931import org .apache .paimon .types .DataField ;
3032import org .apache .paimon .types .DataType ;
3133import org .apache .paimon .types .DataTypes ;
34+ import org .apache .paimon .types .RowKind ;
3235
3336import java .io .Closeable ;
3437import java .math .BigDecimal ;
@@ -474,6 +477,7 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
474477
475478 // Build schema
476479 Schema .Builder schemaBuilder = Schema .newBuilder ();
480+ Map <String , Object > schemaBuilderVariableMap = new HashMap <>();
477481
478482 // Add fields
479483 Map <String , TapField > fields = tapTable .getNameFieldMap ();
@@ -497,19 +501,23 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
497501 // Dynamic bucket mode: set bucket to -1
498502 // This mode provides better flexibility
499503 schemaBuilder .option ("bucket" , "-1" );
504+ schemaBuilderVariableMap .put ("bucket" , -1 );
500505 } else {
501506 // Fixed bucket mode: set specific bucket count
502507 Integer bucketCount = config .getBucketCount ();
503508 if (bucketCount == null || bucketCount <= 0 ) {
504509 bucketCount = 4 ; // Default to 4 buckets if not configured
505510 }
506511 schemaBuilder .option ("bucket" , String .valueOf (bucketCount ));
512+ schemaBuilderVariableMap .put ("bucket" , String .valueOf (bucketCount ));
507513 }
508514 if (EmptyKit .isNotBlank (config .getFileFormat ())) {
509515 schemaBuilder .option ("file.format" , config .getFileFormat ());
516+ schemaBuilderVariableMap .put ("file.format" , config .getFileFormat ());
510517 }
511518 if (EmptyKit .isNotBlank (config .getCompression ())) {
512519 schemaBuilder .option ("compression" , config .getCompression ());
520+ schemaBuilderVariableMap .put ("compression" , config .getCompression ());
513521 }
514522
515523 // ===== Performance Optimization Options =====
@@ -518,55 +526,74 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
518526 // Larger buffer = better performance but more memory usage
519527 if (config .getWriteBufferSize () != null && config .getWriteBufferSize () > 0 ) {
520528 schemaBuilder .option ("write-buffer-size" , config .getWriteBufferSize () + "mb" );
529+ schemaBuilderVariableMap .put ("write-buffer-size" , config .getWriteBufferSize () + "mb" );
521530 }
522531
523532 // 2. Target file size - Paimon will try to create files of this size
524533 // Larger files = fewer files but slower compaction
525534 if (config .getTargetFileSize () != null && config .getTargetFileSize () > 0 ) {
526535 schemaBuilder .option ("target-file-size" , config .getTargetFileSize () + "mb" );
536+ schemaBuilderVariableMap .put ("target-file-size" , config .getTargetFileSize () + "mb" );
527537 }
528538
529539 // 3. Compaction settings
530540 if (config .getEnableAutoCompaction () != null ) {
531541 if (config .getEnableAutoCompaction ()) {
532542 // Enable full compaction for better query performance
533- schemaBuilder .option ("compaction.optimization-interval" ,
534- config .getCompactionIntervalMinutes () + "min" );
543+ schemaBuilder .option ("compaction.async.enabled" , "true" );
544+ schemaBuilder .option ("compaction.optimization-interval" , config .getCompactionIntervalMinutes () + "min" );
545+ schemaBuilderVariableMap .put ("compaction.async.enabled" , "true" );
546+ schemaBuilderVariableMap .put ("compaction.optimization-interval" , config .getCompactionIntervalMinutes () + "min" );
535547
536548 // Set compaction strategy
537- schemaBuilder .option ("changelog-producer" , "full-compaction" );
549+ schemaBuilder .option ("changelog-producer" , "input" );
550+ schemaBuilderVariableMap .put ("changelog-producer" , "input" );
538551
539552 // Compact small files more aggressively
540553 schemaBuilder .option ("num-sorted-run.compaction-trigger" , "3" );
554+ schemaBuilderVariableMap .put ("num-sorted-run.compaction-trigger" , "3" );
541555 schemaBuilder .option ("num-sorted-run.stop-trigger" , "5" );
556+ schemaBuilderVariableMap .put ("num-sorted-run.stop-trigger" , "5" );
542557 } else {
543558 // Disable auto compaction
544559 schemaBuilder .option ("compaction.optimization-interval" , "0" );
560+ schemaBuilderVariableMap .put ("compaction.optimization-interval" , "0" );
545561 }
546562 }
547563
548564 // 4. Snapshot settings for better performance
549565 // Keep more snapshots in memory for faster access
550- schemaBuilder .option ("snapshot.num-retained.min" , "10" );
551- schemaBuilder .option ("snapshot.num-retained.max" , "100" );
552- schemaBuilder .option ("snapshot.time-retained" , "1h" );
566+ schemaBuilder .option ("snapshot.num-retained.min" , "5" );
567+ schemaBuilder .option ("snapshot.num-retained.max" , "50" );
568+ schemaBuilder .option ("snapshot.time-retained" , "30min" );
569+ schemaBuilderVariableMap .put ("snapshot.num-retained.min" , "5" );
570+ schemaBuilderVariableMap .put ("snapshot.num-retained.max" , "50" );
571+ schemaBuilderVariableMap .put ("snapshot.time-retained" , "30min" );
553572
554573 // 5. Commit settings
555574 // Force compact on commit for better read performance
556575 schemaBuilder .option ("commit.force-compact" , "false" ); // Don't force compact on every commit
576+ schemaBuilderVariableMap .put ("commit.force-compact" , "false" );
557577
558578 // 6. Scan settings for better read performance
559579 schemaBuilder .option ("scan.plan-sort-partition" , "true" );
580+ schemaBuilderVariableMap .put ("scan.plan-sort-partition" , "true" );
560581
561582 // 7. Changelog settings for CDC scenarios
562583 schemaBuilder .option ("changelog-producer.lookup-wait" , "false" ); // Don't wait for lookup
584+ schemaBuilderVariableMap .put ("changelog-producer.lookup-wait" , "false" );
563585
564586 // 8. Memory settings
565587 schemaBuilder .option ("sink.parallelism" , String .valueOf (config .getWriteThreads ()));
588+ schemaBuilderVariableMap .put ("sink.parallelism" , String .valueOf (config .getWriteThreads ()));
566589
567590 // Create table
568591 catalog .createTable (identifier , schemaBuilder .build (), false );
569592
593+ // log schema builder variables
594+ Gson gson = new GsonBuilder ().setPrettyPrinting ().create ();
595+ log .info ("Created table {} with schema: {}" , identifier .getFullName (), gson .toJson (schemaBuilder .build ()));
596+
570597 return true ;
571598 }
572599
@@ -1071,25 +1098,138 @@ private void handleStreamInsert(TapInsertRecordEvent event, StreamTableWrite wri
10711098 String database = config .getDatabase ();
10721099 Identifier identifier = Identifier .create (database , table .getName ());
10731100 GenericRow row = convertToGenericRow (after , table , identifier );
1074- int bucket = selectBucketForDynamic (row , table );
1075- writer .write (row , bucket );
1101+ if (config .getBucketMode ().equals ("fixed" )) {
1102+ writer .write (row );
1103+ } else {
1104+ int bucket = selectBucketForDynamic (row , table );
1105+ writer .write (row , bucket );
1106+ }
10761107 }
10771108
10781109 /**
10791110 * Handle update event with stream writer
1111+ * Uses RowKind.UPDATE_BEFORE (U-) and RowKind.UPDATE_AFTER (U+) to implement update
10801112 *
10811113 * @param event update event
10821114 * @param writer stream writer
10831115 * @param table table definition
10841116 * @throws Exception if update fails
10851117 */
10861118 private void handleStreamUpdate (TapUpdateRecordEvent event , StreamTableWrite writer , TapTable table ) throws Exception {
1087- Map <String , Object > after = event .getAfter ();
10881119 String database = config .getDatabase ();
10891120 Identifier identifier = Identifier .create (database , table .getName ());
1090- GenericRow row = convertToGenericRow (after , table , identifier );
1091- int bucket = selectBucketForDynamic (row , table );
1092- writer .write (row , bucket );
1121+
1122+ Map <String , Object > before = event .getBefore ();
1123+ Map <String , Object > after = event .getAfter ();
1124+
1125+ // Convert before and after data to GenericRow first to avoid duplicate conversion
1126+ GenericRow beforeRow = null ;
1127+ if (before != null && !before .isEmpty ()) {
1128+ beforeRow = convertToGenericRow (before , table , identifier );
1129+ }
1130+ GenericRow afterRow = convertToGenericRow (after , table , identifier );
1131+
1132+ // Check if primary key update detection is enabled
1133+ Boolean enablePkUpdate = config .getEnablePrimaryKeyUpdate ();
1134+ if (enablePkUpdate != null && enablePkUpdate ) {
1135+ // Validate that before data is available when primary key update detection is enabled
1136+ if (beforeRow == null ) {
1137+ throw new RuntimeException ("Primary key update detection is enabled but before data is not available. " +
1138+ "Please ensure the source database can provide before-update data or disable this feature." );
1139+ }
1140+
1141+ // Check if primary key has changed
1142+ if (isPrimaryKeyChanged (beforeRow , afterRow , table )) {
1143+ // Convert update to delete + insert
1144+ // First, write DELETE using before data
1145+ beforeRow .setRowKind (RowKind .DELETE );
1146+ if (config .getBucketMode ().equals ("fixed" )) {
1147+ writer .write (beforeRow );
1148+ } else {
1149+ int bucket = selectBucketForDynamic (beforeRow , table );
1150+ writer .write (beforeRow , bucket );
1151+ }
1152+
1153+ // Then, write INSERT using after data
1154+ afterRow .setRowKind (RowKind .INSERT );
1155+ if (config .getBucketMode ().equals ("fixed" )) {
1156+ writer .write (afterRow );
1157+ } else {
1158+ int bucket = selectBucketForDynamic (afterRow , table );
1159+ writer .write (afterRow , bucket );
1160+ }
1161+ return ;
1162+ }
1163+ }
1164+
1165+ // Normal update logic: Write U- (UPDATE_BEFORE) if before data exists
1166+ if (beforeRow != null ) {
1167+ beforeRow .setRowKind (RowKind .UPDATE_BEFORE );
1168+ if (config .getBucketMode ().equals ("fixed" )) {
1169+ writer .write (beforeRow );
1170+ } else {
1171+ int bucket = selectBucketForDynamic (beforeRow , table );
1172+ writer .write (beforeRow , bucket );
1173+ }
1174+ }
1175+
1176+ // Write U+ (UPDATE_AFTER) using after data
1177+ afterRow .setRowKind (RowKind .UPDATE_AFTER );
1178+ if (config .getBucketMode ().equals ("fixed" )) {
1179+ writer .write (afterRow );
1180+ } else {
1181+ int bucket = selectBucketForDynamic (afterRow , table );
1182+ writer .write (afterRow , bucket );
1183+ }
1184+ }
1185+
1186+ /**
1187+ * Check if primary key values have changed between before and after GenericRow
1188+ * Uses converted GenericRow values to ensure consistent comparison
1189+ *
1190+ * @param beforeRow before GenericRow (must not be null)
1191+ * @param afterRow after GenericRow (must not be null)
1192+ * @param table table definition
1193+ * @return true if primary key has changed, false otherwise
1194+ */
1195+ private boolean isPrimaryKeyChanged (GenericRow beforeRow , GenericRow afterRow , TapTable table ) {
1196+ // Get primary key fields
1197+ Collection <String > primaryKeys = table .primaryKeys (true );
1198+ if (primaryKeys == null || primaryKeys .isEmpty ()) {
1199+ // No primary key defined, no change detection needed
1200+ return false ;
1201+ }
1202+
1203+ // Get field index mapping
1204+ Map <String , TapField > fields = table .getNameFieldMap ();
1205+ String cacheKey = table .getId ();
1206+ Map <String , Integer > indexMap = getFieldIndexMap (cacheKey , fields );
1207+
1208+ // Build concatenated string of primary key values from before and after
1209+ // Use same order for comparison
1210+ List <String > pkList = new ArrayList <>(primaryKeys );
1211+ StringBuilder beforePkStr = new StringBuilder ();
1212+ StringBuilder afterPkStr = new StringBuilder ();
1213+
1214+ for (String pkField : pkList ) {
1215+ Integer fieldIndex = indexMap .get (pkField );
1216+ if (fieldIndex == null || fieldIndex < 0 || fieldIndex >= beforeRow .getFieldCount ()) {
1217+ continue ;
1218+ }
1219+
1220+ Object beforeValue = beforeRow .getField (fieldIndex );
1221+ Object afterValue = afterRow .getField (fieldIndex );
1222+
1223+ // Convert to string for comparison
1224+ String beforeStr = beforeValue == null ? "NULL" : String .valueOf (beforeValue );
1225+ String afterStr = afterValue == null ? "NULL" : String .valueOf (afterValue );
1226+
1227+ beforePkStr .append (beforeStr ).append ("|" );
1228+ afterPkStr .append (afterStr ).append ("|" );
1229+ }
1230+
1231+ // Compare concatenated primary key strings
1232+ return !beforePkStr .toString ().contentEquals (afterPkStr );
10931233 }
10941234
10951235 /**
@@ -1106,9 +1246,13 @@ private void handleStreamDelete(TapDeleteRecordEvent event, StreamTableWrite wri
11061246 Identifier identifier = Identifier .create (database , table .getName ());
11071247 GenericRow row = convertToGenericRow (before , table , identifier );
11081248 // Set row kind to DELETE
1109- row .setRowKind (org .apache .paimon .types .RowKind .DELETE );
1110- int bucket = selectBucketForDynamic (row , table );
1111- writer .write (row , bucket );
1249+ row .setRowKind (RowKind .DELETE );
1250+ if (config .getBucketMode ().equals ("fixed" )) {
1251+ writer .write (row );
1252+ } else {
1253+ int bucket = selectBucketForDynamic (row , table );
1254+ writer .write (row , bucket );
1255+ }
11121256 }
11131257
11141258 /**
0 commit comments