Skip to content

Commit 99b0d06

Browse files
committed
address comments
1 parent 604e6f6 commit 99b0d06

File tree

8 files changed

+152
-71
lines changed

8 files changed

+152
-71
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public ScanRecords poll(Duration timeout) {
163163
}
164164
} while (System.nanoTime() - startNanos < timeoutNanos);
165165

166-
return ScanRecords.empty();
166+
return ScanRecords.EMPTY;
167167
} finally {
168168
release();
169169
scannerMetricGroup.recordPollEnd(System.currentTimeMillis());

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@
3737
*/
3838
@PublicEvolving
3939
public class ScanRecords implements Iterable<ScanRecord> {
40-
public static final ScanRecords empty() {
41-
return new ScanRecords(Collections.emptyMap());
42-
}
40+
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap());
4341

4442
private final Map<TableBucket, List<ScanRecord>> records;
4543

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public AppendWriter createWriter() {
4141

4242
@Override
4343
public <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass) {
44-
AppendWriterImpl delegate = new AppendWriterImpl(tablePath, tableInfo, writerClient);
45-
return new TypedAppendWriterImpl<>(delegate, pojoClass, tableInfo);
44+
return new TypedAppendWriterImpl<>(createWriter(), pojoClass, tableInfo);
4645
}
4746
}

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ public UpsertWriter createWriter() {
9898

9999
@Override
100100
public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
101-
UpsertWriterImpl delegate =
102-
new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient);
103-
return new TypedUpsertWriterImpl<>(delegate, pojoClass, tableInfo, targetColumns);
101+
return new TypedUpsertWriterImpl<>(createWriter(), pojoClass, tableInfo, targetColumns);
104102
}
105103
}

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableWriter.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* @since 0.1
2929
*/
3030
@PublicEvolving
31-
public interface TableWriter extends AutoCloseable {
31+
public interface TableWriter {
3232

3333
/**
3434
* Flush data written that have not yet been sent to the server, forcing the client to send the
@@ -38,9 +38,4 @@ public interface TableWriter extends AutoCloseable {
3838
* results in an error.
3939
*/
4040
void flush();
41-
42-
@Override
43-
default void close() throws Exception {
44-
// by default do nothing
45-
}
4641
}

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,12 @@
3030
*/
3131
class TypedAppendWriterImpl<T> implements TypedAppendWriter<T> {
3232

33-
private final AppendWriterImpl delegate;
34-
private final Class<T> pojoClass;
33+
private final AppendWriter delegate;
3534
private final RowType tableSchema;
3635
private final PojoToRowConverter<T> pojoToRowConverter;
3736

38-
TypedAppendWriterImpl(AppendWriterImpl delegate, Class<T> pojoClass, TableInfo tableInfo) {
37+
TypedAppendWriterImpl(AppendWriter delegate, Class<T> pojoClass, TableInfo tableInfo) {
3938
this.delegate = delegate;
40-
this.pojoClass = pojoClass;
4139
this.tableSchema = tableInfo.getRowType();
4240
this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, tableSchema, tableSchema);
4341
}
@@ -47,11 +45,6 @@ public void flush() {
4745
delegate.flush();
4846
}
4947

50-
@Override
51-
public void close() throws Exception {
52-
delegate.close();
53-
}
54-
5548
@Override
5649
public CompletableFuture<AppendResult> append(T record) {
5750
if (record instanceof InternalRow) {

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,10 @@
3333
*/
3434
class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
3535

36-
@Override
37-
public void flush() {
38-
delegate.flush();
39-
}
40-
41-
private final UpsertWriterImpl delegate;
42-
43-
@Override
44-
public void close() throws Exception {
45-
delegate.close();
46-
}
47-
48-
private final Class<T> pojoClass;
36+
private final UpsertWriter delegate;
4937
private final TableInfo tableInfo;
5038
private final RowType tableSchema;
51-
private final int[] targetColumns; // may be null
39+
@Nullable private final int[] targetColumns;
5240

5341
private final RowType pkProjection;
5442
@Nullable private final RowType targetProjection;
@@ -58,12 +46,8 @@ public void close() throws Exception {
5846
@Nullable private final PojoToRowConverter<T> targetConverter;
5947

6048
TypedUpsertWriterImpl(
61-
UpsertWriterImpl delegate,
62-
Class<T> pojoClass,
63-
TableInfo tableInfo,
64-
int[] targetColumns) {
49+
UpsertWriter delegate, Class<T> pojoClass, TableInfo tableInfo, int[] targetColumns) {
6550
this.delegate = delegate;
66-
this.pojoClass = pojoClass;
6751
this.tableInfo = tableInfo;
6852
this.tableSchema = tableInfo.getRowType();
6953
this.targetColumns = targetColumns;
@@ -82,6 +66,11 @@ public void close() throws Exception {
8266
: PojoToRowConverter.of(pojoClass, tableSchema, targetProjection);
8367
}
8468

69+
@Override
70+
public void flush() {
71+
delegate.flush();
72+
}
73+
8574
@Override
8675
public CompletableFuture<UpsertResult> upsert(T record) {
8776
if (record instanceof InternalRow) {
@@ -124,6 +113,9 @@ private InternalRow convertPojo(T pojo, boolean forDelete) {
124113
// set PK fields, others null
125114
for (String pk : tableInfo.getPhysicalPrimaryKeys()) {
126115
int projIndex = projection.getFieldIndex(pk);
116+
117+
// TODO: this can be optimized by pre-computing
118+
// the index mapping in the constructor?
127119
int fullIndex = tableSchema.getFieldIndex(pk);
128120
full.setField(fullIndex, projected.getField(projIndex));
129121
}

0 commit comments

Comments
 (0)