146146import java .util .Map ;
147147import java .util .Set ;
148148import java .util .concurrent .CompletableFuture ;
149+ import java .util .concurrent .CyclicBarrier ;
149150import java .util .concurrent .ExecutionException ;
150151import java .util .concurrent .ExecutorService ;
151152import java .util .concurrent .Semaphore ;
@@ -230,6 +231,9 @@ public class ApiKeyServiceTests extends ESTestCase {
230231 "search": [ {"names": ["logs"]} ],
231232 "replication": [ {"names": ["archive"]} ]
232233 }""" );
234+
235+ private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000 ;
236+
233237 private ThreadPool threadPool ;
234238 private Client client ;
235239 private SecurityIndexManager securityIndex ;
@@ -245,7 +249,7 @@ public void createThreadPool() {
245249 Settings .EMPTY ,
246250 SECURITY_CRYPTO_THREAD_POOL_NAME ,
247251 1 ,
248- 1000 ,
252+ TEST_THREADPOOL_QUEUE_SIZE ,
249253 "xpack.security.crypto.thread_pool" ,
250254 EsExecutors .TaskTrackingConfig .DO_NOT_TRACK
251255 )
@@ -268,6 +272,90 @@ public void setupMocks() {
268272 doAnswer (invocation -> Instant .now ()).when (clock ).instant ();
269273 }
270274
275+ public void testFloodThreadpool () throws Exception {
276+ // We're going to be blocking the security-crypto threadpool so we need a new one for the client
277+ ThreadPool clientThreadpool = new TestThreadPool (
278+ this .getTestName (),
279+ new FixedExecutorBuilder (
280+ Settings .EMPTY ,
281+ this .getTestName (),
282+ 1 ,
283+ 100 ,
284+ "no_settings_used" ,
285+ EsExecutors .TaskTrackingConfig .DO_NOT_TRACK
286+ )
287+ );
288+ try {
289+ when (client .threadPool ()).thenReturn (clientThreadpool );
290+
291+ // setup copied from testAuthenticateWithApiKey
292+ final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
293+ final ApiKeyService service = createApiKeyService (settings );
294+
295+ final String id = randomAlphaOfLength (12 );
296+ final String key = randomAlphaOfLength (16 );
297+
298+ final User user , authUser ;
299+ if (randomBoolean ()) {
300+ user =
new User (
"hulk" ,
new String [] {
"superuser" },
"Bruce Banner" ,
"[email protected] " ,
Map .
of (),
true );
301+ authUser = new User ("authenticated_user" , "other" );
302+ } else {
303+ user =
new User (
"hulk" ,
new String [] {
"superuser" },
"Bruce Banner" ,
"[email protected] " ,
Map .
of (),
true );
304+ authUser = null ;
305+ }
306+ final ApiKey .Type type = randomFrom (ApiKey .Type .values ());
307+ final Map <String , Object > metadata = mockKeyDocument (id , key , user , authUser , false , Duration .ofSeconds (3600 ), null , type );
308+
309+ // Block the security crypto threadpool
310+ CyclicBarrier barrier = new CyclicBarrier (2 );
311+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> safeAwait (barrier ));
312+ // Now fill it up while the one thread is blocked
313+ for (int i = 0 ; i < TEST_THREADPOOL_QUEUE_SIZE ; i ++) {
314+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> {});
315+ }
316+
317+ // Check that it's full
318+ for (var stat : threadPool .stats ().stats ()) {
319+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
320+ assertThat (stat .queue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
321+ assertThat (stat .rejected (), equalTo (0L ));
322+ }
323+ }
324+
325+ // now try to auth with an API key
326+ final AuthenticationResult <User > auth = tryAuthenticate (service , id , key , type );
327+ assertThat (auth .getStatus (), is (AuthenticationResult .Status .TERMINATE ));
328+
329+ // Make sure one was rejected and the queue is still full
330+ for (var stat : threadPool .stats ().stats ()) {
331+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
332+ assertThat (stat .queue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
333+ assertThat (stat .rejected (), equalTo (1L ));
334+ }
335+ }
336+ ListenableFuture <CachedApiKeyHashResult > cachedValue = service .getApiKeyAuthCache ().get (id );
337+ assertThat ("since the request was rejected, there should be no cache entry for this key" , cachedValue , nullValue ());
338+
339+ // unblock the threadpool
340+ safeAwait (barrier );
341+
342+ // wait for the threadpool queue to drain & check that the stats as as expected
343+ flushThreadPoolExecutor (threadPool , SECURITY_CRYPTO_THREAD_POOL_NAME );
344+ for (var stat : threadPool .stats ().stats ()) {
345+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
346+ assertThat (stat .rejected (), equalTo (1L ));
347+ assertThat (stat .queue (), equalTo (0 ));
348+ }
349+ }
350+
351+ // try to authenticate again with the same key - if this hangs, check the future caching
352+ final AuthenticationResult <User > shouldSucceed = tryAuthenticate (service , id , key , type );
353+ assertThat (shouldSucceed .getStatus (), is (AuthenticationResult .Status .SUCCESS ));
354+ } finally {
355+ terminate (clientThreadpool );
356+ }
357+ }
358+
271359 public void testCreateApiKeyUsesBulkIndexAction () throws Exception {
272360 final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
273361 final ApiKeyService service = createApiKeyService (settings );
0 commit comments