Skip to content

Commit cf23fc3

Browse files
committed
improve some code
1 parent c63180c commit cf23fc3

File tree

15 files changed

+247
-406
lines changed

15 files changed

+247
-406
lines changed

.idea/vcs.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.table.writer;
1919

20+
import org.apache.fluss.client.write.WriteFormat;
2021
import org.apache.fluss.client.write.WriteRecord;
2122
import org.apache.fluss.client.write.WriterClient;
2223
import org.apache.fluss.metadata.DataLakeFormat;
@@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
5051
private final KeyEncoder bucketKeyEncoder;
5152

5253
private final KvFormat kvFormat;
54+
private final WriteFormat writeFormat;
5355
private final RowEncoder rowEncoder;
5456
private final FieldGetter[] fieldGetters;
5557
private final TableInfo tableInfo;
@@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7880
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);
7981

8082
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
83+
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
8184
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
8285
this.fieldGetters = InternalRow.createFieldGetters(rowType);
8386
this.tableInfo = tableInfo;
@@ -164,6 +167,7 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
164167
encodeRow(row),
165168
key,
166169
bucketKey,
170+
writeFormat,
167171
targetColumns);
168172
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
169173
}
@@ -182,7 +186,12 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
182186
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
183187
WriteRecord record =
184188
WriteRecord.forDelete(
185-
tableInfo, getPhysicalPath(row), key, bucketKey, targetColumns);
189+
tableInfo,
190+
getPhysicalPath(row),
191+
key,
192+
bucketKey,
193+
writeFormat,
194+
targetColumns);
186195
return send(record).thenApply(ignored -> DELETE_SUCCESS);
187196
}
188197

fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.memory.MemorySegment;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
2424
import org.apache.fluss.record.ChangeType;
25+
import org.apache.fluss.record.MemoryLogRecordsRowBuilder;
2526
import org.apache.fluss.record.bytesview.BytesView;
2627
import org.apache.fluss.row.InternalRow;
2728

@@ -38,44 +39,20 @@
3839
*/
3940
abstract class AbstractRowLogWriteBatch<R> extends WriteBatch {
4041

41-
interface RecordsBuilderAdapter<T> {
42-
boolean hasRoomFor(T row);
43-
44-
void append(ChangeType changeType, T row) throws Exception;
45-
46-
BytesView build() throws IOException;
47-
48-
boolean isClosed();
49-
50-
void close() throws Exception;
51-
52-
void setWriterState(long writerId, int batchSequence);
53-
54-
long writerId();
55-
56-
int batchSequence();
57-
58-
void abort();
59-
60-
void resetWriterState(long writerId, int batchSequence);
61-
62-
int getSizeInBytes();
63-
}
64-
6542
private final AbstractPagedOutputView outputView;
66-
private final RecordsBuilderAdapter<R> builder;
43+
private final MemoryLogRecordsRowBuilder<R> recordsBuilder;
6744
private final String buildErrorMessage;
6845

6946
protected AbstractRowLogWriteBatch(
7047
int bucketId,
7148
PhysicalTablePath physicalTablePath,
7249
long createdMs,
7350
AbstractPagedOutputView outputView,
74-
RecordsBuilderAdapter<R> builder,
51+
MemoryLogRecordsRowBuilder<R> recordsBuilder,
7552
String buildErrorMessage) {
7653
super(bucketId, physicalTablePath, createdMs);
7754
this.outputView = outputView;
78-
this.builder = builder;
55+
this.recordsBuilder = recordsBuilder;
7956
this.buildErrorMessage = buildErrorMessage;
8057
}
8158

@@ -90,34 +67,39 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
9067
"target columns must be null for log record");
9168

9269
R row = requireAndCastRow(rowObj);
93-
if (!builder.hasRoomFor(row) || isClosed()) {
70+
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
9471
return false;
9572
}
96-
builder.append(ChangeType.APPEND_ONLY, row);
73+
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
9774
recordCount++;
9875
callbacks.add(callback);
9976
return true;
10077
}
10178

