Skip to content

Commit 17d16ea

Browse files
committed
Major Cloudup rework
* incremental -update operation to skip files which exist. * improved output * options to flush/hflush * standalone document * replaced "upload" with "copy"
1 parent b8df230 commit 17d16ea

17 files changed

Lines changed: 969 additions & 248 deletions

README.md

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -171,52 +171,8 @@ com.amazonaws.services.s3.model.AmazonS3Exception: The specified method is not a
171171

172172
## Command `cloudup` -upload and download files; optimised for cloud storage
173173

174-
Bulk download of everything from s3a://bucket/qelogs/ to the local dir localquelogs (assuming the default fs is file://)
174+
See [cloudup](src/main/site/cloudup.md)
175175

176-
Usage
177-
178-
```
179-
cloudup -s source -d dest [-o] [-i] [-l <largest>] [-t threads]
180-
181-
-s <uri> : source
182-
-d <uri> : dest
183-
-o: overwrite
184-
-i: ignore failures
185-
-t <n> : number of threads
186-
-l <n> : number of "largest" files to start uploading before just randomly picking files.
187-
188-
```
189-
190-
Algorithm
191-
192-
1. source files are listed.
193-
1. A pool of worker threads is created
194-
2. the largest N files are queued for upload first, where N is a default or the value set by `-l`.
195-
1. The remainder of the files are randomized to avoid throttling and then queued
196-
1. the program waits for everything to complete.
197-
1. Source and dest FS stats are printed.
198-
199-
This is not `distcp` run across a cluster; it's a single process with some threads.
200-
Works best for reading lots of small files from an object store or when you have a
201-
mix of large and small files to download or uplaod.
202-
203-
204-
205-
```
206-
bin/hadoop jar cloudstore-1.0.jar cloudup \
207-
-s s3a://bucket/qelogs/ \
208-
-d localqelogs \
209-
-t 32 -o
210-
```
211-
212-
and the other way
213-
214-
```
215-
bin/hadoop jar cloudstore-1.0.jar cloudup \
216-
-d localqelogs \
217-
-s s3a://bucket/qelogs/ \
218-
-t 32 -o -l 4
219-
```
220176

221177
## Command `committerinfo`
222178

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,18 @@
365365
</properties>
366366
</profile>
367367

368+
<profile>
369+
<id>cdh7.1</id>
370+
<activation>
371+
<property>
372+
<name>cdh7.1</name>
373+
</property>
374+
</activation>
375+
<properties>
376+
<hadoop.version>3.1.1.7.1.7.2000-1</hadoop.version>
377+
</properties>
378+
</profile>
379+
368380
<profile>
369381
<id>extra</id>
370382
<activation>

src/main/java/org/apache/hadoop/fs/store/CommonParameters.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ public final class CommonParameters {
5454

5555
public static final String BFS = "bfs";
5656

57+
public static final String BLOCK = "block";
58+
59+
public static final String CSVFILE = "csv";
60+
61+
public static final String FLUSH = "flush";
62+
63+
public static final String HFLUSH = "hflush";
64+
65+
public static final String IGNORE = "ignore";
66+
67+
public static final String LARGEST = "largest";
68+
69+
public static final String OVERWRITE = "overwrite";
70+
71+
public static final String UPDATE = "update";
72+
5773
private CommonParameters() {
5874
}
5975
}

src/main/java/org/apache/hadoop/fs/store/StoreDurationInfo.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.PrintStream;
2222
import java.time.Duration;
23+
import java.util.function.Supplier;
2324

2425
import org.slf4j.Logger;
2526

@@ -40,10 +41,20 @@ public class StoreDurationInfo
4041

4142
private boolean isFinished;
4243

43-
private final String text;
44+
45+
private final Supplier<String> text;
46+
47+
public static final Supplier<String> EMPTY_TEXT = () -> "";
48+
49+
private String textStr;
4450

4551
private final Logger log;
4652

53+
/**
54+
* Should the log be at INFO rather than DEBUG.
55+
*/
56+
private final boolean logAtInfo;
57+
4758
private final PrintStream out;
4859

4960
/**
@@ -53,16 +64,40 @@ public class StoreDurationInfo
5364
* @param args list of arguments
5465
*/
5566
public StoreDurationInfo(Logger log, String format, Object... args) {
56-
started = time();
57-
finished = started;
58-
this.text = String.format(format, args);
67+
68+
this(log, true, format, args);
69+
}
70+
71+
/**
72+
* Create the duration text from a {@code String.format()} code call
73+
* and log either at info or debug.
74+
* @param log log to write to
75+
* @param logAtInfo should the log be at info, rather than debug
76+
* @param format format string
77+
* @param args list of arguments
78+
*/
79+
public StoreDurationInfo(Logger log,
80+
boolean logAtInfo,
81+
String format,
82+
Object... args) {
83+
this.started = time();
84+
this.finished = started;
85+
this.text = () -> String.format(format, args);
5986
this.log = log;
60-
out = null;
87+
this.logAtInfo = logAtInfo;
88+
this.out = null;
6189
if (log != null) {
62-
log.info("Starting: {}", text);
90+
if (logAtInfo) {
91+
log.info("Starting: {}", getFormattedText());
92+
} else {
93+
if (log.isDebugEnabled()) {
94+
log.debug("Starting: {}", getFormattedText());
95+
}
96+
}
6397
}
6498
}
6599

