Skip to content

Commit 058eccb

Browse files
authored
[FLINK-39224] Support JSON, JSONB types for PostgreSQL (#189)
1 parent 40e2843 commit 058eccb

5 files changed

Lines changed: 161 additions & 19 deletions

File tree

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
8585
private static final String PG_CHARACTER_VARYING = "varchar";
8686
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
8787
private static final String PG_UUID = "uuid";
88+
private static final String PG_JSON = "json";
89+
private static final String PG_JSONB = "jsonb";
8890

8991
@Override
9092
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
@@ -158,6 +160,8 @@ protected DataType getMapping(String pgType, int precision, int scale) {
158160
case PG_CHARACTER_VARYING_ARRAY:
159161
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
160162
case PG_TEXT:
163+
case PG_JSON:
164+
case PG_JSONB:
161165
return DataTypes.STRING();
162166
case PG_TEXT_ARRAY:
163167
return DataTypes.ARRAY(DataTypes.STRING());

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.apache.flink.types.Row;
2424
import org.apache.flink.util.CollectionUtil;
2525

26+
import com.fasterxml.jackson.core.JsonProcessingException;
27+
import com.fasterxml.jackson.databind.JsonNode;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
2629
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

@@ -213,4 +216,37 @@ void testNullUuidTypes() {
213216
.collect());
214217
assertThat(results).hasToString("[+I[1, null]]");
215218
}
219+
220+
@Test
221+
void testJsonTypes() throws JsonProcessingException {
222+
223+
List<Row> results =
224+
CollectionUtil.iteratorToList(
225+
tEnv.sqlQuery(String.format("select * from %s", TABLE_JSON_TYPE))
226+
.execute()
227+
.collect());
228+
229+
ObjectMapper mapper = new ObjectMapper();
230+
JsonNode expectedJson =
231+
mapper.readTree(
232+
"\"test1\":{\"test1-1\":\"testValue\",\"test1-2\":1,\"test1-3\":[\"test1-3-1\",\"test1-3-2\"]}, 2, \"test2\"");
233+
234+
assertThat(results).hasToString("[+I[" + expectedJson.toString() + "]]");
235+
}
236+
237+
@Test
238+
void testJsonbTypes() throws JsonProcessingException {
239+
List<Row> results =
240+
CollectionUtil.iteratorToList(
241+
tEnv.sqlQuery(String.format("select * from %s", TABLE_JSONB_TYPE))
242+
.execute()
243+
.collect());
244+
245+
ObjectMapper mapper = new ObjectMapper();
246+
JsonNode expectedJson =
247+
mapper.readTree(
248+
"\"test1\":{\"test1-1\":\"testValue\",\"test1-2\":1,\"test1-3\":[\"test1-3-1\",\"test1-3-2\"]}, 2, \"test2\"");
249+
250+
assertThat(results).hasToString("[+I[" + transformPGJsonb(expectedJson.toString()) + "]]");
251+
}
216252
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
2525
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
2626

27+
import com.fasterxml.jackson.core.JsonProcessingException;
2728
import org.junit.jupiter.api.Test;
2829

2930
import java.util.Arrays;
@@ -68,6 +69,8 @@ void testListTables() throws DatabaseNotExistException {
6869
.isEqualTo(
6970
Arrays.asList(
7071
"public.array_table",
72+
"public.json_table",
73+
"public.jsonb_table",
7174
"public.primitive_table",
7275
"public.primitive_table2",
7376
"public.serial_table",
@@ -203,4 +206,19 @@ void testNullUuidDataTypes() throws TableNotExistException {
203206
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE2));
204207
assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema);
205208
}
209+
210+
@Test
211+
void testJsonDataTypes() throws TableNotExistException, JsonProcessingException {
212+
CatalogBaseTable table =
213+
catalog.getTable(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_JSON_TYPE));
214+
assertThat(table.getUnresolvedSchema()).isEqualTo(getJsonTable().schema);
215+
}
216+
217+
@Test
218+
void testJsonbDataTypes() throws TableNotExistException, JsonProcessingException {
219+
CatalogBaseTable table =
220+
catalog.getTable(
221+
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_JSONB_TYPE));
222+
assertThat(table.getUnresolvedSchema()).isEqualTo(getJsonbTable().schema);
223+
}
206224
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.apache.flink.table.api.Schema;
2727
import org.apache.flink.table.types.logical.DecimalType;
2828

