Skip to content

Commit a095be8

Browse files
authored
Merge pull request #861 from dakshina99/parallel-event-publishing
Introduce a ThreadPool to the EventJunction to Publish Events in Parallel
2 parents 85dfbb0 + 8145a7c commit a095be8

File tree

10 files changed

+329
-3
lines changed

10 files changed

+329
-3
lines changed

components/event-publisher/event-output-adapters/org.wso2.carbon.event.output.adapter.jms/src/main/java/org/wso2/carbon/event/output/adapter/jms/JMSEventAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class JMSEventAdapter implements OutputEventAdapter {
4545
private OutputEventAdapterConfiguration eventAdapterConfiguration;
4646
private Map<String, String> globalProperties;
4747
private PublisherDetails publisherDetails = null;
48-
private static ExecutorService executorService;
48+
private ExecutorService executorService;
4949
private int tenantId;
5050

5151
public JMSEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
3+
*
4+
* WSO2 LLC. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file except
6+
* in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
package org.wso2.carbon.event.stream.core.internal;
19+
20+
import org.wso2.carbon.context.PrivilegedCarbonContext;
21+
import org.wso2.carbon.databridge.commons.Event;
22+
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
23+
24+
/**
25+
* This class is used to consume events from the event stream.
26+
*/
27+
public class EventConsumerThread implements Runnable {
28+
private WSO2EventConsumer consumer;
29+
private Event event;
30+
private int tenantId;
31+
32+
public EventConsumerThread(WSO2EventConsumer consumer, Event event, int tenantId) {
33+
this.consumer = consumer;
34+
this.event = event;
35+
this.tenantId = tenantId;
36+
}
37+
38+
@Override
39+
public void run() {
40+
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId);
41+
consumer.onEvent(event);
42+
}
43+
}

components/event-stream/org.wso2.carbon.event.stream.core/src/main/java/org/wso2/carbon/event/stream/core/internal/EventJunction.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,26 @@
1616

1717
import org.apache.commons.logging.Log;
1818
import org.apache.commons.logging.LogFactory;
19+
import org.wso2.carbon.context.PrivilegedCarbonContext;
1920
import org.wso2.carbon.databridge.commons.Event;
2021
import org.wso2.carbon.databridge.commons.StreamDefinition;
2122
import org.wso2.carbon.event.stream.core.EventProducer;
2223
import org.wso2.carbon.event.stream.core.EventProducerCallback;
2324
import org.wso2.carbon.event.stream.core.SiddhiEventConsumer;
2425
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
2526
import org.wso2.carbon.event.stream.core.WSO2EventListConsumer;
27+
import org.wso2.carbon.event.stream.core.internal.config.EventPublisherConfigs;
28+
import org.wso2.carbon.event.stream.core.internal.ds.EventStreamServiceValueHolder;
2629
import org.wso2.carbon.event.stream.core.internal.util.EventConverter;
30+
import org.wso2.carbon.event.stream.core.internal.util.EventStreamConstants;
2731

2832
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.ArrayBlockingQueue;
2935
import java.util.concurrent.CopyOnWriteArrayList;
36+
import java.util.concurrent.ThreadFactory;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.atomic.AtomicInteger;
3039

