11package io .tapdata .connector .paimon .service ;
22
3+ import com .google .gson .Gson ;
4+ import com .google .gson .GsonBuilder ;
35import io .tapdata .connector .paimon .config .PaimonConfig ;
46import io .tapdata .entity .event .dml .TapDeleteRecordEvent ;
57import io .tapdata .entity .event .dml .TapInsertRecordEvent ;
@@ -474,6 +476,7 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
474476
475477 // Build schema
476478 Schema .Builder schemaBuilder = Schema .newBuilder ();
479+ Map <String , Object > schemaBuilderVariableMap = new HashMap <>();
477480
478481 // Add fields
479482 Map <String , TapField > fields = tapTable .getNameFieldMap ();
@@ -497,19 +500,23 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
497500 // Dynamic bucket mode: set bucket to -1
498501 // This mode provides better flexibility
499502 schemaBuilder .option ("bucket" , "-1" );
503+ schemaBuilderVariableMap .put ("bucket" , -1 );
500504 } else {
501505 // Fixed bucket mode: set specific bucket count
502506 Integer bucketCount = config .getBucketCount ();
503507 if (bucketCount == null || bucketCount <= 0 ) {
504508 bucketCount = 4 ; // Default to 4 buckets if not configured
505509 }
506510 schemaBuilder .option ("bucket" , String .valueOf (bucketCount ));
511+ schemaBuilderVariableMap .put ("bucket" , String .valueOf (bucketCount ));
507512 }
508513 if (EmptyKit .isNotBlank (config .getFileFormat ())) {
509514 schemaBuilder .option ("file.format" , config .getFileFormat ());
510- }
515+ schemaBuilderVariableMap .put ("file.format" , config .getFileFormat ());
516+ }
511517 if (EmptyKit .isNotBlank (config .getCompression ())) {
512518 schemaBuilder .option ("compression" , config .getCompression ());
519+ schemaBuilderVariableMap .put ("compression" , config .getCompression ());
513520 }
514521
515522 // ===== Performance Optimization Options =====
@@ -518,55 +525,74 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
518525 // Larger buffer = better performance but more memory usage
519526 if (config .getWriteBufferSize () != null && config .getWriteBufferSize () > 0 ) {
520527 schemaBuilder .option ("write-buffer-size" , config .getWriteBufferSize () + "mb" );
528+ schemaBuilderVariableMap .put ("write-buffer-size" , config .getWriteBufferSize () + "mb" );
521529 }
522530
523531 // 2. Target file size - Paimon will try to create files of this size
524532 // Larger files = fewer files but slower compaction
525533 if (config .getTargetFileSize () != null && config .getTargetFileSize () > 0 ) {
526534 schemaBuilder .option ("target-file-size" , config .getTargetFileSize () + "mb" );
535+ schemaBuilderVariableMap .put ("target-file-size" , config .getTargetFileSize () + "mb" );
527536 }
528537
529538 // 3. Compaction settings
530539 if (config .getEnableAutoCompaction () != null ) {
531540 if (config .getEnableAutoCompaction ()) {
532541 // Enable full compaction for better query performance
533- schemaBuilder .option ("compaction.optimization-interval" ,
534- config .getCompactionIntervalMinutes () + "min" );
542+ schemaBuilder .option ("compaction.async.enabled" , "true" );
543+ schemaBuilder .option ("compaction.optimization-interval" , config .getCompactionIntervalMinutes () + "min" );
544+ schemaBuilderVariableMap .put ("compaction.async.enabled" , "true" );
545+ schemaBuilderVariableMap .put ("compaction.optimization-interval" , config .getCompactionIntervalMinutes () + "min" );
535546
536547 // Set compaction strategy
537- schemaBuilder .option ("changelog-producer" , "full-compaction" );
548+ schemaBuilder .option ("changelog-producer" , "input" );
549+ schemaBuilderVariableMap .put ("changelog-producer" , "input" );
538550
539551 // Compact small files more aggressively
540552 schemaBuilder .option ("num-sorted-run.compaction-trigger" , "3" );
553+ schemaBuilderVariableMap .put ("num-sorted-run.compaction-trigger" , "3" );
541554 schemaBuilder .option ("num-sorted-run.stop-trigger" , "5" );
555+ schemaBuilderVariableMap .put ("num-sorted-run.stop-trigger" , "5" );
542556 } else {
543557 // Disable auto compaction
544558 schemaBuilder .option ("compaction.optimization-interval" , "0" );
559+ schemaBuilderVariableMap .put ("compaction.optimization-interval" , "0" );
545560 }
546561 }
547562
548563 // 4. Snapshot settings for better performance
549564 // Keep more snapshots in memory for faster access
550- schemaBuilder .option ("snapshot.num-retained.min" , "10" );
551- schemaBuilder .option ("snapshot.num-retained.max" , "100" );
552- schemaBuilder .option ("snapshot.time-retained" , "1h" );
565+ schemaBuilder .option ("snapshot.num-retained.min" , "5" );
566+ schemaBuilder .option ("snapshot.num-retained.max" , "50" );
567+ schemaBuilder .option ("snapshot.time-retained" , "30min" );
568+ schemaBuilderVariableMap .put ("snapshot.num-retained.min" , "5" );
569+ schemaBuilderVariableMap .put ("snapshot.num-retained.max" , "50" );
570+ schemaBuilderVariableMap .put ("snapshot.time-retained" , "30min" );
553571
554572 // 5. Commit settings
555573 // Force compact on commit for better read performance
556574 schemaBuilder .option ("commit.force-compact" , "false" ); // Don't force compact on every commit
575+ schemaBuilderVariableMap .put ("commit.force-compact" , "false" );
557576
558577 // 6. Scan settings for better read performance
559578 schemaBuilder .option ("scan.plan-sort-partition" , "true" );
579+ schemaBuilderVariableMap .put ("scan.plan-sort-partition" , "true" );
560580
561581 // 7. Changelog settings for CDC scenarios
562582 schemaBuilder .option ("changelog-producer.lookup-wait" , "false" ); // Don't wait for lookup
583+ schemaBuilderVariableMap .put ("changelog-producer.lookup-wait" , "false" );
563584
564585 // 8. Memory settings
565586 schemaBuilder .option ("sink.parallelism" , String .valueOf (config .getWriteThreads ()));
587+ schemaBuilderVariableMap .put ("sink.parallelism" , String .valueOf (config .getWriteThreads ()));
566588
567589 // Create table
568590 catalog .createTable (identifier , schemaBuilder .build (), false );
569591
592+ // log schema builder variables
593+ Gson gson = new GsonBuilder ().setPrettyPrinting ().create ();
594+ log .info ("Created table {} with schema: {}" , identifier .getFullName (), gson .toJson (schemaBuilder .build ()));
595+
570596 return true ;
571597 }
572598
@@ -1071,8 +1097,12 @@ private void handleStreamInsert(TapInsertRecordEvent event, StreamTableWrite wri
10711097 String database = config .getDatabase ();
10721098 Identifier identifier = Identifier .create (database , table .getName ());
10731099 GenericRow row = convertToGenericRow (after , table , identifier );
1074- int bucket = selectBucketForDynamic (row , table );
1075- writer .write (row , bucket );
1100+ if (config .getBucketMode ().equals ("fixed" )) {
1101+ writer .write (row );
1102+ } else {
1103+ int bucket = selectBucketForDynamic (row , table );
1104+ writer .write (row , bucket );
1105+ }
10761106 }
10771107
10781108 /**
@@ -1088,8 +1118,12 @@ private void handleStreamUpdate(TapUpdateRecordEvent event, StreamTableWrite wri
10881118 String database = config .getDatabase ();
10891119 Identifier identifier = Identifier .create (database , table .getName ());
10901120 GenericRow row = convertToGenericRow (after , table , identifier );
1091- int bucket = selectBucketForDynamic (row , table );
1092- writer .write (row , bucket );
1121+ if (config .getBucketMode ().equals ("fixed" )) {
1122+ writer .write (row );
1123+ } else {
1124+ int bucket = selectBucketForDynamic (row , table );
1125+ writer .write (row , bucket );
1126+ }
10931127 }
10941128
10951129 /**
@@ -1107,8 +1141,12 @@ private void handleStreamDelete(TapDeleteRecordEvent event, StreamTableWrite wri
11071141 GenericRow row = convertToGenericRow (before , table , identifier );
11081142 // Set row kind to DELETE
11091143 row .setRowKind (org .apache .paimon .types .RowKind .DELETE );
1110- int bucket = selectBucketForDynamic (row , table );
1111- writer .write (row , bucket );
1144+ if (config .getBucketMode ().equals ("fixed" )) {
1145+ writer .write (row );
1146+ } else {
1147+ int bucket = selectBucketForDynamic (row , table );
1148+ writer .write (row , bucket );
1149+ }
11121150 }
11131151
11141152 /**
0 commit comments