Skip to content

Commit f277b5f

Browse files
authored
Add null check for writer (#5550)
Signed-off-by: Hai Yan <[email protected]>
1 parent f0a00fb commit f277b5f

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ public void writeEvent(final Event event, final OutputStream outputStream) throw
116116
@Override
117117
public synchronized void complete(final OutputStream outputStream) throws IOException {
118118
isClosed = true;
119-
writer.close();
119+
if (writer != null) {
120+
writer.close();
121+
}
120122
}
121123

122124
@Override

data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java

+12
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,18 @@ void writeEvent_throws_exception_when_field_does_not_exist_in_auto_schema() thro
243243
assertThat(actualException.getMessage(), containsString(invalidFieldName));
244244
}
245245

246+
@Test
247+
void exception_in_start_should_not_cause_null_pointer_exception_when_complete() throws IOException {
248+
final ParquetOutputCodec objectUnderTest = createObjectUnderTest();
249+
250+
final File tempFile = new File(tempDirectory, FILE_NAME);
251+
LocalFilePositionOutputStream outputStream = LocalFilePositionOutputStream.create(tempFile);
252+
assertThrows(RuntimeException.class, () -> objectUnderTest.start(null, createEventRecord(generateRecords(1).get(0)), codecContext));
253+
254+
// Calling complete now should not throw
255+
objectUnderTest.complete(outputStream);
256+
}
257+
246258
@Test
247259
void getSize_returns_0_after_construction() {
248260
config.setSchema(createStandardSchema().toString());

0 commit comments

Comments
 (0)