3140
/**
3241
* Acts as the pass through point for a given stream. Does not distinguish between input and output streams.
@@ -59,6 +68,7 @@ public class EventJunction implements EventProducerCallback {
5968
private CopyOnWriteArrayList<SiddhiEventConsumer> siddhiEventConsumers;
6069
private CopyOnWriteArrayList<WSO2EventConsumer> wso2EventConsumers;
6170
private CopyOnWriteArrayList<WSO2EventListConsumer> wso2EventListConsumers;
71+
private ThreadPoolExecutor eventConsumerThreadPoolExecutor;
6272

6373
public EventJunction(StreamDefinition streamDefinition) {
6474
this.streamDefinition = streamDefinition;
@@ -67,6 +77,21 @@ public EventJunction(StreamDefinition streamDefinition) {
6777
this.wso2EventConsumers = new CopyOnWriteArrayList<WSO2EventConsumer>();
6878
this.wso2EventListConsumers = new CopyOnWriteArrayList<WSO2EventListConsumer>();
6979
populateEventTemplate(streamDefinition);
80+
81+
Map<String, Integer> publisherConfigs = getPublisherConfigs();
82+
this.eventConsumerThreadPoolExecutor = new ThreadPoolExecutor(
83+
publisherConfigs.get(EventStreamConstants.EVENT_PUBLISHER_MIN_THREAD_POOL_SIZE),
84+
publisherConfigs.get(EventStreamConstants.EVENT_PUBLISHER_MAX_THREAD_POOL_SIZE),
85+
publisherConfigs.get(EventStreamConstants.EVENT_PUBLISHER_KEEP_ALIVE_TIME)
86+
, java.util.concurrent.TimeUnit.MILLISECONDS,
87+
new ArrayBlockingQueue<>(publisherConfigs.get(EventStreamConstants.EVENT_PUBLISHER_JOB_QUEUE_SIZE)),
88+
new ThreadFactory() {
89+
private final AtomicInteger count = new AtomicInteger(1);
90+
@Override
91+
public Thread newThread(Runnable eventConsumer) {
92+
return new Thread(eventConsumer, "EventConsumerThread-" + count.getAndIncrement());
93+
}
94+
});
7095
}
7196

7297
public void addConsumer(SiddhiEventConsumer consumer) {
@@ -154,7 +179,8 @@ public void sendEvent(Event event) {
154179
if (!wso2EventConsumers.isEmpty()) {
155180
for (WSO2EventConsumer consumer : wso2EventConsumers) {
156181
try {
157-
consumer.onEvent(event);
182+
eventConsumerThreadPoolExecutor.submit(new EventConsumerThread(consumer, event, PrivilegedCarbonContext.getThreadLocalCarbonContext()
183+
.getTenantId()));
158184
} catch (Exception e) {
159185
log.error("Error while dispatching events: " + e.getMessage(), e);
160186
}
@@ -229,4 +255,12 @@ private void populateEventTemplate(StreamDefinition definition) {
229255

230256
this.attributesCount = attributesCount;
231257
}
258+
259+
private static Map<String, Integer> getPublisherConfigs() {
260+
EventPublisherConfigs publisherConfigs = EventStreamServiceValueHolder.getEventPublisherConfigs();
261+
if (publisherConfigs != null) {
262+
return publisherConfigs.getEventPublisherThreadPoolConfigs();
263+
}
264+
return null;
265+
}
232266
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy
6+
* of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed
11+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package org.wso2.carbon.event.stream.core.internal.config;
17+
18+
import org.wso2.carbon.event.stream.core.internal.util.EventStreamConstants;
19+
20+
import javax.xml.bind.annotation.XmlElement;
21+
import javax.xml.bind.annotation.XmlRootElement;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
/**
26+
* AdapterConfigs class is used to represent the configuration of the output event adapters.
27+
*/
28+
@XmlRootElement(name="outputEventAdaptersConfig")
29+
public class EventPublisherConfigs {
30+
31+
private int minThread;
32+
private int maxThread;
33+
private int keepAliveTime;
34+
private int jobQueueSize;
35+
36+
@XmlElement(name="minThread")
37+
public void setMinThread(int minThread) {
38+
this.minThread = minThread;
39+
}
40+
41+
@XmlElement(name="maxThread")
42+
public void setMaxThread(int maxThread) {
43+
this.maxThread = maxThread;
44+
}
45+
46+
@XmlElement(name="keepAliveTime")
47+
public void setKeepAliveTime(int keepAliveTime) {
48+
this.keepAliveTime = keepAliveTime;
49+
}
50+
51+
@XmlElement(name="jobQueueSize")
52+
public void setJobQueueSize(int jobQueueSize) {
53+
this.jobQueueSize = jobQueueSize;
54+
}
55+
56+
public Map<String, Integer> getEventPublisherThreadPoolConfigs() {
57+
Map<String, Integer> eventPublisherConfigs = new HashMap<>();
58+
eventPublisherConfigs.put(EventStreamConstants.EVENT_PUBLISHER_MIN_THREAD_POOL_SIZE, minThread);
59+
eventPublisherConfigs.put(EventStreamConstants.EVENT_PUBLISHER_MAX_THREAD_POOL_SIZE, maxThread);
60+
eventPublisherConfigs.put(EventStreamConstants.EVENT_PUBLISHER_KEEP_ALIVE_TIME, keepAliveTime);
61+
eventPublisherConfigs.put(EventStreamConstants.EVENT_PUBLISHER_JOB_QUEUE_SIZE, jobQueueSize);
62+
return eventPublisherConfigs;
63+
}
64+
}

