|
37 | 37 | import org.junit.Before;
|
38 | 38 | import org.junit.Test;
|
39 | 39 |
|
| 40 | +import java.util.concurrent.CountDownLatch; |
| 41 | +import java.util.concurrent.atomic.AtomicInteger; |
| 42 | + |
40 | 43 | import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
41 | 44 | import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
42 | 45 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
|
|
61 | 64 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
|
62 | 65 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
|
63 | 66 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
|
| 67 | +import static org.junit.Assert.assertEquals; |
64 | 68 | import static org.junit.Assert.assertNull;
|
65 | 69 | import static org.mockito.Mockito.mock;
|
66 | 70 | import static org.mockito.Mockito.when;
|
@@ -794,6 +798,63 @@ public void testCollectAllMetrics() {
|
794 | 798 | .checkAgainst(queueSource, true);
|
795 | 799 | }
|
796 | 800 |
|
| 801 | + @Test |
| 802 | + public void testQueueMetricsRaceCondition() throws InterruptedException { |
| 803 | + final CountDownLatch latch = new CountDownLatch(2); |
| 804 | + final int numIterations = 100000; |
| 805 | + final AtomicInteger exceptionCount = new AtomicInteger(0); |
| 806 | + final AtomicInteger getCount = new AtomicInteger(0); |
| 807 | + |
| 808 | + // init a queue metrics for testing |
| 809 | + String queueName = "test"; |
| 810 | + QueueMetrics metrics = |
| 811 | + QueueMetrics.forQueue(ms, queueName, null, false, conf); |
| 812 | + QueueMetrics.getQueueMetrics().put(queueName, metrics); |
| 813 | + |
| 814 | + /* |
| 815 | + * simulate the concurrent calls for QueueMetrics#getQueueMetrics |
| 816 | + */ |
| 817 | + // thread A will keep querying the same queue metrics for a specified number of iterations |
| 818 | + Thread threadA = new Thread(() -> { |
| 819 | + try { |
| 820 | + for (int i = 0; i < numIterations; i++) { |
| 821 | + QueueMetrics qm = QueueMetrics.getQueueMetrics().get(queueName); |
| 822 | + if (qm != null) { |
| 823 | + getCount.incrementAndGet(); |
| 824 | + } |
| 825 | + } |
| 826 | + } catch (Exception e) { |
| 827 | + System.out.println("Exception: " + e.getMessage()); |
| 828 | + exceptionCount.incrementAndGet(); |
| 829 | + } finally { |
| 830 | + latch.countDown(); |
| 831 | + } |
| 832 | + }); |
| 833 | + // thread B will keep adding new queue metrics for a specified number of iterations |
| 834 | + Thread threadB = new Thread(() -> { |
| 835 | + try { |
| 836 | + for (int i = 0; i < numIterations; i++) { |
| 837 | + QueueMetrics.getQueueMetrics().put("q" + i, metrics); |
| 838 | + } |
| 839 | + } catch (Exception e) { |
| 840 | + exceptionCount.incrementAndGet(); |
| 841 | + } finally { |
| 842 | + latch.countDown(); |
| 843 | + } |
| 844 | + }); |
| 845 | + |
| 846 | + // start threads and wait for them to finish |
| 847 | + threadA.start(); |
| 848 | + threadB.start(); |
| 849 | + latch.await(); |
| 850 | + |
| 851 | + // check if all get operations are successful to |
| 852 | + // make sure there is no race condition |
| 853 | + assertEquals(numIterations, getCount.get()); |
| 854 | + // check if there is any exception |
| 855 | + assertEquals(0, exceptionCount.get()); |
| 856 | + } |
| 857 | + |
797 | 858 | private static void checkAggregatedNodeTypes(MetricsSource source,
|
798 | 859 | long nodeLocal, long rackLocal, long offSwitch) {
|
799 | 860 | MetricsRecordBuilder rb = getMetrics(source);
|
|
0 commit comments