|
17 | 17 |
|
18 | 18 | package com.alibaba.fluss.record; |
19 | 19 |
|
| 20 | +import com.alibaba.fluss.row.InternalRow; |
20 | 21 | import com.alibaba.fluss.testutils.DataTestUtils; |
21 | 22 | import com.alibaba.fluss.testutils.LogRecordsAssert; |
| 23 | +import com.alibaba.fluss.utils.CloseableIterator; |
22 | 24 |
|
23 | 25 | import org.junit.jupiter.api.Test; |
24 | 26 |
|
25 | 27 | import java.nio.ByteBuffer; |
| 28 | +import java.util.Arrays; |
26 | 29 | import java.util.function.Function; |
27 | 30 |
|
28 | 31 | import static com.alibaba.fluss.record.TestData.DATA1; |
29 | 32 | import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; |
| 33 | +import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; |
30 | 34 |
|
31 | 35 | /** Tests for {@link MemoryLogRecords}. */ |
32 | 36 | class MemoryLogRecordsTest { |
@@ -81,4 +85,29 @@ void verifyPointToByteBuffer(Function<Integer, ByteBuffer> bufferSupplier) throw |
81 | 85 | .withSchema(DATA1_ROW_TYPE) |
82 | 86 | .isEqualTo(records); |
83 | 87 | } |
| 88 | + |
| 89 | + @Test |
| 90 | + void testReadMemoryLogRecords() throws Exception { |
| 91 | + MemoryLogRecords records = |
| 92 | + DataTestUtils.genMemoryLogRecordsByObject( |
| 93 | + Arrays.asList( |
| 94 | + new Object[] {1, "A"}, |
| 95 | + new Object[] {2, "B"}, |
| 96 | + new Object[] {3, "C"})); |
| 97 | + |
| 98 | + Iterable<LogRecordBatch> batches = records.batches(); |
| 99 | + for (LogRecordBatch batch : batches) { |
| 100 | + System.out.println(batch.getRecordCount()); |
| 101 | + System.out.println(batch.lastLogOffset()); |
| 102 | + CloseableIterator<LogRecord> it = |
| 103 | + batch.records( |
| 104 | + LogRecordReadContext.createArrowReadContext( |
| 105 | + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID)); |
| 106 | + while (it.hasNext()) { |
| 107 | + LogRecord record = it.next(); |
| 108 | + InternalRow row = record.getRow(); |
| 109 | + System.out.printf("%d, %s\n", row.getInt(0), row.getString(1)); |
| 110 | + } |
| 111 | + } |
| 112 | + } |
84 | 113 | } |
0 commit comments