1616import io .tapdata .pdk .apis .context .TapConnectorContext ;
1717import io .tapdata .pdk .apis .entity .WriteListResult ;
1818import org .apache .hadoop .conf .Configuration ;
19- import org .apache .paimon .catalog .Catalog ;
20- import org .apache .paimon .catalog .CatalogContext ;
21- import org .apache .paimon .catalog .CatalogFactory ;
22- import org .apache .paimon .catalog .Identifier ;
19+ import org .apache .hadoop .fs .FileSystem ;
20+ import org .apache .paimon .catalog .*;
2321import org .apache .paimon .data .BinaryString ;
2422import org .apache .paimon .data .Decimal ;
2523import org .apache .paimon .data .GenericRow ;
2624import org .apache .paimon .data .Timestamp ;
25+ import org .apache .paimon .fs .FileIO ;
26+ import org .apache .paimon .fs .hadoop .HadoopFileIO ;
2727import org .apache .paimon .options .Options ;
2828import org .apache .paimon .schema .Schema ;
2929import org .apache .paimon .table .Table ;
3434import org .apache .paimon .types .RowKind ;
3535
3636import java .io .Closeable ;
37+ import java .lang .reflect .Field ;
3738import java .math .BigDecimal ;
3839import java .util .*;
3940import java .util .concurrent .*;
@@ -255,6 +256,7 @@ private Configuration buildHadoopConfiguration() {
255256 // NoClassDefFoundError due to classloader/version conflicts.
256257 // Ensure S3A filesystem is used when scheme is s3a
257258 conf .set ("fs.s3a.impl" , "org.apache.hadoop.fs.s3a.S3AFileSystem" );
259+ conf .set ("fs.s3a.impl.disable.cache" , "true" );
258260 conf .set ("fs.AbstractFileSystem.s3a.impl" , "org.apache.hadoop.fs.s3a.S3A" );
259261
260262 }
@@ -946,11 +948,30 @@ private void cleanupAllResources() {
946948 // Close old catalog if exists
947949 if (catalog != null ) {
948950 try {
951+ if (catalog instanceof CachingCatalog ) {
952+ CachingCatalog cachingCatalog = (CachingCatalog ) catalog ;
953+ Catalog wrapped = cachingCatalog .wrapped ();
954+ if (wrapped instanceof FileSystemCatalog ) {
955+ FileSystemCatalog fileSystemCatalog = (FileSystemCatalog ) wrapped ;
956+ FileIO fileIO = null ;
957+ try {
958+ fileIO = fileSystemCatalog .fileIO ();
959+ } catch (Throwable ignore ) {
960+ // Ignore fileIO lookup errors
961+ }
962+
963+ // Best-effort close: proactively close FileSystem instances cached by HadoopFileIO
964+ closeHadoopFileIOCachedFileSystems (fileIO );
965+ closeQuietly (fileIO );
966+ }
967+ }
968+
949969 catalog .close ();
950- } catch (Exception e ) {
970+ } catch (Throwable e ) {
951971 // Ignore close errors
972+ } finally {
973+ catalog = null ;
952974 }
953- catalog = null ;
954975 }
955976
956977 // Wait a bit to ensure all internal threads are cleaned up
@@ -962,6 +983,66 @@ private void cleanupAllResources() {
962983 }
963984 }
964985
986+ private void closeQuietly (Closeable closeable ) {
987+ if (closeable == null ) {
988+ return ;
989+ }
990+ try {
991+ closeable .close ();
992+ } catch (Exception ignore ) {
993+ // Ignore close errors
994+ }
995+ }
996+
997+ /**
998+ * Best-effort close for cached Hadoop FileSystem instances inside Paimon HadoopFileIO.
999+ * <p>
1000+ * HadoopFileIO may cache FileSystem instances (e.g., in a field named "fsMap"). Even if
1001+ * Hadoop global FileSystem cache is disabled, this internal cache can still keep an S3A
1002+ * FileSystem whose thread factory captured a Task ThreadGroup that will be destroyed later.
1003+ */
1004+ private void closeHadoopFileIOCachedFileSystems (Object fileIO ) {
1005+ if (!(fileIO instanceof HadoopFileIO )) {
1006+ return ;
1007+ }
1008+
1009+ try {
1010+ Field fsMapField = fileIO .getClass ().getDeclaredField ("fsMap" );
1011+ fsMapField .setAccessible (true );
1012+ Object fsMapObject = fsMapField .get (fileIO );
1013+ if (!(fsMapObject instanceof Map )) {
1014+ return ;
1015+ }
1016+
1017+ Map <?, ?> fsMap = (Map <?, ?>) fsMapObject ;
1018+ if (fsMap .isEmpty ()) {
1019+ return ;
1020+ }
1021+
1022+ // Copy values first to avoid ConcurrentModificationException in case close triggers internal updates.
1023+ List <Object > fileSystems = new ArrayList <>(fsMap .values ());
1024+ for (Object fs : fileSystems ) {
1025+ if (fs instanceof FileSystem ) {
1026+ try {
1027+ ((FileSystem ) fs ).close ();
1028+ } catch (Exception ignore ) {
1029+ // Ignore close errors
1030+ }
1031+ }
1032+ }
1033+
1034+ try {
1035+ fsMap .clear ();
1036+ } catch (Exception ignore ) {
1037+ // Ignore clear errors
1038+ }
1039+ } catch (NoSuchFieldException ignore ) {
1040+ // HadoopFileIO implementation differs; ignore.
1041+ } catch (Throwable ignore ) {
1042+ // Best-effort only
1043+ }
1044+ }
1045+
9651046 /**
9661047 * Check if the exception is caused by ThreadGroup being destroyed.
9671048 * This typically happens when the classloader that created Paimon's thread factory
0 commit comments