Skip to content

Commit 36b3c96

Browse files
authored
[test](example) add example for datagen source (#589)
1 parent 2466b2b commit 36b3c96

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.example;
19+
20+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
21+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
22+
23+
/** This is an example of using Doris sink with DataGen. */
24+
public class DataGen2DorisExample {
25+
public static void main(String[] args) {
26+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
27+
env.setParallelism(1);
28+
env.enableCheckpointing(30000);
29+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
30+
31+
tEnv.executeSql(
32+
"CREATE TABLE student_source ("
33+
+ " id INT,"
34+
+ " name STRING,"
35+
+ " age INT"
36+
+ ") WITH ("
37+
+ " 'connector' = 'datagen',"
38+
+ " 'rows-per-second' = '1',"
39+
+ " 'fields.name.length' = '20',"
40+
+ " 'fields.id.min' = '1',"
41+
+ " 'fields.id.max' = '100000',"
42+
+ " 'fields.age.min' = '3',"
43+
+ " 'fields.age.max' = '30'"
44+
+ ");");
45+
46+
tEnv.executeSql(
47+
"CREATE TABLE student_sink ("
48+
+ " id INT,"
49+
+ " name STRING,"
50+
+ " age INT"
51+
+ " ) "
52+
+ " WITH ("
53+
+ " 'connector' = 'doris',"
54+
+ " 'fenodes' = '127.0.0.1:8030',"
55+
+ " 'table.identifier' = 'test.student',"
56+
+ " 'username' = 'root',"
57+
+ " 'password' = '',"
58+
+ " 'sink.label-prefix' = 'doris_label'"
59+
+ ")");
60+
tEnv.executeSql("INSERT INTO student_sink select * from student_source");
61+
}
62+
}

0 commit comments

Comments
 (0)