Skip to content

Commit fc3b8ed

Browse files
committed
add partiton logic.
1 parent 6fe6b56 commit fc3b8ed

File tree

1 file changed

+34
-0
lines changed

1 file changed

+34
-0
lines changed

Diff for: README.md

+34
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Read files on Hdfs.
1515
- **input_path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`.
1616
- **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property.
1717
- **partition** when this is true, partition input files and increase task count. (default: `true`)
18+
- **num_partitions** number of partitions. (default: `Runtime.getRuntime().availableProcessors()`)
1819

1920
## Example
2021

@@ -32,6 +33,7 @@ in:
3233
input_path: /user/embulk/test/%Y-%m-%d/*
3334
rewind_seconds: 86400
3435
partition: true
36+
num_partitions: 30
3537
decoders:
3638
- {type: gzip}
3739
parser:
@@ -53,6 +55,8 @@ in:
5355
```
5456
5557
## Note
58+
- The parameter **num_partitions** is the approximate value. The actual num_partitions is larger than this parameter.
59+
- see: [The Partitioning Logic](#partition_logic)
5660
- the feature of the partition supports only 3 line terminators.
5761
- `\n`
5862
- `\r`
@@ -61,6 +65,36 @@ in:
6165
## The Reference Implementation
6266
- [hito4t/embulk-input-filesplit](https://github.com/hito4t/embulk-input-filesplit)
6367

68+
##<a id="partition_logic">The Partitioning Logic</a>
69+
70+
```
71+
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
72+
73+
/*
74+
...
75+
*/
76+
77+
int numPartitions;
78+
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
79+
// if the file is compressed, skip partitioning.
80+
numPartitions = 1;
81+
}
82+
else if (!task.getPartition()) {
83+
// if no partition mode, skip partitioning.
84+
numPartitions = 1;
85+
}
86+
else {
87+
// equalize the file size per task as much as possible.
88+
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
89+
}
90+
91+
/*
92+
...
93+
*/
94+
95+
```
96+
97+
6498
## Build
6599
66100
```

0 commit comments

Comments
 (0)