@@ -58,7 +58,7 @@ public interface PluginTask extends Task
58
58
59
59
@ Config ("num_partitions" ) // this parameter is the approximate value.
60
60
@ ConfigDefault ("-1" ) // Default: Runtime.getRuntime().availableProcessors()
61
- public int getApproximateNumPartitions ();
61
+ public long getApproximateNumPartitions ();
62
62
63
63
public List <HdfsPartialFile > getFiles ();
64
64
public void setFiles (List <HdfsPartialFile > hdfsFiles );
@@ -246,25 +246,25 @@ public Path apply(@Nullable String input)
246
246
}
247
247
});
248
248
249
- int totalFileLength = 0 ;
249
+ long totalFileLength = 0 ;
250
250
for (Path path : pathList ) {
251
251
totalFileLength += fs .getFileStatus (path ).getLen ();
252
252
}
253
253
254
254
// TODO: optimum allocation of resources
255
- int approximateNumPartitions =
255
+ long approximateNumPartitions =
256
256
(task .getApproximateNumPartitions () <= 0 ) ? Runtime .getRuntime ().availableProcessors () : task .getApproximateNumPartitions ();
257
- int partitionSizeByOneTask = totalFileLength / approximateNumPartitions ;
257
+ long partitionSizeByOneTask = totalFileLength / approximateNumPartitions ;
258
258
259
259
List <HdfsPartialFile > hdfsPartialFiles = new ArrayList <>();
260
260
for (Path path : pathList ) {
261
- int fileLength = ( int ) fs .getFileStatus (path ).getLen (); // declare `fileLength` here because this is used below.
261
+ long fileLength = fs .getFileStatus (path ).getLen (); // declare `fileLength` here because this is used below.
262
262
if (fileLength <= 0 ) {
263
263
logger .info ("embulk-input-hdfs: Skip the 0 byte target file: {}" , path );
264
264
continue ;
265
265
}
266
266
267
- int numPartitions ;
267
+ long numPartitions ;
268
268
if (path .toString ().endsWith (".gz" ) || path .toString ().endsWith (".bz2" ) || path .toString ().endsWith (".lzo" )) {
269
269
numPartitions = 1 ;
270
270
}
0 commit comments