29+
import com.fasterxml.jackson.core.JsonProcessingException;
30+
import com.fasterxml.jackson.databind.JsonNode;
31+
import com.fasterxml.jackson.databind.ObjectMapper;
2932
import org.junit.jupiter.api.AfterAll;
3033
import org.junit.jupiter.api.BeforeAll;
3134

@@ -57,12 +60,14 @@ private static DatabaseMetadata getStaticMetadata() {
5760
protected static final String TABLE_SERIAL_TYPE = "serial_table";
5861
protected static final String TABLE_UUID_TYPE = "uuid_table";
5962
protected static final String TABLE_UUID_TYPE2 = "uuid_table2";
63+
protected static final String TABLE_JSON_TYPE = "json_table";
64+
protected static final String TABLE_JSONB_TYPE = "jsonb_table";
6065

6166
protected static String baseUrl;
6267
protected static PostgresCatalog catalog;
6368

6469
@BeforeAll
65-
static void init() throws SQLException {
70+
static void init() throws SQLException, JsonProcessingException {
6671
// jdbc:postgresql://localhost:50807/postgres?user=postgres
6772
String jdbcUrl = getStaticMetadata().getJdbcUrl();
6873
// jdbc:postgresql://localhost:50807/
@@ -115,6 +120,11 @@ static void init() throws SQLException {
115120
createTable(
116121
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
117122
getNullUuidTable().pgSchemaSql);
123+
createTable(
124+
PostgresTablePath.fromFlinkTableName(TABLE_JSON_TYPE), getJsonTable().pgSchemaSql);
125+
createTable(
126+
PostgresTablePath.fromFlinkTableName(TABLE_JSONB_TYPE),
127+
getJsonbTable().pgSchemaSql);
118128

119129
executeSQL(
120130
PostgresCatalog.DEFAULT_DATABASE,
@@ -142,6 +152,14 @@ static void init() throws SQLException {
142152
String.format(
143153
"insert into %s values (%s);",
144154
TABLE_UUID_TYPE2, getNullUuidTable().values));
155+
executeSQL(
156+
PostgresCatalog.DEFAULT_DATABASE,
157+
String.format(
158+
"insert into %s values (%s);", TABLE_JSON_TYPE, getJsonTable().values));
159+
executeSQL(
160+
PostgresCatalog.DEFAULT_DATABASE,
161+
String.format(
162+
"insert into %s values (%s);", TABLE_JSONB_TYPE, getJsonbTable().values));
145163
}
146164

147165
@AfterAll
@@ -186,6 +204,14 @@ static void afterAll() throws SQLException {
186204
PostgresCatalog.DEFAULT_DATABASE,
187205
String.format(
188206
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
207+
executeSQL(
208+
PostgresCatalog.DEFAULT_DATABASE,
209+
String.format(
210+
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_JSON_TYPE)));
211+
executeSQL(
212+
PostgresCatalog.DEFAULT_DATABASE,
213+
String.format(
214+
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_JSONB_TYPE)));
189215
}
190216

191217
public static void createTable(PostgresTablePath tablePath, String tableSchemaSql)
@@ -439,4 +465,33 @@ public static TestTable getNullUuidTable() {
439465
"id INT, " + "uid_col UUID",
440466
"1, NULL");
441467
}
468+
469+
static String transformPGJsonb(String json) {
470+
return json.replaceAll(":", ": ").replaceAll(",", ", ");
471+
}
472+
473+
static TestTable getJsonTable() throws JsonProcessingException {
474+
ObjectMapper mapper = new ObjectMapper();
475+
JsonNode jsonNode =
476+
mapper.readTree(
477+
"\"test1\":{\"test1-1\":\"testValue\",\"test1-2\":1,\"test1-3\":[\"test1-3-1\",\"test1-3-2\"]}, 2, \"test2\"");
478+
479+
return new TestTable(
480+
Schema.newBuilder().column("json_col", DataTypes.STRING()).build(),
481+
"json_col JSON",
482+
String.format("'%s'", jsonNode.toString()));
483+
}
484+
485+
static TestTable getJsonbTable() throws JsonProcessingException {
486+
487+
ObjectMapper mapper = new ObjectMapper();
488+
JsonNode jsonNode =
489+
mapper.readTree(
490+
"\"test1\":{\"test1-1\":\"testValue\",\"test1-2\":1,\"test1-3\":[\"test1-3-1\",\"test1-3-2\"]}, 2, \"test2\"");
491+
492+
return new TestTable(
493+
Schema.newBuilder().column("jsonb_col", DataTypes.STRING()).build(),
494+
"jsonb_col JSONB",
495+
String.format("'%s'", jsonNode.toString()));
496+
}
442497
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525
import org.apache.flink.table.api.DataTypes;
2626
import org.apache.flink.types.Row;
2727

28+
import com.fasterxml.jackson.core.JsonProcessingException;
29+
import com.fasterxml.jackson.databind.JsonNode;
30+
import com.fasterxml.jackson.databind.ObjectMapper;
31+
2832
import java.math.BigDecimal;
2933
import java.time.LocalDateTime;
3034
import java.time.LocalTime;
3135
import java.util.Arrays;
36+
import java.util.Collections;
3237
import java.util.List;
3338

3439
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
@@ -51,30 +56,54 @@ protected TableRow createInputTable() {
5156
// other fields
5257
field("real_col", dbType("REAL"), DataTypes.FLOAT()),
5358
field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()),
54-
field("time_col", dbType("TIME"), DataTypes.TIME()));
59+
field("time_col", dbType("TIME"), DataTypes.TIME()),
60+
field("json_col", dbType("JSON"), DataTypes.STRING()),
61+
field("jsonb_col", dbType("JSONB"), DataTypes.STRING()));
62+
}
63+
64+
private JsonNode toJson(String json) throws JsonProcessingException {
65+
ObjectMapper mapper = new ObjectMapper();
66+
return mapper.readTree(json);
67+
}
68+
69+
private String transformPGJsonb(String json) {
70+
return json.replaceAll(":", ": ").replaceAll(",", ", ");
5571
}
5672

