Skip to content

Commit 3e7c8b3

Browse files
committed
test: fix ipc test
1 parent 2a40fd1 commit 3e7c8b3

File tree

2 files changed

+21
-32
lines changed

2 files changed

+21
-32
lines changed

src/integrationtests/java/com/aws/greengrass/integrationtests/ipc/GetClientDeviceAuthTokenTest.java

+13-28
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package com.aws.greengrass.integrationtests.ipc;
77

8+
import com.aws.greengrass.clientdevices.auth.infra.CDAExecutor;
89
import com.aws.greengrass.dependency.State;
910
import com.aws.greengrass.clientdevices.auth.ClientDevicesAuthService;
1011
import com.aws.greengrass.clientdevices.auth.exception.AuthenticationException;
@@ -27,6 +28,7 @@
2728
import org.junit.jupiter.api.extension.ExtensionContext;
2829
import org.junit.jupiter.api.io.TempDir;
2930
import org.mockito.Mock;
31+
import org.mockito.Mockito;
3032
import org.mockito.junit.jupiter.MockitoExtension;
3133
import software.amazon.awssdk.aws.greengrass.GetClientDeviceAuthTokenResponseHandler;
3234
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
@@ -47,21 +49,19 @@
4749
import java.util.Optional;
4850
import java.util.concurrent.CompletableFuture;
4951
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.LinkedBlockingQueue;
53+
import java.util.concurrent.ThreadPoolExecutor;
5054
import java.util.concurrent.TimeUnit;
5155
import java.util.function.Consumer;
5256

53-
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
54-
import static com.aws.greengrass.clientdevices.auth.ClientDevicesAuthService.CLOUD_REQUEST_QUEUE_SIZE_TOPIC;
55-
import static com.aws.greengrass.clientdevices.auth.ClientDevicesAuthService.PERFORMANCE_TOPIC;
5657
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
5758
import static com.aws.greengrass.testcommons.testutilities.TestUtils.asyncAssertOnConsumer;
5859
import static org.hamcrest.MatcherAssert.assertThat;
5960
import static org.hamcrest.Matchers.containsString;
6061
import static org.hamcrest.Matchers.is;
6162
import static org.junit.jupiter.api.Assertions.assertEquals;
6263
import static org.junit.jupiter.api.Assertions.assertThrows;
63-
import static org.mockito.ArgumentMatchers.anyMap;
64-
import static org.mockito.ArgumentMatchers.anyString;
64+
import static org.mockito.ArgumentMatchers.any;
6565
import static org.mockito.Mockito.when;
6666

6767
@ExtendWith({GGExtension.class, UniqueRootPathExtension.class, MockitoExtension.class})
@@ -159,8 +159,15 @@ void GIVEN_brokerWithValidCredentials_WHEN_GetClientDeviceAuthToken_THEN_returns
159159
@Test
160160
void GIVEN_brokerWithInvalidCredentials_WHEN_GetClientDeviceAuthToken_THEN_throwsInvalidArgumentsError_and_WHEN_queueIsFull_THEN_throwsServiceError()
161161
throws Exception {
162+
// Inject work queue which will reject any work added
163+
LinkedBlockingQueue<Runnable> mockQueue = Mockito.mock(LinkedBlockingQueue.class);
164+
when(mockQueue.offer(any())).thenReturn(false);
165+
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, mockQueue);
166+
kernel.getContext().put(CDAExecutor.class, new CDAExecutor(executor));
162167
kernel.getContext().put(SessionManager.class, sessionManager);
168+
163169
startNucleusWithConfig("cda.yaml");
170+
164171
try (EventStreamRPCConnection connection = IPCTestUtils.getEventStreamRpcConnection(kernel,
165172
"BrokerWithGetClientDeviceAuthTokenPermission")) {
166173
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(connection);
@@ -174,11 +181,6 @@ void GIVEN_brokerWithInvalidCredentials_WHEN_GetClientDeviceAuthToken_THEN_throw
174181
assertThat(err.getCause().getMessage(), containsString("Invalid client device credentials"));
175182
}
176183

