Skip to content

Commit 4014ac7

Browse files
committed
npe in bandwidth
1 parent 342aa13 commit 4014ac7

2 files changed

Lines changed: 43 additions & 16 deletions

File tree

src/main/java/org/apache/hadoop/fs/store/commands/Bandwidth.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import java.io.IOException;
2222
import java.io.PrintStream;
23+
import java.security.MessageDigest;
2324
import java.time.Duration;
25+
import java.util.Arrays;
2426
import java.util.List;
2527
import java.util.Locale;
2628
import java.util.Optional;
2729
import java.util.Random;
2830
import java.util.concurrent.atomic.AtomicLong;
2931

32+
import javax.annotation.Nullable;
33+
3034
import org.slf4j.Logger;
3135
import org.slf4j.LoggerFactory;
3236

@@ -95,6 +99,8 @@ public class Bandwidth extends StoreEntryPoint {
9599

96100
public static final String DEFAULT_READ_POLICY = "whole-file";
97101

102+
public static final String DIGEST_ALGORITHM = "SHA-256";
103+
98104
private byte[] dataBuffer;
99105

100106
public Bandwidth() {
@@ -173,10 +179,13 @@ public int run(String[] args) throws Exception {
173179
println("Writing data as %,d blocks each of size %,d bytes", numberOfBuffersToUpload,
174180
blockSize);
175181

182+
MessageDigest uploadDigest = MessageDigest.getInstance(DIGEST_ALGORITHM);
183+
MessageDigest downloadDigest = MessageDigest.getInstance(DIGEST_ALGORITHM);
184+
176185
/*
177186
prepare the CSV output if requested
178187
*/
179-
CsvWriterWithCRC writer = null;
188+
CsvWriterWithCRC csvWriter = null;
180189
Path csvPath = null;
181190
if (csvFile != null) {
182191
csvPath = new Path(csvFile);
@@ -186,10 +195,10 @@ public int run(String[] args) throws Exception {
186195
.bufferSize(BUFFER_SIZE)
187196
.overwrite(true)
188197
.build();
189-
writer = new CsvWriterWithCRC(upload, SEPARATOR, EOL, true);
198+
csvWriter = new CsvWriterWithCRC(upload, SEPARATOR, EOL, true);
190199

191-
writer.columns("operation", "iteration", "bytes", "total", "duration");
192-
writer.newline();
200+
csvWriter.columns("operation", "iteration", "bytes", "total", "duration");
201+
csvWriter.newline();
193202
}
194203

195204
// buffer of randomness
@@ -218,7 +227,7 @@ public int run(String[] args) throws Exception {
218227
.overwrite(true)
219228
.build();
220229
d.finished();
221-
row(writer, "create-file", 1, 0, 0, d);
230+
row(csvWriter, "create-file", 1, 0, 0, d);
222231
}
223232
// set up
224233
if (!keep) {
@@ -249,6 +258,7 @@ public int run(String[] args) throws Exception {
249258
StoreDurationInfo duration = new StoreDurationInfo();
250259
print("Write block %,d", i);
251260
upload.write(dataBuffer);
261+
uploadDigest.update(dataBuffer);
252262
if (flush) {
253263
upload.flush();
254264
}
@@ -258,7 +268,7 @@ public int run(String[] args) throws Exception {
258268
duration.finished();
259269
blockUploads.add(duration.value());
260270
println(" in %.3f seconds", duration.value() / 1000.0);
261-
row(writer, "upload-block", i + 1, blockSize, total += blockSize, duration);
271+
row(csvWriter, "upload-block", i + 1, blockSize, total += blockSize, duration);
262272
}
263273
println();
264274

@@ -270,7 +280,7 @@ public int run(String[] args) throws Exception {
270280
} finally {
271281
closeDuration.close();
272282
}
273-
row(writer, "close-upload", 1, 0, fileSizeBytes, closeDuration);
283+
row(csvWriter, "close-upload", 1, 0, fileSizeBytes, closeDuration);
274284

275285
println();
276286
// print out progress info
@@ -282,7 +292,7 @@ public int run(String[] args) throws Exception {
282292
}
283293
// upload is done, print some statistics
284294
uploadDurationTracker.finished();
285-
row(writer, "upload", 1, blockSize, blockSize, uploadDurationTracker);
295+
row(csvWriter, "upload", 1, blockSize, blockSize, uploadDurationTracker);
286296

287297
// end of upload
288298
printFSInfoInVerbose(fs);
@@ -300,7 +310,7 @@ public int run(String[] args) throws Exception {
300310
fs.rename(uploadPath, downloadPath);
301311
}
302312
rd.finished();
303-
row(writer, "rename", 1, fileSizeBytes, 0, rd);
313+
row(csvWriter, "rename", 1, fileSizeBytes, 0, rd);
304314
}
305315

306316
/*
@@ -318,7 +328,7 @@ public int run(String[] args) throws Exception {
318328
.build().get();
319329
} finally {
320330
openDuration.finished();
321-
row(writer, "open-for-download", 1, 0, 0, openDuration);
331+
row(csvWriter, "open-for-download", 1, 0, 0, openDuration);
322332
}
323333
MinMeanMax blockDownload = new MinMeanMax("block read duration");
324334

@@ -330,20 +340,25 @@ public int run(String[] args) throws Exception {
330340
print("Read block %,d", i);
331341
StoreDurationInfo duration = new StoreDurationInfo();
332342
download.readFully(pos, dataBuffer);
343+
downloadDigest.update(dataBuffer);
344+
333345
pos += blockSize;
334346
duration.finished();
335347
blockDownload.add(duration.value());
336348
println(" in %.3f seconds", duration.value() / 1000.0);
337-
row(writer, "download-block", i + 1, blockSize, total += blockSize, duration);
349+
row(csvWriter, "download-block", i + 1, blockSize, total += blockSize, duration);
338350
}
339351
println();
340352
try (StoreDurationInfo d = new StoreDurationInfo(out, "download stream close()")) {
341353
download.close();
342354
}
343355
downloadDurationTracker.finished();
344-
row(writer, "download", 1, fileSizeBytes, fileSizeBytes, downloadDurationTracker);
356+
row(csvWriter, "download", 1, fileSizeBytes, fileSizeBytes, downloadDurationTracker);
345357
} finally {
346-
writer.close();
358+
if (csvWriter != null) {
359+
csvWriter.flush();
360+
csvWriter.close();
361+
}
347362
printIfVerbose("Download Stream: %s", download);
348363
}
349364

@@ -383,12 +398,25 @@ public int run(String[] args) throws Exception {
383398
summarize("Download", downloadDurationTracker, fileSizeBytes,
384399
"Blocks downloaded:", blockDownload);
385400

401+
int exitCode = 0;
402+
403+
final byte[] uploadHash = uploadDigest.digest();
404+
final byte[] downloadHash = downloadDigest.digest();
405+
if (!Arrays.equals(uploadHash, downloadHash)) {
406+
errorln("Upload hash does not match download hash: data corrupted!");
407+
exitCode = -1;
408+
} else {
409+
println("Data checksums match: the data has not been corrupted during the test");
410+
}
411+
412+
386413
if (csvPath != null) {
387414
print("CSV formatted data saved to %s", csvPath);
388415
}
416+
389417
println();
390418

391-
return 0;
419+
return exitCode;
392420

393421
}
394422

@@ -403,7 +431,7 @@ public int run(String[] args) throws Exception {
403431
* @throws IOException write failure
404432
*/
405433
private static void row(
406-
final CsvWriterWithCRC writer,
434+
@Nullable final CsvWriterWithCRC writer,
407435
final String a,
408436
final int iteration,
409437
final long opBytes,

src/main/java/org/apache/hadoop/fs/store/diag/StoreDiag.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
import static org.apache.hadoop.fs.store.CommonParameters.STANDARD_OPTS;
8383
import static org.apache.hadoop.fs.store.CommonParameters.SYSPROP;
8484
import static org.apache.hadoop.fs.store.CommonParameters.TOKENFILE;
85-
import static org.apache.hadoop.fs.store.CommonParameters.VERBOSE;
8685
import static org.apache.hadoop.fs.store.StoreExitCodes.E_ERROR;
8786
import static org.apache.hadoop.fs.store.StoreExitCodes.E_NOT_FOUND;
8887
import static org.apache.hadoop.fs.store.StoreExitCodes.E_NO_ACCESS;

0 commit comments

Comments
 (0)