Skip to content

Commit b560b94

Browse files
committed
print stream capabilities of input and output streams
...inc latest capabilities for iocontext and vectored io
1 parent 9111a36 commit b560b94

5 files changed

Lines changed: 154 additions & 40 deletions

File tree

src/main/java/org/apache/hadoop/fs/store/CapabilityChecker.java renamed to src/main/java/org/apache/hadoop/fs/store/PathCapabilityChecker.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,22 @@
2222
import java.lang.reflect.InvocationTargetException;
2323
import java.lang.reflect.Method;
2424

25-
import org.apache.hadoop.fs.FileSystem;
2625
import org.apache.hadoop.fs.Path;
2726
import org.apache.hadoop.util.VersionInfo;
2827

2928
import static org.apache.hadoop.fs.store.StoreExitCodes.E_EXCEPTION_THROWN;
3029
import static org.apache.hadoop.fs.store.StoreExitCodes.E_UNSUPPORTED_VERSION;
3130

32-
public class CapabilityChecker {
31+
public class PathCapabilityChecker {
3332

3433
private final Method hasPathCapability;
35-
private final FileSystem fileSystem;
34+
private final Object source;
3635

37-
public CapabilityChecker(FileSystem fileSystem) {
38-
this.fileSystem = fileSystem;
36+
public PathCapabilityChecker(Object source) {
37+
this.source = source;
3938
Method method;
4039
try {
41-
method = FileSystem.class.getMethod("hasPathCapability",
40+
method = source.getClass().getMethod("hasPathCapability",
4241
Path.class, String.class);
4342
} catch (NoSuchMethodException e) {
4443
method = null;
@@ -51,7 +50,7 @@ public boolean methodAvailable() {
5150
}
5251

5352
/**
54-
* Does a filesystem have a capability?
53+
* Does an object have a capability?
5554
* uses reflection so the jar can compile/run against
5655
* older hadoop releases.
5756
* throws StoreExitException(E_UNSUPPORTED_VERSION) if the api isn't found.
@@ -69,7 +68,7 @@ public boolean hasPathCapability(Path path, String capability)
6968
+ VersionInfo.getVersion());
7069
}
7170
try {
72-
return (Boolean) hasPathCapability.invoke(fileSystem, path, capability);
71+
return (Boolean) hasPathCapability.invoke(source, path, capability);
7372
} catch (IllegalAccessException e) {
7473
throw new StoreExitException(E_UNSUPPORTED_VERSION,
7574
"Hadoop version does not support PathCapabilities: "

src/main/java/org/apache/hadoop/fs/store/StoreDurationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public String toString() {
112112
public void close() {
113113
finished();
114114
if (log != null) {
115-
log.info(text + ": duration " + this);
115+
log.info("Duration of {}: {}", text, this);
116116
}
117117
}
118118
}

src/main/java/org/apache/hadoop/fs/store/commands/PathCapability.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.fs.FileSystem;
2828
import org.apache.hadoop.fs.Path;
29-
import org.apache.hadoop.fs.store.CapabilityChecker;
29+
import org.apache.hadoop.fs.store.PathCapabilityChecker;
3030
import org.apache.hadoop.fs.store.StoreEntryPoint;
3131
import org.apache.hadoop.util.ToolRunner;
3232

@@ -79,7 +79,7 @@ public int run(String[] args) throws Exception {
7979
FileSystem fs = path.getFileSystem(conf);
8080
println("Using filesystem %s", fs.getUri());
8181
Path absPath = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
82-
if (new CapabilityChecker(fs).
82+
if (new PathCapabilityChecker(fs).
8383
hasPathCapability(absPath, capability)) {
8484

8585
println("Path %s has capability %s",

src/main/java/org/apache/hadoop/fs/store/diag/CapabilityKeys.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.hadoop.fs.store.diag;
2020

21-
import org.apache.hadoop.classification.InterfaceStability;
21+
2222

2323
/**
2424
* Common path capabilities.
@@ -129,7 +129,6 @@ private CapabilityKeys() {
129129
/**
130130
* Probe for support for BatchListingOperations.
131131
*/
132-
@InterfaceStability.Unstable
133132
public static final String FS_EXPERIMENTAL_BATCH_LISTING =
134133
"fs.capability.batch.listing";
135134

@@ -179,11 +178,13 @@ private CapabilityKeys() {
179178
*/
180179
public static final String STORE_CAPABILITY_MAGIC_COMMITTER
181180
= "fs.s3a.capability.magic.committer";
181+
182182
/**
183183
* Does the FS Support S3 Select?
184184
* Value: {@value}.
185185
*/
186-
public static final String S3_SELECT_CAPABILITY = "fs.s3a.capability.select.sql";
186+
public static final String S3_SELECT_CAPABILITY =
187+
"fs.s3a.capability.select.sql";
187188

188189
/**
189190
* {@code PathCapabilities} probe to indicate that the filesystem
@@ -241,7 +242,8 @@ private CapabilityKeys() {
241242
* {code createFile()} builder.
242243
* Value {@value}.
243244
*/
244-
public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance";
245+
public static final String FS_S3A_CREATE_PERFORMANCE =
246+
"fs.s3a.create.performance";
245247

246248
/**
247249
* Prefix for adding a header to the object when created.
@@ -252,4 +254,91 @@ private CapabilityKeys() {
252254
*/
253255
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
254256

257+
258+
/**
259+
* Streams that support IOStatistics context and capture thread-level
260+
* IOStatistics.
261+
*/
262+
public static String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";
263+
264+
265+
/**
266+
* IOStatisticsSource API.
267+
*/
268+
public static String IOSTATISTICS = "iostatistics";
269+
270+
/**
271+
* Support for vectored IO api.
272+
* See {@code PositionedReadable#readVectored(List, IntFunction)}.
273+
*/
274+
public static String VECTOREDIO = "readvectored";
275+
276+
277+
/**
278+
* Stream setReadahead capability implemented by
279+
* {@code CanSetReadahead#setReadahead(Long)}.
280+
*/
281+
public static String READAHEAD = "in:readahead";
282+
283+
/**
284+
* Stream setDropBehind capability implemented by
285+
* {@code CanSetDropBehind#setDropBehind(Boolean)}.
286+
*/
287+
public static String DROPBEHIND = "dropbehind";
288+
289+
/**
290+
* Stream unbuffer capability implemented by {@code CanUnbuffer#unbuffer()}.
291+
*/
292+
public static String UNBUFFER = "in:unbuffer";
293+
294+
/**
295+
* Stream read(ByteBuffer) capability implemented by
296+
* {@code ByteBufferReadable#read(java.nio.ByteBuffer)}.
297+
*/
298+
public static String READBYTEBUFFER = "in:readbytebuffer";
299+
300+
/**
301+
* Stream read(long, ByteBuffer) capability implemented by
302+
* {@code ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
303+
*/
304+
public static String PREADBYTEBUFFER = "in:preadbytebuffer";
305+
306+
/**
307+
* Stream hflush capability implemented by {@code Syncable#hflush()}.
308+
*
309+
* Use the {@code #HSYNC} probe to check for the support of Syncable;
310+
* it's that presence of {@code hsync()} which matters.
311+
*/
312+
@Deprecated
313+
public static String HFLUSH = "hflush";
314+
315+
/**
316+
* Stream hsync capability implemented by {@code Syncable#hsync()}.
317+
*/
318+
public static String HSYNC = "hsync";
319+
320+
321+
/**
322+
* Set of input/output stream capabilities to scan for.
323+
*/
324+
public static String[] INPUTSTREAM_CAPABILITIES = {
325+
DROPBEHIND,
326+
IOSTATISTICS,
327+
IOSTATISTICS_CONTEXT,
328+
PREADBYTEBUFFER,
329+
READAHEAD,
330+
READBYTEBUFFER,
331+
UNBUFFER,
332+
VECTOREDIO,
333+
};
334+
/**
335+
* Set of input/output stream capabilities to scan for.
336+
*/
337+
public static String[] OUTPUTSTREAM_CAPABILITIES = {
338+
ABORTABLE_STREAM,
339+
HFLUSH,
340+
HSYNC,
341+
IOSTATISTICS,
342+
IOSTATISTICS_CONTEXT,
343+
};
255344
}

src/main/java/org/apache/hadoop/fs/store/diag/StoreDiag.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@
6262
import org.apache.hadoop.fs.LocatedFileStatus;
6363
import org.apache.hadoop.fs.Path;
6464
import org.apache.hadoop.fs.RemoteIterator;
65+
import org.apache.hadoop.fs.StreamCapabilities;
6566
import org.apache.hadoop.fs.UnsupportedFileSystemException;
66-
import org.apache.hadoop.fs.store.CapabilityChecker;
67+
import org.apache.hadoop.fs.store.PathCapabilityChecker;
6768
import org.apache.hadoop.fs.store.StoreDurationInfo;
6869
import org.apache.hadoop.fs.store.StoreExitException;
6970
import org.apache.hadoop.io.IOUtils;
@@ -705,7 +706,7 @@ public void executeFileSystemOperations(final Path baseDir,
705706

706707
final String[] pathCapabilites = storeInfo.getOptionalPathCapabilites();
707708
if (pathCapabilites.length > 0) {
708-
final CapabilityChecker checker = new CapabilityChecker(fs);
709+
final PathCapabilityChecker checker = new PathCapabilityChecker(fs);
709710
if (checker.methodAvailable()) {
710711
heading("Path Capabilities");
711712
for (String s : pathCapabilites) {
@@ -763,32 +764,17 @@ public void executeFileSystemOperations(final Path baseDir,
763764
println("Directory %s does not exist", baseDir);
764765
baseDirFound = false;
765766
}
767+
heading("Attempt to read a file");
766768

767769
if (firstFile != null) {
768770
// found a file to read
769-
Path firstFilePath = firstFile.getPath();
770-
heading("reading file %s", firstFilePath);
771-
FSDataInputStream in = null;
772-
try (StoreDurationInfo ignored = new StoreDurationInfo(LOG,
773-
"Reading file %s", firstFilePath)) {
774-
in = fs.open(firstFilePath);
775-
// read the first char or -1
776-
int c = in.read();
777-
println("First character of file %s is 0x%02x: '%s'",
778-
firstFilePath,
779-
c,
780-
(c > ' ') ? Character.toString((char) c) : "(n/a)");
781-
in.close();
782-
} catch(FileNotFoundException ex) {
783-
warn("file %s: not found/readable %s", firstFilePath, ex);
784-
} catch(AccessDeniedException ex) {
785-
warn("client lacks access to file %s: %s", firstFilePath, ex);
786-
accessDenied = true;
787-
} finally {
788-
IOUtils.closeStream(in);
789-
}
771+
accessDenied = readFile(fs, firstFile.getPath());
772+
} else {
773+
println("no file found to attempt to read");
790774
}
791775

776+
heading("listfiles(%s, true)", baseDir);
777+
792778
// now work with the full path
793779
limit = LIST_LIMIT;
794780
try(StoreDurationInfo ignored = new StoreDurationInfo(LOG,
@@ -920,9 +906,11 @@ public void executeFileSystemOperations(final Path baseDir,
920906
verifyPathNotFound(fs, file);
921907

922908
try (StoreDurationInfo ignored = new StoreDurationInfo(LOG,
923-
"creating a file %s", file)) {
909+
"Creating file %s", file)) {
924910
FSDataOutputStream data = fs.create(file, true);
925911
data.writeUTF(HELLO);
912+
printStreamCapabilities(data, CapabilityKeys.OUTPUTSTREAM_CAPABILITIES);
913+
926914
data.close();
927915
println("Output stream summary: %s", data);
928916
}
@@ -932,8 +920,10 @@ public void executeFileSystemOperations(final Path baseDir,
932920
}
933921
FSDataInputStream in = null;
934922
try (StoreDurationInfo ignored = new StoreDurationInfo(LOG,
935-
"Reading a file %s", file)) {
923+
"Reading file %s", file)) {
936924
in = fs.open(file);
925+
printStreamCapabilities(in, CapabilityKeys.INPUTSTREAM_CAPABILITIES);
926+
937927
String utf = in.readUTF();
938928
in.close();
939929
println("input stream summary: %s", in);
@@ -974,6 +964,42 @@ public void executeFileSystemOperations(final Path baseDir,
974964
}
975965
}
976966

967+
/**
968+
* Read a file.
969+
*/
970+
private boolean readFile(final FileSystem fs,
971+
final Path path) throws IOException {
972+
boolean accessWasDenied = false;
973+
try (StoreDurationInfo ignored = new StoreDurationInfo(LOG,
974+
"Reading file %s", path);
975+
FSDataInputStream in = fs.open(path)) {
976+
// read the first char or -1
977+
int c = in.read();
978+
println("First character of file %s is 0x%02x: '%s'",
979+
path,
980+
c,
981+
(c > ' ') ? Character.toString((char) c) : "(n/a)");
982+
printStreamCapabilities(in, CapabilityKeys.INPUTSTREAM_CAPABILITIES);
983+
println("Stream summary: %s", in);
984+
} catch(FileNotFoundException ex) {
985+
warn("file %s: not found/readable %s", path, ex);
986+
} catch(AccessDeniedException ex) {
987+
warn("client lacks access to file %s: %s", path, ex);
988+
accessWasDenied = true;
989+
}
990+
return accessWasDenied;
991+
}
992+
993+
private void printStreamCapabilities(final StreamCapabilities in,
994+
final String[] capabilities) {
995+
println("Capabilities:");
996+
for (String s : capabilities) {
997+
if (in.hasCapability(s)) {
998+
println(" %s", s);
999+
}
1000+
}
1001+
}
1002+
9771003
public void deleteDir(final FileSystem fs, final Path dir) {
9781004
try (StoreDurationInfo ignored = new StoreDurationInfo(LOG,
9791005
"delete directory %s", dir)) {

0 commit comments

Comments
 (0)