Skip to content

Commit 4adc44b

Browse files
authored
chore: adds concurrent sink (#201)
Signed-off-by: Ankit Raj <[email protected]>
1 parent 958d5d5 commit 4adc44b

File tree

2 files changed

+373
-0
lines changed

2 files changed

+373
-0
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package io.numaproj.numaflow.examples.sink.forkjoin;
2+
3+
import io.numaproj.numaflow.sinker.Datum;
4+
import io.numaproj.numaflow.sinker.DatumIterator;
5+
import io.numaproj.numaflow.sinker.Response;
6+
import io.numaproj.numaflow.sinker.ResponseList;
7+
import io.numaproj.numaflow.sinker.Server;
8+
import io.numaproj.numaflow.sinker.Sinker;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.*;
14+
15+
/**
16+
* ConcurrentSink demonstrates concurrent processing of incoming messages using ThreadPoolExecutor.
17+
* This example shows how to process messages in parallel using a thread pool for
18+
* CPU-intensive operations where parallel processing can improve performance.
19+
*
20+
* Key features:
21+
* - Uses ThreadPoolExecutor for parallel execution
22+
* - Processes each message independently in parallel
23+
* - Demonstrates concurrent data transformation
24+
* - Handles exceptions gracefully in parallel processing
25+
* - Shows how to aggregate results from multiple threads
26+
*/
27+
@Slf4j
28+
public class ConcurrentSink extends Sinker {
29+
30+
private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
31+
32+
private final ThreadPoolExecutor threadPool;
33+
34+
public ConcurrentSink() {
35+
this(DEFAULT_THREAD_POOL_SIZE);
36+
}
37+
38+
public ConcurrentSink(int threadPoolSize) {
39+
this.threadPool = new ThreadPoolExecutor(
40+
threadPoolSize,
41+
threadPoolSize,
42+
60L,
43+
TimeUnit.SECONDS,
44+
new LinkedBlockingQueue<>(),
45+
new ThreadFactory() {
46+
private int counter = 0;
47+
@Override
48+
public Thread newThread(Runnable r) {
49+
return new Thread(r, "ConcurrentSink-Worker-" + (++counter));
50+
}
51+
}
52+
);
53+
}
54+
55+
public static void main(String[] args) throws Exception {
56+
ConcurrentSink concurrentSink = new ConcurrentSink();
57+
58+
Server server = new Server(concurrentSink);
59+
server.start();
60+
server.awaitTermination();
61+
server.stop();
62+
63+
concurrentSink.shutdown();
64+
}
65+
66+
@Override
67+
public ResponseList processMessages(DatumIterator datumIterator) {
68+
log.info("Starting concurrent processing with thread pool size: {}",
69+
threadPool.getCorePoolSize());
70+
71+
List<Datum> messages = new ArrayList<>();
72+
while (true) {
73+
Datum datum;
74+
try {
75+
datum = datumIterator.next();
76+
} catch (InterruptedException e) {
77+
Thread.currentThread().interrupt();
78+
continue;
79+
}
80+
if (datum == null) {
81+
break;
82+
}
83+
messages.add(datum);
84+
}
85+
86+
log.info("Collected {} messages for concurrent processing", messages.size());
87+
88+
if (messages.isEmpty()) {
89+
return ResponseList.newBuilder().build();
90+
}
91+
92+
List<Response> allResponses = processInParallel(messages);
93+
94+
log.info("Completed concurrent processing, generated {} responses", allResponses.size());
95+
96+
ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
97+
for (Response response : allResponses) {
98+
responseListBuilder.addResponse(response);
99+
}
100+
101+
return responseListBuilder.build();
102+
}
103+
104+
/**
105+
* Processes messages in parallel using ThreadPoolExecutor.
106+
* Each message is processed independently in a separate thread.
107+
*/
108+
private List<Response> processInParallel(List<Datum> messages) {
109+
List<Future<Response>> futures = new ArrayList<>();
110+
111+
for (Datum message : messages) {
112+
Future<Response> future = threadPool.submit(new MessageProcessingTask(message));
113+
futures.add(future);
114+
}
115+
116+
List<Response> allResponses = new ArrayList<>();
117+
for (Future<Response> future : futures) {
118+
try {
119+
Response response = future.get(30, TimeUnit.SECONDS);
120+
allResponses.add(response);
121+
} catch (InterruptedException e) {
122+
Thread.currentThread().interrupt();
123+
log.error("Interrupted while waiting for message processing", e);
124+
} catch (ExecutionException e) {
125+
log.error("Error during message processing", e.getCause());
126+
} catch (TimeoutException e) {
127+
log.error("Timeout waiting for message processing", e);
128+
future.cancel(true);
129+
}
130+
}
131+
132+
return allResponses;
133+
}
134+
135+
/**
136+
* Task that processes a single message in a thread.
137+
* This is where the actual CPU-intensive work would be done.
138+
*/
139+
private static class MessageProcessingTask implements Callable<Response> {
140+
private final Datum datum;
141+
142+
public MessageProcessingTask(Datum datum) {
143+
this.datum = datum;
144+
}
145+
146+
@Override
147+
public Response call() {
148+
try {
149+
String message = new String(datum.getValue());
150+
String processedMessage = processMessage(message);
151+
152+
log.debug("Processed message {} -> {}", message, processedMessage);
153+
return Response.responseOK(datum.getId());
154+
155+
} catch (Exception e) {
156+
log.error("Error processing message with ID: {}", datum.getId(), e);
157+
return Response.responseFailure(datum.getId(), e.getMessage());
158+
}
159+
}
160+
161+
/**
162+
* Simulates CPU-intensive message processing.
163+
* In a real-world scenario, this could be data transformation, validation,
164+
* encryption, compression, or any other compute-intensive operation.
165+
*/
166+
private String processMessage(String message) {
167+
StringBuilder processed = new StringBuilder();
168+
processed.append("PROCESSED[")
169+
.append(new StringBuilder(message).reverse())
170+
.append("]-")
171+
.append(Thread.currentThread().getName())
172+
.append("-")
173+
.append(System.currentTimeMillis() % 1000);
174+
175+
for (int i = 0; i < 100; i++) {
176+
Math.sqrt(i * message.hashCode());
177+
}
178+
179+
return processed.toString();
180+
}
181+
}
182+
183+
/**
184+
* Shutdown the thread pool gracefully.
185+
* This should be called when the sink is no longer needed.
186+
*/
187+
public void shutdown() {
188+
log.info("Shutting down concurrent sink thread pool");
189+
threadPool.shutdown();
190+
try {
191+
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
192+
threadPool.shutdownNow();
193+
}
194+
} catch (InterruptedException e) {
195+
threadPool.shutdownNow();
196+
Thread.currentThread().interrupt();
197+
}
198+
}
199+
200+
/**
201+
* Get current thread pool statistics for monitoring.
202+
*/
203+
public String getThreadPoolStats() {
204+
return String.format("ThreadPool[active=%d, completed=%d, queued=%d, pool=%d/%d]",
205+
threadPool.getActiveCount(),
206+
threadPool.getCompletedTaskCount(),
207+
threadPool.getQueue().size(),
208+
threadPool.getPoolSize(),
209+
threadPool.getMaximumPoolSize());
210+
}
211+
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package io.numaproj.numaflow.examples.sink.forkjoin;
2+
3+
import io.numaproj.numaflow.sinker.Response;
4+
import io.numaproj.numaflow.sinker.ResponseList;
5+
import io.numaproj.numaflow.sinker.SinkerTestKit;
6+
import io.numaproj.numaflow.sinker.Datum;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.AfterEach;
11+
import org.junit.jupiter.api.DisplayName;
12+
import org.junit.runner.RunWith;
13+
import org.mockito.junit.MockitoJUnitRunner;
14+
15+
import static org.mockito.Mockito.mock;
16+
17+
18+
import static org.junit.jupiter.api.Assertions.*;
19+
import static org.mockito.Mockito.when;
20+
21+
/**
22+
* Comprehensive test suite for ConcurrentSink to verify concurrent processing functionality.
23+
*/
24+
@RunWith(MockitoJUnitRunner.class)
25+
public class ConcurrentSinkTest {
26+
27+
private ConcurrentSink concurrentSink;
28+
private SinkerTestKit.TestListIterator testIterator;
29+
30+
@BeforeEach
31+
void setUp() {
32+
concurrentSink = new ConcurrentSink();
33+
testIterator = new SinkerTestKit.TestListIterator();
34+
}
35+
36+
@AfterEach
37+
void tearDown() {
38+
if (concurrentSink != null) {
39+
concurrentSink.shutdown();
40+
}
41+
}
42+
43+
@Test
44+
@DisplayName("Should process empty message list")
45+
void testEmptyMessageList() {
46+
ResponseList responseList = concurrentSink.processMessages(testIterator);
47+
48+
assertNotNull(responseList);
49+
assertEquals(0, responseList.getResponses().size());
50+
}
51+
52+
@Test
53+
@DisplayName("Should process single message")
54+
void testSingleMessage() {
55+
testIterator.addDatum(createTestDatum("id-1", "test-message"));
56+
57+
ResponseList responseList = concurrentSink.processMessages(testIterator);
58+
59+
assertNotNull(responseList);
60+
assertEquals(1, responseList.getResponses().size());
61+
62+
Response response = responseList.getResponses().get(0);
63+
assertTrue(response.getSuccess());
64+
assertEquals("id-1", response.getId());
65+
}
66+
67+
@Test
68+
@DisplayName("Should process multiple messages")
69+
void testMultipleMessages() {
70+
int messageCount = 15;
71+
72+
for (int i = 0; i < messageCount; i++) {
73+
testIterator.addDatum(createTestDatum("id-" + i, "message-" + i));
74+
}
75+
76+
ResponseList responseList = concurrentSink.processMessages(testIterator);
77+
78+
assertNotNull(responseList);
79+
assertEquals(messageCount, responseList.getResponses().size());
80+
81+
for (Response response : responseList.getResponses()) {
82+
assertTrue(response.getSuccess());
83+
}
84+
}
85+
86+
@Test
87+
@DisplayName("Should handle concurrent processing with custom configuration")
88+
void testCustomConfiguration() {
89+
int threadPoolSize = 2;
90+
ConcurrentSink customSink = new ConcurrentSink(threadPoolSize);
91+
92+
try {
93+
int messageCount = 15;
94+
SinkerTestKit.TestListIterator iterator = new SinkerTestKit.TestListIterator();
95+
96+
for (int i = 0; i < messageCount; i++) {
97+
iterator.addDatum(createTestDatum("id-" + i, "message-" + i));
98+
}
99+
100+
ResponseList responseList = customSink.processMessages(iterator);
101+
102+
assertNotNull(responseList);
103+
assertEquals(messageCount, responseList.getResponses().size());
104+
105+
for (Response response : responseList.getResponses()) {
106+
assertTrue(response.getSuccess());
107+
}
108+
109+
// Verify thread pool stats
110+
String stats = customSink.getThreadPoolStats();
111+
assertNotNull(stats);
112+
assertTrue(stats.contains("ThreadPool"));
113+
114+
} finally {
115+
customSink.shutdown();
116+
}
117+
}
118+
119+
@Test
120+
@DisplayName("Should handle null values gracefully")
121+
void testNullValues() {
122+
testIterator.addDatum(createTestDatum("id-1", null));
123+
testIterator.addDatum(createTestDatum("id-2", "valid-message"));
124+
125+
ResponseList responseList = concurrentSink.processMessages(testIterator);
126+
127+
assertNotNull(responseList);
128+
assertEquals(2, responseList.getResponses().size());
129+
130+
long successCount = responseList.getResponses().stream()
131+
.mapToLong(response -> response.getSuccess() ? 1 : 0)
132+
.sum();
133+
assertEquals(2, successCount);
134+
}
135+
136+
@Test
137+
@DisplayName("Should handle errors gracefully")
138+
void testErrors() {
139+
Datum mockDatum = mock(Datum.class);
140+
testIterator.addDatum(mockDatum);
141+
testIterator.addDatum(mockDatum);
142+
143+
when(mockDatum.getValue()).thenThrow(new RuntimeException("some exception happened"));
144+
145+
ResponseList responseList = concurrentSink.processMessages(testIterator);
146+
147+
assertNotNull(responseList);
148+
assertEquals(2, responseList.getResponses().size());
149+
150+
long errorCount = responseList.getResponses().stream()
151+
.mapToLong(response -> response.getSuccess() ? 0 : 1)
152+
.sum();
153+
assertEquals(2, errorCount);
154+
}
155+
156+
private SinkerTestKit.TestDatum createTestDatum(String id, String value) {
157+
return SinkerTestKit.TestDatum.builder()
158+
.id(id)
159+
.value(value != null ? value.getBytes() : new byte[0])
160+
.build();
161+
}
162+
}

0 commit comments

Comments
 (0)