Skip to content

Commit e9c0a6d

Browse files
committed
rebase branch
1 parent ee6eea7 commit e9c0a6d

File tree

3 files changed

+33
-29
lines changed

3 files changed

+33
-29
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public CompletableFuture<AppendResult> append(InternalRow row) {
9090
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey);
9191
} else if (logFormat == LogFormat.COMPACTED) {
9292
CompactedRow compactedRow = encodeCompactedRow(row);
93-
record = WriteRecord.forCompactedAppend(tableInfo, physicalPath, compactedRow, bucketKey);
93+
record =
94+
WriteRecord.forCompactedAppend(
95+
tableInfo, physicalPath, compactedRow, bucketKey);
9496
} else {
9597
// ARROW format supports general internal row
9698
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey);

fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,35 @@ public IndexedLogWriteBatch(
5959
MemoryLogRecordsIndexedBuilder.builder(
6060
schemaId, writeLimit, outputView, true);
6161

62-
@Override
63-
public boolean isLogBatch() {
64-
return true;
65-
}
62+
@Override
63+
public boolean isLogBatch() {
64+
return true;
65+
}
66+
67+
@Override
68+
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
69+
throws Exception {
70+
checkNotNull(callback, "write callback must be not null");
71+
checkNotNull(writeRecord.getRow(), "row must not be null for log record");
72+
checkArgument(
73+
writeRecord.getKey() == null, "key must be null for log record");
74+
checkArgument(
75+
writeRecord.getTargetColumns() == null,
76+
"target columns must be null for log record");
77+
checkArgument(
78+
writeRecord.getRow() instanceof IndexedRow,
79+
"row must not be IndexRow for indexed log table");
80+
IndexedRow row = (IndexedRow) writeRecord.getRow();
81+
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
82+
return false;
83+
} else {
84+
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
85+
recordCount++;
86+
callbacks.add(callback);
87+
return true;
88+
}
89+
}
6690

67-
@Override
68-
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
69-
checkNotNull(callback, "write callback must be not null");
70-
checkNotNull(writeRecord.getRow(), "row must not be null for log record");
71-
checkArgument(writeRecord.getKey() == null, "key must be null for log record");
72-
checkArgument(
73-
writeRecord.getTargetColumns() == null,
74-
"target columns must be null for log record");
75-
checkArgument(
76-
writeRecord.getRow() instanceof IndexedRow,
77-
"row must not be IndexRow for indexed log table");
78-
IndexedRow row = (IndexedRow) writeRecord.getRow();
79-
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
80-
return false;
81-
} else {
82-
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
83-
recordCount++;
84-
callbacks.add(callback);
85-
return true;
86-
}
87-
}
8891
@Override
8992
public boolean hasRoomFor(IndexedRow row) {
9093
return delegate.hasRoomFor(row);

fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.PhysicalTablePath;
22-
import org.apache.fluss.record.CompactedLogRecord;
2322
import org.apache.fluss.metadata.TableInfo;
23+
import org.apache.fluss.record.CompactedLogRecord;
2424
import org.apache.fluss.record.DefaultKvRecord;
2525
import org.apache.fluss.record.IndexedLogRecord;
2626
import org.apache.fluss.row.BinaryRow;
@@ -53,8 +53,7 @@ public static WriteRecord forUpsert(
5353
checkNotNull(row, "row must not be null");
5454
checkNotNull(key, "key must not be null");
5555
checkNotNull(bucketKey, "bucketKey must not be null");
56-
int estimatedSizeInBytes =
57-
DefaultKvRecord.sizeOf(key, row) + RECORD_BATCH_HEADER_SIZE;
56+
int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, row) + RECORD_BATCH_HEADER_SIZE;
5857
return new WriteRecord(
5958
tableInfo,
6059
tablePath,

0 commit comments

Comments
 (0)