Skip to content

Commit 2a40fd1

Browse files
committed
feat: cda infrastructure worker
1 parent 0dd2a51 commit 2a40fd1

File tree

6 files changed

+175
-71
lines changed

6 files changed

+175
-71
lines changed

src/main/java/com/aws/greengrass/clientdevices/auth/ClientDevicesAuthService.java

+36-61
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import com.aws.greengrass.clientdevices.auth.configuration.CDAConfiguration;
1515
import com.aws.greengrass.clientdevices.auth.configuration.GroupConfiguration;
1616
import com.aws.greengrass.clientdevices.auth.configuration.GroupManager;
17+
import com.aws.greengrass.clientdevices.auth.configuration.InfrastructureConfiguration;
1718
import com.aws.greengrass.clientdevices.auth.connectivity.CISShadowMonitor;
19+
import com.aws.greengrass.clientdevices.auth.infra.CDAExecutor;
1820
import com.aws.greengrass.clientdevices.auth.infra.NetworkState;
1921
import com.aws.greengrass.clientdevices.auth.session.MqttSessionFactory;
2022
import com.aws.greengrass.clientdevices.auth.session.SessionConfig;
@@ -30,7 +32,6 @@
3032
import com.aws.greengrass.ipc.SubscribeToCertificateUpdatesOperationHandler;
3133
import com.aws.greengrass.ipc.VerifyClientDeviceIdentityOperationHandler;
3234
import com.aws.greengrass.lifecyclemanager.PluginService;
33-
import com.aws.greengrass.util.Coerce;
3435
import com.fasterxml.jackson.databind.MapperFeature;
3536
import com.fasterxml.jackson.databind.ObjectMapper;
3637
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCService;
@@ -45,6 +46,8 @@
4546
import java.util.concurrent.TimeUnit;
4647
import javax.inject.Inject;
4748