100+
66101
/**
67102
* Create the duration text from a {@code String.format()} code call.
68103
* @param out log to write to
@@ -72,25 +107,33 @@ public StoreDurationInfo(Logger log, String format, Object... args) {
72107
public StoreDurationInfo(PrintStream out, String format, Object... args) {
73108
started = time();
74109
finished = started;
75-
this.text = String.format(format, args);
110+
this.text = () -> String.format(format, args);
76111
this.out = out;
77112
this.log = null;
113+
this.logAtInfo = false;
114+
78115
if (out != null) {
79-
out.printf("Starting: %s%n", text);
116+
out.printf("Starting: %s%n", getFormattedText());
80117
}
81118
}
82119

83120
/**
84121
* Create the duration with no output printed.
85122
*/
86123
public StoreDurationInfo() {
87-
this.text = "";
124+
this.text = EMPTY_TEXT;
88125
this.log = null;
89126
this.out = null;
127+
this.logAtInfo = false;
90128
started = time();
91129
finished = started;
92130
}
93131

132+
133+
private String getFormattedText() {
134+
return (textStr == null) ? (textStr = text.get()) : textStr;
135+
}
136+
94137
private long time() {
95138
return System.currentTimeMillis();
96139
}
@@ -112,7 +155,8 @@ public String getDurationString() {
112155
public static String humanTime(long time) {
113156
long seconds = (time / 1000);
114157
long minutes = (seconds / 60);
115-
return String.format("%d:%02d.%03d", minutes, seconds % 60, time % 1000);
158+
long hours = (minutes / 60);
159+
return String.format("%d:%02d:%02d.%03d", hours, minutes % 60, seconds % 60, time % 1000);
116160
}
117161

118162
/**
@@ -145,10 +189,16 @@ public String toString() {
145189
public void close() {
146190
finished();
147191
if (log != null) {
148-
log.info("Duration of {}: {}", text, this);
192+
if (logAtInfo) {
193+
log.info("Duration of {}: {}", getFormattedText(), this);
194+
;
195+
} else {
196+
log.debug("Duration of {}: {}", getFormattedText(), this);
197+
}
198+
149199
}
150200
if (out != null) {
151-
out.printf("Duration of %s: %s%n", text, this);
201+
out.printf("Duration of %s: %s%n", getFormattedText(), this);
152202
}
153203
}
154204
}

src/main/java/org/apache/hadoop/fs/store/StoreEntryPoint.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@
6666
import static org.apache.hadoop.fs.store.CommonParameters.XMLFILE;
6767
import static org.apache.hadoop.fs.store.StoreDiagConstants.IOSTATISTICS_LOGGING_LEVEL;
6868
import static org.apache.hadoop.fs.store.StoreUtils.split;
69+
import static org.apache.hadoop.fs.store.diag.S3ADiagnosticsInfo.DIRECTORY_MARKER_RETENTION;
70+
import static org.apache.hadoop.fs.store.diag.S3ADiagnosticsInfo.FS_S3A_CONNECTION_MAXIMUM;
71+
import static org.apache.hadoop.fs.store.diag.S3ADiagnosticsInfo.FS_S3A_THREADS_MAX;
72+
import static org.apache.hadoop.fs.store.diag.S3ADiagnosticsInfo.INPUT_FADVISE;
73+
import static org.apache.hadoop.fs.store.diag.S3ADiagnosticsInfo.INPUT_FADV_NORMAL;
6974

7075
/**
7176
* Entry point for store applications
@@ -523,6 +528,20 @@ protected Configuration createPreconfiguredConfig()
523528
return conf;
524529
}
525530

531+
/**
532+
* Patch the configuration for maximum S3A performance.
533+
* @param conf config
534+
* @return the now updated config
535+
*/
536+
protected Configuration patchForMaxS3APerformance(Configuration conf) {
537+
conf.setBoolean(DIRECTORY_MARKER_RETENTION, true);
538+
final int workers = 256;
539+
conf.setInt(FS_S3A_CONNECTION_MAXIMUM, workers * 2);
540+
conf.setInt(FS_S3A_THREADS_MAX, workers);
541+
conf.set(INPUT_FADVISE, INPUT_FADV_NORMAL);
542+
return conf;
543+
}
544+
526545
protected void printFSInfoInVerbose(FileSystem fs) {
527546
if (isVerbose()) {
528547
println();

src/main/java/org/apache/hadoop/fs/store/commands/Bandwidth.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.fs.FSDataOutputStream;
3636
import org.apache.hadoop.fs.FileSystem;
3737
import org.apache.hadoop.fs.Path;
38+
import org.apache.hadoop.fs.store.CommonParameters;
3839
import org.apache.hadoop.fs.store.MinMeanMax;
3940
import org.apache.hadoop.fs.store.StoreDurationInfo;
4041
import org.apache.hadoop.fs.store.StoreEntryPoint;
@@ -46,7 +47,11 @@
4647

4748
import static java.util.Optional.empty;
4849
import static java.util.Optional.of;
50+
import static org.apache.hadoop.fs.store.CommonParameters.BLOCK;
51+
import static org.apache.hadoop.fs.store.CommonParameters.CSVFILE;
4952
import static org.apache.hadoop.fs.store.CommonParameters.DEFINE;
53+
import static org.apache.hadoop.fs.store.CommonParameters.FLUSH;
54+
import static org.apache.hadoop.fs.store.CommonParameters.HFLUSH;
5055
import static org.apache.hadoop.fs.store.CommonParameters.TOKENFILE;
5156
import static org.apache.hadoop.fs.store.CommonParameters.VERBOSE;
5257
import static org.apache.hadoop.fs.store.CommonParameters.XMLFILE;
@@ -60,19 +65,15 @@ public class Bandwidth extends StoreEntryPoint {
6065

6166
private static final Logger LOG = LoggerFactory.getLogger(Bandwidth.class);
6267

63-
public static final String BLOCK = "block";
64-
public static final String CSVFILE = "csv";
65-
public static final String FLUSH = "flush";
66-
public static final String HFLUSH = "hflush";
6768
public static final String KEEP = "keep";
6869
public static final String RENAME = "rename";
6970
public static final String POLICY = "policy";
7071

7172

7273
public static final String USAGE
7374
= "Usage: bandwidth [options] size <path>\n"
75+
+ optusage(BLOCK, "size", "block size in megabytes")
7476
+ optusage(DEFINE, "key=value", "Define a property")
75-
+ optusage(BLOCK, "block-size", "block size in megabytes")
7677
+ optusage(CSVFILE, "file", "CSV file to log operation details")
7778
+ optusage(FLUSH, "flush the output after writing each block")
7879
+ optusage(HFLUSH, "hflush() the output after writing each block")
@@ -107,14 +108,16 @@ public Bandwidth() {
107108
HFLUSH,
108109
KEEP,
109110
RENAME,
110-
VERBOSE);
111+
VERBOSE
112+
);
111113
addValueOptions(
112-
TOKENFILE,
113-
XMLFILE,
114114
BLOCK,
115115
CSVFILE,
116116
DEFINE,
117-
POLICY);
117+
POLICY,
118+
TOKENFILE,
119+
XMLFILE
120+
);
118121
}
119122

120123
@Override

src/main/java/org/apache/hadoop/fs/store/diag/S3ADiagnosticsInfo.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import org.apache.hadoop.classification.InterfaceStability;
3334
import org.apache.hadoop.conf.Configuration;
3435
import org.apache.hadoop.fs.FileStatus;
3536
import org.apache.hadoop.fs.FileSystem;
@@ -43,10 +44,6 @@
4344

4445
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
4546
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
46-
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
47-
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
48-
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
49-
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
5047
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
5148
import static org.apache.hadoop.fs.s3a.S3AUtils.getAWSAccessKeys;
5249
import static org.apache.hadoop.fs.store.StoreUtils.cat;
@@ -128,6 +125,34 @@ public class S3ADiagnosticsInfo extends StoreDiagnosticsInfo {
128125
"fs.s3a.fast.upload.active.blocks";
129126

130127

128+
/**
129+
* Which input strategy to use for buffering, seeking and similar when
130+
* reading data.
131+
* Value: {@value}
132+
*/
133+
public static final String INPUT_FADVISE =
134+
"fs.s3a.experimental.input.fadvise";
135+
136+
/**
137+
* General input. Some seeks, some reads.
138+
* Value: {@value}
139+
*/
140+
public static final String INPUT_FADV_NORMAL = "normal";
141+
142+
/**
143+
* Optimized for sequential access.
144+
* Value: {@value}
145+
*/
146+
public static final String INPUT_FADV_SEQUENTIAL = "sequential";
147+
148+
/**
149+
* Optimized purely for random seek+read/positionedRead operations;
150+
* The performance of sequential IO may be reduced in exchange for
151+
* more efficient {@code seek()} operations.
152+
* Value: {@value}
153+
*/
154+
public static final String INPUT_FADV_RANDOM = "random";
155+
131156
public static final String DELEGATION_TOKEN_BINDING =
132157
"fs.s3a.delegation.token.binding";
133158

src/main/java/org/apache/hadoop/fs/store/shim/APIShim.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.shim.api;
19+
package org.apache.hadoop.fs.store.shim;
2020

2121
/**
2222
* An API shim of type {@code TYPE}.

0 commit comments

Comments
 (0)