Skip to content

Commit 1d74fbb

Browse files
authored
[client] Fix the IllegalStateException error that ArrowBuf refCount <= 0 (#1068)
1 parent 4b46a08 commit 1d74fbb

File tree

6 files changed

+14
-7
lines changed

6 files changed

+14
-7
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
7070
checkArgument(writeRecord.getKey() == null, "key must be null for log record");
7171
checkNotNull(row != null, "row must not be null for log record");
7272
checkNotNull(callback, "write callback must be not null");
73-
if (recordsBuilder.isFull() || recordsBuilder.isClosed()) {
73+
if (recordsBuilder.isClosed() || recordsBuilder.isFull()) {
7474
return false;
7575
} else {
7676
recordsBuilder.append(ChangeType.APPEND_ONLY, row);

fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,11 @@ private RecordAppendResult tryAppend(
639639
if (last != null) {
640640
boolean success = last.tryAppend(writeRecord, callback);
641641
if (!success) {
642+
// TODO For ArrowLogWriteBatch, close here is a heavy operation (including build
643+
// logic), we need to avoid do that in an lock which locked dq. However, why we not
644+
// remove build logic out of close for ArrowLogWriteBatch is that we want to release
645+
// non-heap memory hold by arrowWriter as soon as possible to avoid OOM. Maybe we
646+
// need to introduce a more reasonable way to solve these two problems.
642647
last.close();
643648
} else {
644649
return new RecordAppendResult(deque.size() > 1 || last.isClosed(), false, false);

fluss-common/src/main/java/com/alibaba/fluss/record/KvRecordBatchBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class KvRecordBatchBuilder implements AutoCloseable {
5353
private int batchSequence;
5454
private int currentRecordNumber;
5555
private int sizeInBytes;
56-
private boolean isClosed;
56+
private volatile boolean isClosed;
5757
private final KvFormat kvFormat;
5858
private boolean aborted = false;
5959

fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable {
5454
private final AbstractPagedOutputView pagedOutputView;
5555
private final boolean appendOnly;
5656

57-
private MultiBytesView bytesView = null;
57+
private volatile MultiBytesView bytesView = null;
58+
5859
private long writerId;
5960
private int batchSequence;
6061
private int estimatedSizeInBytes;
6162
private int recordCount;
62-
private boolean isClosed;
63+
private volatile boolean isClosed;
6364
private boolean reCalculateSizeInBytes = false;
6465
private boolean resetBatchHeader = false;
6566
private boolean aborted = false;
@@ -216,10 +217,10 @@ public void close() throws Exception {
216217
return;
217218
}
218219

220+
isClosed = true;
221+
219222
// Build arrowBatch when batch close to recycle arrow writer.
220223
build();
221-
222-
isClosed = true;
223224
}
224225

225226
public void recycleArrowWriter() {

fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
5959
private int batchSequence;
6060
private int currentRecordNumber;
6161
private int sizeInBytes;
62-
private boolean isClosed;
62+
private volatile boolean isClosed;
6363
private boolean aborted = false;
6464

6565
private MemoryLogRecordsIndexedBuilder(

fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ public void increaseEpoch() {
284284
public void recycle(long epoch) {
285285
if (this.epoch == epoch) {
286286
root.clear();
287+
recordsCount = 0;
287288
provider.recycleWriter(this);
288289
}
289290
}

0 commit comments

Comments
 (0)