49+
import static com.aws.greengrass.clientdevices.auth.configuration.InfrastructureConfiguration.DEFAULT_THREAD_POOL_SIZE;
50+
import static com.aws.greengrass.clientdevices.auth.configuration.InfrastructureConfiguration.DEFAULT_WORK_QUEUE_DEPTH;
4851
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
4952
import static software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCService.AUTHORIZE_CLIENT_DEVICE_ACTION;
5053
import static software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCService.GET_CLIENT_DEVICE_AUTH_TOKEN;
@@ -55,22 +58,17 @@
5558
public class ClientDevicesAuthService extends PluginService {
5659
public static final String CLIENT_DEVICES_AUTH_SERVICE_NAME = "aws.greengrass.clientdevices.Auth";
5760

61+
private CDAConfiguration cdaConfiguration;
62+
private InfrastructureConfiguration infrastructureConfig;
63+
5864
// TODO: Move configuration related constants to appropriate configuration class
5965
public static final String DEVICE_GROUPS_TOPICS = "deviceGroups";
6066
public static final String PERFORMANCE_TOPIC = "performance";
6167
public static final String MAX_ACTIVE_AUTH_TOKENS_TOPIC = "maxActiveAuthTokens";
6268
public static final String CLOUD_REQUEST_QUEUE_SIZE_TOPIC = "cloudRequestQueueSize";
6369
public static final String MAX_CONCURRENT_CLOUD_REQUESTS_TOPIC = "maxConcurrentCloudRequests";
64-
// Limit the queue size before we start rejecting requests
65-
private static final int DEFAULT_CLOUD_CALL_QUEUE_SIZE = 100;
66-
private static final int DEFAULT_THREAD_POOL_SIZE = 1;
6770
public static final int DEFAULT_MAX_ACTIVE_AUTH_TOKENS = 2500;
6871

69-
// Create a threadpool for calling the cloud. Single thread will be used by default.
70-
private ThreadPoolExecutor cloudCallThreadPool;
71-
private int cloudCallQueueSize;
72-
private CDAConfiguration cdaConfiguration;
73-
7472

7573
/**
7674
* Constructor.
@@ -88,32 +86,10 @@ protected void install() throws InterruptedException {
8886

8987
context.get(UseCases.class).init(context);
9088
context.get(CertificateManager.class).updateCertificatesConfiguration(new CertificatesConfig(getConfig()));
91-
initializeInfrastructure();
9289
initializeHandlers();
9390
subscribeToConfigChanges();
9491
}
9592

96-
private int getValidCloudCallQueueSize(Topics topics) {
97-
int newSize = Coerce.toInt(
98-
topics.findOrDefault(DEFAULT_CLOUD_CALL_QUEUE_SIZE,
99-
CONFIGURATION_CONFIG_KEY, PERFORMANCE_TOPIC, CLOUD_REQUEST_QUEUE_SIZE_TOPIC));
100-
if (newSize <= 0) {
101-
logger.atWarn().log("{} illegal size, will not change the queue size from {}",
102-
CLOUD_REQUEST_QUEUE_SIZE_TOPIC, cloudCallQueueSize);
103-
return cloudCallQueueSize; // existing size
104-
}
105-
return newSize;
106-
}
107-
108-
private void initializeInfrastructure() {
109-
cloudCallQueueSize = DEFAULT_CLOUD_CALL_QUEUE_SIZE;
110-
cloudCallQueueSize = getValidCloudCallQueueSize(config);
111-
cloudCallThreadPool = new ThreadPoolExecutor(1,
112-
DEFAULT_THREAD_POOL_SIZE, 60, TimeUnit.SECONDS,
113-
new ResizableLinkedBlockingQueue<>(cloudCallQueueSize));
114-
cloudCallThreadPool.allowCoreThreadTimeOut(true); // act as a cached threadpool
115-
}
116-
11793
private void initializeHandlers() {
11894
// Register auth session handlers
11995
context.get(SessionManager.class).setSessionConfig(new SessionConfig(getConfig()));
@@ -146,34 +122,20 @@ private void configChangeHandler(WhatHappened whatHappened, Node node) {
146122
return;
147123
}
148124
logger.atDebug().kv("why", whatHappened).kv("node", node).log();
125+
149126
// NOTE: This should not live here. The service doesn't have to have knowledge about where/how
150127
// keys are stored
151-
Topics deviceGroupTopics = this.config.lookupTopics(CONFIGURATION_CONFIG_KEY, DEVICE_GROUPS_TOPICS);
152-
153-
try {
154-
// NOTE: Extract this to a method these are infrastructure concerns.
155-
int threadPoolSize = Coerce.toInt(this.config.findOrDefault(DEFAULT_THREAD_POOL_SIZE,
156-
CONFIGURATION_CONFIG_KEY, PERFORMANCE_TOPIC, MAX_CONCURRENT_CLOUD_REQUESTS_TOPIC));
157-
if (threadPoolSize >= cloudCallThreadPool.getCorePoolSize()) {
158-
cloudCallThreadPool.setMaximumPoolSize(threadPoolSize);
159-
}
160-
} catch (IllegalArgumentException e) {
161-
logger.atWarn().log("Unable to update CDA threadpool size due to {}", e.getMessage());
162-
}
163-
164-
if (whatHappened != WhatHappened.initialized && node != null && node.childOf(CLOUD_REQUEST_QUEUE_SIZE_TOPIC)) {
165-
// NOTE: Extract this to a method these are infrastructure concerns.
166-
BlockingQueue<Runnable> q = cloudCallThreadPool.getQueue();
167-
if (q instanceof ResizableLinkedBlockingQueue) {
168-
cloudCallQueueSize = getValidCloudCallQueueSize(this.config);
169-
((ResizableLinkedBlockingQueue) q).resize(cloudCallQueueSize);
170-
}
171-
}
172-
173128
if (whatHappened == WhatHappened.initialized || node == null || node.childOf(DEVICE_GROUPS_TOPICS)) {
129+
Topics deviceGroupTopics = this.config.lookupTopics(CONFIGURATION_CONFIG_KEY, DEVICE_GROUPS_TOPICS);
174130
updateDeviceGroups(whatHappened, deviceGroupTopics);
175131
}
176132

133+
InfrastructureConfiguration newInfraConfig = InfrastructureConfiguration.from(getConfig());
134+
if (infrastructureConfig == null || !newInfraConfig.equals(infrastructureConfig)) {
135+
updateInfrastructure(newInfraConfig);
136+
infrastructureConfig = newInfraConfig;
137+
}
138+
177139
onConfigurationChanged();
178140
}
179141

@@ -189,10 +151,20 @@ protected void shutdown() throws InterruptedException {
189151
context.get(CertificateManager.class).stopMonitors();
190152
}
191153

192-
@Override
193-
public void postInject() {
194-
super.postInject();
154+
private void updateInfrastructure(InfrastructureConfiguration infraConfig) {
155+
context.get(CDAExecutor.class).accept(infraConfig);
156+
}
157+
158+
private void initializeInfrastructure() {
159+
BlockingQueue<Runnable> queue = new ResizableLinkedBlockingQueue<>(DEFAULT_WORK_QUEUE_DEPTH);
160+
ThreadPoolExecutor executor = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE,
161+
DEFAULT_THREAD_POOL_SIZE, 60, TimeUnit.SECONDS, queue);
162+
context.put(CDAExecutor.class, new CDAExecutor(executor));
163+
}
164+
165+
private void initializeIPC() {
195166
AuthorizationHandler authorizationHandler = context.get(AuthorizationHandler.class);
167+
196168
try {
197169
authorizationHandler.registerComponent(this.getName(),
198170
new HashSet<>(Arrays.asList(SUBSCRIBE_TO_CERTIFICATE_UPDATES,
@@ -212,17 +184,20 @@ public void postInject() {
212184
new SubscribeToCertificateUpdatesOperationHandler(context, certificateManager, authorizationHandler));
213185
greengrassCoreIPCService.setVerifyClientDeviceIdentityHandler(context ->
214186
new VerifyClientDeviceIdentityOperationHandler(context, serviceApi,
215-
authorizationHandler, cloudCallThreadPool));
187+
authorizationHandler, this.context.get(CDAExecutor.class)));
216188
greengrassCoreIPCService.setGetClientDeviceAuthTokenHandler(context ->
217189
new GetClientDeviceAuthTokenOperationHandler(context, serviceApi, authorizationHandler,
218-
cloudCallThreadPool));
190+
this.context.get(CDAExecutor.class)));
219191
greengrassCoreIPCService.setAuthorizeClientDeviceActionHandler(context ->
220192
new AuthorizeClientDeviceActionOperationHandler(context, serviceApi,
221193
authorizationHandler));
222194
}
223195

224-
public CertificateManager getCertificateManager() {
225-
return context.get(CertificateManager.class);
196+
@Override
197+
public void postInject() {
198+
super.postInject();
199+
initializeInfrastructure();
200+
initializeIPC();
226201
}
227202

228203
private void updateDeviceGroups(WhatHappened whatHappened, Topics deviceGroupsTopics) {
@@ -250,7 +225,7 @@ void updateCACertificateConfig(List<String> caCerts) {
250225
protected CompletableFuture<Void> close(boolean waitForDependers) {
251226
// shutdown the threadpool in close, not in shutdown() because it is created
252227
// and injected in the constructor and we won't be able to restart it after it stops.
253-
cloudCallThreadPool.shutdown();
228+
context.get(CDAExecutor.class).shutdown();
254229
return super.close(waitForDependers);
255230
}
256231
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.aws.greengrass.clientdevices.auth.configuration;
7+
8+
import com.aws.greengrass.config.Topics;
9+
import com.aws.greengrass.logging.api.Logger;
10+
import com.aws.greengrass.logging.impl.LogManager;
11+
import com.aws.greengrass.util.Coerce;
12+
import lombok.Value;
13+
14+
/**
15+
* Represents client device infrastructure configuration.
16+
* </p>
17+
* NOTE: currently we're shoving some unrelated things under the `performance` key.
18+
* Things like maxActiveAuthTokens and refresh periods should be grouped separately.
19+
* <p>
20+
* |---- configuration
21+
* | |---- performance:
22+
* | |---- cloudRequestQueueSize: "..."
23+
* | |---- maxConcurrentCloudRequests: [...]
24+
* </p>
25+
*/
26+
@Value
27+
public final class InfrastructureConfiguration {
28+
private static final Logger logger = LogManager.getLogger(InfrastructureConfiguration.class);
29+
30+
public static final int DEFAULT_WORK_QUEUE_DEPTH = 100;
31+
public static final int DEFAULT_THREAD_POOL_SIZE = 1;
32+
33+
public static final String PERFORMANCE_TOPIC = "performance";
34+
// TODO: Need to determine if this is useful. We may want different numbers for internal
35+
// vs external usage - e.g. IPC work queue throttling vs internal cert refreshes
36+
public static final String WORK_QUEUE_DEPTH = "cloudRequestQueueSize"; // Deprecate?
37+
public static final String THREAD_POOL_SIZE = "maxConcurrentCloudRequests"; // Deprecate?
38+
39+
int workQueueDepth;
40+
int threadPoolSize;
41+
42+
private InfrastructureConfiguration(int workQueueDepth, int threadPoolSize) {
43+
this.workQueueDepth = workQueueDepth;
44+
this.threadPoolSize = threadPoolSize;
45+
}
46+
47+
/**
48+
* Factory method for creating an immutable InfrastructureConfiguration from the service configuration.
49+
*
50+
* @param configurationTopics the configuration key of the service configuration
51+
*/
52+
public static InfrastructureConfiguration from(Topics configurationTopics) {
53+
Topics infraTopics = configurationTopics.lookupTopics(PERFORMANCE_TOPIC);
54+
55+
return new InfrastructureConfiguration(
56+
getWorkQueueDepthFromConfiguration(infraTopics),
57+
getThreadPoolSizeFromConfiguration(infraTopics)
58+
);
59+
}
60+
61+
private static int getWorkQueueDepthFromConfiguration(Topics infraTopics) {
62+
return Coerce.toInt(infraTopics.findOrDefault(DEFAULT_WORK_QUEUE_DEPTH, WORK_QUEUE_DEPTH));
63+
}
64+
65+
private static int getThreadPoolSizeFromConfiguration(Topics infraTopics) {
66+
return Coerce.toInt(infraTopics.findOrDefault(DEFAULT_THREAD_POOL_SIZE, THREAD_POOL_SIZE));
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.aws.greengrass.clientdevices.auth.infra;
7+
8+
import com.aws.greengrass.clientdevices.auth.configuration.InfrastructureConfiguration;
9+
import com.aws.greengrass.clientdevices.auth.util.ResizableLinkedBlockingQueue;
10+
11+
import java.util.concurrent.BlockingQueue;
12+
import java.util.concurrent.Callable;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.ThreadPoolExecutor;
16+
import java.util.function.Consumer;
17+
18+
public final class CDAExecutor implements Executor, Consumer<InfrastructureConfiguration> {
19+
private final ThreadPoolExecutor executor;
20+
private final BlockingQueue<Runnable> executorQueue;
21+
22+
/**
23+
* Creates a new CDAExecutor with the given underlying ThreadPoolExecutor.
24+
* The underlying work queue should be resizeable in order to respond
25+
* to component configuration updates.
26+
*
27+
* @param executor Thread pool executor to be used as the underlying work thread pool.
28+
*/
29+
public CDAExecutor(ThreadPoolExecutor executor) {
30+
this.executor = executor;
31+
this.executorQueue = executor.getQueue();
32+
}
33+
34+
public void shutdown() {
35+
executor.shutdown();
36+
}
37+
38+
@Override
39+
public void execute(Runnable command) {
40+
executor.execute(command);
41+
}
42+
43+
public <T> Future<T> execute(Callable<T> callable) {
44+
return executor.submit(callable);
45+
}
46+
47+
@Override
48+
public void accept(InfrastructureConfiguration infrastructureConfiguration) {
49+
// Thread pool size may not shrink below core pool size, so reduce this accordingly
50+
int maxPoolSize = infrastructureConfiguration.getThreadPoolSize();
51+
52+
if (maxPoolSize > executor.getCorePoolSize()) {
53+
maxPoolSize = executor.getCorePoolSize();
54+
}
55+
executor.setMaximumPoolSize(maxPoolSize);
56+
57+
// Only attempt to resize the underlying work queue if it is a ResizeableLinkedBlockingQueue
58+
if (executorQueue instanceof ResizableLinkedBlockingQueue) {
59+
int queueDepth = infrastructureConfiguration.getWorkQueueDepth();
60+
((ResizableLinkedBlockingQueue) executorQueue).resize(queueDepth);
61+
}
62+
}
63+
}

src/main/java/com/aws/greengrass/ipc/GetClientDeviceAuthTokenOperationHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.HashMap;
3030
import java.util.Map;
3131
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executor;
3333
import java.util.concurrent.RejectedExecutionException;
3434

3535
import static com.aws.greengrass.ipc.common.ExceptionUtil.translateExceptions;
@@ -46,7 +46,7 @@ public class GetClientDeviceAuthTokenOperationHandler
4646
private final AuthorizationHandler authorizationHandler;
4747
private final ClientDevicesAuthServiceApi clientDevicesAuthServiceApi;
4848
private final Map<String, String> credentialMap = new HashMap<>();
49-
private final ExecutorService cloudCallThreadPool;
49+
private final Executor cloudCallThreadPool;
5050

5151
/**
5252
* Constructor.
@@ -60,7 +60,7 @@ public GetClientDeviceAuthTokenOperationHandler(
6060
OperationContinuationHandlerContext context,
6161
ClientDevicesAuthServiceApi clientDevicesAuthServiceApi,
6262
AuthorizationHandler authorizationHandler,
63-
ExecutorService cloudCallThreadPool
63+
Executor cloudCallThreadPool
6464
) {
6565

6666
super(context);

src/main/java/com/aws/greengrass/ipc/VerifyClientDeviceIdentityOperationHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.aws.greengrass.authorization.exceptions.AuthorizationException;
1212
import com.aws.greengrass.clientdevices.auth.ClientDevicesAuthService;
1313
import com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi;
14+
import com.aws.greengrass.clientdevices.auth.infra.CDAExecutor;
1415
import com.aws.greengrass.logging.api.Logger;
1516
import com.aws.greengrass.logging.impl.LogManager;
1617
import com.aws.greengrass.util.Utils;
@@ -25,7 +26,6 @@
2526
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;
2627

2728
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.ExecutorService;
2929
import java.util.concurrent.RejectedExecutionException;
3030

3131
import static com.aws.greengrass.ipc.common.ExceptionUtil.translateExceptions;
@@ -41,7 +41,7 @@ public class VerifyClientDeviceIdentityOperationHandler
4141
private final ClientDevicesAuthServiceApi clientDevicesAuthServiceApi;
4242
private final String serviceName;
4343
private final AuthorizationHandler authorizationHandler;
44-
private final ExecutorService cloudCallThreadPool;
44+
private final CDAExecutor cloudCallThreadPool;
4545

4646
/**
4747
* Constructor.
@@ -53,7 +53,7 @@ public class VerifyClientDeviceIdentityOperationHandler
5353
*/
5454
public VerifyClientDeviceIdentityOperationHandler(
5555
OperationContinuationHandlerContext context, ClientDevicesAuthServiceApi clientDevicesAuthServiceApi,
56-
AuthorizationHandler authorizationHandler, ExecutorService cloudCallThreadPool) {
56+
AuthorizationHandler authorizationHandler, CDAExecutor cloudCallThreadPool) {
5757

5858
super(context);
5959
this.clientDevicesAuthServiceApi = clientDevicesAuthServiceApi;

src/test/java/com/aws/greengrass/clientdevices/auth/ClientDevicesAuthServiceTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,8 @@ private String getCaPassphrase() {
305305
}
306306

307307
private List<String> getCaCertificates()
308-
throws ServiceLoadException, CertificateEncodingException, KeyStoreException, IOException {
309-
ClientDevicesAuthService clientDevicesAuthService =
310-
(ClientDevicesAuthService) kernel.locate(ClientDevicesAuthService.CLIENT_DEVICES_AUTH_SERVICE_NAME);
311-
return clientDevicesAuthService.getCertificateManager().getCACertificates();
308+
throws CertificateEncodingException, KeyStoreException, IOException {
309+
return kernel.getContext().get(CertificateManager.class).getCACertificates();
312310
}
313311

314312

0 commit comments

Comments
 (0)