Skip to content

Commit f23e99c

Browse files
StrikeWWillyKidd
authored andcommitted
feat(sink): add date, time, interval, bytea and jsonb data types for PG/MySQL sink of stream_chunk/json payload (#9957)
Co-authored-by: weili <[email protected]> Co-authored-by: WillyKidd <[email protected]>
1 parent fd246bd commit f23e99c

File tree

18 files changed

+749
-88
lines changed

18 files changed

+749
-88
lines changed

ci/scripts/e2e-sink-test.sh

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node
3737
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;"
3838
# grant access to `test` for ci test user
3939
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
40-
# create a table named t_remote
40+
# creates two table named t_remote_0, t_remote_1
4141
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql
4242

4343
echo "--- preparing postgresql"
@@ -92,10 +92,19 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
9292
sleep 1
9393

9494
# check sink destination mysql using shell
95-
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
96-
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
95+
diff -u ./e2e_test/sink/remote/mysql_expected_result_0.tsv \
96+
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote_0 ORDER BY id")
9797
if [ $? -eq 0 ]; then
98-
echo "mysql sink check passed"
98+
echo "mysql sink check 0 passed"
99+
else
100+
echo "The output is not as expected."
101+
exit 1
102+
fi
103+
104+
diff -u ./e2e_test/sink/remote/mysql_expected_result_1.tsv \
105+
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT id, v_varchar, v_text, v_integer, v_smallint, v_bigint, v_decimal, v_real, v_double, v_boolean, v_date, v_time, v_timestamp, v_jsonb, TO_BASE64(v_bytea) FROM test.t_remote_1 ORDER BY id")
106+
if [ $? -eq 0 ]; then
107+
echo "mysql sink check 1 passed"
99108
else
100109
echo "The output is not as expected."
101110
exit 1
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
# the check is run on sink destination postgres database
22

33
query I
4-
select * from t_remote order by id;
4+
select * from t_remote_0 order by id;
55
----
66
1 Alex 28208 281620391 4986480304337356659 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
77
3 Carl 18300 1702307129 7878292368468104216 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
88
4 Doris 17250 151951802 3946135584462581863 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
99
5 Eve 9725 698160808 524334216698825611 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
1010
6 Frank 28131 1233587627 8492820454814063326 123358.7627 58.76 123358.7627 2023-03-21 10:18:32
11+
12+
query II
13+
select * from t_remote_1 order by id;
14+
----
15+
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} \xdeadbeef
16+
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} \xcafebabe
17+
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} \xbabec0de
18+
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} \xdeadbabe
19+
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} \xdeadbabe

