|
24 | 24 | import java.util.HashMap;
|
25 | 25 | import java.util.List;
|
26 | 26 | import java.util.Map;
|
| 27 | +import java.util.concurrent.atomic.AtomicBoolean; |
27 | 28 |
|
28 | 29 | import static org.apache.hadoop.fs.CosNUtils.propagateBucketOptions;
|
29 | 30 |
|
@@ -54,6 +55,7 @@ public class CosFileSystem extends FileSystem {
|
54 | 55 | private boolean isPosixUseOFSRanger;
|
55 | 56 | private boolean isPosixImpl = false;
|
56 | 57 | private FileSystem actualImplFS = null;
|
| 58 | + private final AtomicBoolean closed = new AtomicBoolean(false); |
57 | 59 |
|
58 | 60 | private URI uri;
|
59 | 61 | private Path workingDir;
|
@@ -126,8 +128,9 @@ public void initialize(URI uri, Configuration originalConf) throws IOException {
|
126 | 128 | this.actualImplFS = getActualFileSystemByClassName(posixBucketFSImpl);
|
127 | 129 |
|
128 | 130 | // judge normal impl first, skip the class nodef error when only use normal bucket
|
| 131 | + // outside can use native store to tell whether is posix bucket, not need head bucket twice. can be used by flink cos. |
| 132 | + this.nativeStore.setPosixBucket(true); |
129 | 133 | if (this.actualImplFS instanceof CosNFileSystem) {
|
130 |
| - this.nativeStore.setPosixBucket(true); |
131 | 134 | ((CosNFileSystem) this.actualImplFS).withStore(this.nativeStore).withBucket(bucket)
|
132 | 135 | .withPosixBucket(isPosixFSStore).withRangerCredentialsClient(rangerCredentialsClient);
|
133 | 136 | } else if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
|
@@ -656,11 +659,19 @@ private void checkInitialized() throws IOException {
|
656 | 659 | @Override
|
657 | 660 | public void close() throws IOException {
|
658 | 661 | LOG.info("begin to close cos file system");
|
659 |
| - this.actualImplFS.close(); |
660 |
| - if (null != this.nativeStore && this.isDefaultNativeStore) { |
661 |
| - // close range client later, inner native store |
662 |
| - this.nativeStore.close(); |
| 662 | + if (this.closed.getAndSet(true)) { |
| 663 | + // already closed |
| 664 | + return; |
663 | 665 | }
|
664 | 666 | this.initialized = false;
|
| 667 | + try { |
| 668 | + super.close(); |
| 669 | + } finally { |
| 670 | + this.actualImplFS.close(); |
| 671 | + if (null != this.nativeStore && this.isDefaultNativeStore) { |
| 672 | + // close range client later, inner native store |
| 673 | + this.nativeStore.close(); |
| 674 | + } |
| 675 | + } |
665 | 676 | }
|
666 | 677 | }
|
0 commit comments