Skip to content

Commit f8d1151

Browse files
author
vtiwari5
committed
Add support for on success sink
Signed-off-by: vtiwari5 <[email protected]>
1 parent 38734c0 commit f8d1151

File tree

8 files changed

+201
-10
lines changed

8 files changed

+201
-10
lines changed

src/main/java/io/numaproj/numaflow/sinker/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
class Constants {
44
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock";
55
public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock";
6+
public static final String DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH = "/var/run/numaflow/ons-sink.sock";
67
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info";
78
public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH =
89
"/var/run/numaflow/fb-sinker-server-info";
10+
public static final String DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH =
11+
"/var/run/numaflow/ons-sinker-server-info";
912
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
1013
public static final int DEFAULT_PORT = 50051;
1114
public static final String DEFAULT_HOST = "localhost";
1215
public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";
1316
public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink";
17+
public static final String UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink";
1418

1519
// Private constructor to prevent instantiation
1620
private Constants() {

src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ static GRPCConfig defaultGrpcConfig() {
3636
if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) {
3737
socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH;
3838
infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH;
39+
} else if (Constants.UD_CONTAINER_ON_SUCCESS_SINK.equals(containerType)) {
40+
socketPath = Constants.DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH;
41+
infoFilePath = Constants.DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH;
3942
}
4043
return GRPCConfig.newBuilder()
4144
.infoFilePath(infoFilePath)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.numaproj.numaflow.sinker;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
import java.util.HashMap;
7+
8+
@Getter
9+
@Builder
10+
public class KeyValueGroup {
11+
private final HashMap<String, byte[]> keyValue;
12+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.numaproj.numaflow.sinker;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
import java.util.HashMap;
7+
8+
@Getter
9+
@Builder
10+
public class OnSuccessMessage {
11+
private final byte[] value;
12+
private final String key;
13+
private final HashMap<String, KeyValueGroup> userMetadata;
14+
}
15+
16+

src/main/java/io/numaproj/numaflow/sinker/Response.java

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package io.numaproj.numaflow.sinker;
22

3+
import com.google.protobuf.ByteString;
4+
import common.MetadataOuterClass;
5+
import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse.Result.Message;
36
import lombok.AccessLevel;
47
import lombok.AllArgsConstructor;
58
import lombok.Getter;
69

10+
import java.util.Map;
11+
import java.util.stream.Collectors;
12+
713
/**
814
* Response is used to send response from the user defined sinker. It contains the id of the
915
* message, success status, an optional error message and a fallback status. Various static factory
@@ -16,6 +22,12 @@ public class Response {
1622
private final Boolean success;
1723
private final String err;
1824
private final Boolean fallback;
25+
private final Boolean serve;
26+
private final byte[] serveResponse;
27+
private final Boolean onSuccess;
28+
// FIXME: Should this be OnSuccessMessage object from package? That would allow parity with other SDKs (specially Go)
29+
// Currently done this way to prevent conversion in buildResult method.
30+
private final Message onSuccessMessage;
1931

2032
/**
2133
* Static method to create response for successful message processing.
@@ -24,7 +36,7 @@ public class Response {
2436
* @return Response object with success status
2537
*/
2638
public static Response responseOK(String id) {
27-
return new Response(id, true, null, false);
39+
return new Response(id, true, null, false, false, null, false, null);
2840
}
2941

3042
/**
@@ -35,7 +47,7 @@ public static Response responseOK(String id) {
3547
* @return Response object with failure status and error message
3648
*/
3749
public static Response responseFailure(String id, String errMsg) {
38-
return new Response(id, false, errMsg, false);
50+
return new Response(id, false, errMsg, false, false, null, false, null);
3951
}
4052

4153
/**
@@ -46,6 +58,76 @@ public static Response responseFailure(String id, String errMsg) {
4658
* @return Response object with fallback status
4759
*/
4860
public static Response responseFallback(String id) {
49-
return new Response(id, false, null, true);
61+
return new Response(id, false, null, true, false, null, false, null);
62+
}
63+
64+
/**
65+
* Static method to create response for serve message which is raw bytes.
66+
* This indicates that the message should be sent to the serving store.
67+
* Allows creation of serve message from raw bytes.
68+
*
69+
* @param id id of the message
70+
* @param serveResponse Response object to be sent to the serving store
71+
* @return Response object with serve status and serve response
72+
*/
73+
public static Response responseServe(String id, byte[] serveResponse) {
74+
return new Response(id, false, null, false, true, serveResponse, false, null);
75+
}
76+
77+
/**
78+
* Static method to create response for onSuccess message. Allows creation of onSuccess message
79+
* from protobuf Message object.
80+
*
81+
* @param id id of the message
82+
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink
83+
* @return Response object with onSuccess status and onSuccess message
84+
*/
85+
public static Response responseOnSuccess(String id, Message onSuccessMessage) {
86+
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
87+
}
88+
89+
/**
90+
* Overloaded static method to create response for onSuccess message. Allows creation of onSuccess message
91+
* from OnSuccessMessage object.
92+
*
93+
* @param id id of the message
94+
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink. Can be null
95+
* if original message needs to be written to onSuccess sink
96+
* @return Response object with onSuccess status and onSuccess message
97+
*/
98+
public static Response responseOnSuccess(String id, OnSuccessMessage onSuccessMessage) {
99+
if (onSuccessMessage == null) {
100+
return new Response(id, false, null, false, false, null, true, null);
101+
} else {
102+
Map<String, MetadataOuterClass.KeyValueGroup> pbUserMetadata = onSuccessMessage.getUserMetadata()
103+
.entrySet()
104+
.stream()
105+
.collect(Collectors.toMap(
106+
Map.Entry::getKey,
107+
e -> MetadataOuterClass.KeyValueGroup.newBuilder()
108+
.putAllKeyValue(e.getValue()
109+
.getKeyValue()
110+
.entrySet()
111+
.stream()
112+
.collect(Collectors.toMap(
113+
Map.Entry::getKey,
114+
kv -> ByteString.copyFrom(kv.getValue())
115+
))
116+
)
117+
.build()
118+
));
119+
120+
MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder()
121+
.putAllUserMetadata(pbUserMetadata)
122+
.build();
123+
124+
Message pbOnSuccessMessage = Message.newBuilder()
125+
.addKeys(onSuccessMessage.getKey())
126+
.setValue(ByteString.copyFrom(onSuccessMessage.getValue()))
127+
.setMetadata(pbMetadata)
128+
.build();
129+
130+
return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage);
131+
}
50132
}
51133
}

src/main/java/io/numaproj/numaflow/sinker/Service.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.numaproj.numaflow.sinker;
22

33
import com.google.protobuf.Any;
4+
import com.google.protobuf.ByteString;
45
import com.google.protobuf.Empty;
56
import com.google.rpc.Code;
67
import com.google.rpc.DebugInfo;
@@ -131,13 +132,38 @@ public void onCompleted() {
131132
}
132133

133134
private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
134-
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK
135-
: response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
136-
return SinkOuterClass.SinkResponse.Result.newBuilder()
137-
.setId(response.getId() == null ? "" : response.getId())
138-
.setErrMsg(response.getErr() == null ? "" : response.getErr())
139-
.setStatus(status)
140-
.build();
135+
if (response.getFallback()) {
136+
return SinkOuterClass.SinkResponse.Result.newBuilder()
137+
.setId(response.getId() == null ? "" : response.getId())
138+
.setStatus(SinkOuterClass.Status.FALLBACK)
139+
.build();
140+
} else if (response.getSuccess()) {
141+
return SinkOuterClass.SinkResponse.Result.newBuilder()
142+
.setId(response.getId() == null ? "" : response.getId())
143+
.setStatus(SinkOuterClass.Status.SUCCESS)
144+
.build();
145+
} else if (response.getServe()) {
146+
// FIXME: Return error when serve response is not set?
147+
return SinkOuterClass.SinkResponse.Result.newBuilder()
148+
.setId(response.getId() == null ? "" : response.getId())
149+
.setStatus(SinkOuterClass.Status.SERVE)
150+
.setServeResponse(response.getServeResponse() == null ? null : ByteString.copyFrom(
151+
response.getServeResponse()))
152+
.build();
153+
} else if (response.getOnSuccess()) {
154+
return SinkOuterClass.SinkResponse.Result.newBuilder()
155+
.setId(response.getId() == null ? "" : response.getId())
156+
.setStatus(SinkOuterClass.Status.ON_SUCCESS)
157+
.setOnSuccessMsg(response.getOnSuccessMessage() == null ? null : response.getOnSuccessMessage())
158+
.build();
159+
} else {
160+
// FIXME: Return error when error message is not set?
161+
return SinkOuterClass.SinkResponse.Result.newBuilder()
162+
.setId(response.getId() == null ? "" : response.getId())
163+
.setStatus(SinkOuterClass.Status.FAILURE)
164+
.setErrMsg(response.getErr() == null ? "" : response.getErr())
165+
.build();
166+
}
141167
}
142168

