@@ -135,7 +135,7 @@ public class VerifyReplication extends Configured implements Tool {
135
135
public static class Verifier extends TableMapper <ImmutableBytesWritable , Put > {
136
136
137
137
private ThreadPoolExecutor reCompareExecutor = null ;
138
- private final LinkedBlockingQueue <Future <?>> reCompareTasks = new LinkedBlockingQueue <>( 1_000 ) ;
138
+ private LinkedBlockingQueue <Future <?>> reCompareTasks = null ;
139
139
140
140
public enum Counters {
141
141
GOODROWS ,
@@ -202,6 +202,7 @@ private ResultScanner initializeMapperOnFirstValue(Context context, Result value
202
202
}
203
203
int reCompareThreads = conf .getInt (NAME + ".recompareThreads" , 0 );
204
204
reCompareExecutor = buildReCompareExecutor (reCompareThreads , context );
205
+ reCompareTasks = buildReCompareTasksQueue (reCompareThreads );
205
206
TableName tableName = TableName .valueOf (conf .get (NAME + ".tableName" ));
206
207
sourceConnection = ConnectionFactory .createConnection (conf );
207
208
sourceTable = sourceConnection .getTable (tableName );
@@ -330,6 +331,15 @@ private Future<?> logFailRowAndIncreaseCounter(Context context, Counters counter
330
331
331
332
private void addSubmittedTaskToQueue (Future <?> task , Context context , Result value ,
332
333
String method ) {
334
+ if (reCompareTasks == null ) {
335
+ try {
336
+ task .get ();
337
+ } catch (Exception e ) {
338
+ throw new RuntimeException (e );
339
+ }
340
+ return ;
341
+ }
342
+
333
343
while (true ) {
334
344
if (reCompareTasks .offer (task )) {
335
345
break ;
@@ -459,6 +469,10 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
459
469
}
460
470
};
461
471
}
472
+
473
+ private static LinkedBlockingQueue <Future <?>> buildReCompareTasksQueue (int maxThreads ) {
474
+ return maxThreads <= 0 ? null : new LinkedBlockingQueue <>(maxThreads * 10 );
475
+ }
462
476
}
463
477
464
478
private static Pair <ReplicationPeerConfig , Configuration >
0 commit comments