e2e_test/sink/remote/jdbc.load.slt

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
statement ok
2-
create table t_remote (
2+
create table t_remote_0 (
33
id integer primary key,
44
v_varchar varchar,
55
v_smallint smallint,
@@ -12,59 +12,127 @@ create table t_remote (
1212
);
1313

1414
statement ok
15-
create materialized view mv_remote as select * from t_remote;
15+
CREATE TABLE t_remote_1 (
16+
id BIGINT PRIMARY KEY,
17+
v_varchar VARCHAR,
18+
v_text TEXT,
19+
v_integer INTEGER,
20+
v_smallint SMALLINT,
21+
v_bigint BIGINT,
22+
v_decimal DECIMAL,
23+
v_real REAL,
24+
v_double DOUBLE PRECISION,
25+
v_boolean BOOLEAN,
26+
v_date DATE,
27+
v_time TIME,
28+
v_timestamp TIMESTAMP,
29+
v_jsonb JSONB,
30+
v_bytea BYTEA
31+
);
32+
33+
statement ok
34+
create materialized view mv_remote_0 as select * from t_remote_0;
35+
36+
statement ok
37+
create materialized view mv_remote_1 as select * from t_remote_1;
1638

1739
statement ok
18-
CREATE SINK s_postgres FROM mv_remote WITH (
40+
CREATE SINK s_postgres_0 FROM mv_remote_0 WITH (
1941
connector='jdbc',
2042
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
21-
table.name='t_remote',
43+
table.name='t_remote_0',
44+
type='upsert'
45+
);
46+
47+
statement ok
48+
CREATE SINK s_postgres_1 FROM mv_remote_1 WITH (
49+
connector='jdbc',
50+
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
51+
table.name='t_remote_1',
52+
type='upsert'
53+
);
54+
55+
statement ok
56+
CREATE SINK s_mysql_0 FROM mv_remote_0 WITH (
57+
connector='jdbc',
58+
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
59+
table.name='t_remote_0',
2260
type='upsert'
2361
);
2462

2563
statement ok
26-
CREATE SINK s_mysql FROM mv_remote WITH (
64+
CREATE SINK s_mysql_1 FROM mv_remote_1 WITH (
2765
connector='jdbc',
2866
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
29-
table.name='t_remote',
67+
table.name='t_remote_1',
3068
type='upsert'
3169
);
3270

3371
statement ok
34-
INSERT INTO t_remote VALUES
72+
INSERT INTO t_remote_0 VALUES
3573
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
3674
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
3775
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');
3876

3977
statement ok
40-
INSERT INTO t_remote VALUES
78+
INSERT INTO t_remote_0 VALUES
4179
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
4280
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
4381
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');
4482

83+
statement ok
84+
INSERT INTO t_remote_1 VALUES
85+
(1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '{"key": "value"}', E'\\xDEADBEEF'),
86+
(2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '{"key": "value2"}', E'\\xFEEDBEEF'),
87+
(3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '{"key": "value3"}', E'\\xCAFEBABE');
88+
89+
statement ok
90+
INSERT INTO t_remote_1 VALUES
91+
(4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '{"key": "value4"}', E'\\xBABEC0DE'),
92+
(5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '{"key": "value5"}', E'\\xDEADBABE'),
93+
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '{"key": "value6"}', E'\\xDEADBABE');
94+
4595
statement ok
4696
FLUSH;
4797

4898
statement ok
49-
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;
99+
UPDATE t_remote_0 SET v_varchar = 'Alex' WHERE id = 1;
100+
101+
statement ok
102+
UPDATE t_remote_1 SET v_varchar = 'Alex' WHERE id = 1;
103+
104+
statement ok
105+
DELETE FROM t_remote_0 WHERE id = 2;
50106

51107
statement ok
52-
DELETE FROM t_remote WHERE id = 2;
108+
DELETE FROM t_remote_1 WHERE id = 2;
53109

54110
statement ok
55111
FLUSH;
56112

57113
statement ok
58-
DROP SINK s_postgres;
114+
DROP SINK s_postgres_0;
115+
116+
statement ok
117+
DROP SINK s_postgres_1;
118+
119+
statement ok
120+
DROP SINK s_mysql_0;
121+
122+
statement ok
123+
DROP SINK s_mysql_1;
124+
125+
statement ok
126+
DROP MATERIALIZED VIEW mv_remote_0;
59127

60128
statement ok
61-
DROP SINK s_mysql
129+
DROP MATERIALIZED VIEW mv_remote_1;
62130

63131
statement ok
64-
DROP MATERIALIZED VIEW mv_remote;
132+
DROP TABLE t_remote_0;
65133

66134
statement ok
67-
DROP TABLE t_remote;
135+
DROP TABLE t_remote_1;
68136

69137
statement ok
70138
FLUSH;
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE TABLE t_remote (
1+
CREATE TABLE t_remote_0 (
22
id integer PRIMARY KEY,
33
v_varchar varchar(100),
44
v_smallint smallint,
@@ -8,4 +8,22 @@ CREATE TABLE t_remote (
88
v_float float,
99
v_double double,
1010
v_timestamp timestamp
11-
);
11+
);
12+
13+
CREATE TABLE t_remote_1 (
14+
id BIGINT PRIMARY KEY,
15+
v_varchar VARCHAR(255),
16+
v_text TEXT,
17+
v_integer INT,
18+
v_smallint SMALLINT,
19+
v_bigint BIGINT,
20+
v_decimal DECIMAL(10,2),
21+
v_real FLOAT,
22+
v_double DOUBLE,
23+
v_boolean BOOLEAN,
24+
v_date DATE,
25+
v_time TIME,
26+
v_timestamp TIMESTAMP,
27+
v_jsonb JSON,
28+
v_bytea BLOB
29+
);
File renamed without changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} 3q2+7w==
2+
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 1 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} yv66vg==
3+
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 0 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} ur7A3g==
4+
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 1 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} 3q26vg==
5+
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 0 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} 3q26vg==
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE TABLE t_remote (
1+
CREATE TABLE t_remote_0 (
22
id integer PRIMARY KEY,
33
v_varchar varchar(100),
44
v_smallint smallint,
@@ -8,4 +8,22 @@ CREATE TABLE t_remote (
88
v_float real,
99
v_double double precision,
1010
v_timestamp timestamp
11-
);
11+
);
12+
13+
CREATE TABLE t_remote_1 (
14+
id BIGINT PRIMARY KEY,
15+
v_varchar VARCHAR(255),
16+
v_text TEXT,
17+
v_integer INTEGER,
18+
v_smallint SMALLINT,
19+
v_bigint BIGINT,
20+
v_decimal DECIMAL(10,2),
21+
v_real REAL,
22+
v_double DOUBLE PRECISION,
23+
v_boolean BOOLEAN,
24+
v_date DATE,
25+
v_time TIME,
26+
v_timestamp TIMESTAMP,
27+
v_jsonb JSONB,
28+
v_bytea BYTEA
29+
);

integration_tests/postgres-cdc/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ services:
4242
- POSTGRES_PASSWORD=123456
4343
- POSTGRES_DB=mydb
4444
ports:
45-
- 5432:5432
45+
- 8432:5432
4646
healthcheck:
4747
test: [ "CMD-SHELL", "pg_isready --username=myuser --dbname=mydb" ]
4848
interval: 5s

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import com.risingwave.proto.ConnectorServiceProto;
2323
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
2424
import com.risingwave.proto.Data;
25+
import java.io.ByteArrayInputStream;
2526
import java.math.BigDecimal;
27+
import java.sql.Date;
28+
import java.sql.Time;
2629
import java.sql.Timestamp;
30+
import java.time.LocalDate;
31+
import java.util.Base64;
2732
import java.util.Map;
2833

2934
public class JsonDeserializer implements Deserializer {
@@ -130,6 +135,28 @@ private static BigDecimal castDecimal(Object value) {
130135
}
131136
}
132137

138+
private static Time castTime(Object value) {
139+
try {
140+
Long milli = castLong(value);
141+
return new Time(milli);
142+
} catch (RuntimeException e) {
143+
throw io.grpc.Status.INVALID_ARGUMENT
144+
.withDescription("unable to cast into time from " + value.getClass())
145+
.asRuntimeException();
146+
}
147+
}
148+
149+
private static Date castDate(Object value) {
150+
try {
151+
Long days = castLong(value) - 1;
152+
return Date.valueOf(LocalDate.of(1, 1, 1).plusDays(days));
153+
} catch (RuntimeException e) {
154+
throw io.grpc.Status.INVALID_ARGUMENT
155+
.withDescription("unable to cast into date from " + value.getClass())
156+
.asRuntimeException();
157+
}
158+
}
159+
133160
private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
134161
// value might be null
135162
if (value == null) {
@@ -171,6 +198,32 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
171198
.asRuntimeException();
172199
}
173200
return Timestamp.valueOf((String) value);
201+
case TIME:
202+
return castTime(value);
203+
case DATE:
204+
return castDate(value);
205+
case INTERVAL:
206+
if (!(value instanceof String)) {
207+
throw io.grpc.Status.INVALID_ARGUMENT
208+
.withDescription("Expected interval, got " + value.getClass())
209+
.asRuntimeException();
210+
}
211+
return value;
212+
case JSONB:
213+
if (!(value instanceof String)) {
214+
throw io.grpc.Status.INVALID_ARGUMENT
215+
.withDescription("Expected jsonb, got " + value.getClass())
216+
.asRuntimeException();
217+
}
218+
return value;
219+
case BYTEA:
220+
if (!(value instanceof String)) {
221+
throw io.grpc.Status.INVALID_ARGUMENT
222+
.withDescription("Expected bytea, got " + value.getClass())
223+
.asRuntimeException();
224+
}
225+
byte[] bytes = Base64.getDecoder().decode((String) value);
226+
return new ByteArrayInputStream(bytes);
174227
default:
175228
throw io.grpc.Status.INVALID_ARGUMENT
176229
.withDescription("unsupported type " + typeName)

0 commit comments

Comments
 (0)