Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ public class CompressionUtils {

private static final Logger LOG = LoggerFactory.getLogger(CompressionUtils.class);

/**
* Checks if the given file name has a supported compressed file extension.
*
* @param fileName the file name to check
* @return true if the file is a compressed file (zip, jar, tar, tar.gz, tgz)
*/
public static boolean isCompressedFile(String fileName) {
String lowerCaseFileName = fileName.toLowerCase();
return lowerCaseFileName.endsWith(".zip")
|| lowerCaseFileName.endsWith(".jar")
|| lowerCaseFileName.endsWith(".tar")
|| lowerCaseFileName.endsWith(".tar.gz")
|| lowerCaseFileName.endsWith(".tgz");
}

/**
* Gets the base name of the file without the extension. Handles compound extensions like
* .tar.gz.
*
* @param fileName the file name to process
* @return the base name without extension
*/
public static String getBaseNameWithoutExtension(String fileName) {
// .tar.gz is a compound extension and needs special handling to remove the entire suffix
if (fileName.toLowerCase().endsWith(".tar.gz")) {
return fileName.substring(0, fileName.length() - ".tar.gz".length());
}
// For other formats (.zip, .jar, .tar, .tgz, etc.), use lastIndexOf to handle uniformly
int lastDotIndex = fileName.lastIndexOf('.');
if (lastDotIndex > 0) {
return fileName.substring(0, lastDotIndex);
}
return fileName;
}

public static void extractFile(
String srcFilePath, String targetDirPath, String originalFileName) throws IOException {
if (hasOneOfSuffixes(originalFileName, ".zip", ".jar")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,15 @@ private static void addToPythonPath(PythonEnvironment env, List<Path> pythonFile
// add the parent directory of .py file itself to PYTHONPATH
pythonPathList.add(targetPath.getParent().toString());
} else if (Files.isRegularFile(Paths.get(targetPath.toString()).toRealPath())
&& sourceFileName.endsWith(".zip")) {
// expand the zip file and add the root directory to PYTHONPATH
// as not all zip files are importable
&& CompressionUtils.isCompressedFile(sourceFileName)) {
// expand the compressed file and add the root directory to PYTHONPATH
// as not all compressed files are importable
Path targetDirectory =
new Path(
targetPath.getParent(),
sourceFileName.substring(0, sourceFileName.lastIndexOf(".")));
FileUtils.expandDirectory(targetPath, targetDirectory);
CompressionUtils.getBaseNameWithoutExtension(sourceFileName));
CompressionUtils.extractFile(
targetPath.toString(), targetDirectory.toString(), sourceFileName);
pythonPathList.add(targetDirectory.toString());
} else {
pythonPathList.add(targetPath.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,19 @@ private void constructFilesDirectory(Map<String, String> env, String baseDirecto
// If the python file is file with suffix .py, add the parent directory to
// PYTHONPATH.
pythonPath = String.join(File.separator, filesDirectory, distributedCacheFileName);
} else if (pythonFile.isFile() && originFileName.endsWith(".zip")) {
// Expand the zip file and add the root directory to PYTHONPATH
// as not all zip files are importable
} else if (pythonFile.isFile() && CompressionUtils.isCompressedFile(originFileName)) {
// Expand the compressed file and add the root directory to PYTHONPATH
// as not all compressed files are importable
org.apache.flink.core.fs.Path targetDirectory =
new org.apache.flink.core.fs.Path(
filesDirectory,
String.join(
File.separator,
distributedCacheFileName,
originFileName.substring(
0, originFileName.lastIndexOf("."))));
FileUtils.expandDirectory(
new org.apache.flink.core.fs.Path(pythonFile.getAbsolutePath()),
targetDirectory);
CompressionUtils.getBaseNameWithoutExtension(
originFileName)));
CompressionUtils.extractFile(
pythonFile.getAbsolutePath(), targetDirectory.toString(), originFileName);
pythonPath = targetDirectory.toString();
} else {
pythonPath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,50 @@ private int toUnixMode(Set<PosixFilePermission> permission) {
}
return mode;
}

@Test
void testIsCompressedFile() {
// Supported extensions (lowercase)
assertThat(CompressionUtils.isCompressedFile("a.zip")).isTrue();
assertThat(CompressionUtils.isCompressedFile("a.jar")).isTrue();
assertThat(CompressionUtils.isCompressedFile("a.tar")).isTrue();
assertThat(CompressionUtils.isCompressedFile("a.tar.gz")).isTrue();
assertThat(CompressionUtils.isCompressedFile("a.tgz")).isTrue();

// Case-insensitive
assertThat(CompressionUtils.isCompressedFile("A.ZIP")).isTrue();
assertThat(CompressionUtils.isCompressedFile("A.Tar.Gz")).isTrue();

// With path prefix
assertThat(CompressionUtils.isCompressedFile("/tmp/a.zip")).isTrue();

// Unsupported extensions / no extension
assertThat(CompressionUtils.isCompressedFile("a.txt")).isFalse();
assertThat(CompressionUtils.isCompressedFile("a.gz")).isFalse();
assertThat(CompressionUtils.isCompressedFile("a.rar")).isFalse();
assertThat(CompressionUtils.isCompressedFile("archive")).isFalse();
}

@Test
void testGetBaseNameWithoutExtension() {
// Common extensions
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.zip")).isEqualTo("a");
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.jar")).isEqualTo("a");
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar")).isEqualTo("a");
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tgz")).isEqualTo("a");

// .tar.gz compound extension (case-insensitive)
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar.gz")).isEqualTo("a");
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.TAR.GZ")).isEqualTo("a");

// Filenames with multiple dots
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.zip")).isEqualTo("x.y");
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.tar.gz")).isEqualTo("x.y");

// No extension / dot-prefixed hidden files
assertThat(CompressionUtils.getBaseNameWithoutExtension("archive")).isEqualTo("archive");
assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden")).isEqualTo(".hidden");
assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden.zip"))
.isEqualTo(".hidden");
}
}