10279
protected abstract R requireAndCastRow(InternalRow row);
10380

81+
@Override
82+
public boolean isLogBatch() {
83+
return true;
84+
}
85+
10486
@Override
10587
public BytesView build() {
10688
try {
107-
return builder.build();
89+
return recordsBuilder.build();
10890
} catch (IOException e) {
10991
throw new FlussRuntimeException(buildErrorMessage, e);
11092
}
11193
}
11294

11395
@Override
11496
public boolean isClosed() {
115-
return builder.isClosed();
97+
return recordsBuilder.isClosed();
11698
}
11799

118100
@Override
119101
public void close() throws Exception {
120-
builder.close();
102+
recordsBuilder.close();
121103
reopened = false;
122104
}
123105

@@ -128,32 +110,32 @@ public List<MemorySegment> pooledMemorySegments() {
128110

129111
@Override
130112
public void setWriterState(long writerId, int batchSequence) {
131-
builder.setWriterState(writerId, batchSequence);
113+
recordsBuilder.setWriterState(writerId, batchSequence);
132114
}
133115

134116
@Override
135117
public long writerId() {
136-
return builder.writerId();
118+
return recordsBuilder.writerId();
137119
}
138120

139121
@Override
140122
public int batchSequence() {
141-
return builder.batchSequence();
123+
return recordsBuilder.batchSequence();
142124
}
143125

144126
@Override
145127
public void abortRecordAppends() {
146-
builder.abort();
128+
recordsBuilder.abort();
147129
}
148130

149131
@Override
150132
public void resetWriterState(long writerId, int batchSequence) {
151133
super.resetWriterState(writerId, batchSequence);
152-
builder.resetWriterState(writerId, batchSequence);
134+
recordsBuilder.resetWriterState(writerId, batchSequence);
153135
}
154136

155137
@Override
156138
public int estimatedSizeInBytes() {
157-
return builder.getSizeInBytes();
139+
return recordsBuilder.getSizeInBytes();
158140
}
159141
}

fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,13 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.memory.AbstractPagedOutputView;
2222
import org.apache.fluss.metadata.PhysicalTablePath;
23-
import org.apache.fluss.record.ChangeType;
2423
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
25-
import org.apache.fluss.record.bytesview.BytesView;
2624
import org.apache.fluss.row.InternalRow;
2725
import org.apache.fluss.row.compacted.CompactedRow;
2826
import org.apache.fluss.rpc.messages.ProduceLogRequest;
2927

3028
import javax.annotation.concurrent.NotThreadSafe;
3129

32-
import java.io.IOException;
33-
3430
import static org.apache.fluss.utils.Preconditions.checkArgument;
3531

