Skip to content

Commit 4fa3b39

Browse files
authored
chore: use window information from the request for metadata for reduce stream (#196)
Signed-off-by: Yashash H L <[email protected]>
1 parent d3ad5d0 commit 4fa3b39

File tree

6 files changed

+70
-50
lines changed

6 files changed

+70
-50
lines changed

src/main/java/io/numaproj/numaflow/reducestreamer/Service.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
5555
responseObserver);
5656
}
5757

58-
// get window start and end time from gPRC metadata
59-
String winSt = GrpcServerUtils.WINDOW_START_TIME.get();
60-
String winEt = GrpcServerUtils.WINDOW_END_TIME.get();
61-
62-
// convert the start and end time to Instant
63-
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
64-
Instant endTime = Instant.ofEpochMilli(Long.parseLong(winEt));
65-
66-
// create metadata
67-
IntervalWindow iw = new IntervalWindowImpl(startTime, endTime);
68-
Metadata md = new MetadataImpl(iw);
69-
7058
CompletableFuture<Void> failureFuture = new CompletableFuture<>();
7159

7260
// create a shutdown actor that listens to exceptions.
@@ -88,7 +76,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
8876
ActorRef supervisorActor = reduceActorSystem
8977
.actorOf(SupervisorActor.props(
9078
reduceStreamerFactory,
91-
md,
9279
shutdownActorRef,
9380
outputActor));
9481

