File tree 3 files changed +26
-18
lines changed
3 files changed +26
-18
lines changed Original file line number Diff line number Diff line change
1
+ 0.3.0 (2016-09-21)
2
+ ==================
3
+ - [ Incompatible Change] Not partitoning if files are compressed
4
+ - https://github.com/civitaspo/embulk-input-hdfs/pull/27
5
+
1
6
0.2.1 (2016-02-25)
2
7
==================
3
8
- [ Fix] does not work
Original file line number Diff line number Diff line change @@ -77,23 +77,26 @@ int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
77
77
/*
78
78
...
79
79
* /
80
-
81
- long numPartitions;
82
- if (task.getPartition()) {
83
- if (file.canDecompress()) {
84
- numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
85
- }
86
- else if (file.getCodec() != null) { // if not null, the file is compressed.
87
- numPartitions = 1;
88
- }
89
- else {
90
- numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
91
- }
92
- }
93
- else {
94
- numPartitions = 1;
95
- }
96
-
80
+ long numPartitions = 1; // default is no partition.
81
+ if (isPartitionable(task, conf, status)) { // partition: true and (decompression: false or CompressionCodec is null)
82
+ numPartitions = ((status.getLen() - 1) / partitionSizeByOneTask) + 1;
83
+ }
84
+
85
+ for (long i = 0; i < numPartitions; i++) {
86
+ long start = status.getLen() * i / numPartitions;
87
+ long end = status.getLen() * (i + 1) / numPartitions;
88
+ if (start < end) {
89
+ TargetFileInfo targetFileInfo = new TargetFileInfo.Builder()
90
+ .pathString(status.getPath().toString())
91
+ .start(start)
92
+ .end(end)
93
+ .isDecompressible(isDecompressible(task, conf, status))
94
+ .isPartitionable(isPartitionable(task, conf, status))
95
+ .numHeaderLines(task.getSkipHeaderLines())
96
+ .build();
97
+ builder.add(targetFileInfo);
98
+ }
99
+ }
97
100
/*
98
101
...
99
102
* /
Original file line number Diff line number Diff line change @@ -15,7 +15,7 @@ configurations {
15
15
provided
16
16
}
17
17
18
- version = " 0.2.1 "
18
+ version = " 0.3.0 "
19
19
20
20
sourceCompatibility = 1.7
21
21
targetCompatibility = 1.7
You can’t perform that action at this time.
0 commit comments