Skip to content

Commit 81a8e0c

Browse files
committed
Merge pull request #7 from civitaspo/v0.1.1
V0.1.1
2 parents 2b5247b + 915482f commit 81a8e0c

File tree

4 files changed

+57
-24
lines changed

4 files changed

+57
-24
lines changed

Diff for: README.md

+34
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Read files on Hdfs.
1515
- **input_path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`.
1616
- **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property.
1717
- **partition** when this is true, partition input files and increase task count. (default: `true`)
18+
- **num_partitions** number of partitions. (default: `Runtime.getRuntime().availableProcessors()`)
1819

1920
## Example
2021

@@ -32,6 +33,7 @@ in:
3233
input_path: /user/embulk/test/%Y-%m-%d/*
3334
rewind_seconds: 86400
3435
partition: true
36+
num_partitions: 30
3537
decoders:
3638
- {type: gzip}
3739
parser:
@@ -53,6 +55,8 @@ in:
5355
```
5456
5557
## Note
58+
- The parameter **num_partitions** is the approximate value. The actual num_partitions is larger than this parameter.
59+
- see: [The Partitioning Logic](#partition_logic)
5660
- the feature of the partition supports only 3 line terminators.
5761
- `\n`
5862
- `\r`
@@ -61,6 +65,36 @@ in:
6165
## The Reference Implementation
6266
- [hito4t/embulk-input-filesplit](https://github.com/hito4t/embulk-input-filesplit)
6367

68+
##<a id="partition_logic">The Partitioning Logic</a>
69+
70+
```
71+
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
72+
73+
/*
74+
...
75+
*/
76+
77+
int numPartitions;
78+
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
79+
// if the file is compressed, skip partitioning.
80+
numPartitions = 1;
81+
}
82+
else if (!task.getPartition()) {
83+
// if no partition mode, skip partitioning.
84+
numPartitions = 1;
85+
}
86+
else {
87+
// equalize the file size per task as much as possible.
88+
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
89+
}
90+
91+
/*
92+
...
93+
*/
94+
95+
```
96+
97+
6498
## Build
6599
66100
```

Diff for: build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ configurations {
1212
provided
1313
}
1414

15-
version = "0.1.0"
15+
version = "0.1.1"
1616

1717
sourceCompatibility = 1.7
1818
targetCompatibility = 1.7

Diff for: src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java

+16-17
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@ public interface PluginTask extends Task
5555
@ConfigDefault("true")
5656
public boolean getPartition();
5757

58-
// this parameter is experimental.
59-
@Config("partition_level")
60-
@ConfigDefault("3")
61-
public int getPartitonLevel();
58+
@Config("num_partitions") // this parameter is the approximate value.
59+
@ConfigDefault("-1") // Default: Runtime.getRuntime().availableProcessors()
60+
public int getApproximateNumPartitions();
6261

6362
public List<HdfsPartialFile> getFiles();
6463
public void setFiles(List<HdfsPartialFile> hdfsFiles);
@@ -235,30 +234,30 @@ public Path apply(@Nullable String input)
235234
}
236235

237236
// TODO: optimum allocation of resources
238-
int partitionCountParameter = task.getPartitonLevel();
239-
int partitionSizeByOneTask = totalFileLength / (Runtime.getRuntime().availableProcessors() * partitionCountParameter);
237+
int approximateNumPartitions =
238+
(task.getApproximateNumPartitions() <= 0) ? Runtime.getRuntime().availableProcessors() : task.getApproximateNumPartitions();
239+
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
240240

241241
List<HdfsPartialFile> hdfsPartialFiles = new ArrayList<>();
242242
for (Path path : pathList) {
243-
int partitionCount;
243+
int fileLength = (int) fs.getFileStatus(path).getLen(); // declare `fileLength` here because this is used below.
244+
if (fileLength <= 0) {
245+
logger.info("Skip the 0 byte target file: {}", path);
246+
continue;
247+
}
244248

249+
int numPartitions;
245250
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
246-
partitionCount = 1;
251+
numPartitions = 1;
247252
}
248253
else if (!task.getPartition()) {
249-
partitionCount = 1;
254+
numPartitions = 1;
250255
}
251256
else {
252-
int fileLength = (int) fs.getFileStatus(path).getLen();
253-
partitionCount = fileLength / partitionSizeByOneTask;
254-
int remainder = fileLength % partitionSizeByOneTask;
255-
256-
if (remainder > 0) {
257-
partitionCount++;
258-
}
257+
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
259258
}
260259

261-
HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionCount);
260+
HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, numPartitions);
262261
hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles());
263262
}
264263

Diff for: src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,22 @@ public class HdfsFilePartitioner
1414
{
1515
private FileSystem fs;
1616
private Path path;
17-
private int partitionCount;
17+
private int numPartitions;
1818

19-
public HdfsFilePartitioner(FileSystem fs, Path path, int partitionCount)
19+
public HdfsFilePartitioner(FileSystem fs, Path path, int numPartitions)
2020
{
2121
this.fs = fs;
2222
this.path = path;
23-
this.partitionCount = partitionCount;
23+
this.numPartitions = numPartitions;
2424
}
2525

2626
public List<HdfsPartialFile> getHdfsPartialFiles() throws IOException
2727
{
2828
List<HdfsPartialFile> hdfsPartialFiles = new ArrayList<>();
2929
long size = fs.getFileStatus(path).getLen();
30-
for (int i = 0; i < partitionCount; i++) {
31-
long start = size * i / partitionCount;
32-
long end = size * (i + 1) / partitionCount;
30+
for (int i = 0; i < numPartitions; i++) {
31+
long start = size * i / numPartitions;
32+
long end = size * (i + 1) / numPartitions;
3333
if (start < end) {
3434
hdfsPartialFiles.add(new HdfsPartialFile(path.toString(), start, end));
3535
}

0 commit comments

Comments
 (0)