components/event-stream/org.wso2.carbon.event.stream.core/src/main/java/org/wso2/carbon/event/stream/core/internal/ds/EventStreamServiceDS.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.wso2.carbon.event.stream.core.EventStreamService;
2727
import org.wso2.carbon.event.stream.core.internal.CarbonEventStreamService;
2828
import org.wso2.carbon.event.stream.core.internal.EventStreamRuntime;
29+
import org.wso2.carbon.event.stream.core.internal.util.EventPublisherConfigHelper;
30+
import org.wso2.carbon.securevault.SecretCallbackHandlerService;
2931
import org.wso2.carbon.utils.ConfigurationContextService;
3032

3133
@Component(
@@ -39,6 +41,7 @@ public class EventStreamServiceDS {
3941
protected void activate(ComponentContext context) {
4042

4143
try {
44+
EventStreamServiceValueHolder.setEventPublisherConfigs(EventPublisherConfigHelper.loadGlobalConfigs());
4245
EventStreamServiceValueHolder.registerEventStreamRuntime(new EventStreamRuntime());
4346
CarbonEventStreamService carbonEventStreamService = new CarbonEventStreamService();
4447
EventStreamServiceValueHolder.setCarbonEventStreamService(carbonEventStreamService);
@@ -86,4 +89,20 @@ protected void unsetEventStreamListener(EventStreamListener eventStreamListener)
8689

8790
EventStreamServiceValueHolder.unregisterEventStreamListener(eventStreamListener);
8891
}
92+
93+
@Reference(
94+
name = "secret.callback.handler.service",
95+
service = org.wso2.carbon.securevault.SecretCallbackHandlerService.class,
96+
cardinality = ReferenceCardinality.MANDATORY,
97+
policy = ReferencePolicy.DYNAMIC,
98+
unbind = "unsetSecretCallbackHandlerService")
99+
protected void setSecretCallbackHandlerService(SecretCallbackHandlerService secretCallbackHandlerService) {
100+
101+
EventStreamServiceValueHolder.setSecretCallbackHandlerService(secretCallbackHandlerService);
102+
}
103+
104+
protected void unsetSecretCallbackHandlerService(SecretCallbackHandlerService secretCallbackHandlerService) {
105+
106+
EventStreamServiceValueHolder.setSecretCallbackHandlerService(null);
107+
}
89108
}

components/event-stream/org.wso2.carbon.event.stream.core/src/main/java/org/wso2/carbon/event/stream/core/internal/ds/EventStreamServiceValueHolder.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.wso2.carbon.event.stream.core.EventStreamListener;
1818
import org.wso2.carbon.event.stream.core.internal.CarbonEventStreamService;
1919
import org.wso2.carbon.event.stream.core.internal.EventStreamRuntime;
20+
import org.wso2.carbon.event.stream.core.internal.config.EventPublisherConfigs;
21+
import org.wso2.carbon.securevault.SecretCallbackHandlerService;
2022
import org.wso2.carbon.utils.ConfigurationContextService;
2123

2224
import java.util.List;
@@ -28,6 +30,8 @@ public class EventStreamServiceValueHolder {
2830
private static ConfigurationContextService configurationContextService;
2931
private static List<EventStreamListener> eventStreamListenerList = new CopyOnWriteArrayList<EventStreamListener>();
3032
private static EventStreamRuntime eventStreamRuntime;
33+
private static EventPublisherConfigs eventPublisherConfigs;
34+
private static SecretCallbackHandlerService secretCallbackHandlerService;
3135

3236
private EventStreamServiceValueHolder() {
3337

@@ -69,4 +73,20 @@ public static EventStreamRuntime getEventStreamRuntime() {
6973
public static void registerEventStreamRuntime(EventStreamRuntime eventStreamRuntime) {
7074
EventStreamServiceValueHolder.eventStreamRuntime = eventStreamRuntime;
7175
}
76+
77+
public static void setEventPublisherConfigs(EventPublisherConfigs globalAdapterConfigs) {
78+
EventStreamServiceValueHolder.eventPublisherConfigs = globalAdapterConfigs;
79+
}
80+
81+
public static EventPublisherConfigs getEventPublisherConfigs() {
82+
return eventPublisherConfigs;
83+
}
84+
85+
public static void setSecretCallbackHandlerService(SecretCallbackHandlerService secretCallbackHandlerService) {
86+
EventStreamServiceValueHolder.secretCallbackHandlerService = secretCallbackHandlerService;
87+
}
88+
89+
public static SecretCallbackHandlerService getSecretCallbackHandlerService() {
90+
return secretCallbackHandlerService;
91+
}
7292
}

0 commit comments

Comments
 (0)