Skip to content

Commit 2582ab9

Browse files
committed
fix comments
1 parent 3b34734 commit 2582ab9

File tree

5 files changed

+44
-51
lines changed

5 files changed

+44
-51
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iceberg.DeleteFile;
3232
import org.apache.iceberg.RowDelta;
3333
import org.apache.iceberg.Snapshot;
34+
import org.apache.iceberg.SnapshotUpdate;
3435
import org.apache.iceberg.Table;
3536
import org.apache.iceberg.catalog.Catalog;
3637
import org.apache.iceberg.catalog.TableIdentifier;
@@ -102,31 +103,21 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
102103
for (DataFile dataFile : committable.getDataFiles()) {
103104
appendFiles.appendFile(dataFile);
104105
}
105-
if (!committable.getDeleteFiles().isEmpty()) {
106-
throw new IllegalStateException(
107-
"Delete files are not supported in append-only mode. "
108-
+ "Found "
109-
+ committable.getDeleteFiles().size()
110-
+ " delete files.");
111-
}
112106

113107
addFlussProperties(appendFiles, snapshotProperties);
114108

115109
appendFiles.commit();
116110
} else {
117-
// Row delta validations are not needed for streaming changes that write equality
118-
// deletes.
119-
// Equality deletes are applied to data in all previous sequence numbers, so retries
120-
// may
121-
// push deletes further in the future, but do not affect correctness. Position
122-
// deletes
123-
// committed to the table in this path are used only to delete rows from data files
124-
// that are
125-
// being added in this commit. There is no way for data files added along with the
126-
// delete
127-
// files to be concurrently removed, so there is no need to validate the files
128-
// referenced by
129-
// the position delete files that are being committed.
111+
/**
112+
* Row delta validations are not needed for streaming changes that write equality
113+
* deletes. Equality deletes are applied to data in all previous sequence numbers,
114+
* so retries may push deletes further in the future, but do not affect correctness.
115+
* Position deletes committed to the table in this path are used only to delete rows
116+
* from data files that are being added in this commit. There is no way for data
117+
* files added along with the delete files to be concurrently removed, so there is
118+
* no need to validate the files referenced by the position delete files that are
119+
* being committed.
120+
*/
130121
RowDelta rowDelta = icebergTable.newRowDelta();
131122
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
132123
.forEach(rowDelta::addRows);
@@ -147,10 +138,10 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
147138
}
148139

149140
private void addFlussProperties(
150-
AppendFiles appendFiles, Map<String, String> snapshotProperties) {
151-
appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
141+
SnapshotUpdate<?> operation, Map<String, String> snapshotProperties) {
142+
operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
152143
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
153-
appendFiles.set(entry.getKey(), entry.getValue());
144+
operation.set(entry.getKey(), entry.getValue());
154145
}
155146
}
156147

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.alibaba.fluss.lake.writer.WriterInitContext;
2424
import com.alibaba.fluss.metadata.TablePath;
2525
import com.alibaba.fluss.record.LogRecord;
26-
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Lists;
2726

2827
import org.apache.iceberg.FileFormat;
2928
import org.apache.iceberg.PartitionSpec;
@@ -37,6 +36,7 @@
3736

3837
import java.io.Closeable;
3938
import java.io.IOException;
39+
import java.util.ArrayList;
4040
import java.util.Collections;
4141
import java.util.List;
4242

@@ -64,7 +64,7 @@ public IcebergLakeWriter(
6464

6565
private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
6666
Schema schema = icebergTable.schema();
67-
List<Integer> equalityFieldIds = Lists.newArrayList(schema.identifierFieldIds());
67+
List<Integer> equalityFieldIds = new ArrayList<>(schema.identifierFieldIds());
6868
PartitionSpec spec = icebergTable.spec();
6969

7070
// Get target file size from table properties
@@ -90,6 +90,8 @@ private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
9090
format,
9191
outputFileFactory,
9292
targetFileSize);
93+
} else {
94+
return null;
9395
}
9496
} else {
9597
if (spec.isUnpartitioned()) {
@@ -102,9 +104,10 @@ private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
102104
format,
103105
outputFileFactory,
104106
targetFileSize);
107+
} else {
108+
return null;
105109
}
106110
}
107-
return null;
108111
}
109112

110113
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
2121
import com.alibaba.fluss.metadata.TableBucket;
2222
import com.alibaba.fluss.record.LogRecord;
23-
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Lists;
2423
import com.alibaba.fluss.types.RowType;
2524

2625
import org.apache.iceberg.FileFormat;
@@ -31,10 +30,10 @@
3130
import org.apache.iceberg.io.FileAppenderFactory;
3231
import org.apache.iceberg.io.OutputFileFactory;
3332
import org.apache.iceberg.io.TaskWriter;
34-
import org.apache.iceberg.util.ArrayUtil;
3533

