Skip to content

Commit ce7a84c

Browse files
committed
add stream sorter example for accumulator
Signed-off-by: Yashash H L <[email protected]>
1 parent 9804439 commit ce7a84c

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

examples/pom.xml

+22
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,28 @@
359359
</to>
360360
</configuration>
361361
</execution>
362+
<execution>
363+
<id>accumulator-stream-sorter</id>
364+
<phase>package</phase>
365+
<goals>
366+
<goal>dockerBuild</goal>
367+
</goals>
368+
<configuration>
369+
<from>
370+
<image>amazoncorretto:11</image>
371+
</from>
372+
<container>
373+
<mainClass>
374+
io.numaproj.numaflow.examples.accumulator.sorter.StreamSorterFactory
375+
</mainClass>
376+
</container>
377+
<to>
378+
<image>
379+
numaflow-java-examples/accumulator-stream-sorter:${docker.tag}
380+
</image>
381+
</to>
382+
</configuration>
383+
</execution>
362384
</executions>
363385
</plugin>
364386
</plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.numaproj.numaflow.examples.accumulator.sorter;
2+
3+
import io.numaproj.numaflow.accumulator.Server;
4+
import io.numaproj.numaflow.accumulator.model.Accumulator;
5+
import io.numaproj.numaflow.accumulator.model.AccumulatorFactory;
6+
import io.numaproj.numaflow.accumulator.model.Datum;
7+
import io.numaproj.numaflow.accumulator.model.Message;
8+
import io.numaproj.numaflow.accumulator.model.OutputStreamObserver;
9+
import lombok.AllArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
12+
import java.time.Instant;
13+
import java.util.Comparator;
14+
import java.util.TreeSet;
15+
16+
@Slf4j
17+
@AllArgsConstructor
18+
public class StreamSorterFactory extends AccumulatorFactory<StreamSorterFactory.StreamSorter> {
19+
20+
public static void main(String[] args) throws Exception {
21+
log.info("Starting stream sorter server..");
22+
Server server = new Server(new StreamSorterFactory());
23+
24+
// Start the server
25+
server.start();
26+
27+
// wait for the server to shut down
28+
server.awaitTermination();
29+
log.info("Stream sorter server exited..");
30+
}
31+
32+
@Override
33+
public StreamSorter createAccumulator() {
34+
return new StreamSorter();
35+
}
36+
37+
public static class StreamSorter extends Accumulator {
38+
private Instant latestWm = Instant.ofEpochMilli(-1);
39+
private final TreeSet<Datum> sortedBuffer = new TreeSet<>(Comparator.comparing(Datum::getEventTime));
40+
41+
@Override
42+
public void processMessage(Datum datum, OutputStreamObserver outputStream) {
43+
log.info("Received datum with event time: {}", datum.toString());
44+
if (datum.getWatermark().isAfter(latestWm)) {
45+
latestWm = datum.getWatermark();
46+
flushBuffer(outputStream);
47+
}
48+
sortedBuffer.add(datum);
49+
}
50+
51+
@Override
52+
public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {
53+
flushBuffer(outputStreamObserver);
54+
}
55+
56+
private void flushBuffer(OutputStreamObserver outputStream) {
57+
log.info("Watermark updated, flushing sortedBuffer: {}", latestWm.toEpochMilli());
58+
while (!sortedBuffer.isEmpty() && !sortedBuffer
59+
.first()
60+
.getEventTime()
61+
.isAfter(latestWm)) {
62+
Datum datum = sortedBuffer.pollFirst();
63+
assert datum != null;
64+
outputStream.send(new Message(datum));
65+
log.info("Sent datum with event time: {}", datum.getEventTime().toEpochMilli());
66+
}
67+
}
68+
}
69+
}

src/main/java/io/numaproj/numaflow/accumulator/Server.java

-2
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,7 @@ public void start() throws Exception {
102102
* waiting
103103
*/
104104
public void awaitTermination() throws InterruptedException {
105-
log.info("accumulagtor server is waiting for termination");
106105
this.server.awaitTermination();
107-
log.info("accumulagtor server terminated");
108106
}
109107

110108
/**

0 commit comments

Comments
 (0)