Skip to content

Commit 382830c

Browse files
committed
rebase branch
1 parent cc1d2c8 commit 382830c

File tree

5 files changed

+35
-32
lines changed

5 files changed

+35
-32
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public CompletableFuture<AppendResult> append(InternalRow row) {
9090
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey);
9191
} else if (logFormat == LogFormat.COMPACTED) {
9292
CompactedRow compactedRow = encodeCompactedRow(row);
93-
record = WriteRecord.forCompactedAppend(tableInfo, physicalPath, compactedRow, bucketKey);
93+
record =
94+
WriteRecord.forCompactedAppend(
95+
tableInfo, physicalPath, compactedRow, bucketKey);
9496
} else {
9597
// ARROW format supports general internal row
9698
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey);

fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,35 @@ public IndexedLogWriteBatch(
5959
MemoryLogRecordsIndexedBuilder.builder(
6060
schemaId, writeLimit, outputView, true);
6161

62-
@Override
63-
public boolean isLogBatch() {
64-
return true;
65-
}
62+
@Override
63+
public boolean isLogBatch() {
64+
return true;
65+
}
66+
67+
@Override
68+
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
69+
throws Exception {
70+
checkNotNull(callback, "write callback must be not null");
71+
checkNotNull(writeRecord.getRow(), "row must not be null for log record");
72+
checkArgument(
73+
writeRecord.getKey() == null, "key must be null for log record");
74+
checkArgument(
75+
writeRecord.getTargetColumns() == null,
76+
"target columns must be null for log record");
77+
checkArgument(
78+
writeRecord.getRow() instanceof IndexedRow,
79+
"row must not be IndexRow for indexed log table");
80+
IndexedRow row = (IndexedRow) writeRecord.getRow();
81+
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
82+
return false;
83+
} else {
84+
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
85+
recordCount++;
86+
callbacks.add(callback);
87+
return true;
88+
}
89+
}
6690

67-
@Override
68-
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
69-
checkNotNull(callback, "write callback must be not null");
70-
checkNotNull(writeRecord.getRow(), "row must not be null for log record");
71-
checkArgument(writeRecord.getKey() == null, "key must be null for log record");
72-
checkArgument(
73-
writeRecord.getTargetColumns() == null,
74-
"target columns must be null for log record");
75-
checkArgument(
76-
writeRecord.getRow() instanceof IndexedRow,
77-
"row must not be IndexRow for indexed log table");
78-
IndexedRow row = (IndexedRow) writeRecord.getRow();
79-
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
80-
return false;
81-
} else {
82-
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
83-
recordCount++;
84-
callbacks.add(callback);
85-
return true;
86-
}
87-
}
8891
@Override
8992
public boolean hasRoomFor(IndexedRow row) {
9093
return delegate.hasRoomFor(row);

fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.PhysicalTablePath;
22-
import org.apache.fluss.record.CompactedLogRecord;
2322
import org.apache.fluss.metadata.TableInfo;
23+
import org.apache.fluss.record.CompactedLogRecord;
2424
import org.apache.fluss.record.DefaultKvRecord;
2525
import org.apache.fluss.record.IndexedLogRecord;
2626
import org.apache.fluss.row.BinaryRow;
@@ -53,8 +53,7 @@ public static WriteRecord forUpsert(
5353
checkNotNull(row, "row must not be null");
5454
checkNotNull(key, "key must not be null");
5555
checkNotNull(bucketKey, "bucketKey must not be null");
56-
int estimatedSizeInBytes =
57-
DefaultKvRecord.sizeOf(key, row) + RECORD_BATCH_HEADER_SIZE;
56+
int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, row) + RECORD_BATCH_HEADER_SIZE;
5857
return new WriteRecord(
5958
tableInfo,
6059
tablePath,

fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ public CloseableIterator<LogRecord> records(ReadContext context) {
233233
case INDEXED:
234234
return rowRecordIterator(
235235
rowType, context.getOutputProjectedRow(schemaId), timestamp);
236-
return rowRecordIterator(rowType, timestamp);
237236
case COMPACTED:
238237
return compactedRowRecordIterator(rowType, timestamp);
239238
default:

fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public static LogRecordReadContext createReadContext(
110110
int[] selectedFields = projection.getProjectionPositions();
111111
return createIndexedReadContext(rowType, schemaId, selectedFields, schemaGetter);
112112
} else if (logFormat == LogFormat.COMPACTED) {
113-
int[] selectedFields = projection.getProjection();
113+
int[] selectedFields = projection.getProjectionPositions();
114114
return createCompactedRowReadContext(rowType, schemaId, selectedFields);
115115
} else {
116116
throw new IllegalArgumentException("Unsupported log format: " + logFormat);
@@ -200,7 +200,7 @@ public static LogRecordReadContext createCompactedRowReadContext(
200200
FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields);
201201
// for COMPACTED log format, the projection is NEVER push downed to the server side
202202
return new LogRecordReadContext(
203-
LogFormat.COMPACTED, rowType, schemaId, null, null, fieldGetters, false);
203+
LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, null);
204204
}
205205

206206
private LogRecordReadContext(

0 commit comments

Comments
 (0)