177-
// Update the cloud queue size to 1 so that we'll just reject the second request
178-
kernel.findServiceTopic(ClientDevicesAuthService.CLIENT_DEVICES_AUTH_SERVICE_NAME)
179-
.lookup(CONFIGURATION_CONFIG_KEY, PERFORMANCE_TOPIC, CLOUD_REQUEST_QUEUE_SIZE_TOPIC).withValue(1);
180-
kernel.getContext().waitForPublishQueueToClear();
181-
182184
// Verify that we get a good error that the request couldn't be queued
183185
try (EventStreamRPCConnection connection = IPCTestUtils.getEventStreamRpcConnection(kernel,
184186
"BrokerWithGetClientDeviceAuthTokenPermission")) {
@@ -187,27 +189,10 @@ void GIVEN_brokerWithInvalidCredentials_WHEN_GetClientDeviceAuthToken_THEN_throw
187189
new CredentialDocument().withMqttCredential(
188190
new MQTTCredential().withClientId("some-client-id").withCertificatePem("VALID PEM")));
189191

190-
CountDownLatch cdl = new CountDownLatch(1);
191-
when(sessionManager.createSession(anyString(), anyMap())).thenAnswer((a) -> {
192-
cdl.countDown();
193-
Thread.sleep(1_000); // slow down the first request so that the second will be rejected
194-
return "uuid";
195-
});
196-
// Request 1 (immediately runs)
197-
CompletableFuture<GetClientDeviceAuthTokenResponse> fut1 =
198-
ipcClient.getClientDeviceAuthToken(request, Optional.empty()).getResponse();
199-
// Ensure the threadpool is actively blocked before we send the next requests to fill the queue and then
200-
// overflow the queue.
201-
cdl.await(2, TimeUnit.SECONDS);
202-
// Request 2 (queued so that queue size is 1)
203-
CompletableFuture<GetClientDeviceAuthTokenResponse> fut2 =
204-
ipcClient.getClientDeviceAuthToken(request, Optional.empty()).getResponse();
205-
// Request 3 (expect rejection)
192+
// Expect rejection
206193
Exception err = assertThrows(Exception.class, () -> clientDeviceAuthToken(ipcClient, request, (r) -> {}));
207194
assertThat(err.getCause().getMessage(), containsString("Unable to queue request"));
208195
assertEquals(ServiceError.class, err.getCause().getClass());
209-
fut1.get(2, TimeUnit.SECONDS);
210-
fut2.get(2, TimeUnit.SECONDS);
211196
}
212197
}
213198

src/main/java/com/aws/greengrass/clientdevices/auth/ClientDevicesAuthService.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,14 @@ private void updateInfrastructure(InfrastructureConfiguration infraConfig) {
156156
}
157157

158158
private void initializeInfrastructure() {
159-
BlockingQueue<Runnable> queue = new ResizableLinkedBlockingQueue<>(DEFAULT_WORK_QUEUE_DEPTH);
160-
ThreadPoolExecutor executor = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE,
161-
DEFAULT_THREAD_POOL_SIZE, 60, TimeUnit.SECONDS, queue);
162-
context.put(CDAExecutor.class, new CDAExecutor(executor));
159+
// Don't re-inject this if it is already present
160+
CDAExecutor cdaExecutor = context.getIfExists(CDAExecutor.class, null);
161+
if (cdaExecutor == null) {
162+
BlockingQueue<Runnable> queue = new ResizableLinkedBlockingQueue<>(DEFAULT_WORK_QUEUE_DEPTH);
163+
ThreadPoolExecutor executor = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE,
164+
DEFAULT_THREAD_POOL_SIZE, 60, TimeUnit.SECONDS, queue);
165+
context.put(CDAExecutor.class, new CDAExecutor(executor));
166+
}
163167
}
164168

165169
private void initializeIPC() {

0 commit comments

Comments
 (0)