4848
4949public class IndexWriterPool {
5050 private final static Logger LOG = LoggerFactory .getLogger (IndexWriterPool .class );
51- private final static ThreadMonitor threadMonitor = new ThreadMonitor ();
5251
5352 public static final String OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = "oak.indexer.parallelWriter.maxBatchSize" ;
5453 public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = 256 ;
@@ -63,23 +62,25 @@ public class IndexWriterPool {
6362 private final int queueSize = ConfigHelper .getSystemPropertyAsInt (OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE , DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE );
6463 private final int numberOfThreads = ConfigHelper .getSystemPropertyAsInt (OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS , DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS );
6564
65+ private final ThreadMonitor threadMonitor = new ThreadMonitor ();
6666 private final ArrayList <Operation > batch = new ArrayList <>(maxBatchSize );
6767 private final BlockingQueue <OperationBatch > queue = new ArrayBlockingQueue <>(queueSize );
68- private final List <Future <?>> futures ;
69- private final ExecutorService writerPool ;
68+ private final List <Future <?>> workerFutures ;
69+ private final ExecutorService writersPool ;
7070 private final ScheduledExecutorService scheduledExecutor = Executors .newSingleThreadScheduledExecutor ();
7171 private final AtomicBoolean closed = new AtomicBoolean (false );
7272
73- private long updateCount = 0 ;
74- private long deleteCount = 0 ;
75- private long totalEnqueueTimeNanos = 0 ;
76-
77- private final Object lock = new Object ();
7873 // Used to keep track of the sequence number of the batches that are currently being processed.
7974 // This is used to wait until all operations for a writer are processed before closing it.
75+ private final Object pendingBatchesLock = new Object ();
8076 private final HashSet <Long > pendingBatches = new HashSet <>();
8177 private long batchSequenceNumber = 0 ;
8278
79+ // Statistics
80+ private long updateCount = 0 ;
81+ private long deleteCount = 0 ;
82+ private long totalEnqueueTimeNanos = 0 ;
83+
8384 private static class OperationBatch {
8485 final long sequenceNumber ;
8586 final Operation [] operations ;
@@ -181,9 +182,9 @@ public void run() {
181182 }
182183 }
183184 LOG .info ("Batch processed: {}" , op .sequenceNumber );
184- synchronized (lock ) {
185+ synchronized (pendingBatchesLock ) {
185186 pendingBatches .remove (op .sequenceNumber );
186- lock .notifyAll ();
187+ pendingBatchesLock .notifyAll ();
187188 }
188189// maxSize = Math.max(maxSize, sumSize);
189190// LOG.info("Executed batch of size: {}. Total size: {}, Max: {}", op.length, sumSize, maxSize);
@@ -209,9 +210,9 @@ public IndexWriterPool() {
209210 .setNameFormat ("index-writer-%d" )
210211 .build ();
211212
212- this .writerPool = Executors .newFixedThreadPool (numberOfThreads , delegateThreadFactory );
213- this .futures = IntStream .range (0 , numberOfThreads )
214- .mapToObj (i -> writerPool .submit (new Worker ()))
213+ this .writersPool = Executors .newFixedThreadPool (numberOfThreads , delegateThreadFactory );
214+ this .workerFutures = IntStream .range (0 , numberOfThreads )
215+ .mapToObj (i -> writersPool .submit (new Worker ()))
215216 .collect (Collectors .toList ());
216217 threadMonitor .start ();
217218 scheduledExecutor .scheduleAtFixedRate (this ::printStatistics , 0 , 30 , TimeUnit .SECONDS );
@@ -240,14 +241,14 @@ public boolean closeWriter(LuceneIndexWriter writer, long timestamp) {
240241 // operations are for which writer.
241242 long seqNumber = flushBatch ();
242243 LOG .info ("All operations for writer: {} enqueued (highest batch sequence number: {}). Waiting for them to be processed" , writer , seqNumber );
243- synchronized (lock ) {
244+ synchronized (pendingBatchesLock ) {
244245 while (true ) {
245246 Long earliestPending = pendingBatches .isEmpty () ? null : pendingBatches .stream ().min (Long ::compareTo ).get ();
246247 LOG .info ("Earliest pending batch: {}. Waiting for seqNumber: {}" , earliestPending , seqNumber );
247248 if (earliestPending == null || earliestPending > seqNumber ) {
248249 break ;
249250 }
250- lock .wait ();
251+ pendingBatchesLock .wait ();
251252 }
252253 }
253254 LOG .info ("All operations for writer: {} processed. Enqueuing close operation" , writer );
@@ -268,15 +269,15 @@ public void close() {
268269 flushBatch ();
269270 queue .add (SHUTDOWN );
270271 LOG .info ("Shutting down PipelinedIndexWriter. Total enqueue time: {} ms" , totalEnqueueTimeNanos / 1_000_000 );
271- for (Future <?> f : futures ) {
272+ for (Future <?> f : workerFutures ) {
272273 LOG .info ("Waiting for future: {}" , f );
273274 try {
274275 f .get ();
275276 } catch (InterruptedException | ExecutionException e ) {
276277 LOG .info ("Error while waiting for future" , e );
277278 }
278279 }
279- new ExecutorCloser (writerPool , 1 , TimeUnit .SECONDS ).close ();
280+ new ExecutorCloser (writersPool , 1 , TimeUnit .SECONDS ).close ();
280281 new ExecutorCloser (scheduledExecutor , 1 , TimeUnit .SECONDS ).close ();
281282 threadMonitor .printStatistics ();
282283 } else {
@@ -301,7 +302,7 @@ private long flushBatch() {
301302 // Batches may be empty. This is necessary
302303 try {
303304 long seqNumber ;
304- synchronized (lock ) {
305+ synchronized (pendingBatchesLock ) {
305306 // Shared between producer and workers
306307 seqNumber = batchSequenceNumber ;
307308 batchSequenceNumber ++;
0 commit comments