This repository contains the official Apache Flink Cloudberry connector.
While the standard Apache Flink JDBC Connector provides general-purpose database connectivity, this dedicated Cloudberry connector offers significant performance advantages for Cloudberry Database workloads:
PostgreSQL COPY Protocol for High-Performance Ingestion
In append mode, this connector leverages PostgreSQL's native COPY protocol for bulk data loading, delivering substantially better throughput compared to traditional JDBC batch processing.
This connector currently supports only Sink (data write) operations. Source (data read) operations are not supported.
This connector does not support Flink's exactly-once delivery guarantee.
In Flink's architecture, achieving exactly-once semantics typically requires the external database to support XA Transactions. However, Cloudberry Database does not currently support XA transactions (2-phase commit coordination of database transactions with external transactions), which means this connector cannot provide exactly-once semantics currently.
Prerequisites:
- Unix-like environment (we use Linux, Mac OS)
- Git
- Java 8 (Gradle wrapper will be downloaded automatically)
git clone https://github.com/cloudberry-contrib/flink-connector-cloudberry.git
cd flink-connector-cloudberry
./gradlew clean buildTo run only unit tests (fast):
./gradlew testTo run all tests (unit + integration):
./gradlew checkTo skip integration tests:
./gradlew build -x integrationTestThe resulting jars can be found in the build/libs directory.
Important: This is a core connector module. Users need to provide the PostgreSQL JDBC driver separately.
<dependencies>
<!-- Flink Cloudberry Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cloudberry</artifactId>
<version>0.5.0</version>
</dependency>
<!-- PostgreSQL JDBC Driver (Required!) -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
<scope>runtime</scope>
</dependency>
</dependencies>dependencies {
implementation 'org.apache.flink:flink-connector-cloudberry:0.5.0'
runtimeOnly 'org.postgresql:postgresql:42.7.3' // Required!
}-- Register Cloudberry sink table
CREATE TABLE cloudberry_sink (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'cloudberry',
'url' = 'jdbc:postgresql://localhost:5432/testdb',
'table-name' = 'users_output',
'username' = 'your_username',
'password' = 'your_password'
);Note: Make sure the PostgreSQL JDBC driver is in your classpath!
Use DataStream API when you need fine-grained control over data processing:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data stream and write to Cloudberry
env.fromElements(books)
.addSink(
JdbcSink.sink(
// SQL insert statement
"INSERT INTO books (id, title, author, price, quantity) VALUES (?, ?, ?, ?, ?)",
// Statement builder - binds Java objects to SQL parameters
(ps, book) -> {
ps.setInt(1, book.id);
ps.setString(2, book.title);
ps.setString(3, book.author);
ps.setDouble(4, book.price);
ps.setInt(5, book.quantity);
},
// Connection options
JdbcConnectionOptions.builder()
.setJdbcUrl("jdbc:postgresql://localhost:5432/testdb")
.setUsername("username")
.setPassword("password")
.setTable("books")
.build()));
env.execute("Insert Books");Use append mode for write-only scenarios without updates:
-- Create sink table
CREATE TABLE order_sink (
order_id INT,
user_name STRING,
amount DECIMAL(10,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'cloudberry',
'url' = 'jdbc:postgresql://localhost:5432/testdb',
'username' = 'username',
'password' = 'password',
'table-name' = 'user_orders'
);
Use upsert mode for aggregations and scenarios requiring updates:
-- Create sink table with primary key - enables upsert mode automatically
CREATE TABLE stats_sink (
user_id INT,
total_orders BIGINT,
total_amount DECIMAL(18,2),
last_update TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED -- Primary key enables upsert!
) WITH (
'connector' = 'cloudberry',
'url' = 'jdbc:postgresql://localhost:5432/testdb',
'username' = 'username',
'password' = 'password',
'table-name' = 'user_statistics'
);
Use COPY protocol for high-performance bulk data loading (much faster than standard INSERT):
-- Enable COPY protocol for faster bulk inserts
CREATE TABLE bulk_orders_sink (
order_id INT,
product_name STRING,
quantity INT,
total_price DECIMAL(10,2),
order_date TIMESTAMP(3)
) WITH (
'connector' = 'cloudberry',
'url' = 'jdbc:postgresql://localhost:5432/testdb',
'username' = 'username',
'password' = 'password',
'table-name' = 'bulk_orders',
'sink.use-copy-protocol' = 'true' -- Enable COPY protocol
);
Use batch mode for offline analytics and ETL jobs:
-- Create sink table
CREATE TABLE sales_sink (
product_name STRING,
sales_count BIGINT
) WITH (
'connector' = 'cloudberry',
'url' = 'jdbc:postgresql://localhost:5432/testdb',
'username' = 'username',
'password' = 'password',
'table-name' = 'product_sales'
);
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
connector |
Yes | - | String | Must be 'cloudberry' |
url |
Yes | - | String | JDBC connection URL (e.g., jdbc:postgresql://host:port/database) |
table-name |
Yes | - | String | Target table name in the database |
username |
No | - | String | Database username |
password |
No | - | String | Database password |
sink.use-copy-protocol |
No | false | Boolean | Enable PostgreSQL COPY protocol for high-performance bulk loading |
sink.buffer-flush.max-rows |
No | 1000 | Integer | Maximum number of rows to buffer before flushing |
sink.buffer-flush.interval |
No | 5s | Duration | Flush interval (e.g., '1s', '500ms') |
sink.max-retries |
No | 3 | Integer | Maximum number of retry attempts on failure |
sink.retry.interval |
No | 1s | Duration | Time interval between retry attempts |
- Use COPY protocol (
sink.use-copy-protocol = 'true') for bulk data loading - it's significantly faster than standard INSERT statements - Tune buffer settings - adjust
sink.buffer-flush.max-rowsandsink.buffer-flush.intervalbased on your throughput requirements - Use upsert mode wisely - only define PRIMARY KEY when you need update semantics, as it adds overhead
- Batch mode for analytics - use batch mode (
EnvironmentSettings.inBatchMode()) for offline ETL jobs
The documentation of Apache Flink is located on the website: https://flink.apache.org
or in the docs/ directory of the source code.
This is an active open-source project. We are always open to people who want to use the system or contribute to it.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
This project is based on Apache Flink JDBC Connector. We thank the Apache Flink community for their excellent work.