Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ limitations under the License.


<properties>
<fluss.version>0.7.0</fluss.version>
<fluss.version>0.9.0-incubating</fluss.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>${fluss.version}</version>
</dependency>
Expand Down Expand Up @@ -66,34 +66,34 @@ limitations under the License.
</dependency>

<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-test-utils</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<!-- In Flink CDC project has Pipeline Sink Connector for Fluss. we import fluss-fink for Fluss Sink Connector just for test purpose -->
<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-common</artifactId>
<version>${fluss.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-1.20</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
Expand Down Expand Up @@ -129,7 +129,7 @@ limitations under the License.
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>com.alibaba.fluss:*</include>
<include>org.apache.fluss:*</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.fluss.sink.FlussDataSink;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;

import java.util.HashMap;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussSink;

import com.alibaba.fluss.config.Configuration;
import org.apache.fluss.config.Configuration;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.DataType;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.DataType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -129,12 +130,12 @@ private TablePath getTablePath(TableId tableId) {

private static class TableSchemaInfo {
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema;
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
Map<Integer, Integer> indexMapping;

private TableSchemaInfo(
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema) {
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
this.upstreamCdcSchema = upstreamCdcSchema;
this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
this.indexMapping =
Expand All @@ -144,8 +145,8 @@ private TableSchemaInfo(
}

static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
com.alibaba.fluss.metadata.Schema inferredFlussSchema,
com.alibaba.fluss.metadata.Schema currentFlussNewSchema) {
org.apache.fluss.metadata.Schema inferredFlussSchema,
org.apache.fluss.metadata.Schema currentFlussNewSchema) {
List<String> inferredSchemaColumnNames = inferredFlussSchema.getColumnNames();
Map<String, Integer> reverseIndex = new HashMap<>();
for (int i = 0; i < inferredSchemaColumnNames.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.table.api.ValidationException;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.fluss.sink.row;

import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.TimestampData;

import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalMap;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;

import static org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow.fromFlinkDecimal;

/** Wraps a CDC {@link ArrayData} as a Fluss {@link InternalArray}. */
public class CdcAsFlussArray implements InternalArray {

private final ArrayData flussArray;

public CdcAsFlussArray(ArrayData flussArray) {
this.flussArray = flussArray;
}

@Override
public int size() {
return flussArray.size();
}

@Override
public boolean[] toBooleanArray() {
return flussArray.toBooleanArray();
}

@Override
public byte[] toByteArray() {
return flussArray.toByteArray();
}

@Override
public short[] toShortArray() {
return flussArray.toShortArray();
}

@Override
public int[] toIntArray() {
return flussArray.toIntArray();
}

@Override
public long[] toLongArray() {
return flussArray.toLongArray();
}

@Override
public float[] toFloatArray() {
return flussArray.toFloatArray();
}

@Override
public double[] toDoubleArray() {
return flussArray.toDoubleArray();
}

@Override
public boolean isNullAt(int pos) {
return flussArray.isNullAt(pos);
}

@Override
public boolean getBoolean(int pos) {
return flussArray.getBoolean(pos);
}

@Override
public byte getByte(int pos) {
return flussArray.getByte(pos);
}

@Override
public short getShort(int pos) {
return flussArray.getShort(pos);
}

@Override
public int getInt(int pos) {
return flussArray.getInt(pos);
}

@Override
public long getLong(int pos) {
return flussArray.getLong(pos);
}

@Override
public float getFloat(int pos) {
return flussArray.getFloat(pos);
}

@Override
public double getDouble(int pos) {
return flussArray.getDouble(pos);
}

@Override
public BinaryString getChar(int pos, int length) {
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
}

@Override
public BinaryString getString(int pos) {
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
return fromFlinkDecimal(flussArray.getDecimal(pos, precision, scale));
}

@Override
public TimestampNtz getTimestampNtz(int pos, int precision) {
TimestampData timestamp = flussArray.getTimestamp(pos, precision);
return TimestampNtz.fromMillis(
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
}

@Override
public TimestampLtz getTimestampLtz(int pos, int precision) {
TimestampData timestamp = flussArray.getTimestamp(pos, precision);
return TimestampLtz.fromEpochMillis(
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
}

@Override
public byte[] getBinary(int pos, int length) {
return flussArray.getBinary(pos);
}

@Override
public byte[] getBytes(int pos) {
return flussArray.getBinary(pos);
}

@Override
public InternalArray getArray(int pos) {
return new CdcAsFlussArray(flussArray.getArray(pos));
}

@Override
public InternalMap getMap(int pos) {
return new CdcAsFlussMap(flussArray.getMap(pos));
}

@Override
public InternalRow getRow(int pos, int numFields) {
return CdcAsFlussRow.replace(flussArray.getRecord(pos, numFields));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.fluss.sink.row;

import org.apache.flink.cdc.common.data.MapData;

import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalMap;

/** Wraps a Cdc {@link MapData} as a Fluss {@link InternalMap}. */
public class CdcAsFlussMap implements InternalMap {

private final MapData cdcMap;

public CdcAsFlussMap(MapData cdcMap) {
this.cdcMap = cdcMap;
}

@Override
public int size() {
return cdcMap.size();
}

@Override
public InternalArray keyArray() {
return new CdcAsFlussArray(cdcMap.keyArray());
}

@Override
public InternalArray valueArray() {
return new CdcAsFlussArray(cdcMap.valueArray());
}
}
Loading
Loading