Skip to content

Commit e096170

Browse files
authored
[client] Recycle ArrowWriter when MemoryLogRecordsArrowBuilder closed instead of built (#973)
1 parent 8cacbca commit e096170

File tree

3 files changed

+6
-8
lines changed

3 files changed

+6
-8
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void setWriterState(long writerId, int batchSequence) {
119119
@Override
120120
public void resetWriterState(long writerId, int batchSequence) {
121121
super.resetWriterState(writerId, batchSequence);
122-
recordsBuilder.resetWriterState(writerId, batchSequence);
122+
recordsBuilder.setWriterState(writerId, batchSequence);
123123
}
124124

125125
@Override

fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,10 @@ public int batchSequence() {
190190
}
191191

192192
public void setWriterState(long writerId, int batchBaseSequence) {
193-
this.writerId = writerId;
194-
this.batchSequence = batchBaseSequence;
195-
}
196-
197-
public void resetWriterState(long writerId, int batchSequence) {
198193
// trigger to rewrite batch header when next build.
199194
this.resetBatchHeader = true;
200195
this.writerId = writerId;
201-
this.batchSequence = batchSequence;
196+
this.batchSequence = batchBaseSequence;
202197
}
203198

204199
public void abort() {
@@ -221,6 +216,9 @@ public void close() throws Exception {
221216
return;
222217
}
223218

219+
// Build arrowBatch when batch close to recycle arrow writer.
220+
build();
221+
224222
isClosed = true;
225223
}
226224

fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ void testResetWriterState() throws Exception {
327327

328328
// test reset writer state and build (This situation will happen when the produceLog request
329329
// failed and the batch is re-enqueue to send with different write state).
330-
builder.resetWriterState(1L, 1);
330+
builder.setWriterState(1L, 1);
331331
records = MemoryLogRecords.pointToBytesView(builder.build());
332332
assertLogRecordsEquals(DATA1_ROW_TYPE, records, expectedResult);
333333
recordBatch = records.batches().iterator().next();

0 commit comments

Comments
 (0)