Skip to content

Commit e7d12a1

Browse files
committed
flink read, write and lookup
1 parent 46a6db5 commit e7d12a1

22 files changed

+366
-79
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,34 @@
2222
import org.apache.fluss.row.InternalRow;
2323
import org.apache.fluss.row.TimestampLtz;
2424
import org.apache.fluss.row.TimestampNtz;
25+
import org.apache.fluss.types.RowType;
2526

2627
import org.apache.flink.table.data.DecimalData;
2728
import org.apache.flink.table.data.RowData;
2829
import org.apache.flink.table.data.TimestampData;
2930

31+
import javax.annotation.Nullable;
32+
33+
import java.util.List;
34+
3035
/** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
3136
public class FlinkAsFlussRow implements InternalRow {
3237

3338
private RowData flinkRow;
3439

35-
public FlinkAsFlussRow() {}
40+
/**
41+
* Creates a FlinkAsFlussRow with a mapping from Fluss field index to Flink field index. If the
42+
* indexMapping is null, flink and flink share same field name.
43+
*/
44+
@Nullable private final int[] indexMapping;
45+
46+
public FlinkAsFlussRow() {
47+
this(null);
48+
}
49+
50+
public FlinkAsFlussRow(int[] indexMapping) {
51+
this.indexMapping = indexMapping;
52+
}
3653

3754
public FlinkAsFlussRow replace(RowData flinkRow) {
3855
this.flinkRow = flinkRow;
@@ -41,62 +58,76 @@ public FlinkAsFlussRow replace(RowData flinkRow) {
4158

4259
@Override
4360
public int getFieldCount() {
44-
return flinkRow.getArity();
61+
if (indexMapping == null) {
62+
return flinkRow.getArity();
63+
} else {
64+
return indexMapping.length;
65+
}
4566
}
4667

4768
@Override
4869
public boolean isNullAt(int pos) {
49-
return flinkRow.isNullAt(pos);
70+
return (indexMapping != null && indexMapping[pos] == -1) || flinkRow.isNullAt(pos);
5071
}
5172

5273
@Override
5374
public boolean getBoolean(int pos) {
54-
return flinkRow.getBoolean(pos);
75+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
76+
return flinkRow.getBoolean(flussPos);
5577
}
5678

5779
@Override
5880
public byte getByte(int pos) {
59-
return flinkRow.getByte(pos);
81+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
82+
return flinkRow.getByte(flussPos);
6083
}
6184

6285
@Override
6386
public short getShort(int pos) {
64-
return flinkRow.getShort(pos);
87+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
88+
return flinkRow.getShort(flussPos);
6589
}
6690

6791
@Override
6892
public int getInt(int pos) {
69-
return flinkRow.getInt(pos);
93+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
94+
return flinkRow.getInt(flussPos);
7095
}
7196

7297
@Override
7398
public long getLong(int pos) {
74-
return flinkRow.getLong(pos);
99+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
100+
return flinkRow.getLong(flussPos);
75101
}
76102

77103
@Override
78104
public float getFloat(int pos) {
79-
return flinkRow.getFloat(pos);
105+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
106+
return flinkRow.getFloat(flussPos);
80107
}
81108

82109
@Override
83110
public double getDouble(int pos) {
84-
return flinkRow.getDouble(pos);
111+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
112+
return flinkRow.getDouble(flussPos);
85113
}
86114

87115
@Override
88116
public BinaryString getChar(int pos, int length) {
89-
return BinaryString.fromBytes(flinkRow.getString(pos).toBytes());
117+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
118+
return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes());
90119
}
91120

92121
@Override
93122
public BinaryString getString(int pos) {
94-
return BinaryString.fromBytes(flinkRow.getString(pos).toBytes());
123+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
124+
return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes());
95125
}
96126

97127
@Override
98128
public Decimal getDecimal(int pos, int precision, int scale) {
99-
return fromFlinkDecimal(flinkRow.getDecimal(pos, precision, scale));
129+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
130+
return fromFlinkDecimal(flinkRow.getDecimal(flussPos, precision, scale));
100131
}
101132

