Skip to content

Commit 5909608

Browse files
authored
Enhancement: Major update for direct-load connector with unified multi-node implementation (#131)
1 parent 4b4f333 commit 5909608

17 files changed

+544
-750
lines changed

docs/sink/flink-connector-oceanbase-directload.md

Lines changed: 48 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,18 @@
22

33
English | [简体中文](flink-connector-oceanbase-directload_cn.md)
44

5-
This Flink Connector based on the direct-load feature of OceanBase. It can write data to OceanBase through direct-load in Flink.
5+
This Flink Connector is based on the direct-load feature of OceanBase, enabling high-performance bulk data loading from Flink to OceanBase.
66

7-
For OceanBase's direct-load feature, see the [direct-load document](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568).
7+
## ⚠️ Important Notes
8+
9+
**This connector is specifically designed for batch processing scenarios with the following characteristics:**
10+
11+
-**Bounded Streams Only**: Data sources must be bounded; unbounded streams are not supported. Flink Batch mode is recommended for better performance
12+
-**High Throughput**: Ideal for large-scale data import with multi-node parallel writing capability
13+
- ⚠️ **Table Locking During Import**: The target table will be locked during the direct-load process, allowing only SELECT queries. INSERT/UPDATE/DELETE operations are not permitted
14+
- ⚠️ **Not for Real-time**: If you need real-time/streaming writes with unbounded streams, please use [flink-connector-oceanbase](flink-connector-oceanbase.md) instead
15+
16+
For more details on OceanBase's direct-load feature, see the [direct-load document](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568).
817

918
## Getting Started
1019

@@ -54,19 +63,34 @@ To use this connector through Flink SQL directly, you need to download the shade
5463
- Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload
5564
- Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase-directload
5665

57-
### Instructions for use:
66+
### Prerequisites
67+
68+
**Data sources must be bounded streams**. The direct-load connector does not support unbounded streams.
69+
70+
**Flink Batch mode is recommended** for better performance:
71+
72+
- **Table API / Flink SQL**:
73+
74+
```sql
75+
SET 'execution.runtime-mode' = 'BATCH';
76+
```
77+
- **DataStream API**:
78+
79+
```java
80+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
81+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
82+
```
5883

59-
- Currently, the direct-load Flink Connector only supports running in Flink Batch execution mode. Refer to the following method to enable Flink Batch execution mode.
60-
- Table-API/Flink-SQL: `SET 'execution.runtime-mode' = 'BATCH';`
61-
- DataStream API:
84+
> **Note**: While you can use bounded data sources in Streaming mode, Batch mode typically provides better performance and resource utilization.
6285
63-
```
64-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
65-
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
66-
```
67-
- Currently, the direct-load Flink Connector supports two modes: single-node write and multi-node write:
68-
- Single-node write: In this mode, the Flink Task has only one parallelism for writing. It is suitable for small and medium-sized data import. This method is simple and easy to use and is recommended.
69-
- Multi-node write: In this mode, the parallelism of the Flink Task can be freely adjusted according to the amount of data to be imported to improve the write throughput.
86+
### Performance Tuning
87+
88+
- **Parallelism Adjustment**: Supports multi-node parallel writing. Increase Flink task parallelism to improve throughput
89+
90+
```sql
91+
SET 'parallelism.default' = '8'; -- Adjust based on data volume
92+
```
93+
- **Server-side Parallelism**: Use the `parallel` parameter to configure CPU resources on OceanBase server for processing the import task
7094

7195
### Demo
7296

@@ -85,97 +109,24 @@ CREATE TABLE `t_sink`
85109
);
86110
```
87111

88-
#### single-node write
89-
90112
#### Flink SQL Demo
91113

92-
Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
114+
Put the JAR files of dependencies into the 'lib' directory of Flink, then create the destination table using Flink SQL through the SQL client.
93115

94116
```sql
117+
-- Recommended to set BATCH mode for better performance
95118
SET 'execution.runtime-mode' = 'BATCH';
96119

97-
CREATE TABLE t_sink
98-
(
99-
id INT,
100-
username VARCHAR,
101-
score INT,
102-
PRIMARY KEY (id) NOT ENFORCED
103-
) with (
104-
'connector' = 'oceanbase-directload',
105-
'host' = '127.0.0.1',
106-
'port' = '2882',
107-
'schema-name' = 'test',
108-
'table-name' = 't_sink',
109-
'username' = 'root',
110-
'tenant-name' = 'test',
111-
'password' = 'password'
112-
);
113-
```
114-
115-
Insert records by Flink SQL.
116-
117-
```sql
118-
INSERT INTO t_sink
119-
VALUES (1, 'Tom', 99),
120-
(2, 'Jerry', 88),
121-
(1, 'Tom', 89);
122-
```
123-
124-
Once executed, the records should have been written to OceanBase.
125-
126-
#### Multi-node write
127-
128-
##### 1. Create a direct-load task in the code and obtain the execution id
129-
130-
- Create a Java Maven project with the following POM file:
131-
132-
```xml
133-
<?xml version="1.0" encoding="UTF-8"?>
134-
<project xmlns="http://maven.apache.org/POM/4.0.0"
135-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
136-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
137-
<modelVersion>4.0.0</modelVersion>
138-
139-
<groupId>com.oceanbase.directload</groupId>
140-
<artifactId>multi-node-write-demo</artifactId>
141-
<version>1.0-SNAPSHOT</version>
142-
143-
<dependencies>
144-
<dependency>
145-
<groupId>com.oceanbase</groupId>
146-
<artifactId>obkv-table-client</artifactId>
147-
<version>1.2.13</version>
148-
</dependency>
149-
<dependency>
150-
<groupId>com.alibaba.fastjson2</groupId>
151-
<artifactId>fastjson2</artifactId>
152-
<version>2.0.53</version>
153-
</dependency>
154-
</dependencies>
155-
</project>
156-
```
157-
158-
- Create a direct-load task in the code and obtain the execution id
159-
160-
For code examples, see the complete sample code below.
161-
162-
#### 2. After obtaining the execution id in the above steps, submit the Flink task
163-
164-
Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
165-
166-
Note, set `enable-multi-node-write` to true and set `execution-id` to the execution id obtained in the above steps.
167-
168-
```sql
169-
SET 'execution.runtime-mode' = 'BATCH';
170-
SET 'parallelism.default' = '3';
120+
-- Optional: Adjust parallelism based on data volume to improve throughput
121+
SET 'parallelism.default' = '8';
171122

