Skip to content

Commit 4cf486d

Browse files
authored
[flink] Union read decouple with paimon for log table (#1527)
1 parent 1c358ef commit 4cf486d

26 files changed

+993
-55
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.alibaba.fluss.config.Configuration;
2525
import com.alibaba.fluss.exception.FlussRuntimeException;
2626
import com.alibaba.fluss.exception.InvalidTableException;
27-
import com.alibaba.fluss.flink.lakehouse.LakeCatalog;
27+
import com.alibaba.fluss.flink.lake.LakeCatalog;
2828
import com.alibaba.fluss.flink.procedure.ProcedureManager;
2929
import com.alibaba.fluss.flink.utils.CatalogExceptionUtils;
3030
import com.alibaba.fluss.flink.utils.DataLakeUtils;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.alibaba.fluss.config.ConfigOptions;
2121
import com.alibaba.fluss.config.Configuration;
2222
import com.alibaba.fluss.flink.FlinkConnectorOptions;
23-
import com.alibaba.fluss.flink.lakehouse.LakeTableFactory;
23+
import com.alibaba.fluss.flink.lake.LakeTableFactory;
2424
import com.alibaba.fluss.flink.sink.FlinkTableSink;
2525
import com.alibaba.fluss.flink.source.FlinkTableSource;
2626
import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
@@ -143,7 +143,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
143143
cache,
144144
partitionDiscoveryIntervalMs,
145145
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
146-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
146+
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
147+
context.getCatalogTable().getOptions());
147148
}
148149

149150
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java renamed to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.alibaba.fluss.flink.lakehouse;
18+
package com.alibaba.fluss.flink.lake;
1919

2020
import org.apache.flink.table.catalog.CatalogBaseTable;
2121
import org.apache.flink.table.catalog.ObjectPath;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java renamed to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.alibaba.fluss.flink.lakehouse;
18+
package com.alibaba.fluss.flink.lake;
1919

2020
import com.alibaba.fluss.client.table.scanner.ScanRecord;
21+
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
2122
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
2223
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
2324
import com.alibaba.fluss.flink.source.reader.RecordAndPos;
@@ -48,6 +49,9 @@ public void emitRecord(
4849
((PaimonSnapshotAndFlussLogSplitState) splitState)
4950
.setRecordsToSkip(recordAndPos.readRecordsCount());
5051
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
52+
} else if (splitState instanceof LakeSnapshotSplitState) {
53+
((LakeSnapshotSplitState) splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
54+
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
5155
} else {
5256
throw new UnsupportedOperationException(
5357
"Unknown split state type: " + splitState.getClass());

0 commit comments

Comments
 (0)