Skip to content

Commit 8b9825a

Browse files
committed
Improve Cloudup support for multi-GB uploads
* progress * size printed with , * better readFully/write ops into MB buffers
1 parent cec5f86 commit 8b9825a

5 files changed

Lines changed: 118 additions & 40 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ the length of the subset increases with every row then wraps around.
632632
This is to give the file variable length rows, complicate split calculation
633633
etc.
634634

635-
635+
## Command `pathcapability`
636636

637637
Probes a filesystem for offering a specific named capability on the given path.
638638

src/main/java/org/apache/hadoop/fs/store/StoreEntryPoint.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.fs.statistics.IOStatistics;
4646
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
4747
*/
48+
import org.apache.hadoop.fs.tools.cloudup.OptionSwitch;
4849
import org.apache.hadoop.io.IOUtils;
4950
import org.apache.hadoop.security.Credentials;
5051
import org.apache.hadoop.security.UserGroupInformation;

src/main/java/org/apache/hadoop/fs/tools/cloudup/Cloudup.java

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Iterator;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.SortedMap;
3132
import java.util.TreeMap;
3233
import java.util.concurrent.Callable;
3334
import java.util.concurrent.CompletionService;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.ThreadPoolExecutor;
3940
import java.util.concurrent.TimeUnit;
4041
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicLong;
4143

4244
import org.slf4j.Logger;
4345
import org.slf4j.LoggerFactory;
@@ -53,17 +55,20 @@
5355
import org.apache.hadoop.fs.FSDataOutputStream;
5456
import org.apache.hadoop.fs.FileStatus;
5557
import org.apache.hadoop.fs.FileSystem;
56-
import org.apache.hadoop.fs.LocalFileSystem;
5758
import org.apache.hadoop.fs.LocatedFileStatus;
5859
import org.apache.hadoop.fs.Path;
5960
import org.apache.hadoop.fs.RemoteIterator;
6061
import org.apache.hadoop.fs.StorageStatistics;
6162
import org.apache.hadoop.fs.store.StoreDurationInfo;
6263
import org.apache.hadoop.fs.store.StoreEntryPoint;
6364
import org.apache.hadoop.fs.store.StoreUtils;
64-
import org.apache.hadoop.io.IOUtils;
65+
import org.apache.hadoop.util.Progressable;
6566
import org.apache.hadoop.util.ToolRunner;
6667

68+
import static org.apache.hadoop.fs.store.CommonParameters.DEFINE;
69+
import static org.apache.hadoop.fs.store.CommonParameters.TOKENFILE;
70+
import static org.apache.hadoop.fs.store.CommonParameters.VERBOSE;
71+
import static org.apache.hadoop.fs.store.CommonParameters.XMLFILE;
6772
import static org.apache.hadoop.fs.store.StoreExitCodes.E_USAGE;
6873

6974
/**
@@ -76,11 +81,15 @@ public class Cloudup extends StoreEntryPoint {
7681
private static final Logger LOG = LoggerFactory.getLogger(Cloudup.class);
7782

7883
public static final String USAGE
79-
= "Usage: cloudup -s source -d dest [-o] [-i] [-l <largest>] [-t threads] ";
84+
=
85+
"Usage: cloudup -s source -d dest [-o] [-i] [-l <largest>] [-t threads] ";
8086

8187
private static final int DEFAULT_LARGEST = 4;
88+
8289
private static final int DEFAULT_THREADS = 16;
8390

91+
public static final int BUFFER_SIZE = 4_000_000;
92+
8493
private ExecutorService workers;
8594

8695
private FileSystem sourceFS;
@@ -96,16 +105,31 @@ public class Cloudup extends StoreEntryPoint {
96105
private boolean overwrite = true;
97106

98107
private boolean ignoreFailures = true;
108+
99109
// single element exception with sync access.
100110
private final Exception[] firstException = new Exception[1];
111+
101112
private CompletionService<Outcome> completion;
102113

103114
private FileStatus sourcePathStatus;
104115

105116
private FileStatus destPathStatus;
106117

118+
private boolean verbose;
119+
107120

108121
public Cloudup() {
122+
createCommandFormat(0, 0, VERBOSE);
123+
addValueOptions(TOKENFILE, XMLFILE, DEFINE);
124+
}
125+
126+
/**
127+
* convert a long to a string with commas inserted.
128+
* @param l long
129+
* @return string value
130+
*/
131+
static String commas(long l) {
132+
return String.format("%,d", l);
109133
}
110134

