Skip to content

Commit c3737df

Browse files
yhl25KeranYangvigith
authored
feat: user defined serving store (#169)
Signed-off-by: Yashash H L <[email protected]> Signed-off-by: Keran Yang <[email protected]> Co-authored-by: Keran Yang <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
1 parent d1cdb8f commit c3737df

File tree

23 files changed

+815
-19
lines changed

23 files changed

+815
-19
lines changed

examples/pom.xml

+22
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,28 @@
337337
</to>
338338
</configuration>
339339
</execution>
340+
<execution>
341+
<id>serving-inmem-store</id>
342+
<phase>package</phase>
343+
<goals>
344+
<goal>dockerBuild</goal>
345+
</goals>
346+
<configuration>
347+
<from>
348+
<image>amazoncorretto:11</image>
349+
</from>
350+
<container>
351+
<mainClass>
352+
io.numaproj.numaflow.examples.servingstore.memory.ServingInMemoryStore
353+
</mainClass>
354+
</container>
355+
<to>
356+
<image>
357+
numaflow-java-examples/serving-inmem-store:${docker.tag}
358+
</image>
359+
</to>
360+
</configuration>
361+
</execution>
340362
</executions>
341363
</plugin>
342364
</plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.numaproj.numaflow.examples.servingstore.memory;
2+
3+
import io.numaproj.numaflow.servingstore.GetDatum;
4+
import io.numaproj.numaflow.servingstore.Payload;
5+
import io.numaproj.numaflow.servingstore.PutDatum;
6+
import io.numaproj.numaflow.servingstore.Server;
7+
import io.numaproj.numaflow.servingstore.ServingStorer;
8+
import io.numaproj.numaflow.servingstore.StoredResult;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
import java.util.Collections;
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
16+
@Slf4j
17+
public class ServingInMemoryStore extends ServingStorer {
18+
private final Map<String, List<Payload>> store = new HashMap<>();
19+
20+
public void put(PutDatum putDatum) {
21+
log.info("Putting data into the store with ID: {}", putDatum.ID());
22+
store.put(putDatum.ID(), putDatum.Payloads());
23+
}
24+
25+
public StoredResult get(GetDatum getDatum) {
26+
log.info("Getting data from the store with ID: {}", getDatum.ID());
27+
List<Payload> payloads = store.getOrDefault(getDatum.ID(), Collections.emptyList());
28+
return new StoredResult(getDatum.ID(), payloads);
29+
}
30+
31+
public static void main(String[] args) throws Exception {
32+
Server server = new Server(new ServingInMemoryStore());
33+
34+
// Start the server
35+
server.start();
36+
37+
// Wait for the server to shutdown
38+
server.awaitTermination();
39+
}
40+
}

pom.xml

+3-1
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@
318318
<exclude>io/numaproj/numaflow/shared/*</exclude>
319319
<exclude>io/numaproj/numaflow/sideinput/v1/*</exclude>
320320
<exclude>io/numaproj/numaflow/source/v1/*</exclude>
321+
<exclude>io/numaproj/numaflow/serving/v1/*</exclude>
321322
<exclude>**/*TestKit*</exclude>
322323
</excludes>
323324
</configuration>
@@ -355,7 +356,8 @@
355356
io.numaproj.numaflow.shared,
356357
io.numaproj.numaflow.sideinput.v1,
357358
io.numaproj.numaflow.source.v1,
358-
io.numaproj.numaflow.sessionreduce.v1
359+
io.numaproj.numaflow.sessionreduce.v1,
360+
io.numaproj.numaflow.serving.v1
359361
</excludePackageNames>
360362
</configuration>
361363
<executions>

src/main/java/io/numaproj/numaflow/info/ContainerType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ public enum ContainerType {
1111
REDUCE_STREAMER("reducestreamer"),
1212
SESSION_REDUCER("sessionreducer"),
1313
SIDEINPUT("sideinput"),
14-
FBSINKER("fb-sinker");
14+
FBSINKER("fb-sinker"),
15+
SERVING("serving");
1516

1617
private final String name;
1718

src/main/java/io/numaproj/numaflow/info/ServerInfo.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public class ServerInfo {
3535
entry(ContainerType.REDUCE_STREAMER, "1.4.0-z"),
3636
entry(ContainerType.SESSION_REDUCER, "1.4.0-z"),
3737
entry(ContainerType.SIDEINPUT, "1.4.0-z"),
38-
entry(ContainerType.FBSINKER, "1.4.0-z")
38+
entry(ContainerType.FBSINKER, "1.4.0-z"),
39+
entry(ContainerType.SERVING, "1.5.0-z")
3940
);
4041
@JsonProperty("protocol")
4142
private Protocol protocol;

src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ static GRPCConfig defaultGrpcConfig() {
3131
return GRPCConfig.newBuilder()
3232
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
3333
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
34-
.isLocal(System.getenv("NUMAFLOW_POD")
35-
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
34+
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running
35+
// using numaflow
3636
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
3737
}
3838
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
class Constants {
4+
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
5+
6+
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/serving.sock";
7+
8+
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/serving-server-info";
9+
10+
public static final int DEFAULT_PORT = 50051;
11+
12+
public static final String DEFAULT_HOST = "localhost";
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
import io.numaproj.numaflow.shared.GrpcConfigRetriever;
4+
import lombok.Builder;
5+
import lombok.Getter;
6+
7+
/**
8+
* GRPCConfig is used to provide configurations for gRPC server.
9+
*/
10+
@Getter
11+
@Builder(builderMethodName = "newBuilder")
12+
public class GRPCConfig implements GrpcConfigRetriever {
13+
@Builder.Default
14+
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
15+
16+
@Builder.Default
17+
private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE;
18+
19+
@Builder.Default
20+
private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH;
21+
22+
@Builder.Default
23+
private int port = Constants.DEFAULT_PORT;
24+
25+
private boolean isLocal;
26+
27+
/**
28+
* Static method to create default GRPCConfig.
29+
*/
30+
static GRPCConfig defaultGrpcConfig() {
31+
return GRPCConfig.newBuilder()
32+
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
33+
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
34+
.isLocal(System.getenv("NUMAFLOW_POD")
35+
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
36+
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
/**
4+
* GetDatum is the interface to expose methods to retrieve data from the Get RPC.
5+
*/
6+
public interface GetDatum {
7+
/**
8+
* Returns the ID.
9+
*
10+
* @return the ID
11+
*/
12+
String ID();
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
import lombok.AllArgsConstructor;
4+
5+
@AllArgsConstructor
6+
public class GetDatumImpl implements GetDatum {
7+
private final String id;
8+
9+
/**
10+
* Returns the ID.
11+
*
12+
* @return the ID
13+
*/
14+
@Override
15+
public String ID() {
16+
return id;
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
import lombok.Getter;
4+
import lombok.AllArgsConstructor;
5+
6+
/**
7+
* Payload is each independent result stored in the Store per origin for a given ID.
8+
*/
9+
@Getter
10+
@AllArgsConstructor
11+
public class Payload {
12+
// origin is the name of the vertex that produced the payload.
13+
private String origin;
14+
// value is the byte array of the payload.
15+
private byte[] value;
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
import java.util.List;
4+
5+
/**
6+
* PutDatum interface exposes methods to retrieve data from the Put RPC.
7+
*/
8+
public interface PutDatum {
9+
/**
10+
* Returns the ID.
11+
*
12+
* @return the ID
13+
*/
14+
String ID();
15+
16+
/**
17+
* Returns the list of payloads.
18+
*
19+
* @return the list of payloads
20+
*/
21+
List<Payload> Payloads();
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.numaproj.numaflow.servingstore;
2+
3+
import java.util.List;
4+
5+
import lombok.AllArgsConstructor;
6+
7+
@AllArgsConstructor
8+
public class PutDatumImpl implements PutDatum {
9+
private final String id;
10+
private final List<Payload> payloads;
11+
12+
/**
13+
* Returns the ID.
14+
*
15+
* @return the ID
16+
*/
17+
@Override
18+
public String ID() {
19+
return id;
20+
}
21+
22+
/**
23+
* Returns the list of payloads.
24+
*
25+
* @return the list of payloads
26+
*/
27+
@Override
28+
public List<Payload> Payloads() {
29+
return payloads;
30+
}
31+
}

0 commit comments

Comments
 (0)