1616 */
1717package com .google .edwmigration .dumper .application .dumper .connector .hdfs ;
1818
19+ import static com .google .edwmigration .dumper .application .dumper .ConnectorArguments .OPT_HDFS_SCAN_ROOT_PATH ;
20+ import static com .google .edwmigration .dumper .application .dumper .ConnectorArguments .OPT_THREAD_POOL_SIZE ;
21+ import static com .google .edwmigration .dumper .application .dumper .connector .hdfs .SingleDirScanJob .trimExceptionMessage ;
1922import static java .lang .String .format ;
2023import static java .nio .charset .StandardCharsets .UTF_8 ;
2124
3538import java .util .concurrent .ExecutorService ;
3639import java .util .concurrent .TimeUnit ;
3740import javax .annotation .Nonnull ;
41+ import org .apache .hadoop .fs .ContentSummary ;
3842import org .apache .hadoop .fs .FileStatus ;
3943import org .apache .hadoop .fs .Path ;
4044import org .apache .hadoop .hdfs .DistributedFileSystem ;
4448public class HdfsExtractionTask extends AbstractTask <Void > implements HdfsExtractionDumpFormat {
4549 private static final Logger LOG = LoggerFactory .getLogger (HdfsExtractionTask .class );
4650
47- private final int poolSize ;
51+ private final int threadPoolSize ;
52+ private final String hdfsScanRootPath ;
4853
4954 HdfsExtractionTask (@ Nonnull ConnectorArguments args ) {
5055 super (HdfsFormat .ZIP_ENTRY_NAME );
51- Preconditions .checkNotNull (args , "Arguments was null." );
52- poolSize = args .getThreadPoolSize ();
56+ threadPoolSize = args .getThreadPoolSize ();
57+ Preconditions .checkArgument (
58+ threadPoolSize > 0 , "Argument %s should be positive number" , OPT_THREAD_POOL_SIZE );
59+ hdfsScanRootPath = args .getHdfsScanRootPath ();
60+ Preconditions .checkArgument (
61+ !hdfsScanRootPath .isEmpty (), "Argument %s should be non-empty" , OPT_HDFS_SCAN_ROOT_PATH );
5362 }
5463
5564 @ Override
@@ -74,21 +83,38 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Ha
7483 DistributedFileSystem fs = ((HdfsHandle ) handle ).getDfs ();
7584 // Create a dedicated ExecutorService to use:
7685 ExecutorService execService =
77- ExecutorManager .newExecutorServiceWithBackpressure ("hdfs-extraction" , poolSize );
86+ ExecutorManager .newExecutorServiceWithBackpressure ("hdfs-extraction" , threadPoolSize );
7887 try (Writer output = sink .asCharSink (UTF_8 ).openBufferedStream ();
79- ScanContext scanCtx = new ScanContext ( fs , output );
80- ExecutorManager execManager = new ExecutorManager ( execService )) {
88+ ExecutorManager execManager = new ExecutorManager ( execService );
89+ ScanContext scanCtx = new ScanContext ( execManager , fs , output )) {
8190
82- String hdfsPath = "/" ;
83- FileStatus rootDir = fs .getFileStatus (new Path (hdfsPath ));
84- SingleDirScanJob rootJob = new SingleDirScanJob (scanCtx , execManager , rootDir );
85- execManager .execute (rootJob ); // The root job executes immediately
91+ LOG .info (
92+ "Running HDFS extraction\n \t {}: {}\n \t {}: {}" ,
93+ OPT_HDFS_SCAN_ROOT_PATH ,
94+ hdfsScanRootPath ,
95+ OPT_THREAD_POOL_SIZE ,
96+ threadPoolSize );
97+ FileStatus rootDir = fs .getFileStatus (new Path (hdfsScanRootPath ));
98+ scanCtx .submitRootDirScanJob (rootDir , getContentSummaryFor (fs , rootDir ));
8699 execManager .await (); // Wait until all (recursive) tasks are done executing
87- LOG .info (scanCtx .getFormattedStats ());
100+ LOG .info ("Final stats: \n {}" , scanCtx .getDetailedStats ());
88101 } finally {
89102 // Shutdown the dedicated ExecutorService:
90103 MoreExecutors .shutdownAndAwaitTermination (execService , 100 , TimeUnit .MILLISECONDS );
91104 }
92105 return null ;
93106 }
107+
108+ private ContentSummary getContentSummaryFor (DistributedFileSystem dfs , FileStatus file ) {
109+ try {
110+ return dfs .getContentSummary (file .getPath ());
111+ } catch (org .apache .hadoop .security .AccessControlException exn ) {
112+ LOG .info (
113+ "Progress for HDFS extraction won't be displayed due to AccessControlException: {}" ,
114+ trimExceptionMessage (exn .getMessage ()));
115+ } catch (IOException exn ) {
116+ LOG .error ("Progress for HDFS extraction won't be displayed due to IOException: " , exn );
117+ }
118+ return null ;
119+ }
94120}
0 commit comments