Skip to content

Commit 70258bb

Browse files
committed
HADOOP-18521. csv writing enhancements
1 parent 95facb2 commit 70258bb

4 files changed

Lines changed: 114 additions & 40 deletions

File tree

src/main/java/org/apache/hadoop/fs/store/StoreUtils.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020

2121
import java.io.IOException;
2222
import java.lang.reflect.Array;
23+
import java.util.Locale;
2324
import java.util.Map;
2425
import java.util.concurrent.ExecutionException;
2526
import java.util.concurrent.Future;
2627

28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.conf.StorageUnit;
30+
2731
public class StoreUtils {
2832

2933
/** life without Guava. */
@@ -119,6 +123,33 @@ public static <T> T[] cat(T[] left, T[] right) {
119123
return dest;
120124
}
121125

126+
/**
127+
* get the storage size from a string, uses M, G, T etc
128+
* @param size data size
129+
* @return size as a double.
130+
*/
131+
public static double getDataSize(final String size) {
132+
double uploadSize;
133+
134+
String s = size.trim().toUpperCase(Locale.ROOT);
135+
try {
136+
// look for a long value,
137+
uploadSize = Long.parseLong(s);
138+
} catch (NumberFormatException e) {
139+
// parse the size values via Configuration
140+
// this is only possible on hadoop 3.1+.
141+
if (!s.endsWith("B")) {
142+
s = s + "B";
143+
}
144+
final Configuration sizeConf = new Configuration(false);
145+
146+
147+
// upload in MB.
148+
uploadSize = sizeConf.getStorageSize("size", s, StorageUnit.MB);
149+
}
150+
return uploadSize;
151+
}
152+
122153
public static class StringPair implements Map.Entry<String, String>{
123154
private String key, value;
124155

src/main/java/org/apache/hadoop/fs/store/commands/Bandwidth.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import org.apache.hadoop.conf.Configuration;
30-
import org.apache.hadoop.conf.StorageUnit;
3130
import org.apache.hadoop.fs.FSDataInputStream;
3231
import org.apache.hadoop.fs.FSDataOutputStream;
3332
import org.apache.hadoop.fs.FileSystem;
3433
import org.apache.hadoop.fs.Path;
3534
import org.apache.hadoop.fs.store.StoreDurationInfo;
3635
import org.apache.hadoop.fs.store.StoreEntryPoint;
36+
import org.apache.hadoop.fs.store.StoreUtils;
3737
import org.apache.hadoop.util.Progressable;
3838
import org.apache.hadoop.util.ToolRunner;
3939

@@ -91,19 +91,7 @@ public int run(String[] args) throws Exception {
9191
FileSystem fs = path.getFileSystem(conf);
9292
println("Using filesystem %s", fs.getUri());
9393

94-
double uploadSize;
95-
96-
try {
97-
// look for a long value,
98-
uploadSize = Long.parseLong(size);
99-
} catch (NumberFormatException e) {
100-
// parse the size values via Configuration
101-
// this is only possible on hadoop 3.1+.
102-
final Configuration sizeConf = new Configuration(false);
103-
104-
// upload in MB.
105-
uploadSize = sizeConf.getStorageSize("size", size, StorageUnit.MB);
106-
}
94+
double uploadSize = StoreUtils.getDataSize(size);
10795

10896
long sizeMB = Math.round(uploadSize);
10997
if (sizeMB <= 0) {

src/main/java/org/apache/hadoop/fs/tools/csv/MkCSV.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.fs.tools.csv;
2020

2121
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.Locale;
2425
import java.util.Random;
@@ -38,18 +39,22 @@
3839
import org.apache.hadoop.util.Progressable;
3940
import org.apache.hadoop.util.ToolRunner;
4041

41-
import static java.lang.Math.max;
4242
import static org.apache.hadoop.fs.store.CommonParameters.DEFINE;
4343
import static org.apache.hadoop.fs.store.CommonParameters.TOKENFILE;
4444
import static org.apache.hadoop.fs.store.CommonParameters.VERBOSE;
4545
import static org.apache.hadoop.fs.store.CommonParameters.XMLFILE;
4646
import static org.apache.hadoop.fs.store.StoreExitCodes.E_USAGE;
47+
import static org.apache.hadoop.fs.store.StoreUtils.getDataSize;
4748

49+
/**
50+
* Create a large CSV file for validation.
51+
*/
4852
public class MkCSV extends StoreEntryPoint {
4953

5054
private static final Logger LOG = LoggerFactory.getLogger(MkCSV.class);
5155

5256
public static final String HEADER = "header";
57+
5358
public static final String QUOTE = "quote";
5459

5560
public static final String USAGE
@@ -80,6 +85,10 @@ public class MkCSV extends StoreEntryPoint {
8085

8186
private static final String SEPARATOR = ",";
8287

88+
public static final String START = "start";
89+
90+
public static final String END = "end";
91+
8392

8493
public MkCSV() {
8594
createCommandFormat(2, 2, VERBOSE, HEADER, QUOTE);
@@ -100,7 +109,7 @@ public int run(String[] args) throws Exception {
100109
String size = argList.get(0).toLowerCase(Locale.ENGLISH);
101110
String pathString = argList.get(1);
102111
Path path = new Path(pathString);
103-
long rows = Long.parseLong(size);
112+
long rows = (long) getDataSize(size);
104113
if (rows < 0) {
105114
errorln("Invalid row count %s", size);
106115
errorln(USAGE);
@@ -122,6 +131,13 @@ public int run(String[] args) throws Exception {
122131

123132
String block = sb.toString();
124133

134+
final List<String> blockData = new ArrayList<>();
135+
blockRows(blockData, 'a', 'z', elements);
136+
blockRows(blockData, 'A', 'Z', elements);
137+
blockRows(blockData, '0', '9', elements);
138+
final int blockCount = blockData.size();
139+
140+
125141
// progress callback counts #of invocations, and optionally prints a .
126142
AtomicLong progressCount = new AtomicLong();
127143
Progressable progress = () -> {
@@ -140,7 +156,7 @@ public int run(String[] args) throws Exception {
140156
// open the file. track duration
141157
FSDataOutputStream upload;
142158
try (StoreDurationInfo d = new StoreDurationInfo(LOG,
143-
"Opening %s for upload", path)) {
159+
"Opening %s for writing", path)) {
144160
upload = fs.createFile(path)
145161
.progress(progress)
146162
.recursive()
@@ -152,31 +168,33 @@ public int run(String[] args) throws Exception {
152168
StoreCsvWriter writer = new StoreCsvWriter(upload, SEPARATOR, EOL, quote);
153169
if (header) {
154170
writer
155-
.columns("rowId", "dataCrc", "data", "rowId2", "rowCrc")
171+
.columns(START, "rowId", "length", "dataCrc", "data", "rowId2", "rowCrc", END)
156172
.newline();
157173
}
158174

159175
Random rand = new Random();
160176
for (int r = 1; r <= rows; r++) {
161177

178+
writer.column(START);
162179
String rowId = Long.toString(r);
163180
writer.column(rowId);
164181
// now collect a subset of the value
165-
int firstElt = rand.nextInt(elements-1);
166-
int lastElt = firstElt + 1 + rand.nextInt(elements - firstElt - 1);
167-
int first = firstElt * 5;
168-
// always 1 elt higher than first
169-
int last = lastElt * 5 - 1;
170-
String data = block.substring(first, last);
182+
int lastElt = 2 + rand.nextInt(elements);
183+
String dataRow = blockData.get(r % blockCount);
184+
int length = Math.min(lastElt, elements);
185+
String data = dataRow.substring(length);
186+
writer.column(data.length());
171187
// data CRC
172188
CRC32 crc = new CRC32();
173189
crc.update(data.getBytes(StandardCharsets.UTF_8));
174190
writer.column(crc.getValue());
175191
writer.column(data);
176192
// repeat the row ID
177-
writer.column(r);
193+
writer.column(rowId);
178194
// full row checksum
179195
writer.column(writer.getRowCrc());
196+
// end of row
197+
writer.column(END);
180198
writer.newline();
181199
}
182200
// now close the file
@@ -187,7 +205,7 @@ public int run(String[] args) throws Exception {
187205
}
188206

189207
} finally {
190-
printIfVerbose("Upload Stream: %s", upload);
208+
printIfVerbose("Write Stream: %s", upload);
191209
}
192210

193211
println();
@@ -201,12 +219,32 @@ public int run(String[] args) throws Exception {
201219
printFSInfoInVerbose(fs);
202220

203221
long sizeBytes = status.getLen();
204-
summarize("Upload", uploadDurationTracker, sizeBytes);
222+
summarize("CSV Generation", uploadDurationTracker, sizeBytes);
205223

206224
return 0;
207225

208226
}
209227

228+
/**
229+
* Generate a row from a string
230+
* @param s string to use
231+
* @param elements number of elements
232+
* @return string of s repeated elements times.
233+
*/
234+
private String blockRow(String s, int elements) {
235+
StringBuilder sb = new StringBuilder(elements);
236+
for (int i = 1; i <= elements; i++) {
237+
sb.append(s);
238+
}
239+
return sb.toString();
240+
}
241+
242+
private void blockRows(List<String> rows, char start, char end, int elements) {
243+
for (char i = start; i <= end; i++) {
244+
rows.add(blockRow(Character.toString(i), elements));
245+
}
246+
}
247+
210248
/**
211249
* Execute the command, return the result or throw an exception,
212250
* as appropriate.

src/main/site/mkcsv.md

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,38 @@
1414

1515
# Command `mkcsv`
1616

17-
Creates a CSV file with a given path; useful
18-
for scale testing CSV processing .
17+
Creates a CSV file with a given path; useful for scale testing CSV processing.
1918

20-
```
19+
```bash
2120
hadoop jar cloudstore-1.0.jar mkcsv -header -quote -verbose 10000 s3a://bucket/file.csv
2221
```
2322

24-
The format is a variable width sequence, with entries cross referencing each other for ease of validation.
23+
The format is a variable width sequence, with entries cross referencing each other for validation.
2524
```csv
26-
"rowId","dataCrc","data","rowId2","rowCrc"
27-
"1","4098016739","0008-0009-000a-000b-000c-000d-000e-000f-0010-0011-0012-0013-0014-0015-0016-0017-0018-0019-001a-001b-001c-001d-001e-001f-0020-0021-0022-0023-0024-0025-0026-0027-0028-0029-002a-002b-002c-002d-002e-002f-0030-0031-0032-0033-0034-0035-0036-0037-0038-0039-003a-003b-003c-003d-003e-003f-0040-0041-0042-0043","1","2526808319"
28-
"2","4102619375","005b","2","3614304611"
29-
"3","2808119570","005e-005f-0060","3","3847878359"
25+
"start","rowId","length","dataCrc","data","rowId2","rowCrc","end"
26+
"start","1","87","691051183","bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb","1","2707924207","end"
27+
"start","2","40","2886466480","cccccccccccccccccccccccccccccccccccccccc","2","2141198053","end"
28+
"start","3","98","3320970725","dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd","3","4203069111","end"
29+
"start","4","8","1257926895","eeeeeeee","4","189792478","end"
30+
"start","5","25","1630497970","fffffffffffffffffffffffff","5","1034603103","end"
31+
"start","6","38","557554018","gggggggggggggggggggggggggggggggggggggg","6","1412646710","end"
32+
"start","7","86","951894681","hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh","7","2062289315","end"
33+
"start","8","45","3065088391","iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii","8","3774714774","end"
34+
"start","9","70","2839984696","jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj","9","303056462","end"
3035
```
3136

3237
## Invariants
3338

3439
For each row
3540
```java
41+
start == "start"
3642
rowId == rowId2
37-
43+
length == a random int >= 0
44+
data = string where data.length() == length
45+
elements of data == char c where c in "[a-z][A-Z][0-9]"
3846
dataCrc == new CRC32().update(data.getBytes(StandardCharsets.UTF_8))
3947
rowCrC == crc32 of all previous fields, including quotes, *excluding separators*
48+
end == "end"
4049
// and ignoring headers
4150
forall n: row[n].rowID == n
4251
```
@@ -46,25 +55,33 @@ forall n: row[n].rowID == n
4655
```scala
4756

4857
/**
49-
* Dataset case class.
58+
* Dataset class.
59+
* Latest build is "start","rowId","length","dataCrc","data","rowId2","rowCrc","end"
5060
*/
5161
case class CsvRecord(
62+
start: String,
5263
rowId: Long,
64+
length: Long,
5365
dataCrc: Long,
5466
data: String,
5567
rowId2: Long,
56-
rowCrc: Long)
68+
rowCrc: Long,
69+
end: String)
5770

5871
/**
5972
* The StructType of the CSV data.
73+
* "start","rowId","length","dataCrc","data","rowId2","rowCrc","end"
6074
*/
6175
val csvSchema: StructType = {
62-
new StructType().
76+
new StructType().
77+
add("start", StringType).
6378
add("rowId", LongType).
79+
add("length", LongType).
6480
add("dataCrc", LongType).
6581
add("data", StringType).
6682
add("rowId2", LongType).
67-
add("rowCrc", LongType)
83+
add("rowCrc", LongType).
84+
add("end", StringType)
6885
}
6986

7087
```

0 commit comments

Comments
 (0)