src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import akka.japi.pf.ReceiveBuilder;
1010
import com.google.common.base.Preconditions;
1111
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
12+
import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
1213
import io.numaproj.numaflow.reducestreamer.model.Metadata;
1314
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
1415
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
@@ -27,31 +28,26 @@
2728
@Slf4j
2829
class SupervisorActor extends AbstractActor {
2930
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
30-
private final Metadata md;
3131
private final ActorRef shutdownActor;
3232
private final ActorRef outputActor;
3333
private final Map<String, ActorRef> actorsMap = new HashMap<>();
3434

3535
public SupervisorActor(
3636
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
37-
Metadata md,
3837
ActorRef shutdownActor,
3938
ActorRef outputActor) {
4039
this.reduceStreamerFactory = reduceStreamerFactory;
41-
this.md = md;
4240
this.shutdownActor = shutdownActor;
4341
this.outputActor = outputActor;
4442
}
4543

4644
public static Props props(
4745
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
48-
Metadata md,
4946
ActorRef shutdownActor,
5047
ActorRef outputActor) {
5148
return Props.create(
5249
SupervisorActor.class,
5350
reduceStreamerFactory,
54-
md,
5551
shutdownActor,
5652
outputActor);
5753
}
@@ -94,12 +90,22 @@ public Receive createReceive() {
9490
private void invokeActor(ActorRequest actorRequest) {
9591
String[] keys = actorRequest.getKeySet();
9692
String uniqueId = actorRequest.getUniqueIdentifier();
93+
ReduceOuterClass.Window window = actorRequest.getRequest().getOperation().getWindows(0);
9794
if (!actorsMap.containsKey(uniqueId)) {
9895
ReduceStreamer reduceStreamerHandler = reduceStreamerFactory.createReduceStreamer();
96+
// create metadata
97+
IntervalWindow iw = new IntervalWindowImpl(
98+
Instant.ofEpochSecond(
99+
window.getStart().getSeconds(),
100+
window.getStart().getNanos()),
101+
Instant.ofEpochSecond(
102+
window.getEnd().getSeconds(),
103+
window.getEnd().getNanos()));
104+
Metadata md = new MetadataImpl(iw);
99105
ActorRef actorRef = getContext()
100106
.actorOf(ReduceStreamerActor.props(
101107
keys,
102-
this.md,
108+
md,
103109
reduceStreamerHandler,
104110
this.outputActor));
105111
actorsMap.put(uniqueId, actorRef);

src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.numaproj.numaflow.reducestreamer;
22

33
import com.google.protobuf.ByteString;
4+
import com.google.protobuf.Timestamp;
45
import io.grpc.Context;
56
import io.grpc.Contexts;
67
import io.grpc.ManagedChannel;
@@ -91,10 +92,6 @@ public void tearDown() throws Exception {
9192

9293
@Test
9394
public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowable() {
94-
Metadata metadata = new Metadata();
95-
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
96-
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
97-
9895
// create an output stream observer
9996
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
10097
// we need to maintain a reference to any exceptions thrown inside the thread, otherwise even if the assertion failed in the thread,
@@ -121,7 +118,6 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
121118

122119
StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
123120
.newStub(inProcessChannel)
124-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
125121
.reduceFn(outputStreamObserver);
126122

127123
for (int i = 1; i <= 10; i++) {
@@ -132,6 +128,15 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
132128
.addKeys("reduce-key")
133129
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
134130
.build())
131+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
132+
.newBuilder()
133+
.addWindows(
134+
ReduceOuterClass.Window
135+
.newBuilder()
136+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
137+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
138+
.build()
139+
))
135140
.build();
136141
inputStreamObserver.onNext(reduceRequest);
137142
}

src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.numaproj.numaflow.reducestreamer;
22

33
import com.google.protobuf.ByteString;
4+
import com.google.protobuf.Timestamp;
45
import io.grpc.Context;
56
import io.grpc.Contexts;
67
import io.grpc.ManagedChannel;
@@ -97,16 +98,11 @@ public void tearDown() throws Exception {
9798
public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() {
9899
String reduceKey = "reduce-key";
99100

100-
Metadata metadata = new Metadata();
101-
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
102-
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
103-
104101
// create an output stream observer
105102
io.numaproj.numaflow.reducer.ReduceOutputStreamObserver outputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver();
106103

107104
StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
108105
.newStub(inProcessChannel)
109-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
110106
.reduceFn(outputStreamObserver);
111107

112108
for (int i = 1; i <= 11; i++) {
@@ -116,6 +112,15 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
116112
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
117113
.addAllKeys(List.of(reduceKey))
118114
.build())
115+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
116+
.newBuilder()
117+
.addWindows(
118+
ReduceOuterClass.Window
119+
.newBuilder()
120+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
121+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
122+
.build()
123+
))
119124
.build();
120125
inputStreamObserver.onNext(request);
121126
}
@@ -169,16 +174,11 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
169174
String reduceKey = "reduce-key";
170175
int keyCount = 10;
171176

172-
Metadata metadata = new Metadata();
173-
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
174-
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
175-
176177
// create an output stream observer
177178
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
178179

179180
StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
180181
.newStub(inProcessChannel)
181-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
182182
.reduceFn(outputStreamObserver);
183183

184184
// send messages with keyCount different keys
@@ -190,6 +190,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
190190
.addAllKeys(List.of(reduceKey + j))
191191
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
192192
.build())
193+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
194+
.newBuilder()
195+
.addWindows(
196+
ReduceOuterClass.Window
197+
.newBuilder()
198+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
199+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
200+
.build()
201+
))
193202
.build();
194203
inputStreamObserver.onNext(request);
195204
}

src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import akka.actor.AllDeadLetters;
66
import akka.actor.DeadLetter;
77
import com.google.protobuf.ByteString;
8+
import com.google.protobuf.Timestamp;
89
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
910
import io.numaproj.numaflow.reducestreamer.model.Datum;
1011
import io.numaproj.numaflow.reducestreamer.model.Message;
@@ -36,9 +37,6 @@ public void testFailure() {
3637
.actorOf(ShutdownActor
3738
.props(completableFuture));
3839

39-
Metadata md = new MetadataImpl(
40-
new IntervalWindowImpl(Instant.now(), Instant.now()));
41-
4240
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();
4341

4442
ActorRef outputActor = actorSystem.actorOf(OutputActor
@@ -48,7 +46,6 @@ public void testFailure() {
4846
.actorOf(SupervisorActor
4947
.props(
5048
new TestExceptionFactory(),
51-
md,
5249
shutdownActor,
5350
outputActor));
5451

@@ -58,6 +55,15 @@ public void testFailure() {
5855
.addKeys("reduce-test")
5956
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
6057
.build())
58+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
59+
.newBuilder()
60+
.addWindows(
61+
ReduceOuterClass.Window
62+
.newBuilder()
63+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
64+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
65+
.build()
66+
))
6167
.build());
6268
supervisorActor.tell(reduceRequest, ActorRef.noSender());
6369

@@ -80,9 +86,6 @@ public void testDeadLetterHandling() {
8086

8187
actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class);
8288

83-
Metadata md = new MetadataImpl(
84-
new IntervalWindowImpl(Instant.now(), Instant.now()));
85-
8689
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();
8790

8891
ActorRef outputActor = actorSystem.actorOf(OutputActor
@@ -92,7 +95,6 @@ public void testDeadLetterHandling() {
9295
.actorOf(SupervisorActor
9396
.props(
9497
new TestExceptionFactory(),
95-
md,
9698
shutdownActor,
9799
outputActor));
98100

src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import akka.actor.ActorRef;
44
import akka.actor.ActorSystem;
55
import com.google.protobuf.ByteString;
6+
import com.google.protobuf.Timestamp;
67
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
78
import io.numaproj.numaflow.reducestreamer.model.Datum;
89
import io.numaproj.numaflow.reducestreamer.model.Message;
@@ -32,9 +33,6 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
3233
.actorOf(ShutdownActor
3334
.props(completableFuture));
3435

35-
Metadata md = new MetadataImpl(
36-
new IntervalWindowImpl(Instant.now(), Instant.now()));
37-
3836
io.numaproj.numaflow.reducer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver();
3937

4038
ActorRef outputActor = actorSystem.actorOf(OutputActor
@@ -44,7 +42,6 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
4442
.actorOf(SupervisorActor
4543
.props(
4644
new TestReduceStreamerFactory(),
47-
md,
4845
shutdownActor,
4946
outputActor));
5047

@@ -58,6 +55,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
5855
.addAllKeys(Arrays.asList("key-1", "key-2"))
5956
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
6057
.build())
58+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
59+
.newBuilder()
60+
.addWindows(
61+
ReduceOuterClass.Window
62+
.newBuilder()
63+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
64+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
65+
.build()
66+
))
6167
.build());
6268
supervisorActor.tell(reduceRequest, ActorRef.noSender());
6369
}
@@ -94,17 +100,13 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
94100
.actorOf(ShutdownActor
95101
.props(completableFuture));
96102

97-
Metadata md = new MetadataImpl(
98-
new IntervalWindowImpl(Instant.now(), Instant.now()));
99-
100103
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new ReduceOutputStreamObserver();
101104
ActorRef outputActor = actorSystem.actorOf(OutputActor
102105
.props(reduceOutputStreamObserver));
103106
ActorRef supervisorActor = actorSystem
104107
.actorOf(SupervisorActor
105108
.props(
106109
new TestReduceStreamerFactory(),
107-
md,
108110
shutdownActor,
109111
outputActor)
110112
);
@@ -119,6 +121,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
119121
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
120122
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
121123
.build())
124+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
125+
.newBuilder()
126+
.addWindows(
127+
ReduceOuterClass.Window
128+
.newBuilder()
129+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
130+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
131+
.build()
132+
))
122133
.build());
123134
supervisorActor.tell(reduceRequest, ActorRef.noSender());
124135
}

0 commit comments

Comments
 (0)