111135
private long now() {
@@ -150,6 +174,7 @@ public int run(String[] args) throws Exception {
150174
sourcePath = sourceFS.makeQualified(src);
151175
destPath = new Path(OptionSwitch.DEST.required(command));
152176
destFS = destPath.getFileSystem(conf);
177+
verbose = OptionSwitch.VERBOSE.hasOption(command);
153178

154179
LOG.info("Uploading from {} to {};"
155180
+ " threads={}; large files={}"
@@ -225,7 +250,8 @@ public int run(String[] args) throws Exception {
225250
for (int i = 0; i < sortUploadCount; i++) {
226251
UploadEntry upload = uploadList.get(i);
227252
LOG.info("Large file {}: size = {}: {}",
228-
i + 1, upload.getSize(),
253+
i + 1,
254+
upload.sizeStr(),
229255
upload.getSource());
230256
long submitSize = submit(upload);
231257
if (submitSize >= 0) {
@@ -234,7 +260,8 @@ public int run(String[] args) throws Exception {
234260
}
235261
}
236262
LOG.info("Largest {} uploads commenced, total size = {}",
237-
uploadCount, sortUploadSize);
263+
uploadCount,
264+
commas(sortUploadSize));
238265

239266

240267
// shuffle and submit remainder
@@ -283,7 +310,9 @@ public int run(String[] args) throws Exception {
283310
}
284311

285312
LOG.info("\n\nUploads attempted: {}, size {}, StoreDurationInfo: {}",
286-
uploadCount, uploadSize, uploadDuration);
313+
uploadCount,
314+
commas(uploadSize),
315+
uploadDuration);
287316
LOG.info("Bandwidth {} MB/s",
288317
uploadTimer.bandwidthDescription(uploadSize));
289318
LOG.info(String.format("Seconds per file %.3fs",
@@ -302,7 +331,7 @@ public int run(String[] args) throws Exception {
302331
finalUploadedSize += result.getBytesUploaded();
303332
} catch (InterruptedException ignored) {
304333
// ignored
305-
} catch (Exception e) {
334+
} catch (Exception e) {
306335
errors++;
307336
if (exception == null) {
308337
exception = e;
@@ -313,7 +342,8 @@ public int run(String[] args) throws Exception {
313342
if (exception != null) {
314343
LOG.warn("Upload failed due to an error");
315344
LOG.warn("Number of errors: {} actual bytes uploaded = {}",
316-
errors, finalUploadedSize);
345+
errors,
346+
commas(finalUploadedSize));
317347
if (!ignoreFailures) {
318348
throw exception;
319349
}
@@ -332,7 +362,6 @@ private Callable<Outcome> createUploadOperation(final UploadEntry upload) {
332362
}
333363

334364
/**
335-
*
336365
* Submit an upload; does nothing if the upload is already queued.
337366
* @param upload upload to submit
338367
* @return size to upload; -1 for no upload
@@ -419,12 +448,12 @@ private Outcome uploadOneFile(final UploadEntry upload) {
419448
final Path source = upload.getSource();
420449
final Path dest = destFS.makeQualified(upload.getDest());
421450
try {
422-
LOG.info("Uploading {} to {} (size: {}",
423-
source, dest, upload.getSize());
424-
uploadOneFile(source, dest);
451+
LOG.info("Uploading {} to {} (size: {})",
452+
source, dest, upload.sizeStr());
453+
uploadOneFile(upload, dest);
425454
upload.setState(UploadEntry.State.succeeded);
426455
upload.setEndTime(now());
427-
LOG.info("Successful upload of {} tpo {} in {} s",
456+
LOG.info("Successful upload of {} to {} in {} s",
428457
source,
429458
dest,
430459
StoreDurationInfo.humanTime(upload.getDuration()));
@@ -433,28 +462,68 @@ private Outcome uploadOneFile(final UploadEntry upload) {
433462
upload.setState(UploadEntry.State.failed);
434463
upload.setException(e);
435464
upload.setEndTime(now());
436-
LOG.warn("Failed to upload {} : {}", source, e.toString());
465+
LOG.warn("Failed to upload {} : {}", source, e.toString());
437466
LOG.debug("Upload to {} failed", dest, e);
438467
noteException(e);
439468
return Outcome.failed(upload, e);
440469
}
441470
}
442471

443-
private void uploadOneFile(final Path source, final Path dest)
472+
/**
473+
* Upload one file; uses readFully, fails if the stream
474+
* is shorter than expected, and logs close time.
475+
* @param upload upload entry
476+
* @param dest test path
477+
* @throws IOException failure
478+
*/
479+
private void uploadOneFile(final UploadEntry upload, final Path dest)
444480
throws IOException {
445-
if (sourceFS instanceof LocalFileSystem) {
446-
// source is local, use the local operation
447-
destFS.copyFromLocalFile(false, overwrite, source, dest);
448-
} else {
449-
try(FSDataInputStream in = sourceFS.open(source);
450-
FSDataOutputStream out = destFS.create(dest, overwrite)) {
451-
IOUtils.copyBytes(in, out, getConf(), true);
452-
LOG.info("In: {}", in);
453-
LOG.info("Out: {}", out);
481+
final Path source = upload.getSource();
482+
long remaining = upload.getSize();
483+
484+
int bufferSize = (int) Math.min(BUFFER_SIZE, remaining);
485+
try (FSDataInputStream in = sourceFS.open(source);
486+
FSDataOutputStream out = destFS.createFile(dest)
487+
.overwrite(overwrite)
488+
.progress(progress)
489+
.recursive()
490+
.bufferSize(bufferSize)
491+
.build()) {
492+
byte[] buffer = new byte[bufferSize];
493+
while (remaining > 0) {
494+
int blockSize = (int) Math.min(bufferSize, remaining);
495+
in.readFully(buffer,0, blockSize);
496+
out.write(buffer,0, blockSize);
497+
498+
remaining -= blockSize;
499+
if (verbose) {
500+
print(".");
501+
}
454502
}
503+
504+
in.close();
505+
506+
try (StoreDurationInfo d = new StoreDurationInfo(LOG,
507+
"close(%s)", dest)) {
508+
out.flush();
509+
out.close();
510+
}
511+
LOG.info("In: {}", in);
512+
LOG.info("Out: {}", out);
455513
}
514+
456515
}
457516

517+
final AtomicLong progressCount = new AtomicLong();
518+
519+
final Progressable progress = () -> {
520+
progressCount.incrementAndGet();
521+
if (verbose) {
522+
print("^");
523+
}
524+
};
525+
526+
458527
/**
459528
* Note the exception.
460529
* If this is the first exception, it's recorded, and,
@@ -502,16 +571,17 @@ private Path getFinalPath(Path srcFile) throws IOException {
502571
* @param fs filesystem.
503572
*/
504573
private void dumpStats(FileSystem fs, String header) {
505-
// Not supported on Hadoop 2.7
506-
LOG.info("\n" + header +": " + fs.getUri());
574+
// Not supported on Hadoop 2.7
575+
LOG.info("\n" + header + ": " + fs.getUri());
507576

508577
// hope to see FS IOStats
509578
LOG.info("Filesystem {}", fs);
510579

511580
Iterator<StorageStatistics.LongStatistic> iterator
512581
= fs.getStorageStatistics().getLongStatistics();
513582
// convert to a (sorted) treemap
514-
TreeMap<String, Long> results = new TreeMap<>();
583+
SortedMap<String, Long> results = new TreeMap<>();
584+
515585
while (iterator.hasNext()) {
516586
StorageStatistics.LongStatistic stat = iterator.next();
517587
results.put(stat.getName(), stat.getValue());
@@ -527,9 +597,11 @@ private void dumpStats(FileSystem fs, String header) {
527597
* Outcome of an upload: A count of uploaded data, outcome and error.
528598
*/
529599
private static final class Outcome {
600+
530601
private final boolean executed;
531602

532603
private final UploadEntry upload;
604+
533605
private final long bytesUploaded;
534606

535607
private final Exception exception;
@@ -545,16 +617,16 @@ private Outcome(
545617
this.exception = exception;
546618
}
547619

548-
private static Outcome notExecuted(final UploadEntry upload) {
620+
private static Outcome notExecuted(final UploadEntry upload) {
549621
return new Outcome(false, upload, 0, null);
550622
}
551623

552-
private static Outcome succeeded(final UploadEntry upload) {
624+
private static Outcome succeeded(final UploadEntry upload) {
553625
return new Outcome(true, upload, upload.getSize(), null);
554626
}
555627

556628
private static Outcome failed(final UploadEntry upload,
557-
final Exception exception) {
629+
final Exception exception) {
558630
return new Outcome(true, upload, 0, exception);
559631
}
560632

@@ -589,7 +661,7 @@ private void maybeThrowException() throws Exception {
589661
* @throws Exception failure
590662
*/
591663
public static int exec(String... args) throws Exception {
592-
try(final Cloudup tool = new Cloudup()) {
664+
try (final Cloudup tool = new Cloudup()) {
593665
return ToolRunner.run(tool, args);
594666
}
595667
}

src/main/java/org/apache/hadoop/fs/tools/cloudup/OptionSwitch.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ public enum OptionSwitch {
5151
* Overwrite target-files unconditionally.
5252
*/
5353
OVERWRITE(new Option("o", "overwrite", false,
54-
"Overwrite target files even if they exist.")),
54+
"Overwrite target files even if they exist.")),
5555

5656
SOURCE(new Option("s", "source", true, "source path")),
5757

58-
DEST(new Option("d", "dest", true, "destination path"));
58+
DEST(new Option("d", "dest", true, "destination path")),
59+
60+
VERBOSE(new Option("v", "verbose", false, "verbose"));
5961

6062
private final Option option;
6163

@@ -86,7 +88,7 @@ public String getOptionName() {
8688

8789
@Override
8890
public String toString() {
89-
return super.name() + " {" +
91+
return super.name() + " {" +
9092
"option=" + option + '}';
9193
}
9294

@@ -107,7 +109,7 @@ public void add(Options cliOptions) {
107109
*/
108110
public String required(CommandLine command) {
109111
String r = eval(command, null);
110-
checkArgument(r != null && !r.isEmpty(),
112+
checkArgument(r != null && !r.isEmpty(),
111113
"Unset option: " + getOptionName());
112114
return r;
113115
}
@@ -128,7 +130,7 @@ public boolean hasOption(CommandLine command) {
128130
}
129131

130132
public int eval(CommandLine command, int defVal) {
131-
return Integer.valueOf(eval(command, Integer.toString(defVal)));
133+
return Integer.parseInt(eval(command, Integer.toString(defVal)));
132134
}
133135

134136
/**

0 commit comments

Comments
 (0)