Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SparkBigQueryReadSessionMetrics extends SparkListener
private final LongAccumulator scanTimeAccumulator;
private final LongAccumulator parseTimeAccumulator;
private final String sessionId;
private final SparkSession sparkSession;
private final transient SparkSession sparkSession;
private final long timestamp;
private final DataFormat readDataFormat;
private final DataOrigin dataOrigin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,30 @@ public void testReadSessionMetricsAccumulator() {
metrics.incrementScanTimeAccumulator(5000);
assertThat(metrics.getScanTime()).isEqualTo(6000);
}

@Test
public void testSerialization() throws Exception {
String sessionName = "projects/test-project/locations/us/sessions/testSession";
SparkBigQueryReadSessionMetrics metrics =
SparkBigQueryReadSessionMetrics.from(
spark,
ReadSession.newBuilder().setName(sessionName).build(),
10L,
DataFormat.ARROW,
DataOrigin.QUERY,
10L);

java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(bos);
out.writeObject(metrics);
out.close();
Comment on lines +80 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure resources are properly managed and to make the code more robust, it's recommended to use a try-with-resources statement for the ObjectOutputStream. This guarantees that the stream is closed even if an exception is thrown.

Suggested change
java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(bos);
out.writeObject(metrics);
out.close();
try (java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(bos)) {
out.writeObject(metrics);
}


java.io.ByteArrayInputStream bis = new java.io.ByteArrayInputStream(bos.toByteArray());
java.io.ObjectInputStream in = new java.io.ObjectInputStream(bis);
SparkBigQueryReadSessionMetrics deserializedMetrics =
(SparkBigQueryReadSessionMetrics) in.readObject();
Comment on lines +84 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ObjectInputStream is not being closed, which can lead to resource leaks in other contexts. It's best practice to use a try-with-resources statement to manage the stream's lifecycle automatically, ensuring it's always closed.

Suggested change
java.io.ByteArrayInputStream bis = new java.io.ByteArrayInputStream(bos.toByteArray());
java.io.ObjectInputStream in = new java.io.ObjectInputStream(bis);
SparkBigQueryReadSessionMetrics deserializedMetrics =
(SparkBigQueryReadSessionMetrics) in.readObject();
SparkBigQueryReadSessionMetrics deserializedMetrics;
try (java.io.ByteArrayInputStream bis = new java.io.ByteArrayInputStream(bos.toByteArray());
java.io.ObjectInputStream in = new java.io.ObjectInputStream(bis)) {
deserializedMetrics = (SparkBigQueryReadSessionMetrics) in.readObject();
}


assertThat(deserializedMetrics.getNumReadStreams()).isEqualTo(10L);
assertThat(deserializedMetrics.getBytesRead()).isEqualTo(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the test more thorough, it would be good to also assert that the other metric accumulators (rowsRead, parseTime, scanTime) are correctly initialized to 0 after deserialization. This will provide stronger guarantees about the serialization behavior.

    assertThat(deserializedMetrics.getBytesRead()).isEqualTo(0);
    assertThat(deserializedMetrics.getRowsRead()).isEqualTo(0);
    assertThat(deserializedMetrics.getParseTime()).isEqualTo(0);
    assertThat(deserializedMetrics.getScanTime()).isEqualTo(0);

}
}
Loading