102133
public static Decimal fromFlinkDecimal(DecimalData decimal) {
@@ -111,25 +142,42 @@ public static Decimal fromFlinkDecimal(DecimalData decimal) {
111142

112143
@Override
113144
public TimestampNtz getTimestampNtz(int pos, int precision) {
114-
TimestampData timestamp = flinkRow.getTimestamp(pos, precision);
145+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
146+
TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision);
115147
return TimestampNtz.fromMillis(
116148
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
117149
}
118150

119151
@Override
120152
public TimestampLtz getTimestampLtz(int pos, int precision) {
121-
TimestampData timestamp = flinkRow.getTimestamp(pos, precision);
153+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
154+
TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision);
122155
return TimestampLtz.fromEpochMillis(
123156
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
124157
}
125158

126159
@Override
127160
public byte[] getBinary(int pos, int length) {
128-
return flinkRow.getBinary(pos);
161+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
162+
return flinkRow.getBinary(flussPos);
129163
}
130164

131165
@Override
132166
public byte[] getBytes(int pos) {
133-
return flinkRow.getBinary(pos);
167+
int flussPos = indexMapping == null ? pos : indexMapping[pos];
168+
return flinkRow.getBinary(flussPos);
169+
}
170+
171+
/** Converts a Flink {@link RowType} to a Fluss {@link RowType}, based on the field name. */
172+
public static FlinkAsFlussRow from(
173+
org.apache.flink.table.types.logical.RowType finkRowType, RowType flussRowType) {
174+
int[] indexMapping = new int[flussRowType.getFieldCount()];
175+
List<String> fieldNames = flussRowType.getFieldNames();
176+
for (int i = 0; i < flussRowType.getFieldCount(); i++) {
177+
String fieldName = fieldNames.get(i);
178+
int fieldIndex = finkRowType.getFieldIndex(fieldName);
179+
indexMapping[i] = fieldIndex;
180+
}
181+
return new FlinkAsFlussRow(indexMapping);
134182
}
135183
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public void setup(int numChannels) {
9393
partitionGetter != null && numBucket % numChannels != 0;
9494

9595
try {
96+
// no need to read real database, thus assume to deserialize the fluss row as same as
97+
// flink table type.
9698
this.serializationSchema.open(new SerializerInitContextImpl(flussRowType));
9799
} catch (Exception e) {
98100
throw new FlussRuntimeException(e);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,17 @@ public interface FlussSerializationSchema<T> extends Serializable {
6262
@PublicEvolving
6363
interface InitializationContext {
6464
/**
65-
* Returns the target row schema.
65+
* Returns the Fluss physical row schema.
6666
*
6767
* @return The schema of the target row.
6868
*/
6969
RowType getRowSchema();
70+
71+
/**
72+
* Returns the Flink table row schema.
73+
*
74+
* @return The schema of the Flink table row.
75+
*/
76+
org.apache.flink.table.types.logical.RowType getTableRowType();
7077
}
7178
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public RowDataSerializationSchema(boolean isAppendOnly, boolean ignoreDelete) {
6363
*/
6464
@Override
6565
public void open(InitializationContext context) throws Exception {
66-
this.converter = new FlinkAsFlussRow();
66+
this.converter = FlinkAsFlussRow.from(context.getTableRowType(), context.getRowSchema());
6767
}
6868

6969
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.types.RowType;
2121

22+
import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
2223
import static org.apache.fluss.utils.Preconditions.checkNotNull;
2324

2425
/**
@@ -27,14 +28,26 @@
2728
*/
2829
public class SerializerInitContextImpl implements FlussSerializationSchema.InitializationContext {
2930

30-
private final RowType rowSchema;
31+
private final RowType flussRowSchema;
32+
private final org.apache.flink.table.types.logical.RowType flinkRowType;
3133

3234
public SerializerInitContextImpl(RowType rowSchema) {
33-
this.rowSchema = checkNotNull(rowSchema, "rowSchema");
35+
this(rowSchema, toFlinkRowType(rowSchema));
36+
}
37+
38+
public SerializerInitContextImpl(
39+
RowType rowSchema, org.apache.flink.table.types.logical.RowType flinkRowType) {
40+
this.flussRowSchema = checkNotNull(rowSchema, "flussRowSchema");
41+
this.flinkRowType = checkNotNull(flinkRowType, "flinkRowType");
3442
}
3543

3644
@Override
3745
public RowType getRowSchema() {
38-
return rowSchema;
46+
return flussRowSchema;
47+
}
48+
49+
@Override
50+
public org.apache.flink.table.types.logical.RowType getTableRowType() {
51+
return flinkRowType;
3952
}
4053
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,15 @@ public void initialize(SinkWriterMetricGroup metricGroup) {
110110
metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
111111
connection = ConnectionFactory.createConnection(flussConfig, flinkMetricRegistry);
112112
table = connection.getTable(tablePath);
113+
LOG.info(
114+
"Current Fluss Schema is {}, Table RowType is {}",
115+
table.getTableInfo().getSchema(),
116+
tableRowType);
113117
sanityCheck(table.getTableInfo());
114118

115119
try {
116120
this.serializationSchema.open(
117-
new SerializerInitContextImpl(table.getTableInfo().getRowType()));
121+
new SerializerInitContextImpl(table.getTableInfo().getRowType(), tableRowType));
118122
} catch (Exception e) {
119123
throw new FlussRuntimeException(e);
120124
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.fluss.flink.source;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.client.table.Table;
2023
import org.apache.fluss.config.Configuration;
2124
import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
2225
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
@@ -32,6 +35,9 @@
3235
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
3336
import org.apache.fluss.lake.source.LakeSource;
3437
import org.apache.fluss.lake.source.LakeSplit;
38+
import org.apache.fluss.metadata.Schema;
39+
import org.apache.fluss.metadata.SchemaInfo;
40+
import org.apache.fluss.metadata.TableInfo;
3541
import org.apache.fluss.metadata.TablePath;
3642
import org.apache.fluss.predicate.Predicate;
3743
import org.apache.fluss.types.RowType;
@@ -181,18 +187,28 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
181187
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
182188
new FlinkSourceReaderMetrics(context.metricGroup());
183189

190+
TableInfo tableInfo;
191+
try (Connection connection = ConnectionFactory.createConnection(flussConf);
192+
Table table = connection.getTable(tablePath)) {
193+
tableInfo = table.getTableInfo();
194+
}
195+
196+
Schema schema = tableInfo.getSchema();
197+
184198
deserializationSchema.open(
185199
new DeserializerInitContextImpl(
186200
context.metricGroup().addGroup("deserializer"),
187201
context.getUserCodeClassLoader(),
188-
sourceOutputType));
202+
sourceOutputType,
203+
schema.getRowType()));
189204
FlinkRecordEmitter<OUT> recordEmitter = new FlinkRecordEmitter<>(deserializationSchema);
190205

191206
return new FlinkSourceReader<>(
192207
elementsQueue,
193208
flussConf,
194209
tablePath,
195210
sourceOutputType,
211+
new SchemaInfo(schema, tableInfo.getSchemaId()),
196212
context,
197213
projectedFields,
198214
flinkSourceReaderMetrics,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/DeserializerInitContextImpl.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,26 @@ public class DeserializerInitContextImpl
3131

3232
private final MetricGroup metricGroup;
3333
private final UserCodeClassLoader userCodeClassLoader;
34-
private final RowType rowSchema;
34+
private final RowType tableOutputType;
35+
private final RowType flussRowType;
3536

3637
public DeserializerInitContextImpl(
37-
MetricGroup metricGroup, UserCodeClassLoader userCodeClassLoader, RowType rowSchema) {
38+
MetricGroup metricGroup,
39+
UserCodeClassLoader userCodeClassLoader,
40+
RowType tableOutputType) {
41+
// todo: remove it
42+
this(metricGroup, userCodeClassLoader, tableOutputType, tableOutputType);
43+
}
44+
45+
public DeserializerInitContextImpl(
46+
MetricGroup metricGroup,
47+
UserCodeClassLoader userCodeClassLoader,
48+
RowType tableOutputType,
49+
RowType flussRowType) {
3850
this.metricGroup = metricGroup;
3951
this.userCodeClassLoader = userCodeClassLoader;
40-
this.rowSchema = rowSchema;
52+
this.tableOutputType = tableOutputType;
53+
this.flussRowType = flussRowType;
4154
}
4255

4356
@Override
@@ -52,6 +65,11 @@ public UserCodeClassLoader getUserCodeClassLoader() {
5265

5366
@Override
5467
public RowType getRowSchema() {
55-
return rowSchema;
68+
return flussRowType;
69+
}
70+
71+
@Override
72+
public RowType getOutPutRowType() {
73+
return null;
5674
}
5775
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/FlussDeserializationSchema.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,10 @@ interface InitializationContext {
103103
* @return The schema of the {@link LogRecord#getRow()}.
104104
*/
105105
RowType getRowSchema();
106+
107+
/*
108+
* Returns the schema of user need.
109+
*/
110+
RowType getOutPutRowType();
106111
}
107112
}

0 commit comments

Comments
 (0)