Skip to content

Commit a79f1fd

Browse files
authored
fix: avoid NullPointerExceptions if array element is null (#194)
1 parent 641639c commit a79f1fd

File tree

13 files changed

+27
-27
lines changed

13 files changed

+27
-27
lines changed

src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.time.Instant;
1111
import java.util.ArrayList;
1212
import java.util.Arrays;
13-
import java.util.List;
1413

1514
class OutputStreamObserverImpl implements OutputStreamObserver {
1615
private final ActorRef outputActor;
@@ -61,7 +60,7 @@ private AccumulatorOuterClass.AccumulatorResponse buildResponse(Message message)
6160
.setId(message.getId())
6261
.build())
6362
.addAllTags(
64-
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
63+
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
6564
.build();
6665
}
6766

src/main/java/io/numaproj/numaflow/batchmapper/Service.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import java.time.Instant;
1818
import java.util.ArrayList;
19+
import java.util.Arrays;
1920
import java.util.List;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutorService;
@@ -150,9 +151,9 @@ private void buildAndStreamResponse(
150151
== null ? ByteString.EMPTY : ByteString.copyFrom(
151152
res.getValue()))
152153
.addAllKeys(res.getKeys()
153-
== null ? new ArrayList<>() : List.of(res.getKeys()))
154+
== null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
154155
.addAllTags(res.getTags()
155-
== null ? new ArrayList<>() : List.of(res.getTags()))
156+
== null ? new ArrayList<>() : Arrays.asList(res.getTags()))
156157
.build()
157158
);
158159
});

src/main/java/io/numaproj/numaflow/mapper/MapperActor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import java.time.Instant;
1111
import java.util.ArrayList;
12-
import java.util.List;
12+
import java.util.Arrays;
1313

1414
/**
1515
* Mapper actor that processes the map request. It invokes the mapper to process the request and
@@ -86,9 +86,9 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
8686
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
8787
message.getValue()))
8888
.addAllKeys(message.getKeys()
89-
== null ? new ArrayList<>() : List.of(message.getKeys()))
89+
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
9090
.addAllTags(message.getTags()
91-
== null ? new ArrayList<>() : List.of(message.getTags()))
91+
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
9292
.build());
9393
});
9494
return responseBuilder.setId(ID).build();

src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.time.Instant;
1515
import java.util.ArrayList;
16+
import java.util.Arrays;
1617
import java.util.HashMap;
1718
import java.util.List;
1819
import java.util.Map;
@@ -160,7 +161,7 @@ private MapOuterClass.MapRequest createRequest(
160161
String requestId) {
161162
return MapOuterClass.MapRequest.newBuilder().setRequest(
162163
MapOuterClass.MapRequest.Request.newBuilder()
163-
.addAllKeys(keys == null ? new ArrayList<>() : List.of(keys))
164+
.addAllKeys(keys == null ? new ArrayList<>() : Arrays.asList(keys))
164165
.setValue(data.getValue()
165166
== null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue()))
166167
.setEventTime(

src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import lombok.AllArgsConstructor;
77

88
import java.util.ArrayList;
9-
import java.util.List;
9+
import java.util.Arrays;
1010

1111
/**
1212
* Implementation of the OutputObserver interface.
@@ -36,9 +36,9 @@ public void send(Message message) {
3636
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
3737
message.getValue()))
3838
.addAllKeys(message.getKeys()
39-
== null ? new ArrayList<>() : List.of(message.getKeys()))
39+
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
4040
.addAllTags(message.getTags()
41-
== null ? new ArrayList<>() : List.of(message.getTags()))
41+
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
4242
.build()).build();
4343
supervisorActor.tell(response, ActorRef.noSender());
4444
}

src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import java.util.ArrayList;
1313
import java.util.Arrays;
14-
import java.util.List;
1514

1615
/**
1716
* Reduce actor invokes the reducer and returns the result.
@@ -70,7 +69,7 @@ private ActorResponse buildActorResponse(Message message) {
7069
.addAllKeys(message.getKeys()
7170
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
7271
.addAllTags(
73-
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
72+
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
7473
.build());
7574
return new ActorResponse(responseBuilder.build());
7675
}
@@ -89,7 +88,7 @@ private ActorResponse buildEOFActorResponse() {
8988
// set a dummy result with the keys.
9089
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
9190
.newBuilder()
92-
.addAllKeys(List.of(this.keys))
91+
.addAllKeys(Arrays.asList(this.keys))
9392
.build());
9493
return new ActorResponse(responseBuilder.build());
9594
}

src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import java.util.ArrayList;
1313
import java.util.Arrays;
14-
import java.util.List;
1514

1615
@AllArgsConstructor
1716
class OutputStreamObserverImpl implements OutputStreamObserver {
@@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message) {
4241
.addAllKeys(message.getKeys()
4342
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
4443
.addAllTags(
45-
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
44+
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
4645
.build());
4746
return new ActorResponse(responseBuilder.build());
4847
}

src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
1212
import lombok.AllArgsConstructor;
1313

14-
import java.util.List;
14+
import java.util.Arrays;
1515

1616
/**
1717
* Reduce streamer actor invokes user defined functions to handle reduce requests.
@@ -71,7 +71,7 @@ private ActorResponse buildEOFResponse() {
7171
// set a dummy result with the keys.
7272
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
7373
.newBuilder()
74-
.addAllKeys(List.of(this.keys))
74+
.addAllKeys(Arrays.asList(this.keys))
7575
.build());
7676
return new ActorResponse(responseBuilder.build());
7777
}

src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import java.util.ArrayList;
1212
import java.util.Arrays;
13-
import java.util.List;
1413

1514
/**
1615
* OutputStreamObserverImpl transforms a message to an ActorResponse.
@@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message, Sessionreduce.KeyedWindow k
4241
.addAllKeys(message.getKeys()
4342
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
4443
.addAllTags(
45-
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
44+
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
4645
.build());
4746
return ActorResponse.builder()
4847
.response(responseBuilder.build())

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.time.Instant;
1818
import java.util.ArrayList;
19+
import java.util.Arrays;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
@@ -157,7 +158,7 @@ public void onCompleted() {
157158
.newBuilder()
158159
.addAllKeys(
159160
datum.getKeys()
160-
== null ? new ArrayList<>() : List.of(datum.getKeys()))
161+
== null ? new ArrayList<>() : Arrays.asList(datum.getKeys()))
161162
.setValue(datum.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
162163
datum.getValue()))
163164
.setId(datum.getId())

0 commit comments

Comments
 (0)