Skip to content

Commit d0b466f

Browse files
committed
[client] Unify Memory Allocation with MemorySegmentPool for Code Consistency (alibaba#290)
- Unified Memory Allocation: Replaced `WriterBufferPool` with `MemorySegmentPool` to ensure code consistency across the project. - Removed `trySerialize()` from `WriteBatch`: Eliminated the `trySerialize()`` method on `WriteBatch`. The fallback logic for handling memory request timeouts is no longer necessary because all batching memories are pre-allocated before the WriteBatch is created. In the rare case where pre-allocated memory is insufficient, we allocate from the heap directly. - Removed `serialize()` from `WriteBatch`: Moved the serialization logic from `serialize()` to `build()` in `WriteBatch`. This simplification addresses potential concurrency issues and streamlines the process. The original purpose of serialize() was to release the ArrowWriter as soon as possible, which occurred when `WriteBatch#close()`. Now, this release happens in `WriteBatch#build()`, which is called when sending RPC. This change should not introduce significant problems, as the ArrowWriter is reused. - Added `buffer_usage_ratio` for `ArrowWriter`: Introduced a `buffer_usage_ratio` for `ArrowWriter` to prevent excessive memory buffer usage. This reduces the likelihood of heap allocation, thereby minimizing GC overhead. If heap allocation does occur, it is still manageable and does not pose a major issue.
1 parent 71f97f1 commit d0b466f

File tree

57 files changed

+967
-980
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+967
-980
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java

+4-35
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
import com.alibaba.fluss.exception.FlussRuntimeException;
2121
import com.alibaba.fluss.memory.AbstractPagedOutputView;
2222
import com.alibaba.fluss.memory.MemorySegment;
23-
import com.alibaba.fluss.memory.MemorySegmentPool;
24-
import com.alibaba.fluss.memory.PreAllocatedManagedPagedOutputView;
2523
import com.alibaba.fluss.metadata.PhysicalTablePath;
2624
import com.alibaba.fluss.metadata.TableBucket;
2725
import com.alibaba.fluss.record.MemoryLogRecordsArrowBuilder;
2826
import com.alibaba.fluss.record.RowKind;
2927
import com.alibaba.fluss.record.bytesview.BytesView;
30-
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
3128
import com.alibaba.fluss.row.InternalRow;
3229
import com.alibaba.fluss.row.arrow.ArrowWriter;
3330
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
@@ -37,7 +34,6 @@
3734

3835
import java.io.IOException;
3936
import java.util.List;
40-
import java.util.stream.Collectors;
4137

4238
/**
4339
* A batch of log records managed in ARROW format that is or will be sent to server by {@link
@@ -50,22 +46,17 @@
5046
public class ArrowLogWriteBatch extends WriteBatch {
5147
private final MemoryLogRecordsArrowBuilder recordsBuilder;
5248
private final AbstractPagedOutputView outputView;
53-
private final List<MemorySegment> preAllocatedMemorySegments;
5449

5550
public ArrowLogWriteBatch(
5651
TableBucket tableBucket,
5752
PhysicalTablePath physicalTablePath,
5853
int schemaId,
5954
ArrowWriter arrowWriter,
60-
List<MemorySegment> preAllocatedMemorySegments,
61-
MemorySegmentPool memorySegmentSource) {
55+
AbstractPagedOutputView outputView) {
6256
super(tableBucket, physicalTablePath);
63-
this.outputView =
64-
new PreAllocatedManagedPagedOutputView(
65-
preAllocatedMemorySegments, memorySegmentSource);
57+
this.outputView = outputView;
6658
this.recordsBuilder =
6759
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView);
68-
this.preAllocatedMemorySegments = preAllocatedMemorySegments;
6960
}
7061

7162
@Override
@@ -88,20 +79,6 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
8879
}
8980
}
9081

91-
@Override
92-
public void serialize() {
93-
try {
94-
recordsBuilder.serialize();
95-
} catch (IOException e) {
96-
throw new FlussRuntimeException("Failed to serialize Arrow batch to memory buffer.", e);
97-
}
98-
}
99-
100-
@Override
101-
public boolean trySerialize() {
102-
return recordsBuilder.trySerialize();
103-
}
104-
10582
@Override
10683
public BytesView build() {
10784
try {
@@ -128,16 +105,8 @@ public int sizeInBytes() {
128105
}
129106

130107
@Override
131-
public List<MemorySegment> memorySegments() {
132-
List<MemorySegment> usedMemorySegments =
133-
outputView.getSegmentBytesViewList().stream()
134-
.map(MemorySegmentBytesView::getMemorySegment)
135-
.collect(Collectors.toList());
136-
if (usedMemorySegments.size() > preAllocatedMemorySegments.size()) {
137-
return usedMemorySegments;
138-
} else {
139-
return preAllocatedMemorySegments;
140-
}
108+
public List<MemorySegment> pooledMemorySegments() {
109+
return outputView.allocatedPooledSegments();
141110
}
142111

143112
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java

+12-30
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,21 @@
1717
package com.alibaba.fluss.client.write;
1818

1919
import com.alibaba.fluss.annotation.Internal;
20-
import com.alibaba.fluss.annotation.VisibleForTesting;
2120
import com.alibaba.fluss.exception.FlussRuntimeException;
21+
import com.alibaba.fluss.memory.AbstractPagedOutputView;
2222
import com.alibaba.fluss.memory.MemorySegment;
2323
import com.alibaba.fluss.metadata.PhysicalTablePath;
2424
import com.alibaba.fluss.metadata.TableBucket;
25-
import com.alibaba.fluss.record.MemoryLogRecords;
2625
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
2726
import com.alibaba.fluss.record.RowKind;
2827
import com.alibaba.fluss.record.bytesview.BytesView;
29-
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
3028
import com.alibaba.fluss.row.InternalRow;
3129
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
3230
import com.alibaba.fluss.utils.Preconditions;
3331

3432
import javax.annotation.concurrent.NotThreadSafe;
3533

3634
import java.io.IOException;
37-
import java.util.Collections;
3835
import java.util.List;
3936

4037
/**
@@ -46,14 +43,19 @@
4643
@NotThreadSafe
4744
@Internal
4845
public final class IndexedLogWriteBatch extends WriteBatch {
46+
private final AbstractPagedOutputView outputView;
4947
private final MemoryLogRecordsIndexedBuilder recordsBuilder;
5048

5149
public IndexedLogWriteBatch(
5250
TableBucket tableBucket,
5351
PhysicalTablePath physicalTablePath,
54-
MemoryLogRecordsIndexedBuilder recordsBuilder) {
52+
int schemaId,
53+
int writeLimit,
54+
AbstractPagedOutputView outputView) {
5555
super(tableBucket, physicalTablePath);
56-
this.recordsBuilder = recordsBuilder;
56+
this.outputView = outputView;
57+
this.recordsBuilder =
58+
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView);
5759
}
5860

5961
@Override
@@ -77,34 +79,14 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
7779
}
7880

7981
@Override
80-
public void serialize() {
81-
// do nothing, records are serialized into memory buffer when appending
82-
}
83-
84-
@Override
85-
public boolean trySerialize() {
86-
// records have been serialized.
87-
return true;
88-
}
89-
90-
@VisibleForTesting
91-
public MemoryLogRecords records() {
82+
public BytesView build() {
9283
try {
9384
return recordsBuilder.build();
9485
} catch (IOException e) {
95-
throw new FlussRuntimeException("build memory log records failed", e);
86+
throw new FlussRuntimeException("Failed to build indexed log record batch.", e);
9687
}
9788
}
9889

99-
@Override
100-
public BytesView build() {
101-
MemoryLogRecords memoryLogRecords = records();
102-
return new MemorySegmentBytesView(
103-
memoryLogRecords.getMemorySegment(),
104-
memoryLogRecords.getPosition(),
105-
memoryLogRecords.sizeInBytes());
106-
}
107-
10890
@Override
10991
public boolean isClosed() {
11092
return recordsBuilder.isClosed();
@@ -117,8 +99,8 @@ public void close() throws Exception {
11799
}
118100

119101
@Override
120-
public List<MemorySegment> memorySegments() {
121-
return Collections.singletonList(recordsBuilder.getMemorySegment());
102+
public List<MemorySegment> pooledMemorySegments() {
103+
return outputView.allocatedPooledSegments();
122104
}
123105

124106
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java

+22-36
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.alibaba.fluss.client.write;
1818

1919
import com.alibaba.fluss.annotation.Internal;
20-
import com.alibaba.fluss.annotation.VisibleForTesting;
2120
import com.alibaba.fluss.exception.FlussRuntimeException;
21+
import com.alibaba.fluss.memory.AbstractPagedOutputView;
2222
import com.alibaba.fluss.memory.MemorySegment;
23+
import com.alibaba.fluss.metadata.KvFormat;
2324
import com.alibaba.fluss.metadata.PhysicalTablePath;
2425
import com.alibaba.fluss.metadata.TableBucket;
25-
import com.alibaba.fluss.record.DefaultKvRecordBatch;
26+
import com.alibaba.fluss.record.KvRecordBatchBuilder;
2627
import com.alibaba.fluss.record.bytesview.BytesView;
27-
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
2828
import com.alibaba.fluss.row.InternalRow;
2929
import com.alibaba.fluss.rpc.messages.PutKvRequest;
3030
import com.alibaba.fluss.utils.Preconditions;
@@ -34,7 +34,6 @@
3434

3535
import java.io.IOException;
3636
import java.util.Arrays;
37-
import java.util.Collections;
3837
import java.util.List;
3938

4039
/**
@@ -45,23 +44,30 @@
4544
@NotThreadSafe
4645
@Internal
4746
public class KvWriteBatch extends WriteBatch {
48-
private final DefaultKvRecordBatch.Builder recordsBuilder;
47+
private final AbstractPagedOutputView outputView;
48+
private final KvRecordBatchBuilder recordsBuilder;
4949
private final @Nullable int[] targetColumns;
5050

5151
public KvWriteBatch(
5252
TableBucket tableBucket,
5353
PhysicalTablePath physicalTablePath,
54-
DefaultKvRecordBatch.Builder recordsBuilder,
55-
int[] targetColumns) {
54+
int schemaId,
55+
KvFormat kvFormat,
56+
int writeLimit,
57+
AbstractPagedOutputView outputView,
58+
@Nullable int[] targetColumns) {
5659
super(tableBucket, physicalTablePath);
57-
this.recordsBuilder = recordsBuilder;
60+
this.outputView = outputView;
61+
this.recordsBuilder =
62+
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
5863
this.targetColumns = targetColumns;
5964
}
6065

6166
@Override
6267
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
6368
// currently, we throw exception directly when the target columns of the write record is
64-
// not the same as the current target columns in the batch
69+
// not the same as the current target columns in the batch.
70+
// this should be quite fast as they should be the same objects.
6571
if (!Arrays.equals(targetColumns, writeRecord.getTargetColumns())) {
6672
throw new IllegalStateException(
6773
String.format(
@@ -84,38 +90,18 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
8490
}
8591
}
8692

87-
@Override
88-
public void serialize() {
89-
// do nothing, records are serialized into memory buffer when appending
90-
}
91-
92-
@Override
93-
public boolean trySerialize() {
94-
// records have been serialized.
95-
return true;
96-
}
97-
98-
@VisibleForTesting
99-
public DefaultKvRecordBatch records() {
100-
try {
101-
return recordsBuilder.build();
102-
} catch (IOException e) {
103-
throw new FlussRuntimeException("Failed to build record batch.", e);
104-
}
105-
}
106-
10793
@Nullable
10894
public int[] getTargetColumns() {
10995
return targetColumns;
11096
}
11197

11298
@Override
11399
public BytesView build() {
114-
DefaultKvRecordBatch recordBatch = records();
115-
return new MemorySegmentBytesView(
116-
recordBatch.getMemorySegment(),
117-
recordBatch.getPosition(),
118-
recordBatch.sizeInBytes());
100+
try {
101+
return recordsBuilder.build();
102+
} catch (IOException e) {
103+
throw new FlussRuntimeException("Failed to build kv record batch.", e);
104+
}
119105
}
120106

121107
@Override
@@ -135,8 +121,8 @@ public int sizeInBytes() {
135121
}
136122

137123
@Override
138-
public List<MemorySegment> memorySegments() {
139-
return Collections.singletonList(recordsBuilder.getMemorySegment());
124+
public List<MemorySegment> pooledMemorySegments() {
125+
return outputView.allocatedPooledSegments();
140126
}
141127

142128
@Override

0 commit comments

Comments
 (0)