Skip to content

Commit 8b29134

Browse files
leonardBangThorneANN
authored andcommitted
[FLINK-38726][fluss] Bump Fluss version to 0.9.0-incubating
This closes apache#4180.
1 parent c750824 commit 8b29134

26 files changed

Lines changed: 480 additions & 236 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ limitations under the License.
3333

3434

3535
<properties>
36-
<fluss.version>0.7.0</fluss.version>
36+
<fluss.version>0.9.0-incubating</fluss.version>
3737
</properties>
3838

3939
<dependencies>
4040
<dependency>
41-
<groupId>com.alibaba.fluss</groupId>
41+
<groupId>org.apache.fluss</groupId>
4242
<artifactId>fluss-client</artifactId>
4343
<version>${fluss.version}</version>
4444
</dependency>
@@ -66,34 +66,34 @@ limitations under the License.
6666
</dependency>
6767

6868
<dependency>
69-
<groupId>com.alibaba.fluss</groupId>
69+
<groupId>org.apache.fluss</groupId>
7070
<artifactId>fluss-server</artifactId>
7171
<version>${fluss.version}</version>
7272
<scope>test</scope>
7373
</dependency>
7474
<dependency>
75-
<groupId>com.alibaba.fluss</groupId>
75+
<groupId>org.apache.fluss</groupId>
7676
<artifactId>fluss-server</artifactId>
7777
<version>${fluss.version}</version>
7878
<type>test-jar</type>
7979
<scope>test</scope>
8080
</dependency>
8181
<dependency>
82-
<groupId>com.alibaba.fluss</groupId>
82+
<groupId>org.apache.fluss</groupId>
8383
<artifactId>fluss-test-utils</artifactId>
8484
<version>${fluss.version}</version>
8585
<scope>test</scope>
8686
</dependency>
8787
<!-- In Flink CDC project has Pipeline Sink Connector for Fluss. we import fluss-fink for Fluss Sink Connector just for test purpose -->
8888
<dependency>
89-
<groupId>com.alibaba.fluss</groupId>
89+
<groupId>org.apache.fluss</groupId>
9090
<artifactId>fluss-flink-common</artifactId>
9191
<version>${fluss.version}</version>
9292
<type>test-jar</type>
9393
<scope>test</scope>
9494
</dependency>
9595
<dependency>
96-
<groupId>com.alibaba.fluss</groupId>
96+
<groupId>org.apache.fluss</groupId>
9797
<artifactId>fluss-flink-1.20</artifactId>
9898
<version>${fluss.version}</version>
9999
<scope>test</scope>
@@ -129,7 +129,7 @@ limitations under the License.
129129
<shadeTestJar>false</shadeTestJar>
130130
<artifactSet>
131131
<includes>
132-
<include>com.alibaba.fluss:*</include>
132+
<include>org.apache.fluss:*</include>
133133
</includes>
134134
</artifactSet>
135135
</configuration>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.apache.flink.cdc.common.sink.DataSink;
2424
import org.apache.flink.cdc.connectors.fluss.sink.FlussDataSink;
2525

26-
import com.alibaba.fluss.config.ConfigOptions;
27-
import com.alibaba.fluss.config.Configuration;
26+
import org.apache.fluss.config.ConfigOptions;
27+
import org.apache.fluss.config.Configuration;
2828

2929
import java.util.HashMap;
3030
import java.util.HashSet;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.cdc.common.sink.MetadataApplier;
2626
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussSink;
2727

28-
import com.alibaba.fluss.config.Configuration;
28+
import org.apache.fluss.config.Configuration;
2929

3030
import java.util.List;
3131
import java.util.Map;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2525
import org.apache.flink.cdc.common.event.TableId;
2626
import org.apache.flink.cdc.common.utils.Preconditions;
27+
import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
2728
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
2829
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
2930
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;
3031

31-
import com.alibaba.fluss.client.Connection;
32-
import com.alibaba.fluss.client.table.Table;
33-
import com.alibaba.fluss.metadata.TablePath;
34-
import com.alibaba.fluss.types.DataType;
32+
import org.apache.fluss.client.Connection;
33+
import org.apache.fluss.client.table.Table;
34+
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.types.DataType;
3536

