Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 15 additions & 22 deletions docs/content/docs/connectors/datastream/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ Please consult your database documentation on how to add the corresponding drive

## JDBC Source

Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSource.html" name="JdbcSource javadoc" >}}
and {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.html" name="JdbcSourceBuilder javadoc" >}}).

### Usage

{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd41" >}}
Expand Down Expand Up @@ -128,7 +125,6 @@ ResultExtractor resultExtractor = new ResultExtractor() {
### JdbcParameterValuesProvider

A provider to provide parameters in sql to fulfill actual value in the corresponding placeholders, which is in the form of two-dimension array.
See {{< javadoc file="org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.html" name="JdbcParameterValuesProvider javadoc" >}} for more details.

```java

Expand Down Expand Up @@ -178,8 +174,6 @@ jdbcSourceBuilder =
JdbcSource source = jdbcSourceBuilder.build();
```

See {{< javadoc file="org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.html" name="ContinuousUnBoundingSettings javadoc" >}} for more details.

### Full example

{{< tabs "4ab65f13-608a-411a-8d24-e303f348ds81" >}}
Expand Down Expand Up @@ -233,7 +227,7 @@ Still not supported in Python API.

The JDBC sink provides at-least-once guarantee.
Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}).
Configuration goes as follows:

{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd4c" >}}
{{< tab "Java" >}}
Expand Down Expand Up @@ -274,7 +268,7 @@ It then repeatedly calls a user-provided function to update that prepared statem

### JDBC execution options

The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})
The SQL DML statements are executed in batches, which can optionally be configured with the following instance:

{{< tabs "4ab65f13-607a-411a-8d24-e709f512ed6k" >}}
{{< tab "Java" >}}
Expand All @@ -300,13 +294,12 @@ JdbcExecutionOptions.builder() \
A JDBC batch is executed as soon as one of the following conditions is true:

* the configured batch interval time is elapsed
* the maximum batch size is reached
* the maximum batch size is reached
* a Flink checkpoint has started

### JDBC connection parameters

The connection to the database is configured with a `JdbcConnectionOptions` instance.
Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="org/apache/flink/connector/jdbc/JdbcConnectionOptions.html" >}} for details
The connection to the database is configured with a `JdbcConnectionOptions` instance.

### Full example

Expand Down Expand Up @@ -397,14 +390,14 @@ env.execute()

## `JdbcSink.exactlyOnceSink`

Since 1.13, Flink JDBC sink supports exactly-once mode.
The implementation relies on the JDBC driver support of XA
Since 1.13, Flink JDBC sink supports exactly-once mode.
The implementation relies on the JDBC driver support of XA
[standard](https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf).
Most drivers support XA if the database also supports XA (so the driver is usually the same).

To use it, create a sink using `exactlyOnceSink()` method as above and additionally provide:
- {{< javadoc name="exactly-once options" file="org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html" >}}
- {{< javadoc name="execution options" file="org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}}
- `JdbcExactlyOnceOptions`
- `JdbcExecutionOptions`
- [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier

For example:
Expand All @@ -416,26 +409,26 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
env
.fromElements(...)
.addSink(JdbcSink.exactlyOnceSink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
JdbcExecutionOptions.builder()
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.defaults(),
() -> {
// create a driver-specific XA DataSource
// The following example is for derby
EmbeddedXADataSource ds = new EmbeddedXADataSource();
// create a driver-specific XA DataSource
// The following example is for derby
EmbeddedXADataSource ds = new EmbeddedXADataSource();
ds.setDatabaseName("my_db");
return ds;
});
env.execute();
env.execute();
```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down