Skip to content

Commit 062d91c

Browse files
committed
flink,spark 支持bulk_load_on_conflict, stream copy支持激进模式
1 parent 0007f1f commit 062d91c

12 files changed

Lines changed: 303 additions & 142 deletions

File tree

holo-client/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.alibaba.hologres</groupId>
88
<artifactId>holo-client</artifactId>
9-
<version>2.4.2</version>
9+
<version>2.5.4</version>
1010

1111
<url>https://www.hologres.io/</url>
1212
<name>holo-client</name>
@@ -46,7 +46,7 @@
4646
<dependency>
4747
<groupId>org.postgresql</groupId>
4848
<artifactId>postgresql</artifactId>
49-
<version>42.2.25</version>
49+
<version>42.3.10</version>
5050
</dependency>
5151

5252
<dependency>

hologres-connector-examples/hologres-connector-flink-examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<dependency>
3030
<groupId>com.alibaba.hologres</groupId>
3131
<artifactId>holo-client</artifactId>
32-
<version>2.5.2</version>
32+
<version>2.5.4</version>
3333
</dependency>
3434
<dependency>
3535
<groupId>org.apache.flink</groupId>

hologres-connector-examples/hologres-connector-spark-examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<dependency>
2626
<groupId>com.alibaba.hologres</groupId>
2727
<artifactId>holo-client</artifactId>
28-
<version>2.5.3</version>
28+
<version>2.5.4</version>
2929
</dependency>
3030
<dependency>
3131
<groupId>org.apache.spark</groupId>

hologres-connector-flink-1.15/README.md

Lines changed: 24 additions & 24 deletions
Large diffs are not rendered by default.

hologres-connector-flink-1.15/src/test/java/com/alibaba/ververica/connectors/hologres/sink/HologresSinkTableITTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ public void testSinkTable() throws Exception {
226226
+ "'fixedConnectionMode'='%s',"
227227
+ "%s"
228228
+ "'mutateType'='insertorignore',"
229+
+ "'aggressive.enabled'='true',"
229230
+ "'endpoint'='%s',"
230231
+ "'dbName'='%s',"
231232
+ "'tableName'='%s',"
@@ -252,6 +253,81 @@ public void testSinkTable() throws Exception {
252253
10000);
253254
}
254255

256+
@Test
257+
public void testSinkTableUpdate() throws Exception {
258+
if (copyMode == CopyMode.BULK_LOAD) {
259+
copyMode = CopyMode.BULK_LOAD_ON_CONFLICT;
260+
}
261+
Object[][] prepare =
262+
new Object[][] {
263+
new Object[] {0, "old_1"},
264+
new Object[] {2, "old_2"},
265+
new Object[] {11, "old_3"}
266+
};
267+
insertValues(sinkTable, prepare);
268+
tEnv.executeSql(
269+
String.format(
270+
"CREATE TABLE sinkTable ("
271+
+ "a INT NOT NULL,"
272+
+ "b STRING NOT NULL,"
273+
+ "c DOUBLE,"
274+
+ "d BOOLEAN,"
275+
+ "e BIGINT,"
276+
+ "f DATE,"
277+
+ "g VARCHAR,"
278+
+ "h TIMESTAMP,"
279+
+ "i FLOAT,"
280+
+ "j ARRAY<INT> NOT NULL,"
281+
+ "k ARRAY<BIGINT> NOT NULL,"
282+
+ "l ARRAY<FLOAT>,"
283+
+ "m ARRAY<DOUBLE>,"
284+
+ "n ARRAY<BOOLEAN>,"
285+
+ "o ARRAY<STRING>,"
286+
+ "p BOOLEAN,"
287+
+ "q NUMERIC(6,2),"
288+
+ "r TIMESTAMP,"
289+
+ "s SMALLINT,"
290+
+ "t VARCHAR,"
291+
+ "u VARCHAR"
292+
+ ") WITH ("
293+
+ "'connector'='hologres',"
294+
+ "'fixedConnectionMode'='%s',"
295+
+ "%s"
296+
+ "'mutateType'='insertOrUpdate',"
297+
+ "'endpoint'='%s',"
298+
+ "'dbName'='%s',"
299+
+ "'tableName'='%s',"
300+
+ "'userName'='%s',"
301+
+ "'password'='%s'"
302+
+ ")",
303+
fixedMode,
304+
(copyMode == null ? "" : "'jdbcCopyWriteMode'='" + copyMode + "',"),
305+
endpoint,
306+
database,
307+
sinkTable,
308+
username,
309+
password));
310+
311+
String insertAnotherRow =
312+
",("
313+
+ "11,'dim',cast(20.2007 as double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 16:28:07.737' as timestamp),"
314+
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as array<int>),cast(ARRAY [8589934592,8589934593,8589934594] as array<bigint>),"
315+
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as double),cast(792343.646446 as double),cast(76.46464 as double)],"
316+
+ "cast(ARRAY [true,true,false,true] as array<boolean>),cast(ARRAY ['monday','saturday','sunday'] as array<STRING>),true,cast(8119.21 as numeric(6,2)), "
317+
+ "cast('2020-07-10 16:28:07.737' as timestamp), cast(2 as smallint), cast('{\"a\":\"bbbb\", \"c\":\"dddd\"}' as varchar), cast('{\"a\":\"bbbb\", \"c\":\"dddd\"}' as varchar)"
318+
+ ")";
319+
tEnv.executeSql(String.format(insertStatement + insertAnotherRow, "sinkTable")).await();
320+
321+
checkResultWithTimeout(
322+
expectedRowsToString(EXPECTED),
323+
"select * from " + sinkTable,
324+
FIELD_NAMES,
325+
JDBCUtils.getDbUrl(endpoint, database),
326+
username,
327+
password,
328+
10000);
329+
}
330+
255331
@Test
256332
public void testSinkTableWithSchema() throws Exception {
257333
tEnv.executeSql(

0 commit comments

Comments
 (0)