Skip to content
This repository was archived by the owner on Jun 24, 2019. It is now read-only.
This repository was archived by the owner on Jun 24, 2019. It is now read-only.

MultiThreaded Put case, iterator count is not correct #7

@davinash

Description

@davinash

I wrote following test
20 threads doing puts in the Store, each threads puts 100 keys.
When I iterate over using iterator I am getting less keys

package com.indeed.lsmtree.core;

import com.indeed.util.serialization.StringSerializer;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;


public class MultiThreadsOp extends TestCase {
  final class PutTask1 implements Runnable {
    private int taskId;
    private Store map;

    public PutTask1(int id, Store map) {
      this.taskId = id;
      this.map = map;
    }

    @Override
    public void run() {
      System.out.println("Task ID : " + this.taskId + " performed by "
          + Thread.currentThread().getName());
      for (int rowIdx = 0; rowIdx < 100; rowIdx++) {
        try {
          final String key = "Key-" + rowIdx + taskId;
          final String val = "Value-" + rowIdx + taskId;
          this.map.put( key, val );
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }

  @Test
  public void testLSMOpPutGetMultipleThreads() throws IOException {
    try {
      Store map = new StoreBuilder<String, String>(
          new File("/tmp/testLSMOpPutGetMultipleThreads"), new StringSerializer(),
          new StringSerializer()).setMaxVolatileGenerationSize(8 * 1024).setCodec(null)
          .setStorageType(StorageType.INLINE).build();

      final int numOfThreads = 20;
      ExecutorService taskExecutor = Executors.newFixedThreadPool(numOfThreads);
      IntStream.range(0, numOfThreads).forEach(i -> taskExecutor.submit(new PutTask1(i, map)));

      taskExecutor.shutdown();
      try {
        while(!taskExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
          Thread.sleep(60000);
        }
      } catch (InterruptedException e) {
      }


      Iterator iterator = map.iterator();
      int count = 0;
      while ( iterator.hasNext()) {
        count++;
        iterator.next();
      }
      assertEquals(100 * numOfThreads, count);
    } finally {
      FileUtils.deleteDirectory(new File("/tmp/testLSMOpPutGetMultipleThreads"));
    }
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions