Skip to content

Commit f9e7526

Browse files
committed
fix rebase issues
1 parent 801d074 commit f9e7526

File tree

3 files changed

+31
-27
lines changed

3 files changed

+31
-27
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,10 +1565,27 @@ void testPkCompactedProject() throws Exception {
15651565
}
15661566
upsert.flush();
15671567

1568-
// Creating a projected log scanner for COMPACTED should fail
1569-
assertThatThrownBy(() -> createLogScanner(table, new int[] {0, 2}))
1570-
.isInstanceOf(IllegalArgumentException.class)
1571-
.hasMessageContaining("Projection is not supported for COMPACTED log format");
1568+
// Creating a projected log scanner for COMPACTED should work
1569+
try (LogScanner scanner = createLogScanner(table, new int[] {0, 2})) {
1570+
subscribeFromBeginning(scanner, table);
1571+
int seen = 0;
1572+
while (seen < 10) {
1573+
ScanRecords records = scanner.poll(Duration.ofSeconds(5));
1574+
for (ScanRecord record : records) {
1575+
InternalRow row = record.getRow();
1576+
assertThat(row.getFieldCount()).isEqualTo(2);
1577+
int a = row.getInt(0);
1578+
assertThat(a).isEqualTo(seen);
1579+
if (a % 2 == 0) {
1580+
assertThat(row.getString(1).toString()).isEqualTo("v" + a);
1581+
} else {
1582+
assertThat(row.isNullAt(1)).isTrue();
1583+
}
1584+
seen++;
1585+
}
1586+
}
1587+
assertThat(seen).isEqualTo(10);
1588+
}
15721589
}
15731590
}
15741591

fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,15 @@
3232
import java.io.IOException;
3333

3434
import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
35+
3536
/**
36-
* An immutable log record for @CompactedRow which can be directly persisted. The on-wire schema is
37-
* identical to IndexedLogRecord but the row payload uses the CompactedRow binary format:
37+
* An immutable log record for {@link CompactedRow} which can be directly persisted. The on-wire
38+
* schema is identical to IndexedLogRecord but the row payload uses the CompactedRow binary format:
3839
*
3940
* <ul>
40-
* <li>Length => int32 (total number of bytes following this length field)</li>
41-
* <li>Attributes => int8 (low 4 bits encode {@link ChangeType})</li>
42-
* <li>Value => {@link CompactedRow} (bytes in compacted row format)</li>
41+
* <li>Length => int32 (total number of bytes following this length field)
42+
* <li>Attributes => int8 (low 4 bits encode {@link ChangeType})
43+
* <li>Value => {@link CompactedRow} (bytes in compacted row format)
4344
* </ul>
4445
*
4546
* <p>Differences vs {@link IndexedLogRecord}: - Uses CompactedRow encoding which is space-optimized
@@ -48,12 +49,6 @@
4849
* in a CompactedRow with a {@link CompactedRowDeserializer} and only decode to object values when a
4950
* field is accessed. - The record header (Length + Attributes) layout and attribute semantics are
5051
* the same.
51-
* Differences vs {@link IndexedLogRecord}:
52-
* - Uses CompactedRow encoding which is space-optimized (VLQ for ints/longs, per-row null bitset) and
53-
* trades CPU for smaller storage; random access to fields is not supported without decoding.
54-
* - Deserialization is lazy: we wrap the underlying bytes in a CompactedRow with a
55-
* {@link CompactedRowDeserializer} and only decode to object values when a field is accessed.
56-
* - The record header (Length + Attributes) layout and attribute semantics are the same.
5752
*
5853
* <p>The offset computes the difference relative to the base offset of the batch containing this
5954
* record.
@@ -170,24 +165,22 @@ private static int calculateSizeInBytes(BinaryRow row) {
170165
int size = 1; // one byte for attributes
171166
size += row.getSizeInBytes();
172167
return size;
173-
int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH;
174-
return deserializeCompactedRow(
175-
sizeInBytes - rowOffset, segment, offset + rowOffset, fieldTypes, LogFormat.COMPACTED);
176168
}
177169

178-
179170
private static InternalRow deserializeCompactedRow(
180171
int length,
181172
MemorySegment segment,
182173
int position,
183174
DataType[] fieldTypes,
184175
LogFormat logFormat) {
185176
if (logFormat == LogFormat.COMPACTED) {
186-
CompactedRow compactedRow = new CompactedRow(fieldTypes.length, compactedDeserializer);
177+
CompactedRow compactedRow =
178+
new CompactedRow(fieldTypes.length, new CompactedRowDeserializer(fieldTypes));
187179
compactedRow.pointTo(segment, position, length);
188180
return compactedRow;
189181
} else {
190-
throw new IllegalArgumentException("No such compacted row deserializer for: " + logFormat);
182+
throw new IllegalArgumentException(
183+
"No such compacted row deserializer for: " + logFormat);
191184
}
192185
}
193186
}

fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,6 @@ public static LogRecordReadContext createReadContext(
7979
Projection.of(
8080
IntStream.range(0, rowType.getFieldCount()).toArray(),
8181
tableInfo.getSchema());
82-
} else {
83-
// Explicitly forbid any projection for COMPACTED log format
84-
if (logFormat == LogFormat.COMPACTED) {
85-
throw new IllegalArgumentException(
86-
"Projection is not supported for COMPACTED log format. Please remove projection.");
87-
}
8882
}
8983

9084
if (logFormat == LogFormat.ARROW) {

0 commit comments

Comments
 (0)