Skip to content

Commit f64f18b

Browse files
committed
fix comments
1 parent d9a7362 commit f64f18b

File tree

9 files changed

+17
-33
lines changed

9 files changed

+17
-33
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
import org.apache.arrow.vector.FieldVector;
2626
import org.apache.arrow.vector.VectorSchemaRoot;
2727

28-
/** An Arrow writer for InternalRow. */
28+
/** An Arrow writer for {@link InternalRow}. */
2929
public class ArrowWriter {
30-
/**
31-
* An array of writers which are responsible for the serialization of each column of the rows.
32-
*/
30+
private final VectorSchemaRoot root;
31+
3332
private final ArrowFieldWriter<InternalRow>[] fieldWriters;
3433

3534
private int recordsCount;
36-
private VectorSchemaRoot root;
3735

36+
/**
37+
* Writer which serializes the Fluss rows to Arrow record batches.
38+
*
39+
* @param fieldWriters An array of writers which are responsible for the serialization of each
40+
* column of the rows
41+
* @param root Container that holds a set of vectors for the rows
42+
*/
3843
public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, VectorSchemaRoot root) {
3944
this.fieldWriters = fieldWriters;
4045
this.root = root;

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public LanceCommittable deserialize(int version, byte[] serialized) throws IOExc
6161
try {
6262
return new LanceCommittable((List<FragmentMetadata>) ois.readObject());
6363
} catch (ClassNotFoundException e) {
64-
throw new RuntimeException(e);
64+
throw new IOException("Couldn't deserialize LanceCommittable", e);
6565
}
6666
}
6767
}

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ public long commit(LanceCommittable committable, Map<String, String> snapshotPro
7474
}
7575

7676
@Override
77-
public void abort(LanceCommittable committable) throws IOException {
78-
throw new UnsupportedOperationException();
79-
}
77+
public void abort(LanceCommittable committable) throws IOException {}
8078

8179
@SuppressWarnings("checkstyle:LocalVariableName")
8280
@Nullable

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
/** Implementation of {@link LakeWriter} for Lance. */
3939
public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
4040
private final LanceArrowWriter arrowWriter;
41-
FutureTask<List<FragmentMetadata>> fragmentCreationTask;
41+
private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
4242

4343
public LanceLakeWriter(Configuration options, WriterInitContext writerInitContext)
4444
throws IOException {

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public LanceWriteResult deserialize(int version, byte[] serialized) throws IOExc
5050
try {
5151
return (LanceWriteResult) ois.readObject();
5252
} catch (ClassNotFoundException e) {
53-
throw new RuntimeException(e);
53+
throw new IOException("Couldn't deserialize LanceWriteResult", e);
5454
}
5555
}
5656
}

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,19 @@
2828
import org.apache.arrow.memory.BufferAllocator;
2929
import org.apache.arrow.memory.RootAllocator;
3030
import org.apache.arrow.vector.VectorSchemaRoot;
31-
import org.apache.arrow.vector.VectorUnloader;
32-
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
3331
import org.junit.jupiter.api.Test;
3432

3533
import java.util.Arrays;
3634
import java.util.List;
3735
import java.util.concurrent.atomic.AtomicInteger;
38-
import java.util.concurrent.atomic.AtomicLong;
3936

4037
import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
4138
import static org.assertj.core.api.Assertions.assertThat;
4239

4340
/** The UT for Lance Arrow Writer. */
4441
public class LanceArrowWriterTest {
4542
@Test
46-
public void test() throws Exception {
43+
public void testLanceArrowWriter() throws Exception {
4744
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
4845
List<DataField> fields = Arrays.asList(new DataField("column1", DataTypes.INT()));
4946

@@ -56,7 +53,6 @@ public void test() throws Exception {
5653
allocator, LanceArrowUtils.toArrowSchema(rowType), batchSize, rowType);
5754
AtomicInteger rowsWritten = new AtomicInteger(0);
5855
AtomicInteger rowsRead = new AtomicInteger(0);
59-
AtomicLong expectedBytesRead = new AtomicLong(0);
6056

6157
Thread writerThread =
6258
new Thread(
@@ -89,11 +85,6 @@ public void test() throws Exception {
8985
VectorSchemaRoot root = arrowWriter.getVectorSchemaRoot();
9086
int rowCount = root.getRowCount();
9187
rowsRead.addAndGet(rowCount);
92-
try (ArrowRecordBatch recordBatch =
93-
new VectorUnloader(root).getRecordBatch()) {
94-
expectedBytesRead.addAndGet(
95-
recordBatch.computeBodyLength());
96-
}
9788
for (int i = 0; i < rowCount; i++) {
9889
int value =
9990
(int) root.getVector("column1").getObject(i);

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,6 @@ private void checkDataInLanceAppendOnlyTable(LanceConfig config, List<InternalRo
124124
LanceConfig.genReadOptionFromConfig(config))) {
125125
ArrowReader reader = dataset.newScan().scanBatches();
126126
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
127-
// while (reader.loadNextBatch()) {
128-
// System.out.print(readerRoot.contentToTSVString());
129-
// }
130127
reader.loadNextBatch();
131128
Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
132129
int rowCount = readerRoot.getRowCount();

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,6 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
171171
LanceConfig.genReadOptionFromConfig(config))) {
172172
ArrowReader reader = dataset.newScan().scanBatches();
173173
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
174-
// while (reader.loadNextBatch()) {
175-
// System.out.print(readerRoot.contentToTSVString());
176-
// }
177174

178175
// then, check data
179176
for (int bucket = 0; bucket < 3; bucket++) {
@@ -311,13 +308,10 @@ private Schema createTable(LanceConfig config) throws Exception {
311308
columns.add(new Schema.Column("c3", DataTypes.STRING()));
312309
Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns);
313310
Schema schema = schemaBuilder.build();
314-
doCreateLanceTable(schema, config);
315-
return schema;
316-
}
317-
318-
private void doCreateLanceTable(Schema schema, LanceConfig config) throws Exception {
319311
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
320312
LanceDatasetAdapter.createDataset(
321313
config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
314+
315+
return schema;
322316
}
323317
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ public void beforeEach() {
126126
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
127127
execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
128128
execEnv.setParallelism(2);
129-
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
130129
}
131130

132131
protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception {

0 commit comments

Comments
 (0)