Skip to content

Fix FileSystem closed after pr 23 #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/main/java/org/apache/hadoop/fs/FileSystemCleaner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

/**
* Manage FileSystemHolders that removed from PrestoFileSystemCache, and close them after PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME.
* To avoid Filesystem closed Exception caused by closing FileSystem in use.
*/
public class FileSystemCleaner
{
public static final Log log = LogFactory.getLog(FileSystemCleaner.class);
public static final String PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME = "presto.hdfs.expired.fs.delay.close.time";
public static final String PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL = "presto.hdfs.expired.fs.check.interval";
public static final long DEFAULT_PRESTO_HDFS_FS_CACHE_DELAY_CLOSE_TIME = 5 * 60 * 1000; // five minutes
public static final long DEFAULT_PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL = 60 * 1000; // one minute

private static final FileSystemCleaner manager = new FileSystemCleaner();
private static final List<PrestoFileSystemCache.FileSystemHolder> fileSystemHolderList = new LinkedList<>();

public FileSystemCleaner()
{
Thread fileSystemCleanerTask = new FileSystemCleanerTask();
fileSystemCleanerTask.setDaemon(true);
fileSystemCleanerTask.setName("FileSystemCleanerTask");
fileSystemCleanerTask.start();
}

public static FileSystemCleaner getInstance()
{
return manager;
}

public void addExpiredFileSystem(PrestoFileSystemCache.FileSystemHolder fileSystemHolder)
{
fileSystemHolder.setExpireTimestamp(System.currentTimeMillis());
fileSystemHolderList.add(fileSystemHolder);
}

private static class FileSystemCleanerTask
extends Thread
{
@Override
public void run()
{
while (true) {
for (PrestoFileSystemCache.FileSystemHolder holder : fileSystemHolderList) {
long delayCloseTime = holder.getFileSystem().getConf().getLong(PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME,
DEFAULT_PRESTO_HDFS_FS_CACHE_DELAY_CLOSE_TIME);
if (System.currentTimeMillis() - holder.getExpireTimestamp() >= delayCloseTime) {
try {
log.info(String.format("Closing expired FileSystem{expireTimestamp: %s, delayCloseTime: %s ms}",
holder.getExpireTimestamp(), delayCloseTime));
holder.getFileSystem().close();
fileSystemHolderList.remove(holder);
}
catch (IOException e) {
log.error("Close expired file system fail ", e);
}
}
else {
break;
}
}

try {
Thread.sleep(DEFAULT_PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL);
}
catch (InterruptedException e) {
log.error(e);
}
}
}
}
}
37 changes: 19 additions & 18 deletions src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un
// Kerberos re-login occurs, re-create the file system and cache it using
// the same key.
if (fileSystemRefresh(uri, conf, privateCredentials, fileSystemHolder)) {
map.remove(key);
FileSystemHolder expiredFSHolder = map.remove(key);
FileSystemCleaner.getInstance().addExpiredFileSystem(expiredFSHolder);
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
Expand All @@ -111,7 +112,9 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un
private boolean fileSystemRefresh(URI uri, Configuration conf, Set<?> privateCredentials, FileSystemHolder fileSystemHolder)
{
if (isHdfs(uri)) {
return !fileSystemHolder.getPrivateCredentials().equals(privateCredentials);
// privateCredentials size() will be more than fileSystemHolder.getPrivateCredentials(),
// but privateCredentials contains fileSystemHolder.getPrivateCredentials()
return !privateCredentials.containsAll(fileSystemHolder.getPrivateCredentials());
}
if ("gs".equals(uri.getScheme())) {
String existingGcsToken = fileSystemHolder.getFileSystem().getConf().get(PRESTO_GCS_OAUTH_ACCESS_TOKEN_KEY);
Expand Down Expand Up @@ -147,21 +150,7 @@ private FileSystem createFileSystem(URI uri, Configuration conf)
}
final FileSystem original = (FileSystem) ReflectionUtils.newInstance(clazz, conf);
original.initialize(uri, conf);
FileSystem wrapper = createPrestoFileSystemWrapper(original);
FinalizerService.getInstance().addFinalizer(wrapper, new Runnable()
{
@Override
public void run()
{
try {
original.close();
}
catch (IOException e) {
log.error("Error occurred when finalizing file system", e);
}
}
});
return wrapper;
return createPrestoFileSystemWrapper(original);
}

protected FileSystem createPrestoFileSystemWrapper(FileSystem original)
Expand Down Expand Up @@ -305,10 +294,11 @@ public String toString()
}
}

private static class FileSystemHolder
protected static class FileSystemHolder
{
private final FileSystem fileSystem;
private final Set<?> privateCredentials;
private long expireTimestamp; // timestamp of remove from cache

public FileSystemHolder(FileSystem fileSystem, Set<?> privateCredentials)
{
Expand All @@ -332,7 +322,18 @@ public String toString()
return toStringHelper(this)
.add("fileSystem", fileSystem)
.add("privateCredentials", privateCredentials)
.add("expireTimestamp", expireTimestamp)
.toString();
}

public void setExpireTimestamp(long expireTimestamp)
{
this.expireTimestamp = expireTimestamp;
}

public long getExpireTimestamp()
{
return expireTimestamp;
}
}
}