3634
import javax.annotation.Nullable;
3735

36+
import java.util.ArrayList;
3837
import java.util.List;
3938

4039
/** A {@link RecordWriter} to write to Iceberg's primary-key table. */
@@ -63,17 +62,19 @@ private static TaskWriter<Record> createTaskWriter(
6362
FileFormat format,
6463
OutputFileFactory outputFileFactory,
6564
long targetFileSize) {
66-
65+
int[] equalityFieldIds =
66+
icebergTable.schema().identifierFieldIds().stream()
67+
.mapToInt(Integer::intValue)
68+
.toArray();
6769
FileAppenderFactory<Record> appenderFactory =
6870
new GenericAppenderFactory(
6971
icebergTable.schema(),
7072
icebergTable.spec(),
71-
ArrayUtil.toIntArray(
72-
Lists.newArrayList(icebergTable.schema().identifierFieldIds())),
73+
equalityFieldIds,
7374
icebergTable.schema(),
7475
null);
7576

76-
List<String> columns = Lists.newArrayList();
77+
List<String> columns = new ArrayList<>();
7778
for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
7879
columns.add(icebergTable.schema().findField(fieldId).name());
7980
}
@@ -99,9 +100,8 @@ public void write(LogRecord record) throws Exception {
99100
deltaWriter.write(flussRecordAsIcebergRecord);
100101
break;
101102
case UPDATE_BEFORE:
102-
deltaWriter.delete(flussRecordAsIcebergRecord);
103-
break;
104103
case DELETE:
104+
// TODO we can project the record and only write the equality delete fields
105105
deltaWriter.delete(flussRecordAsIcebergRecord);
106106
break;
107107
default:

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ public void delete(Record row) throws IOException {
5656
deltaWriter.delete(row);
5757
}
5858

59-
// The caller of this function is responsible for passing in a record with only the key fields
60-
public void deleteKey(Record key) throws IOException {
61-
deltaWriter.deleteKey(key);
62-
}
63-
6459
@Override
6560
public void close() throws IOException {
6661
deltaWriter.close();
@@ -73,13 +68,13 @@ private GenericEqualityDeltaWriter(
7368
}
7469

7570
@Override
76-
protected StructLike asStructLike(Record row) {
77-
return row;
71+
protected StructLike asStructLike(Record record) {
72+
return record;
7873
}
7974

8075
@Override
81-
protected StructLike asStructLikeKey(Record data) {
82-
return data;
76+
protected StructLike asStructLikeKey(Record record) {
77+
return record;
8378
}
8479
}
8580
}

fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,17 @@ private static Stream<Arguments> tieringWriteArgs() {
105105
@ParameterizedTest
106106
@MethodSource("tieringWriteArgs")
107107
void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
108-
TablePath tablePath = TablePath.of("iceberg", "test_table");
108+
int bucketNum = 3;
109+
TablePath tablePath =
110+
TablePath.of(
111+
"iceberg",
112+
String.format(
113+
"test_tiering_table_%s",
114+
isPrimaryKeyTable ? "primary_key" : "log"));
109115
createTable(tablePath, isPrimaryKeyTable);
110116

111117
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
112118

113-
int bucketNum = 3;
114-
115119
Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
116120

117121
List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
@@ -126,14 +130,14 @@ void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
126130
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
127131
isPrimaryKeyTable
128132
? genPrimaryKeyTableRecords(bucket)
129-
: genLogTableRecords(bucket, 5);
133+
: genLogTableRecords(bucket, 10);
130134

131135
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
132136
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
137+
recordsByBucket.put(bucket, expectRecords);
133138
for (LogRecord record : writtenRecords) {
134139
writer.write(record);
135140
}
136-
recordsByBucket.put(bucket, expectRecords);
137141
IcebergWriteResult result = writer.complete();
138142
byte[] serialized = writeResultSerializer.serialize(result);
139143
icebergWriteResults.add(
@@ -167,7 +171,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
167171
if (isPrimaryKeyTable) {
168172
verifyPrimaryKeyTableRecord(actualRecords, expectRecords, bucket);
169173
} else {
170-
verifyLogTableRecords(actualRecords, bucket, expectRecords);
174+
verifyLogTableRecords(actualRecords, expectRecords, bucket);
171175
}
172176
}
173177
}
@@ -329,8 +333,8 @@ private CloseableIterator<Record> getIcebergRows(Table table, int bucket) {
329333

330334
private void verifyLogTableRecords(
331335
CloseableIterator<Record> actualRecords,
332-
int expectBucket,
333-
List<LogRecord> expectRecords) {
336+
List<LogRecord> expectRecords,
337+
int expectBucket) {
334338
for (LogRecord expectRecord : expectRecords) {
335339
Record actualRecord = actualRecords.next();
336340
// check business columns:

0 commit comments

Comments
 (0)