5773
protected List<Row> getTestData() {
5874

5975
String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
6076
String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11";
77+
String json =
78+
"\"test1\":{\"test1-1\":\"testValue\",\"test1-2\":1,\"test1-3\":[\"test1-3-1\",\"test1-3-2\"]}, 2, \"test2\"";
79+
80+
try {
81+
JsonNode jsonNode = toJson(json);
6182

62-
return Arrays.asList(
63-
Row.of(
64-
1L,
65-
uuid1,
66-
BigDecimal.valueOf(100.1234),
67-
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
68-
1.175E-37F,
69-
1.79769E308D,
70-
LocalTime.parse("15:35")),
71-
Row.of(
72-
2L,
73-
uuid2,
74-
BigDecimal.valueOf(101.1234),
75-
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
76-
-1.175E-37F,
77-
-1.79769E308,
78-
LocalTime.parse("15:36:01")));
83+
return Arrays.asList(
84+
Row.of(
85+
1L,
86+
uuid1,
87+
BigDecimal.valueOf(100.1234),
88+
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
89+
1.175E-37F,
90+
1.79769E308D,
91+
LocalTime.parse("15:35"),
92+
jsonNode.toString(),
93+
transformPGJsonb(jsonNode.toString())),
94+
Row.of(
95+
2L,
96+
uuid2,
97+
BigDecimal.valueOf(101.1234),
98+
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
99+
-1.175E-37F,
100+
-1.79769E308,
101+
LocalTime.parse("15:36:01"),
102+
jsonNode.toString(),
103+
transformPGJsonb(jsonNode.toString())));
104+
} catch (JsonProcessingException e) {
105+
e.printStackTrace();
106+
}
107+
return Collections.emptyList();
79108
}
80109
}

0 commit comments

Comments
 (0)