Skip to content

Commit 74c63f8

Browse files
authored
[#6860] feat: add file viewer support for filesets (#7215)
### What changes were proposed in this pull request? Enables web pages to view the directories and files in a `Fileset`. ### Why are the changes needed? Fix: #6860 ### Does this PR introduce _any_ user-facing change? (Please list the user-facing changes introduced by your change, including 1. Change in user-facing APIs. 2. Addition or removal of property keys.) ### How was this patch tested? (Please test your changes, and provide instructions on how to test it: 1. If you add a feature or fix a bug, add a test to cover your changes. 2. If you fix a flaky test, repeat it for many times to prove it works.)
1 parent 711d276 commit 74c63f8

File tree

16 files changed

+874
-9
lines changed

16 files changed

+874
-9
lines changed

catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.io.FileNotFoundException;
4040
import java.io.IOException;
4141
import java.time.Instant;
42+
import java.util.Arrays;
4243
import java.util.Collections;
4344
import java.util.HashMap;
4445
import java.util.HashSet;
@@ -65,12 +66,14 @@
6566
import org.apache.gravitino.audit.CallerContext;
6667
import org.apache.gravitino.audit.FilesetAuditConstants;
6768
import org.apache.gravitino.audit.FilesetDataOperation;
69+
import org.apache.gravitino.catalog.FilesetFileOps;
6870
import org.apache.gravitino.catalog.ManagedSchemaOperations;
6971
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
7072
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
7173
import org.apache.gravitino.connector.CatalogInfo;
7274
import org.apache.gravitino.connector.CatalogOperations;
7375
import org.apache.gravitino.connector.HasPropertyMetadata;
76+
import org.apache.gravitino.dto.file.FileInfoDTO;
7477
import org.apache.gravitino.exceptions.AlreadyExistsException;
7578
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
7679
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
@@ -81,6 +84,7 @@
8184
import org.apache.gravitino.exceptions.NoSuchSchemaException;
8285
import org.apache.gravitino.exceptions.NonEmptySchemaException;
8386
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
87+
import org.apache.gravitino.file.FileInfo;
8488
import org.apache.gravitino.file.Fileset;
8589
import org.apache.gravitino.file.FilesetCatalog;
8690
import org.apache.gravitino.file.FilesetChange;
@@ -99,7 +103,7 @@
99103
import org.slf4j.LoggerFactory;
100104

101105
public class HadoopCatalogOperations extends ManagedSchemaOperations
102-
implements CatalogOperations, FilesetCatalog {
106+
implements CatalogOperations, FilesetCatalog, FilesetFileOps {
103107
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
104108
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
105109
private static final String SLASH = "/";
@@ -237,6 +241,46 @@ public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException {
237241
});
238242
}
239243

244+
@Override
245+
public FileInfo[] listFiles(NameIdentifier filesetIdent, String locationName, String subPath)
246+
throws NoSuchFilesetException, IOException {
247+
if (disableFSOps) {
248+
LOG.warn("Filesystem operations disabled, rejecting listFiles for {}", filesetIdent);
249+
throw new UnsupportedOperationException("Filesystem operations are disabled on this server");
250+
}
251+
252+
String actualPath = getFileLocation(filesetIdent, subPath, locationName);
253+
Path formalizedPath = formalizePath(new Path(actualPath), conf);
254+
255+
FileSystem fs = getFileSystem(formalizedPath, conf);
256+
if (!fs.exists(formalizedPath)) {
257+
throw new IllegalArgumentException(
258+
String.format(
259+
"Path %s does not exist in fileset %s", formalizedPath.toString(), filesetIdent));
260+
}
261+
262+
String catalogName = filesetIdent.namespace().level(1);
263+
String schemaName = filesetIdent.namespace().level(2);
264+
String filesetName = filesetIdent.name();
265+
266+
try {
267+
return Arrays.stream(fs.listStatus(formalizedPath))
268+
.map(
269+
status ->
270+
FileInfoDTO.builder()
271+
.name(status.getPath().getName())
272+
.isDir(status.isDirectory())
273+
.size(status.isDirectory() ? 0L : status.getLen())
274+
.lastModified(status.getModificationTime())
275+
.path(buildGVFSFilePath(catalogName, schemaName, filesetName, subPath))
276+
.build())
277+
.toArray(FileInfo[]::new);
278+
279+
} catch (IOException e) {
280+
throw new RuntimeException("Failed to list files in fileset" + filesetIdent, e);
281+
}
282+
}
283+
240284
@Override
241285
public Fileset createMultipleLocationFileset(
242286
NameIdentifier ident,
@@ -1049,6 +1093,10 @@ private String removeTrailingSlash(String path) {
10491093
return path.endsWith(SLASH) ? path.substring(0, path.length() - 1) : path;
10501094
}
10511095

1096+
private String ensureLeadingSlash(String path) {
1097+
return path.startsWith(SLASH) ? path : SLASH + path;
1098+
}
1099+
10521100
private FilesetEntity updateFilesetEntity(
10531101
NameIdentifier ident, FilesetEntity filesetEntity, FilesetChange... changes) {
10541102
Map<String, String> props =
@@ -1144,13 +1192,13 @@ private Map<String, Path> calculateFilesetPaths(
11441192
Path schemaPath = schemaPaths.get(locationName);
11451193
filesetPaths.put(
11461194
locationName,
1147-
caculateFilesetPath(
1195+
calculateFilesetPath(
11481196
schemaName, filesetName, storageLocation, schemaPath, properties));
11491197
});
11501198
return filesetPaths.build();
11511199
}
11521200

1153-
private Path caculateFilesetPath(
1201+
private Path calculateFilesetPath(
11541202
String schemaName,
11551203
String filesetName,
11561204
String storageLocation,
@@ -1248,6 +1296,14 @@ private boolean checkSingleFile(Fileset fileset, String locationName) {
12481296
}
12491297
}
12501298

1299+
private String buildGVFSFilePath(
1300+
String catalogName, String schemaName, String filesetName, String subPath) {
1301+
String prefix = String.join(SLASH, "/fileset", catalogName, schemaName, filesetName);
1302+
return StringUtils.isBlank(subPath)
1303+
? prefix
1304+
: prefix + ensureLeadingSlash(removeTrailingSlash(subPath));
1305+
}
1306+
12511307
FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
12521308
if (path == null) {
12531309
throw new IllegalArgumentException("Path should not be null");

catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.gravitino.SchemaChange;
4242
import org.apache.gravitino.UserPrincipal;
4343
import org.apache.gravitino.audit.CallerContext;
44+
import org.apache.gravitino.catalog.FilesetFileOps;
4445
import org.apache.gravitino.catalog.hadoop.authentication.UserContext;
4546
import org.apache.gravitino.connector.CatalogInfo;
4647
import org.apache.gravitino.connector.CatalogOperations;
@@ -58,6 +59,7 @@
5859
import org.apache.gravitino.exceptions.NoSuchSchemaException;
5960
import org.apache.gravitino.exceptions.NonEmptySchemaException;
6061
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
62+
import org.apache.gravitino.file.FileInfo;
6163
import org.apache.gravitino.file.Fileset;
6264
import org.apache.gravitino.file.FilesetCatalog;
6365
import org.apache.gravitino.file.FilesetChange;
@@ -70,7 +72,11 @@
7072

7173
@SuppressWarnings("removal")
7274
public class SecureHadoopCatalogOperations
73-
implements CatalogOperations, SupportsSchemas, FilesetCatalog, SupportsPathBasedCredentials {
75+
implements CatalogOperations,
76+
SupportsSchemas,
77+
FilesetCatalog,
78+
FilesetFileOps,
79+
SupportsPathBasedCredentials {
7480

7581
public static final Logger LOG = LoggerFactory.getLogger(SecureHadoopCatalogOperations.class);
7682

@@ -232,6 +238,12 @@ public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaExc
232238
return hadoopCatalogOperations.listFilesets(namespace);
233239
}
234240

241+
@Override
242+
public FileInfo[] listFiles(NameIdentifier ident, String locationName, String subPath)
243+
throws NoSuchFilesetException, IOException {
244+
return hadoopCatalogOperations.listFiles(ident, locationName, subPath);
245+
}
246+
235247
@Override
236248
public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException {
237249
return hadoopCatalogOperations.loadFileset(ident);

catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.gravitino.exceptions.NoSuchSchemaException;
8080
import org.apache.gravitino.exceptions.NonEmptySchemaException;
8181
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
82+
import org.apache.gravitino.file.FileInfo;
8283
import org.apache.gravitino.file.Fileset;
8384
import org.apache.gravitino.file.FilesetChange;
8485
import org.apache.gravitino.storage.IdGenerator;
@@ -241,7 +242,7 @@ public static void setUp() {
241242
.getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
242243
Mockito.anyString(), Mockito.anyString(), Mockito.eq("schema11"));
243244

244-
for (int i = 10; i < 30; i++) {
245+
for (int i = 10; i < 33; i++) {
245246
doReturn(new SchemaIds(1L, 1L, (long) i))
246247
.when(spySchemaMetaService)
247248
.getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
@@ -899,6 +900,110 @@ public void testListFilesets() throws IOException {
899900
}
900901
}
901902

903+
@Test
904+
public void testListFilesetFiles() throws IOException {
905+
String schemaName = "schema30";
906+
String comment = "comment30";
907+
String filesetName = "fileset30";
908+
String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
909+
910+
createSchema(schemaName, comment, null, schemaPath);
911+
createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, null, null);
912+
913+
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
914+
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
915+
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName);
916+
917+
Path testDir = new Path(schemaPath + "/" + filesetName);
918+
FileSystem fs = testDir.getFileSystem(new Configuration());
919+
fs.mkdirs(testDir);
920+
fs.create(new Path(testDir, "test_file1.txt")).close();
921+
fs.create(new Path(testDir, "test_file2.txt")).close();
922+
fs.mkdirs(new Path(testDir, "test_subdir"));
923+
924+
FileInfo[] files = ops.listFiles(filesetIdent, null, "/");
925+
926+
Assertions.assertNotNull(files);
927+
Assertions.assertTrue(files.length >= 3);
928+
929+
Set<String> fileNames = Arrays.stream(files).map(FileInfo::name).collect(Collectors.toSet());
930+
931+
Assertions.assertTrue(fileNames.contains("test_file1.txt"));
932+
Assertions.assertTrue(fileNames.contains("test_file2.txt"));
933+
Assertions.assertTrue(fileNames.contains("test_subdir"));
934+
935+
for (FileInfo file : files) {
936+
// verify file type related properties
937+
if (file.name().equals("test_file1.txt") || file.name().equals("test_file2.txt")) {
938+
Assertions.assertFalse(file.isDir(), "File should not be directory: " + file.name());
939+
Assertions.assertTrue(file.size() >= 0, "File size should be non-negative");
940+
} else if (file.name().equals("test_subdir")) {
941+
Assertions.assertTrue(file.isDir(), "Directory should be marked as directory");
942+
Assertions.assertEquals(0, file.size(), "Directory size should be 0");
943+
}
944+
// verify other properties
945+
Assertions.assertNotNull(file.name(), "File name should not be null");
946+
Assertions.assertNotNull(file.path(), "File path should not be null");
947+
Assertions.assertTrue(file.lastModified() > 0, "Last modified time should be positive");
948+
}
949+
}
950+
}
951+
952+
@Test
953+
public void testListFilesetFilesWithFSOpsDisabled() throws Exception {
954+
String schemaName = "schema31";
955+
String comment = "comment31";
956+
String filesetName = "fileset31";
957+
String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
958+
959+
createSchema(schemaName, comment, null, schemaPath);
960+
createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, null, null);
961+
962+
Map<String, String> catalogProps = Maps.newHashMap();
963+
catalogProps.put(DISABLE_FILESYSTEM_OPS, "true");
964+
965+
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
966+
ops.initialize(catalogProps, randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
967+
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName);
968+
969+
UnsupportedOperationException ex =
970+
Assertions.assertThrows(
971+
UnsupportedOperationException.class,
972+
() -> ops.listFiles(filesetIdent, null, "/"),
973+
"Expected listFiles to throw UnsupportedOperationException when disableFSOps is true");
974+
Assertions.assertTrue(
975+
ex.getMessage().contains("Filesystem operations are disabled on this server"),
976+
"Exception message should mention 'Filesystem operations are disabled on this server'");
977+
}
978+
}
979+
980+
@Test
981+
public void testListFilesetFilesWithNonExistentPath() throws IOException {
982+
String schemaName = "schema32";
983+
String comment = "comment32";
984+
String filesetName = "fileset32";
985+
String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
986+
987+
createSchema(schemaName, comment, null, schemaPath);
988+
createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, null, null);
989+
990+
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
991+
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
992+
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName);
993+
994+
String nonExistentSubPath = "/non_existent_file.txt";
995+
IllegalArgumentException ex =
996+
Assertions.assertThrows(
997+
IllegalArgumentException.class,
998+
() -> ops.listFiles(filesetIdent, null, nonExistentSubPath),
999+
"Listing a non-existent fileset directory should throw IllegalArgumentException");
1000+
1001+
Assertions.assertTrue(
1002+
ex.getMessage().contains("does not exist"),
1003+
"Exception message should mention that the path does not exist");
1004+
}
1005+
}
1006+
9021007
@ParameterizedTest
9031008
@MethodSource("testRenameArguments")
9041009
public void testRenameFileset(

0 commit comments

Comments
 (0)