Skip to content

Commit 131ace4

Browse files
committed
fix: Fixed the error of running the paimon: java.lang.IllegalThreadStateException
(cherry picked from commit 735d24b)
1 parent 874594c commit 131ace4

File tree

1 file changed

+87
-6
lines changed
  • connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service

1 file changed

+87
-6
lines changed

connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
import io.tapdata.pdk.apis.context.TapConnectorContext;
1717
import io.tapdata.pdk.apis.entity.WriteListResult;
1818
import org.apache.hadoop.conf.Configuration;
19-
import org.apache.paimon.catalog.Catalog;
20-
import org.apache.paimon.catalog.CatalogContext;
21-
import org.apache.paimon.catalog.CatalogFactory;
22-
import org.apache.paimon.catalog.Identifier;
19+
import org.apache.hadoop.fs.FileSystem;
20+
import org.apache.paimon.catalog.*;
2321
import org.apache.paimon.data.BinaryString;
2422
import org.apache.paimon.data.Decimal;
2523
import org.apache.paimon.data.GenericRow;
2624
import org.apache.paimon.data.Timestamp;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.hadoop.HadoopFileIO;
2727
import org.apache.paimon.options.Options;
2828
import org.apache.paimon.schema.Schema;
2929
import org.apache.paimon.table.Table;
@@ -34,6 +34,7 @@
3434
import org.apache.paimon.types.RowKind;
3535

3636
import java.io.Closeable;
37+
import java.lang.reflect.Field;
3738
import java.math.BigDecimal;
3839
import java.util.*;
3940
import java.util.concurrent.*;
@@ -255,6 +256,7 @@ private Configuration buildHadoopConfiguration() {
255256
// NoClassDefFoundError due to classloader/version conflicts.
256257
// Ensure S3A filesystem is used when scheme is s3a
257258
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
259+
conf.set("fs.s3a.impl.disable.cache", "true");
258260
conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A");
259261

260262
}
@@ -946,11 +948,30 @@ private void cleanupAllResources() {
946948
// Close old catalog if exists
947949
if (catalog != null) {
948950
try {
951+
if (catalog instanceof CachingCatalog) {
952+
CachingCatalog cachingCatalog = (CachingCatalog) catalog;
953+
Catalog wrapped = cachingCatalog.wrapped();
954+
if (wrapped instanceof FileSystemCatalog) {
955+
FileSystemCatalog fileSystemCatalog = (FileSystemCatalog) wrapped;
956+
FileIO fileIO = null;
957+
try {
958+
fileIO = fileSystemCatalog.fileIO();
959+
} catch (Throwable ignore) {
960+
// Ignore fileIO lookup errors
961+
}
962+
963+
// Best-effort close: proactively close FileSystem instances cached by HadoopFileIO
964+
closeHadoopFileIOCachedFileSystems(fileIO);
965+
closeQuietly(fileIO);
966+
}
967+
}
968+
949969
catalog.close();
950-
} catch (Exception e) {
970+
} catch (Throwable e) {
951971
// Ignore close errors
972+
} finally {
973+
catalog = null;
952974
}
953-
catalog = null;
954975
}
955976

956977
// Wait a bit to ensure all internal threads are cleaned up
@@ -962,6 +983,66 @@ private void cleanupAllResources() {
962983
}
963984
}
964985

986+
private void closeQuietly(Closeable closeable) {
987+
if (closeable == null) {
988+
return;
989+
}
990+
try {
991+
closeable.close();
992+
} catch (Exception ignore) {
993+
// Ignore close errors
994+
}
995+
}
996+
997+
/**
998+
* Best-effort close for cached Hadoop FileSystem instances inside Paimon HadoopFileIO.
999+
* <p>
1000+
* HadoopFileIO may cache FileSystem instances (e.g., in a field named "fsMap"). Even if
1001+
* Hadoop global FileSystem cache is disabled, this internal cache can still keep an S3A
1002+
* FileSystem whose thread factory captured a Task ThreadGroup that will be destroyed later.
1003+
*/
1004+
private void closeHadoopFileIOCachedFileSystems(Object fileIO) {
1005+
if (!(fileIO instanceof HadoopFileIO)) {
1006+
return;
1007+
}
1008+
1009+
try {
1010+
Field fsMapField = fileIO.getClass().getDeclaredField("fsMap");
1011+
fsMapField.setAccessible(true);
1012+
Object fsMapObject = fsMapField.get(fileIO);
1013+
if (!(fsMapObject instanceof Map)) {
1014+
return;
1015+
}
1016+
1017+
Map<?, ?> fsMap = (Map<?, ?>) fsMapObject;
1018+
if (fsMap.isEmpty()) {
1019+
return;
1020+
}
1021+
1022+
// Copy values first to avoid ConcurrentModificationException in case close triggers internal updates.
1023+
List<Object> fileSystems = new ArrayList<>(fsMap.values());
1024+
for (Object fs : fileSystems) {
1025+
if (fs instanceof FileSystem) {
1026+
try {
1027+
((FileSystem) fs).close();
1028+
} catch (Exception ignore) {
1029+
// Ignore close errors
1030+
}
1031+
}
1032+
}
1033+
1034+
try {
1035+
fsMap.clear();
1036+
} catch (Exception ignore) {
1037+
// Ignore clear errors
1038+
}
1039+
} catch (NoSuchFieldException ignore) {
1040+
// HadoopFileIO implementation differs; ignore.
1041+
} catch (Throwable ignore) {
1042+
// Best-effort only
1043+
}
1044+
}
1045+
9651046
/**
9661047
* Check if the exception is caused by ThreadGroup being destroyed.
9671048
* This typically happens when the classloader that created Paimon's thread factory

0 commit comments

Comments
 (0)