Skip to content

Commit a71d8fa

Browse files
committed
[log] Fix the crc checksum error cause by The MemoryLogRecordsArrowBuilder#getSizeInBytes() maybe modify the memory data after closed
1 parent 00fff20 commit a71d8fa

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,12 @@ public void close() throws Exception {
236236
}
237237

238238
// update sizeInBytes and recordCount if needed
239-
getSizeInBytes();
239+
if (recordCount != arrowWriter.getRecordsCount()) {
240+
// make size in bytes up-to-date
241+
sizeInBytes =
242+
ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes() + arrowWriter.sizeInBytes();
243+
recordCount = arrowWriter.getRecordsCount();
244+
}
240245
isClosed = true;
241246
}
242247

@@ -246,12 +251,6 @@ public void deallocate() {
246251
}
247252

248253
public int getSizeInBytes() {
249-
if (recordCount != arrowWriter.getRecordsCount()) {
250-
// make size in bytes up-to-date
251-
sizeInBytes =
252-
ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes() + arrowWriter.sizeInBytes();
253-
recordCount = arrowWriter.getRecordsCount();
254-
}
255254
return sizeInBytes;
256255
}
257256

fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void close() {
120120
}
121121

122122
@VisibleForTesting
123-
Map<String, Deque<ArrowWriter>> freeWriters() {
123+
public Map<String, Deque<ArrowWriter>> freeWriters() {
124124
return freeWriters;
125125
}
126126
}

fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,44 @@ void testIllegalArgument() {
141141
"The size of first segment of pagedOutputView is too small, need at least 44 bytes.");
142142
}
143143

144+
@Test
145+
void testClose() throws Exception {
146+
int maxSizeInBytes = 1024;
147+
ArrowWriter writer =
148+
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE);
149+
MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
150+
List<RowKind> rowKinds =
151+
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
152+
List<InternalRow> rows =
153+
DATA1.stream()
154+
.map(object -> row(DATA1_ROW_TYPE, object))
155+
.collect(Collectors.toList());
156+
while (!builder.isFull()) {
157+
int rndIndex = RandomUtils.nextInt(0, DATA1.size());
158+
builder.append(rowKinds.get(rndIndex), rows.get(rndIndex));
159+
}
160+
assertThat(builder.isFull()).isTrue();
161+
162+
String tableSchemaId = 1L + "-" + 1;
163+
assertThat(provider.freeWriters().size()).isEqualTo(0);
164+
builder.close();
165+
builder.serialize();
166+
builder.setWriterState(1L, 0);
167+
MemoryLogRecords.pointToByteBuffer(builder.build().getByteBuf().nioBuffer());
168+
assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(1);
169+
int sizeInBytes = builder.getSizeInBytes();
170+
// get writer again, writer will be initial.
171+
ArrowWriter writer1 =
172+
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
173+
assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(0);
174+
175+
// Even if the writer has re-initialized, the sizeInBytes should be the same.
176+
assertThat(builder.getSizeInBytes()).isEqualTo(sizeInBytes);
177+
178+
writer.close();
179+
writer1.close();
180+
}
181+
144182
private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
145183
ArrowWriter writer, int maxPages, int pageSizeInBytes) {
146184
conf.set(

0 commit comments

Comments
 (0)