Skip to content

Commit e616ba6

Browse files
vaibhavtiwari33vtiwari5
andauthored
feat: Support for on success sink (#207)
Signed-off-by: vtiwari5 <[email protected]> Co-authored-by: vtiwari5 <[email protected]>
1 parent 38734c0 commit e616ba6

File tree

18 files changed

+599
-20
lines changed

18 files changed

+599
-20
lines changed

examples/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,28 @@
381381
</to>
382382
</configuration>
383383
</execution>
384+
<execution>
385+
<id>on-success-sink</id>
386+
<phase>package</phase>
387+
<goals>
388+
<goal>dockerBuild</goal>
389+
</goals>
390+
<configuration>
391+
<from>
392+
<image>amazoncorretto:11</image>
393+
</from>
394+
<container>
395+
<mainClass>
396+
io.numaproj.numaflow.examples.sink.onsuccess.OnSuccess
397+
</mainClass>
398+
</container>
399+
<to>
400+
<image>
401+
numaflow-java-examples/on-success-sink:${docker.tag}
402+
</image>
403+
</to>
404+
</configuration>
405+
</execution>
384406
</executions>
385407
</plugin>
386408
</plugins>
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package io.numaproj.numaflow.examples.sink.onsuccess;
2+
3+
import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
4+
import io.numaproj.numaflow.sinker.Datum;
5+
import io.numaproj.numaflow.sinker.DatumIterator;
6+
import io.numaproj.numaflow.sinker.Message;
7+
import io.numaproj.numaflow.sinker.Response;
8+
import io.numaproj.numaflow.sinker.ResponseList;
9+
import io.numaproj.numaflow.sinker.Server;
10+
import io.numaproj.numaflow.sinker.Sinker;
11+
import lombok.extern.slf4j.Slf4j;
12+
13+
import java.util.Random;
14+
15+
@Slf4j
16+
public class OnSuccess extends Sinker {
17+
public static void main(String[] args) throws Exception {
18+
Server server = new Server(new OnSuccess());
19+
20+
// Start the server
21+
server.start();
22+
23+
// wait for the server to shut down
24+
server.awaitTermination();
25+
}
26+
27+
@Override
28+
public ResponseList processMessages(DatumIterator datumIterator) {
29+
ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
30+
while (true) {
31+
Datum datum = null;
32+
try {
33+
datum = datumIterator.next();
34+
} catch (InterruptedException e) {
35+
Thread.currentThread().interrupt();
36+
continue;
37+
}
38+
// null means the iterator is closed, so we break the loop
39+
if (datum == null) {
40+
break;
41+
}
42+
try {
43+
String msg = new String(datum.getValue());
44+
log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders());
45+
if (writeToPrimarySink()) {
46+
log.info("Writing to onSuccess sink: {}", datum.getId());
47+
responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
48+
Message.builder()
49+
.value(String.format("Successfully wrote message with ID: %s",
50+
datum.getId()).getBytes())
51+
.build()));
52+
} else {
53+
log.info("Writing to fallback sink: {}", datum.getId());
54+
responseListBuilder.addResponse(Response.responseFallback(datum.getId()));
55+
}
56+
} catch (Exception e) {
57+
log.warn("Error while writing to any sink: ", e);
58+
responseListBuilder.addResponse(Response.responseFailure(
59+
datum.getId(),
60+
e.getMessage()));
61+
}
62+
}
63+
return responseListBuilder.build();
64+
}
65+
66+
/**
67+
* Example method to simulate write failures/success to primary sink.
68+
* Based on whether this returns true/false, we write to fallback sink / onSuccess sink
69+
* @return true if simulated write to primary sink is successful, false otherwise
70+
*/
71+
public boolean writeToPrimarySink() {
72+
Random random = new Random();
73+
return random.nextBoolean();
74+
}
75+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.numaproj.numaflow.examples.sink.onsuccess;
2+
3+
import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
4+
import io.numaproj.numaflow.sinker.Response;
5+
import io.numaproj.numaflow.sinker.ResponseList;
6+
import io.numaproj.numaflow.sinker.SinkerTestKit;
7+
import org.junit.jupiter.api.Assertions;
8+
import org.junit.jupiter.api.Test;
9+
10+
public class OnSuccessTest {
11+
@Test
12+
public void testOnSuccessSink() {
13+
int datumCount = 10;
14+
OnSuccess onSuccessSink = new OnSuccess();
15+
// Create a test datum iterator with 10 messages
16+
SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator();
17+
for (int i = 0; i < datumCount; i++) {
18+
testListIterator.addDatum(
19+
SinkerTestKit.TestDatum
20+
.builder()
21+
.id("id-" + i)
22+
.value(("value-" + i).getBytes())
23+
.build());
24+
}
25+
ResponseList responseList = onSuccessSink.processMessages(testListIterator);
26+
Assertions.assertEquals(datumCount, responseList.getResponses().size());
27+
for (Response response : responseList.getResponses()) {
28+
Assertions.assertEquals(false, response.getSuccess());
29+
}
30+
// we can add the logic to verify if the messages were
31+
// successfully written to the sink(could be a file, database, etc.)
32+
}
33+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@
310310
<exclude>io/numaproj/numaflow/source/v1/*</exclude>
311311
<exclude>io/numaproj/numaflow/serving/v1/*</exclude>
312312
<exclude>io/numaproj/numaflow/accumulator/v1/*</exclude>
313+
<exclude>common/*</exclude>
313314
<exclude>**/*TestKit*</exclude>
314315
</excludes>
315316
</configuration>

src/main/java/io/numaproj/numaflow/sinker/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
class Constants {
44
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock";
55
public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock";
6+
public static final String DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH = "/var/run/numaflow/ons-sink.sock";
67
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info";
78
public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH =
89
"/var/run/numaflow/fb-sinker-server-info";
10+
public static final String DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH =
11+
"/var/run/numaflow/ons-sinker-server-info";
912
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
1013
public static final int DEFAULT_PORT = 50051;
1114
public static final String DEFAULT_HOST = "localhost";
1215
public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";
1316
public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink";
17+
public static final String UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink";
1418

1519
// Private constructor to prevent instantiation
1620
private Constants() {

src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ static GRPCConfig defaultGrpcConfig() {
3636
if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) {
3737
socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH;
3838
infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH;
39+
} else if (Constants.UD_CONTAINER_ON_SUCCESS_SINK.equals(containerType)) {
40+
socketPath = Constants.DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH;
41+
infoFilePath = Constants.DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH;
3942
}
4043
return GRPCConfig.newBuilder()
4144
.infoFilePath(infoFilePath)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.numaproj.numaflow.sinker;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
import java.util.HashMap;
7+
8+
/**
9+
* KeyValueGroup is a map of key-value pairs for a given group.
10+
* Used as part of {@link io.numaproj.numaflow.sinker.Message}.
11+
*/
12+
@Getter
13+
@Builder
14+
public class KeyValueGroup {
15+
private final HashMap<String, byte[]> keyValue;
16+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.numaproj.numaflow.sinker;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
import java.util.HashMap;
7+
8+
/**
9+
* Message contains information that needs to be sent to the OnSuccess sink.
10+
* The message can be different from the original message that was sent to primary sink.
11+
*/
12+
@Getter
13+
@Builder
14+
public class Message {
15+
private final byte[] value;
16+
private final String key;
17+
private final HashMap<String, KeyValueGroup> userMetadata;
18+
}
19+
20+

src/main/java/io/numaproj/numaflow/sinker/Response.java

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
package io.numaproj.numaflow.sinker;
22

3+
import com.google.protobuf.ByteString;
4+
import common.MetadataOuterClass;
5+
import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse;
36
import lombok.AccessLevel;
47
import lombok.AllArgsConstructor;
58
import lombok.Getter;
69

10+
import java.util.Collections;
11+
import java.util.Map;
12+
import java.util.stream.Collectors;
13+
714
/**
815
* Response is used to send response from the user defined sinker. It contains the id of the
916
* message, success status, an optional error message and a fallback status. Various static factory
@@ -16,15 +23,21 @@ public class Response {
1623
private final Boolean success;
1724
private final String err;
1825
private final Boolean fallback;
26+
private final Boolean serve;
27+
private final byte[] serveResponse;
28+
private final Boolean onSuccess;
29+
// FIXME: Should this be Message object from this package? That would allow parity with other SDKs (specially Go)
30+
// Currently done this way to prevent conversion in buildResult method.
31+
private final SinkResponse.Result.Message onSuccessMessage;
1932

20-
/**
33+
/**
2134
* Static method to create response for successful message processing.
2235
*
2336
* @param id id of the message
2437
* @return Response object with success status
2538
*/
2639
public static Response responseOK(String id) {
27-
return new Response(id, true, null, false);
40+
return new Response(id, true, null, false, false, null, false, null);
2841
}
2942

3043
/**
@@ -35,7 +48,7 @@ public static Response responseOK(String id) {
3548
* @return Response object with failure status and error message
3649
*/
3750
public static Response responseFailure(String id, String errMsg) {
38-
return new Response(id, false, errMsg, false);
51+
return new Response(id, false, errMsg, false, false, null, false, null);
3952
}
4053

4154
/**
@@ -46,6 +59,89 @@ public static Response responseFailure(String id, String errMsg) {
4659
* @return Response object with fallback status
4760
*/
4861
public static Response responseFallback(String id) {
49-
return new Response(id, false, null, true);
62+
return new Response(id, false, null, true, false, null, false, null);
63+
}
64+
65+
/**
66+
* Static method to create response for serve message which is raw bytes.
67+
* This indicates that the message should be sent to the serving store.
68+
* Allows creation of serve message from raw bytes.
69+
*
70+
* @param id id of the message
71+
* @param serveResponse Response object to be sent to the serving store
72+
* @return Response object with serve status and serve response
73+
*/
74+
public static Response responseServe(String id, byte[] serveResponse) {
75+
return new Response(id, false, null, false, true, serveResponse, false, null);
76+
}
77+
78+
/**
79+
* Static method to create response for onSuccess message. Allows creation of onSuccess message
80+
* from protobuf Message object.
81+
*
82+
* @param id id of the message
83+
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink
84+
* @return Response object with onSuccess status and onSuccess message
85+
*/
86+
public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) {
87+
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
88+
}
89+
90+
/**
91+
* Overloaded static method to create response for onSuccess message. Allows creation of onSuccess message
92+
* from OnSuccessMessage object.
93+
*
94+
* @param id id of the message
95+
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink. Can be null
96+
* if original message needs to be written to onSuccess sink
97+
* @return Response object with onSuccess status and onSuccess message
98+
*/
99+
public static Response responseOnSuccess(String id, Message onSuccessMessage) {
100+
if (onSuccessMessage == null) {
101+
return new Response(id, false, null, false, false, null, true, null);
102+
} else {
103+
104+
Map<String, MetadataOuterClass.KeyValueGroup> pbUserMetadata = MetadataOuterClass.Metadata
105+
.getDefaultInstance()
106+
.getUserMetadataMap();
107+
108+
if (onSuccessMessage.getUserMetadata() != null) {
109+
pbUserMetadata =
110+
onSuccessMessage.getUserMetadata()
111+
.entrySet()
112+
.stream()
113+
.filter(e -> e.getKey() != null && e.getValue() != null)
114+
.collect(Collectors.toMap(
115+
Map.Entry::getKey,
116+
e -> MetadataOuterClass.KeyValueGroup.newBuilder()
117+
.putAllKeyValue(e.getValue().getKeyValue() == null
118+
? Collections.emptyMap()
119+
: e.getValue()
120+
.getKeyValue()
121+
.entrySet()
122+
.stream()
123+
.filter(kv -> kv.getKey() != null
124+
&& kv.getValue() != null)
125+
.collect(Collectors.toMap(
126+
Map.Entry::getKey,
127+
kv -> ByteString.copyFrom(kv.getValue())
128+
))
129+
)
130+
.build()
131+
));
132+
}
133+
134+
MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder()
135+
.putAllUserMetadata(pbUserMetadata)
136+
.build();
137+
138+
SinkResponse.Result.Message pbOnSuccessMessage = SinkResponse.Result.Message.newBuilder()
139+
.addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey())
140+
.setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue()))
141+
.setMetadata(pbMetadata)
142+
.build();
143+
144+
return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage);
145+
}
50146
}
51147
}

0 commit comments

Comments
 (0)