Skip to content

Commit b1dfa93

Browse files
Merge pull request #176 from rundeck-plugins/RUN-3100
RUN-3100 allow get node instances data for each region in parallel
2 parents 9f645f0 + 2da40bd commit b1dfa93

File tree

4 files changed

+82
-19
lines changed

4 files changed

+82
-19
lines changed

src/main/java/com/dtolabs/rundeck/plugin/resources/ec2/EC2ResourceModelSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class EC2ResourceModelSource implements ResourceModelSource {
8888
boolean useDefaultMapping = true;
8989
boolean runningOnly = false;
9090
boolean queryAsync = true;
91+
boolean queryNodeInstancesInParallel = false;
9192
Future<INodeSet> futureResult = null;
9293
final Properties mapping = new Properties();
9394
final String assumeRoleArn;
@@ -219,6 +220,8 @@ public EC2ResourceModelSource(final Properties configuration, final Services ser
219220

220221
queryAsync = !("true".equals(configuration.getProperty(SYNCHRONOUS_LOAD)) || refreshInterval <= 0);
221222

223+
this.queryNodeInstancesInParallel = Boolean.parseBoolean(configuration.getProperty(EC2ResourceModelSourceFactory.QUERY_NODE_INSTANCES_IN_PARALLEL, "false"));
224+
222225
final ArrayList<String> params = new ArrayList<String>();
223226
if (null != filterParams) {
224227
Collections.addAll(params, filterParams.split(";"));
@@ -313,12 +316,12 @@ public synchronized INodeSet getNodes() throws ResourceModelSourceException {
313316
*/
314317
if (lastRefresh > 0 && queryAsync && null == futureResult) {
315318
futureResult = executor.submit(() -> {
316-
return mapper.performQuery();
319+
return mapper.performQuery(queryNodeInstancesInParallel);
317320
});
318321
lastRefresh = System.currentTimeMillis();
319322
} else if (!queryAsync || lastRefresh < 1) {
320323
//always perform synchronous query the first time
321-
iNodeSet = mapper.performQuery();
324+
iNodeSet = mapper.performQuery(queryNodeInstancesInParallel);
322325
lastRefresh = System.currentTimeMillis();
323326
}
324327

src/main/java/com/dtolabs/rundeck/plugin/resources/ec2/EC2ResourceModelSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class EC2ResourceModelSourceFactory implements ResourceModelSourceFactory
7171
public static final String REFRESH_INTERVAL = "refreshInterval";
7272
public static final String SYNCHRONOUS_LOAD = "synchronousLoad";
7373
public static final String USE_DEFAULT_MAPPING = "useDefaultMapping";
74+
public static final String QUERY_NODE_INSTANCES_IN_PARALLEL = "queryNodeInstancesInParallel";
7475
public static final String HTTP_PROXY_HOST = "httpProxyHost";
7576
public static final String HTTP_PROXY_PORT = "httpProxyPort";
7677
public static final String HTTP_PROXY_USER = "httpProxyUser";
@@ -220,6 +221,9 @@ public ResourceModelSource createResourceModelSource(Properties configuration) t
220221
.property(PropertyUtil.integer(MAX_RESULTS, "Max API Results",
221222
"Max number of reservations returned per AWS API call.",
222223
false, "100"))
224+
.property(PropertyUtil.bool(QUERY_NODE_INSTANCES_IN_PARALLEL, "Query Node Instances in Parallel",
225+
"Query node instances in parallel. If false, instances will be queried one at a time.",
226+
false, "false"))
223227

224228
.build();
225229

src/main/java/com/dtolabs/rundeck/plugin/resources/ec2/InstanceToNodeMapper.java

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
package com.dtolabs.rundeck.plugin.resources.ec2;
2525

2626
import com.amazonaws.services.ec2.AmazonEC2;
27+
import com.amazonaws.services.ec2.AmazonEC2Client;
2728
import com.amazonaws.services.ec2.model.*;
2829
import com.dtolabs.rundeck.core.common.INodeEntry;
2930
import com.dtolabs.rundeck.core.common.NodeEntryImpl;
@@ -33,6 +34,7 @@
3334
import org.slf4j.LoggerFactory;
3435

3536
import java.util.*;
37+
import java.util.concurrent.*;
3638
import java.util.regex.Matcher;
3739
import java.util.regex.Pattern;
3840
import java.util.stream.Collectors;
@@ -69,28 +71,67 @@ class InstanceToNodeMapper {
6971
* Perform the query and return the set of instances
7072
*
7173
*/
72-
public NodeSetImpl performQuery() {
74+
public NodeSetImpl performQuery(boolean queryNodeInstancesInParallel) {
7375
final NodeSetImpl nodeSet = new NodeSetImpl();
7476

7577
Set<Instance> instances = new HashSet<>();
7678

7779
DescribeInstancesRequest request = new DescribeInstancesRequest().withFilters(buildFilters()).withMaxResults(maxResults);
7880

7981
if(getEndpoint() != null) {
80-
for (String endpoint : determineEndpoints()) {
81-
AmazonEC2 ec2 = ec2Supplier.getEC2ForEndpoint(endpoint);
82-
zones = ec2.describeAvailabilityZones();
83-
84-
final Set<Instance> newInstances = addExtraMappingAttribute(ec2, query(ec2, request));
85-
86-
if (newInstances != null && !newInstances.isEmpty()) {
87-
instances.addAll(newInstances);
82+
ExecutorService executor = null;
83+
Collection<Future<Set<Instance>>> futures = new LinkedList<Future<Set<Instance>>>();
84+
Set<Callable<Set<Instance>>> tasks = new HashSet<>();
85+
List<String> endpoints = determineEndpoints();
86+
for (String endpoint : endpoints) {
87+
if(queryNodeInstancesInParallel) {
88+
if(executor == null){
89+
logger.info("Creating thread pool for {} regions", endpoints.size() );
90+
executor = Executors.newFixedThreadPool(endpoints.size());
91+
}
92+
tasks.add(new Callable<Set<Instance>>() {
93+
@Override
94+
public Set<Instance> call() throws Exception {
95+
return getInstancesByRegion(endpoint);
96+
};
97+
});
98+
}else{
99+
instances.addAll(getInstancesByRegion(endpoint));
100+
}
101+
}
102+
if(queryNodeInstancesInParallel) {
103+
try {
104+
logger.info("Querying {} regions in parallel", endpoints.size() );
105+
futures = executor.invokeAll(tasks);
106+
} catch (InterruptedException e) {
107+
throw new RuntimeException(e);
108+
} finally {
109+
try {
110+
for (Future<Set<Instance>> future : futures) {
111+
if (future != null) {
112+
instances.addAll(future.get());
113+
}
114+
}
115+
logger.info("Finished querying {} regions in parallel", endpoints.size() );
116+
executor.shutdown();
117+
} catch (InterruptedException e) {
118+
throw new RuntimeException(e);
119+
} catch (ExecutionException e) {
120+
throw new RuntimeException(e);
121+
}
88122
}
89-
90123
try {
91-
Thread.sleep(100);
92-
} catch (InterruptedException ex) {
124+
// Wait for 90 seconds for all tasks to finish
125+
logger.info("Waiting for {} seconds for all tasks to finish", 90);
126+
executor.awaitTermination(90, TimeUnit.SECONDS);
127+
} catch (InterruptedException ignored) {
128+
// Restore interrupted status
129+
logger.warn("Thread interrupted while waiting for tasks to finish", ignored);
93130
Thread.currentThread().interrupt();
131+
} finally {
132+
// Force shutdown if not already done
133+
logger.warn("Forcing shutdown of thread pool");
134+
executor.shutdownNow();
94135
}
95136
}
96137
}
@@ -137,6 +178,21 @@ private List<String> determineEndpoints() {
137178
return endpoints;
138179
}
139180

181+
private Set<Instance> getInstancesByRegion(String endpoint) {
182+
Set<Instance> allInstances = new HashSet<>();
183+
AmazonEC2 ec2 = ec2Supplier.getEC2ForEndpoint(endpoint);
184+
zones = ec2.describeAvailabilityZones();
185+
final ArrayList<Filter> filters = buildFilters();
186+
187+
final Set<Instance> newInstances = addExtraMappingAttribute(ec2, query(ec2, new DescribeInstancesRequest().withFilters(filters).withMaxResults(maxResults)));
188+
189+
if (!newInstances.isEmpty() && newInstances != null) {
190+
allInstances.addAll(newInstances);
191+
}
192+
193+
return allInstances;
194+
}
195+
140196
private Set<Instance> query(final AmazonEC2 ec2, final DescribeInstancesRequest request) {
141197
//create "running" filter
142198
final Set<Instance> instances = new HashSet<>();

src/test/groovy/com/dtolabs/rundeck/plugin/resources/ec2/InstanceToNodeMapperSpec.groovy

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class InstanceToNodeMapperSpec extends Specification {
193193
mapper.setRegion("us-west-1")
194194

195195
when:
196-
def instances = mapper.performQuery()
196+
def instances = mapper.performQuery(false)
197197
then:
198198
instances!=null
199199
instances.getNode("aninstanceId").getAttributes().containsKey(expected)
@@ -234,7 +234,7 @@ class InstanceToNodeMapperSpec extends Specification {
234234
def mapper = new InstanceToNodeMapper(supplier, mapping, pageResults);
235235
mapper.setRegion("us-west-1")
236236
when:
237-
def instances = mapper.performQuery()
237+
def instances = mapper.performQuery(false)
238238
then:
239239
instances!=null
240240
0*ec2.describeImages(_)
@@ -277,7 +277,7 @@ class InstanceToNodeMapperSpec extends Specification {
277277
def mapper = new InstanceToNodeMapper(supplier, mapping, pageResults);
278278
mapper.setRegion(region)
279279
when:
280-
def instances = mapper.performQuery()
280+
def instances = mapper.performQuery(false)
281281
then:
282282
instances!=null
283283
instances.getNode("aninstanceId").getAttributes().containsKey("region")
@@ -325,7 +325,7 @@ class InstanceToNodeMapperSpec extends Specification {
325325
def mapper = new InstanceToNodeMapper(supplier, mapping, pageResults);
326326
mapper.setEndpoint(endpoints.join(', '))
327327
when:
328-
def instances = mapper.performQuery()
328+
def instances = mapper.performQuery(false)
329329
then:
330330
instances!=null
331331
instances.getNode("aninstanceId").getAttributes().containsKey("region")
@@ -384,7 +384,7 @@ class InstanceToNodeMapperSpec extends Specification {
384384
def mapper = new InstanceToNodeMapper(supplier, mapping, pageResults);
385385
mapper.setEndpoint(endpoint)
386386
when:
387-
def instances = mapper.performQuery()
387+
def instances = mapper.performQuery(false)
388388
then:
389389
instances!=null
390390
instances.getNode("aninstanceId").getAttributes().containsKey("region")

0 commit comments

Comments
 (0)