3637
import java.io.IOException;
3738
import java.util.Collections;
@@ -129,12 +130,12 @@ private TablePath getTablePath(TableId tableId) {
129130

130131
private static class TableSchemaInfo {
131132
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
132-
com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema;
133+
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
133134
Map<Integer, Integer> indexMapping;
134135

135136
private TableSchemaInfo(
136137
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
137-
com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema) {
138+
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
138139
this.upstreamCdcSchema = upstreamCdcSchema;
139140
this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
140141
this.indexMapping =
@@ -144,8 +145,8 @@ private TableSchemaInfo(
144145
}
145146

146147
static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
147-
com.alibaba.fluss.metadata.Schema inferredFlussSchema,
148-
com.alibaba.fluss.metadata.Schema currentFlussNewSchema) {
148+
org.apache.fluss.metadata.Schema inferredFlussSchema,
149+
org.apache.fluss.metadata.Schema currentFlussNewSchema) {
149150
List<String> inferredSchemaColumnNames = inferredFlussSchema.getColumnNames();
150151
Map<String, Integer> reverseIndex = new HashMap<>();
151152
for (int i = 0; i < inferredSchemaColumnNames.size(); i++) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
import org.apache.flink.cdc.common.sink.MetadataApplier;
2727
import org.apache.flink.table.api.ValidationException;
2828

29-
import com.alibaba.fluss.client.Connection;
30-
import com.alibaba.fluss.client.ConnectionFactory;
31-
import com.alibaba.fluss.client.admin.Admin;
32-
import com.alibaba.fluss.config.Configuration;
33-
import com.alibaba.fluss.metadata.DatabaseDescriptor;
34-
import com.alibaba.fluss.metadata.TableDescriptor;
35-
import com.alibaba.fluss.metadata.TableInfo;
36-
import com.alibaba.fluss.metadata.TablePath;
29+
import org.apache.fluss.client.Connection;
30+
import org.apache.fluss.client.ConnectionFactory;
31+
import org.apache.fluss.client.admin.Admin;
32+
import org.apache.fluss.config.Configuration;
33+
import org.apache.fluss.metadata.DatabaseDescriptor;
34+
import org.apache.fluss.metadata.TableDescriptor;
35+
import org.apache.fluss.metadata.TableInfo;
36+
import org.apache.fluss.metadata.TablePath;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.fluss.sink.row;
19+
20+
import org.apache.flink.cdc.common.data.ArrayData;
21+
import org.apache.flink.cdc.common.data.TimestampData;
22+
23+
import org.apache.fluss.row.BinaryString;
24+
import org.apache.fluss.row.Decimal;
25+
import org.apache.fluss.row.InternalArray;
26+
import org.apache.fluss.row.InternalMap;
27+
import org.apache.fluss.row.InternalRow;
28+
import org.apache.fluss.row.TimestampLtz;
29+
import org.apache.fluss.row.TimestampNtz;
30+
31+
import static org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow.fromFlinkDecimal;
32+
33+
/** Wraps a CDC {@link ArrayData} as a Fluss {@link InternalArray}. */
34+
public class CdcAsFlussArray implements InternalArray {
35+
36+
private final ArrayData flussArray;
37+
38+
public CdcAsFlussArray(ArrayData flussArray) {
39+
this.flussArray = flussArray;
40+
}
41+
42+
@Override
43+
public int size() {
44+
return flussArray.size();
45+
}
46+
47+
@Override
48+
public boolean[] toBooleanArray() {
49+
return flussArray.toBooleanArray();
50+
}
51+
52+
@Override
53+
public byte[] toByteArray() {
54+
return flussArray.toByteArray();
55+
}
56+
57+
@Override
58+
public short[] toShortArray() {
59+
return flussArray.toShortArray();
60+
}
61+
62+
@Override
63+
public int[] toIntArray() {
64+
return flussArray.toIntArray();
65+
}
66+
67+
@Override
68+
public long[] toLongArray() {
69+
return flussArray.toLongArray();
70+
}
71+
72+
@Override
73+
public float[] toFloatArray() {
74+
return flussArray.toFloatArray();
75+
}
76+
77+
@Override
78+
public double[] toDoubleArray() {
79+
return flussArray.toDoubleArray();
80+
}
81+
82+
@Override
83+
public boolean isNullAt(int pos) {
84+
return flussArray.isNullAt(pos);
85+
}
86+
87+
@Override
88+
public boolean getBoolean(int pos) {
89+
return flussArray.getBoolean(pos);
90+
}
91+
92+
@Override
93+
public byte getByte(int pos) {
94+
return flussArray.getByte(pos);
95+
}
96+
97+
@Override
98+
public short getShort(int pos) {
99+
return flussArray.getShort(pos);
100+
}
101+
102+
@Override
103+
public int getInt(int pos) {
104+
return flussArray.getInt(pos);
105+
}
106+
107+
@Override
108+
public long getLong(int pos) {
109+
return flussArray.getLong(pos);
110+
}
111+
112+
@Override
113+
public float getFloat(int pos) {
114+
return flussArray.getFloat(pos);
115+
}
116+
117+
@Override
118+
public double getDouble(int pos) {
119+
return flussArray.getDouble(pos);
120+
}
121+
122+
@Override
123+
public BinaryString getChar(int pos, int length) {
124+
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
125+
}
126+
127+
@Override
128+
public BinaryString getString(int pos) {
129+
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
130+
}
131+
132+
@Override
133+
public Decimal getDecimal(int pos, int precision, int scale) {
134+
return fromFlinkDecimal(flussArray.getDecimal(pos, precision, scale));
135+
}
136+
137+
@Override
138+
public TimestampNtz getTimestampNtz(int pos, int precision) {
139+
TimestampData timestamp = flussArray.getTimestamp(pos, precision);
140+
return TimestampNtz.fromMillis(
141+
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
142+
}
143+
144+
@Override
145+
public TimestampLtz getTimestampLtz(int pos, int precision) {
146+
TimestampData timestamp = flussArray.getTimestamp(pos, precision);
147+
return TimestampLtz.fromEpochMillis(
148+
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
149+
}
150+
151+
@Override
152+
public byte[] getBinary(int pos, int length) {
153+
return flussArray.getBinary(pos);
154+
}
155+
156+
@Override
157+
public byte[] getBytes(int pos) {
158+
return flussArray.getBinary(pos);
159+
}
160+
161+
@Override
162+
public InternalArray getArray(int pos) {
163+
return new CdcAsFlussArray(flussArray.getArray(pos));
164+
}
165+
166+
@Override
167+
public InternalMap getMap(int pos) {
168+
return new CdcAsFlussMap(flussArray.getMap(pos));
169+
}
170+
171+
@Override
172+
public InternalRow getRow(int pos, int numFields) {
173+
return CdcAsFlussRow.replace(flussArray.getRecord(pos, numFields));
174+
}
175+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.fluss.sink.row;
19+
20+
import org.apache.flink.cdc.common.data.MapData;
21+
22+
import org.apache.fluss.row.InternalArray;
23+
import org.apache.fluss.row.InternalMap;
24+
25+
/** Wraps a Cdc {@link MapData} as a Fluss {@link InternalMap}. */
26+
public class CdcAsFlussMap implements InternalMap {
27+
28+
private final MapData cdcMap;
29+
30+
public CdcAsFlussMap(MapData cdcMap) {
31+
this.cdcMap = cdcMap;
32+
}
33+
34+
@Override
35+
public int size() {
36+
return cdcMap.size();
37+
}
38+
39+
@Override
40+
public InternalArray keyArray() {
41+
return new CdcAsFlussArray(cdcMap.keyArray());
42+
}
43+
44+
@Override
45+
public InternalArray valueArray() {
46+
return new CdcAsFlussArray(cdcMap.valueArray());
47+
}
48+
}

0 commit comments

Comments
 (0)