3632
/**
@@ -55,74 +51,10 @@ public CompactedLogWriteBatch(
5551
physicalTablePath,
5652
createdMs,
5753
outputView,
58-
new RecordsBuilderAdapter<CompactedRow>() {
59-
private final MemoryLogRecordsCompactedBuilder delegate =
60-
MemoryLogRecordsCompactedBuilder.builder(
61-
schemaId, writeLimit, outputView, true);
62-
63-
@Override
64-
public boolean hasRoomFor(CompactedRow row) {
65-
return delegate.hasRoomFor(row);
66-
}
67-
68-
@Override
69-
public void append(ChangeType changeType, CompactedRow row) throws Exception {
70-
delegate.append(changeType, row);
71-
}
72-
73-
@Override
74-
public BytesView build() throws IOException {
75-
return delegate.build();
76-
}
77-
78-
@Override
79-
public boolean isClosed() {
80-
return delegate.isClosed();
81-
}
82-
83-
@Override
84-
public void close() throws Exception {
85-
delegate.close();
86-
}
87-
88-
@Override
89-
public void setWriterState(long writerId, int batchSequence) {
90-
delegate.setWriterState(writerId, batchSequence);
91-
}
92-
93-
@Override
94-
public long writerId() {
95-
return delegate.writerId();
96-
}
97-
98-
@Override
99-
public int batchSequence() {
100-
return delegate.batchSequence();
101-
}
102-
103-
@Override
104-
public void abort() {
105-
delegate.abort();
106-
}
107-
108-
@Override
109-
public void resetWriterState(long writerId, int batchSequence) {
110-
delegate.resetWriterState(writerId, batchSequence);
111-
}
112-
113-
@Override
114-
public int getSizeInBytes() {
115-
return delegate.getSizeInBytes();
116-
}
117-
},
54+
MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit, outputView, true),
11855
"Failed to build compacted log record batch.");
11956
}
12057

121-
@Override
122-
public boolean isLogBatch() {
123-
return true;
124-
}
125-
12658
@Override
12759
protected CompactedRow requireAndCastRow(InternalRow row) {
12860
checkArgument(

fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.memory.AbstractPagedOutputView;
2222
import org.apache.fluss.metadata.PhysicalTablePath;
23-
import org.apache.fluss.record.ChangeType;
2423
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
25-
import org.apache.fluss.record.bytesview.BytesView;
2624
import org.apache.fluss.row.indexed.IndexedRow;
2725
import org.apache.fluss.rpc.messages.ProduceLogRequest;
2826

2927
import javax.annotation.concurrent.NotThreadSafe;
3028

31-
import java.io.IOException;
32-
3329
import static org.apache.fluss.utils.Preconditions.checkArgument;
3430

3531
/**
@@ -54,74 +50,10 @@ public IndexedLogWriteBatch(
5450
physicalTablePath,
5551
createdMs,
5652
outputView,
57-
new RecordsBuilderAdapter<IndexedRow>() {
58-
private final MemoryLogRecordsIndexedBuilder delegate =
59-
MemoryLogRecordsIndexedBuilder.builder(
60-
schemaId, writeLimit, outputView, true);
61-
62-
@Override
63-
public boolean hasRoomFor(IndexedRow row) {
64-
return delegate.hasRoomFor(row);
65-
}
66-
67-
@Override
68-
public void append(ChangeType changeType, IndexedRow row) throws Exception {
69-
delegate.append(changeType, row);
70-
}
71-
72-
@Override
73-
public BytesView build() throws IOException {
74-
return delegate.build();
75-
}
76-
77-
@Override
78-
public boolean isClosed() {
79-
return delegate.isClosed();
80-
}
81-
82-
@Override
83-
public void close() throws Exception {
84-
delegate.close();
85-
}
86-
87-
@Override
88-
public void setWriterState(long writerId, int batchSequence) {
89-
delegate.setWriterState(writerId, batchSequence);
90-
}
91-
92-
@Override
93-
public long writerId() {
94-
return delegate.writerId();
95-
}
96-
97-
@Override
98-
public int batchSequence() {
99-
return delegate.batchSequence();
100-
}
101-
102-
@Override
103-
public void abort() {
104-
delegate.abort();
105-
}
106-
107-
@Override
108-
public void resetWriterState(long writerId, int batchSequence) {
109-
delegate.resetWriterState(writerId, batchSequence);
110-
}
111-
112-
@Override
113-
public int getSizeInBytes() {
114-
return delegate.getSizeInBytes();
115-
}
116-
},
53+
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView, true),
11754
"Failed to build indexed log record batch.");
11855
}
11956

120-
@Override
121-
public boolean isLogBatch() {
122-
return true;
123-
}
124-
12557
@Override
12658
protected IndexedRow requireAndCastRow(org.apache.fluss.row.InternalRow row) {
12759
checkArgument(row instanceof IndexedRow, "row must be IndexRow for indexed log table");

0 commit comments

Comments
 (0)