172123
CREATE TABLE t_sink
173124
(
174125
id INT,
175126
username VARCHAR,
176127
score INT,
177128
PRIMARY KEY (id) NOT ENFORCED
178-
) with (
129+
) WITH (
179130
'connector' = 'oceanbase-directload',
180131
'host' = '127.0.0.1',
181132
'port' = '2882',
@@ -184,81 +135,23 @@ CREATE TABLE t_sink
184135
'username' = 'root',
185136
'tenant-name' = 'test',
186137
'password' = 'password',
187-
'enable-multi-node-write' = 'true',
188-
'execution-id' = '5cIeLwELBIWAxOAKAAAAwhY='
189-
);
138+
'parallel' = '8' -- OceanBase server-side parallelism
139+
);
190140
```
191141

192-
Insert records by Flink SQL.
142+
Insert records using Flink SQL:
193143

194144
```sql
195145
INSERT INTO t_sink
196146
VALUES (1, 'Tom', 99),
197147
(2, 'Jerry', 88),
198-
(1, 'Tom', 89);
199-
```
200-
201-
#### 3、Wait for the execution of the Flink task submitted above to be completed, and finally perform the final submission action of the direct-load task in the code.
202-
203-
For code examples, see the complete sample code below.
204-
205-
#### Complete sample code
206-
207-
```java
208-
public class MultiNode {
209-
private static String host = "127.0.0.1";
210-
private static int port = 2882;
211-
212-
private static String userName = "root";
213-
private static String tenantName = "test";
214-
private static String password = "password";
215-
private static String dbName = "test";
216-
private static String tableName = "t_sink";
217-
218-
public static void main(String[] args) throws ObDirectLoadException, IOException, InterruptedException {
219-
// 1. Create a direct-load task and obtain the execution id.
220-
ObDirectLoadConnection connection = ObDirectLoadManager.getConnectionBuilder()
221-
.setServerInfo(host, port)
222-
.setLoginInfo(tenantName, userName, password, dbName)
223-
.build();
224-
ObDirectLoadStatement statement = connection.getStatementBuilder()
225-
.setTableName(tableName)
226-
.build();
227-
statement.begin();
228-
ObDirectLoadStatementExecutionId statementExecutionId =
229-
statement.getExecutionId();
230-
byte[] executionIdBytes = statementExecutionId.encode();
231-
// Convert the execution id in byte[] form to string form so that it can be passed to the Flink-SQL job as a parameter.
232-
String executionId = java.util.Base64.getEncoder().encodeToString(executionIdBytes);
233-
System.out.println(executionId);
234-
235-
// 2. After obtaining the executionId, submit the Flink SQL job.
236-
237-
// 3. Enter the id of the Flink job submitted in the second step on the command line and wait for the Flink job to be completed.
238-
Scanner scanner = new Scanner((System.in));
239-
String flinkJobId = scanner.nextLine();
240-
241-
while (true) {
242-
// Loop to check the running status of Flink jobs, see: https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/rest_api/
243-
JSONObject jsonObject = JSON.parseObject(new URL("http://localhost:8081/jobs/" + flinkJobId));
244-
String status = jsonObject.getString("state");
245-
if ("FINISHED".equals(status)) {
246-
break;
247-
}
248-
Thread.sleep(3_000);
249-
}
250-
251-
// 4. After waiting for the Flink job execution to FINISHED, perform the final submission action of the direct-load task.
252-
statement.commit();
253-
254-
statement.close();
255-
connection.close();
256-
}
257-
}
148+
(3, 'Alice', 95);
258149
```
259150

260151
Once executed, the records should have been written to OceanBase.
261152

153+
**Note**: During the execution of the `INSERT` statement (while direct-load is in progress), the target table `t_sink` will be locked. Only SELECT queries are allowed; INSERT/UPDATE/DELETE operations are not permitted.
154+
262155
## Configuration
263156

264157
<div class="highlight">
@@ -400,22 +293,6 @@ Once executed, the records should have been written to OceanBase.
400293
</ul>
401294
</td>
402295
</tr>
403-
<tr>
404-
<td>enable-multi-node-write</td>
405-
<td>No</td>
406-
<td>No</td>
407-
<td>false</td>
408-
<td>Boolean</td>
409-
<td>Whether to enable direct-load that supports multi-node writing. Not enabled by default.</td>
410-
</tr>
411-
<tr>
412-
<td>execution-id</td>
413-
<td>No</td>
414-
<td>No</td>
415-
<td></td>
416-
<td>String</td>
417-
<td>The execution id of the direct-load task. This parameter is only valid when the <code>enable-multi-node-write</code> parameter is true.</td>
418-
</tr>
419296
</tbody>
420297
</table>
421298
</div>

0 commit comments

Comments
 (0)