143169
/**
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
Copyright 2022 The Numaproj Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
syntax = "proto3";
18+
option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/common";
19+
20+
package common;
21+
22+
// Metadata is the metadata of the message
23+
message Metadata {
24+
// PreviousVertex is the name of the previous vertex
25+
string previous_vertex = 1;
26+
// SystemMetadata is the system metadata of the message
27+
// Key of the map is the group name
28+
map<string, KeyValueGroup> sys_metadata = 2;
29+
// UserMetadata is the user metadata of the message
30+
// Key of the map is the group name
31+
map<string, KeyValueGroup> user_metadata = 3;
32+
}
33+
34+
// KeyValueGroup is a group of key-value pairs for a given group.
35+
message KeyValueGroup {
36+
map<string, bytes> key_value = 1;
37+
}

src/main/proto/sink/v1/sink.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.sink.v1";
44

55
import "google/protobuf/timestamp.proto";
66
import "google/protobuf/empty.proto";
7+
import "common/metadata.proto";
78

89
package sink.v1;
910

@@ -65,19 +66,29 @@ enum Status {
6566
SUCCESS = 0;
6667
FAILURE = 1;
6768
FALLBACK = 2;
69+
SERVE = 3;
70+
ON_SUCCESS = 4;
6871
}
6972

7073
/**
7174
* SinkResponse is the individual response of each message written to the sink.
7275
*/
7376
message SinkResponse {
7477
message Result {
78+
message Message {
79+
bytes value = 1;
80+
repeated string keys = 2;
81+
common.Metadata metadata = 3;
82+
}
7583
// id is the ID of the message, can be used to uniquely identify the message.
7684
string id = 1;
7785
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
7886
Status status = 2;
7987
// err_msg is the error message, set it if success is set to false.
8088
string err_msg = 3;
89+
optional bytes serve_response = 4;
90+
// on_success_msg is the message to be sent to on_success sink.
91+
optional Message on_success_msg = 5;
8192
}
8293
repeated Result results = 1;
8394
optional Handshake handshake = 2;

0 commit comments

Comments
 (0)