|
37 | 37 | import lombok.NonNull; |
38 | 38 | import lombok.ToString; |
39 | 39 |
|
40 | | -/** Provides throttling on concurrent tasks per client, and on total number of clients serviced concurrently. */ |
| 40 | +/** |
| 41 | + * Provides throttling on concurrent tasks per client, and on total number of clients serviced |
| 42 | + * concurrently. |
| 43 | + */ |
41 | 44 | @ThreadSafe |
42 | 45 | @ToString |
43 | 46 | public final class Conottle implements ClientTaskExecutor, AutoCloseable { |
44 | | - private static final Logger logger = Logger.instance(); |
45 | | - private static final int DEFAULT_MAX = Math.max(16, Runtime.getRuntime().availableProcessors()); |
46 | | - private final ExecutorService adminExecutorService = |
47 | | - Executors.newCachedThreadPool(ThreadFactories.newPlatformThreadFactory("conottle-admin")); |
| 47 | + private static final Logger logger = Logger.instance(); |
| 48 | + private static final int DEFAULT_MAX = Math.max(16, Runtime.getRuntime().availableProcessors()); |
| 49 | + private final ExecutorService adminExecutorService = |
| 50 | + Executors.newCachedThreadPool(ThreadFactories.newPlatformThreadFactory("conottle-admin")); |
48 | 51 |
|
49 | | - private final ConcurrentMap<Object, SingleClientThrottlingTaskExecutor> activeThrottlingExecutors; |
50 | | - private final Semaphore clientTotalThrottle; |
51 | | - private final int maxParallelismPerClient; |
52 | | - private final ExecutorService workerExecutorService; |
| 52 | + private final ConcurrentMap<Object, SingleClientThrottlingTaskExecutor> activeThrottlingExecutors; |
| 53 | + private final Semaphore clientTotalThrottle; |
| 54 | + private final int maxParallelismPerClient; |
| 55 | + private final ExecutorService workerExecutorService; |
53 | 56 |
|
54 | | - @Builder |
55 | | - private Conottle(int maxClientsInParallel, int maxParallelismPerClient, ExecutorService workerExecutorService) { |
56 | | - this.activeThrottlingExecutors = new ConcurrentHashMap<>(requireNonNegativeOrThrow(maxClientsInParallel)); |
57 | | - this.clientTotalThrottle = new Semaphore(requireNonNegativeOrThrow(maxClientsInParallel)); |
58 | | - this.maxParallelismPerClient = requireNonNegativeOrThrow(maxParallelismPerClient); |
59 | | - this.workerExecutorService = |
60 | | - workerExecutorService == null ? Executors.newWorkStealingPool(DEFAULT_MAX) : workerExecutorService; |
61 | | - logger.log("Constructed: {}", this); |
62 | | - } |
| 57 | + @Builder |
| 58 | + private Conottle( |
| 59 | + int maxClientsInParallel, |
| 60 | + int maxParallelismPerClient, |
| 61 | + ExecutorService workerExecutorService) { |
| 62 | + this.activeThrottlingExecutors = |
| 63 | + new ConcurrentHashMap<>(requireNonNegativeOrThrow(maxClientsInParallel)); |
| 64 | + this.clientTotalThrottle = new Semaphore(requireNonNegativeOrThrow(maxClientsInParallel)); |
| 65 | + this.maxParallelismPerClient = requireNonNegativeOrThrow(maxParallelismPerClient); |
| 66 | + this.workerExecutorService = workerExecutorService == null |
| 67 | + ? Executors.newWorkStealingPool(DEFAULT_MAX) |
| 68 | + : workerExecutorService; |
| 69 | + logger.log("Constructed: {}", this); |
| 70 | + } |
63 | 71 |
|
64 | | - private static int requireNonNegativeOrThrow(int value) { |
65 | | - if (value < 0) { |
66 | | - throw new IllegalArgumentException("Negative value is not allowed: " + value); |
67 | | - } |
68 | | - if (value == 0) { |
69 | | - return DEFAULT_MAX; |
70 | | - } |
71 | | - return value; |
| 72 | + private static int requireNonNegativeOrThrow(int value) { |
| 73 | + if (value < 0) { |
| 74 | + throw new IllegalArgumentException("Negative value is not allowed: " + value); |
72 | 75 | } |
73 | | - |
74 | | - @Override |
75 | | - public <V> @NonNull Future<V> submit(@NonNull Callable<V> task, @NonNull Object clientId) { |
76 | | - CompletableFutureHolder<V> taskCompletableFutureHolder = new CompletableFutureHolder<>(); |
77 | | - activeThrottlingExecutors.compute(clientId, (k, valueExecutor) -> { |
78 | | - SingleClientThrottlingTaskExecutor executor = |
79 | | - (valueExecutor == null) ? acquireClientExecutor() : valueExecutor; |
80 | | - taskCompletableFutureHolder.setCompletableFuture(executor.submit(task)); |
81 | | - return executor; |
82 | | - }); |
83 | | - CompletableFuture<V> taskCompletableFuture = taskCompletableFutureHolder.getCompletableFuture(); |
84 | | - CompletableFuture<V> copy = taskCompletableFuture.thenApply(r -> r); |
85 | | - taskCompletableFuture.whenCompleteAsync( |
86 | | - (r, e) -> activeThrottlingExecutors.computeIfPresent(clientId, (k, valueExecutor) -> { |
87 | | - if (valueExecutor.hasMoreWork()) { |
88 | | - return valueExecutor; |
89 | | - } |
90 | | - releaseClientExecutor(); |
91 | | - return null; |
92 | | - }), |
93 | | - adminExecutorService); |
94 | | - return new DefensiveFuture<>(copy); |
| 76 | + if (value == 0) { |
| 77 | + return DEFAULT_MAX; |
95 | 78 | } |
| 79 | + return value; |
| 80 | + } |
96 | 81 |
|
97 | | - @Override |
98 | | - public void close() { |
99 | | - workerExecutorService.shutdown(); |
100 | | - await().forever().until(workerExecutorService::isTerminated); |
101 | | - adminExecutorService.shutdown(); |
102 | | - } |
| 82 | + @Override |
| 83 | + public <V> @NonNull Future<V> submit(@NonNull Callable<V> task, @NonNull Object clientId) { |
| 84 | + CompletableFutureHolder<V> taskCompletableFutureHolder = new CompletableFutureHolder<>(); |
| 85 | + activeThrottlingExecutors.compute(clientId, (k, valueExecutor) -> { |
| 86 | + SingleClientThrottlingTaskExecutor executor = |
| 87 | + (valueExecutor == null) ? acquireClientExecutor() : valueExecutor; |
| 88 | + taskCompletableFutureHolder.setCompletableFuture(executor.submit(task)); |
| 89 | + return executor; |
| 90 | + }); |
| 91 | + CompletableFuture<V> taskCompletableFuture = taskCompletableFutureHolder.getCompletableFuture(); |
| 92 | + CompletableFuture<V> copy = taskCompletableFuture.thenApply(r -> r); |
| 93 | + taskCompletableFuture.whenCompleteAsync( |
| 94 | + (r, e) -> activeThrottlingExecutors.computeIfPresent(clientId, (k, valueExecutor) -> { |
| 95 | + if (valueExecutor.hasMoreWork()) { |
| 96 | + return valueExecutor; |
| 97 | + } |
| 98 | + releaseClientExecutor(); |
| 99 | + return null; |
| 100 | + }), |
| 101 | + adminExecutorService); |
| 102 | + return new DefensiveFuture<>(copy); |
| 103 | + } |
103 | 104 |
|
104 | | - int countActiveExecutors() { |
105 | | - return activeThrottlingExecutors.size(); |
106 | | - } |
| 105 | + @Override |
| 106 | + public void close() { |
| 107 | + workerExecutorService.shutdown(); |
| 108 | + await().forever().until(workerExecutorService::isTerminated); |
| 109 | + adminExecutorService.shutdown(); |
| 110 | + } |
107 | 111 |
|
108 | | - private @NonNull SingleClientThrottlingTaskExecutor acquireClientExecutor() { |
109 | | - Semaphores.acquireInterruptiblyUnchecked(clientTotalThrottle); |
110 | | - return new SingleClientThrottlingTaskExecutor( |
111 | | - maxParallelismPerClient, workerExecutorService, adminExecutorService); |
112 | | - } |
| 112 | + int countActiveExecutors() { |
| 113 | + return activeThrottlingExecutors.size(); |
| 114 | + } |
113 | 115 |
|
114 | | - private void releaseClientExecutor() { |
115 | | - clientTotalThrottle.release(); |
116 | | - } |
| 116 | + private @NonNull SingleClientThrottlingTaskExecutor acquireClientExecutor() { |
| 117 | + Semaphores.acquireInterruptiblyUnchecked(clientTotalThrottle); |
| 118 | + return new SingleClientThrottlingTaskExecutor( |
| 119 | + maxParallelismPerClient, workerExecutorService, adminExecutorService); |
| 120 | + } |
| 121 | + |
| 122 | + private void releaseClientExecutor() { |
| 123 | + clientTotalThrottle.release(); |
| 124 | + } |
117 | 125 | } |
0 commit comments