Skip to content

Commit 409ae64

Browse files
authored
[flink/datastream] Introduce converter for Fluss Row to JSON (#770)
1 parent 1d2409b commit 409ae64

File tree

5 files changed

+528
-10
lines changed

5 files changed

+528
-10
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/deserializer/JsonStringDeserializationSchema.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
package com.alibaba.fluss.flink.source.deserializer;
1818

1919
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.flink.utils.FlussRowToJsonConverters;
21+
import com.alibaba.fluss.flink.utils.TimestampFormat;
2022
import com.alibaba.fluss.record.LogRecord;
23+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
25+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
26+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
2127
import com.alibaba.fluss.types.RowType;
2228

2329
import org.apache.flink.api.common.typeinfo.TypeInformation;
2430
import org.apache.flink.api.common.typeinfo.Types;
25-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
26-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
27-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
2831

2932
import java.util.LinkedHashMap;
3033
import java.util.Map;
@@ -68,13 +71,33 @@ public class JsonStringDeserializationSchema implements FlussDeserializationSche
6871
*/
6972
private transient ObjectMapper objectMapper = new ObjectMapper();
7073

74+
/** Reusable object node. */
75+
private transient ObjectNode node;
76+
7177
/**
7278
* Reusable map for building the record representation before serializing to JSON. This avoids
7379
* creating a new Map for each record. Using LinkedHashMap to ensure a stable order of fields in
7480
* the JSON output.
7581
*/
7682
private final Map<String, Object> recordMap = new LinkedHashMap<>(4);
7783

84+
/**
85+
* Converter responsible for transforming Fluss row data into json. Initialized during {@link
86+
* #open(InitializationContext)}.
87+
*/
88+
private transient FlussRowToJsonConverters.FlussRowToJsonConverter runtimeConverter;
89+
90+
/** Timestamp format specification which is used to parse timestamp. */
91+
private final TimestampFormat timestampFormat;
92+
93+
public JsonStringDeserializationSchema() {
94+
this(TimestampFormat.ISO_8601);
95+
}
96+
97+
private JsonStringDeserializationSchema(TimestampFormat timestampFormat) {
98+
this.timestampFormat = timestampFormat;
99+
}
100+
78101
/**
79102
* Initializes the JSON serialization mechanism.
80103
*
@@ -90,6 +113,11 @@ public class JsonStringDeserializationSchema implements FlussDeserializationSche
90113
*/
91114
@Override
92115
public void open(InitializationContext context) throws Exception {
116+
if (runtimeConverter == null) {
117+
this.runtimeConverter =
118+
new FlussRowToJsonConverters(timestampFormat)
119+
.createNullableConverter(context.getRowSchema());
120+
}
93121
objectMapper = new ObjectMapper();
94122

95123
objectMapper.registerModule(new JavaTimeModule());
@@ -111,8 +139,11 @@ public String deserialize(LogRecord record) throws Exception {
111139
recordMap.put("offset", record.logOffset());
112140
recordMap.put("timestamp", record.timestamp());
113141
recordMap.put("change_type", record.getChangeType().toString());
114-
// TODO: convert row into JSON https://github.com/alibaba/fluss/issues/678
115-
recordMap.put("row", record.getRow().toString());
142+
143+
if (node == null) {
144+
node = objectMapper.createObjectNode();
145+
}
146+
recordMap.put("row", runtimeConverter.convert(objectMapper, node, record.getRow()));
116147

117148
return objectMapper.writeValueAsString(recordMap);
118149
}
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.utils;
18+
19+
import com.alibaba.fluss.row.Decimal;
20+
import com.alibaba.fluss.row.InternalRow;
21+
import com.alibaba.fluss.row.TimestampLtz;
22+
import com.alibaba.fluss.row.TimestampNtz;
23+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
24+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
26+
import com.alibaba.fluss.types.ArrayType;
27+
import com.alibaba.fluss.types.DataField;
28+
import com.alibaba.fluss.types.DataType;
29+
import com.alibaba.fluss.types.MapType;
30+
import com.alibaba.fluss.types.RowType;
31+
32+
import java.io.Serializable;
33+
import java.math.BigDecimal;
34+
import java.time.LocalDate;
35+
import java.time.LocalTime;
36+
import java.time.ZoneOffset;
37+
import java.util.Arrays;
38+
39+
import static com.alibaba.fluss.flink.utils.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
40+
import static com.alibaba.fluss.flink.utils.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
41+
import static com.alibaba.fluss.flink.utils.TimeFormats.SQL_TIMESTAMP_FORMAT;
42+
import static com.alibaba.fluss.flink.utils.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
43+
import static com.alibaba.fluss.flink.utils.TimeFormats.SQL_TIME_FORMAT;
44+
import static com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN;
45+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
46+
47+
/** A converter to convert Fluss's {@link com.alibaba.fluss.row.InternalRow} to {@link JsonNode}. */
48+
public class FlussRowToJsonConverters {
49+
50+
/** Timestamp format specification which is used to parse timestamp. */
51+
private final TimestampFormat timestampFormat;
52+
53+
public FlussRowToJsonConverters(TimestampFormat timestampFormat) {
54+
this.timestampFormat = timestampFormat;
55+
}
56+
57+
/**
58+
* Runtime converter that converts objects of Fluss data structures to corresponding {@link
59+
* JsonNode}s.
60+
*/
61+
@FunctionalInterface
62+
public interface FlussRowToJsonConverter extends Serializable {
63+
JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
64+
}
65+
66+
public FlussRowToJsonConverter createNullableConverter(DataType flussDataType) {
67+
return wrapIntoNullableConverter(createNotNullConverter(flussDataType));
68+
}
69+
70+
private FlussRowToJsonConverter createNotNullConverter(DataType type) {
71+
switch (type.getTypeRoot()) {
72+
case CHAR:
73+
case STRING:
74+
return ((mapper, reuse, value) ->
75+
mapper.getNodeFactory().textNode(value.toString()));
76+
case BOOLEAN:
77+
return (mapper, reuse, value) ->
78+
mapper.getNodeFactory().booleanNode((boolean) value);
79+
case BINARY:
80+
case BYTES:
81+
return ((mapper, reuse, value) ->
82+
mapper.getNodeFactory().binaryNode((byte[]) value));
83+
case DECIMAL:
84+
return createDecimalConverter();
85+
case TINYINT:
86+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
87+
case SMALLINT:
88+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
89+
case INTEGER:
90+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
91+
case BIGINT:
92+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
93+
case FLOAT:
94+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
95+
case DOUBLE:
96+
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
97+
case DATE:
98+
return createDateConverter();
99+
case TIME_WITHOUT_TIME_ZONE:
100+
return createTimeConverter();
101+
case TIMESTAMP_WITHOUT_TIME_ZONE:
102+
return createTimestampConverter();
103+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
104+
return createTimestampLtzConverter();
105+
case ARRAY:
106+
return createArrayConverter((ArrayType) type);
107+
case MAP:
108+
MapType mapType = (MapType) type;
109+
return createMapConverter(
110+
mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
111+
case ROW:
112+
return createRowConverter((RowType) type);
113+
default:
114+
throw new UnsupportedOperationException("Not support to parse type: " + type);
115+
}
116+
}
117+
118+
private FlussRowToJsonConverter createDecimalConverter() {
119+
return (mapper, reuse, value) -> {
120+
BigDecimal bd = ((Decimal) value).toBigDecimal();
121+
return mapper.getNodeFactory()
122+
.numberNode(
123+
mapper.isEnabled(WRITE_BIGDECIMAL_AS_PLAIN)
124+
? bd
125+
: bd.stripTrailingZeros());
126+
};
127+
}
128+
129+
private FlussRowToJsonConverter createDateConverter() {
130+
return ((mapper, reuse, value) -> {
131+
int days = (int) value;
132+
LocalDate date = LocalDate.ofEpochDay(days);
133+
return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date));
134+
});
135+
}
136+
137+
private FlussRowToJsonConverter createTimeConverter() {
138+
return (mapper, reuse, value) -> {
139+
int milliseconds = (int) value;
140+
LocalTime time = LocalTime.ofSecondOfDay(milliseconds / 1000L);
141+
return mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
142+
};
143+
}
144+
145+
private FlussRowToJsonConverter createTimestampConverter() {
146+
switch (timestampFormat) {
147+
case ISO_8601:
148+
return (mapper, reuse, value) -> {
149+
TimestampNtz timestamp = (TimestampNtz) value;
150+
return mapper.getNodeFactory()
151+
.textNode(ISO8601_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
152+
};
153+
case SQL:
154+
return (mapper, reuse, value) -> {
155+
TimestampNtz timestamp = (TimestampNtz) value;
156+
return mapper.getNodeFactory()
157+
.textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
158+
};
159+
default:
160+
throw new UnsupportedOperationException(
161+
String.format("Unsupported timestamp format %s", timestampFormat));
162+
}
163+
}
164+
165+
private FlussRowToJsonConverter createTimestampLtzConverter() {
166+
switch (timestampFormat) {
167+
case ISO_8601:
168+
return (mapper, reuse, value) -> {
169+
TimestampLtz timestampWithLocalZone = (TimestampLtz) value;
170+
return mapper.getNodeFactory()
171+
.textNode(
172+
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(
173+
timestampWithLocalZone
174+
.toInstant()
175+
.atOffset(ZoneOffset.UTC)));
176+
};
177+
case SQL:
178+
return (mapper, reuse, value) -> {
179+
TimestampLtz timestampWithLocalZone = (TimestampLtz) value;
180+
return mapper.getNodeFactory()
181+
.textNode(
182+
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(
183+
timestampWithLocalZone
184+
.toInstant()
185+
.atOffset(ZoneOffset.UTC)));
186+
};
187+
default:
188+
throw new UnsupportedOperationException(
189+
String.format("Unsupported timestamp format %s", timestampFormat));
190+
}
191+
}
192+
193+
private FlussRowToJsonConverter createArrayConverter(ArrayType type) {
194+
// TODO
195+
return null;
196+
}
197+
198+
private FlussRowToJsonConverter createMapConverter(
199+
String typeSummary, DataType keyType, DataType valueType) {
200+
// TODO
201+
return null;
202+
}
203+
204+
private FlussRowToJsonConverter createRowConverter(RowType type) {
205+
final String[] fieldNames = type.getFieldNames().toArray(new String[0]);
206+
final DataType[] fieldTypes =
207+
type.getFields().stream().map(DataField::getType).toArray(DataType[]::new);
208+
final FlussRowToJsonConverter[] fieldConverters =
209+
Arrays.stream(fieldTypes)
210+
.map(this::createNullableConverter)
211+
.toArray(FlussRowToJsonConverter[]::new);
212+
final int fieldCount = type.getFieldCount();
213+
final InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[fieldCount];
214+
for (int i = 0; i < fieldCount; i++) {
215+
fieldGetters[i] = InternalRow.createFieldGetter(fieldTypes[i], i);
216+
}
217+
return ((mapper, reuse, value) -> {
218+
ObjectNode node;
219+
if (reuse == null || reuse.isNull()) {
220+
node = mapper.createObjectNode();
221+
} else {
222+
node = (ObjectNode) reuse;
223+
}
224+
InternalRow row = (InternalRow) value;
225+
for (int i = 0; i < fieldCount; i++) {
226+
String fieldName = fieldNames[i];
227+
try {
228+
Object field = fieldGetters[i].getFieldOrNull(row);
229+
node.set(
230+
fieldName,
231+
fieldConverters[i].convert(mapper, node.get(fieldName), field));
232+
} catch (Throwable t) {
233+
throw new RuntimeException(
234+
String.format("Fail to convert to json at field: %s.", fieldName), t);
235+
}
236+
}
237+
return node;
238+
});
239+
}
240+
241+
private FlussRowToJsonConverter wrapIntoNullableConverter(
242+
FlussRowToJsonConverter flussRowToJsonConverter) {
243+
return ((mapper, reuse, value) -> {
244+
if (value == null) {
245+
return mapper.getNodeFactory().nullNode();
246+
} else {
247+
return flussRowToJsonConverter.convert(mapper, reuse, value);
248+
}
249+
});
250+
}
251+
}

0 commit comments

Comments
 (0)