-
Notifications
You must be signed in to change notification settings - Fork 458
Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
0.8.0 (latest release)
Please describe the bug 🐞
I'm trying to create a simple PoC that will take data from a PostgreSQL table using Flink CDC into a Fluss table and enable streaming into Paimon. This is all local for now.
Paimon uses Garage (S3 API) for the warehouse a PostgreSQL as the catalog.
This is my initial job in Flink:
FlinkSQL Job for CDC into Fluss table
-- Run streaming job + checkpointing for EOS
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 1) Fluss catalog (point to coordinator+tablet; Fluss supports comma-separated bootstrap servers) :contentReference[oaicite:3]{index=3}
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = '192.168.1.4:9123,192.168.1.4:9124'
);
USE CATALOG fluss_catalog;
CREATE DATABASE IF NOT EXISTS osb_staging;
USE osb_staging;
-- DROP TABLE IF EXISTS tickets_staging;
-- 2) Fluss staging table (append-only log table)
CREATE TABLE IF NOT EXISTS tickets_staging (
ticket_id bigint,
movie_id bigint,
user_id bigint,
cost DECIMAL(10, 2),
purchased_at timestamp(3),
PRIMARY KEY (ticket_id) NOT ENFORCED
)
WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '30s'
);
-- 3) Postgres CDC source (Flink CDC SQL connector)
-- The connector options shown here are the documented ones. :contentReference[oaicite:4]{index=4}
CREATE TEMPORARY TABLE pg_osb_tickets (
ticket_id BIGINT,
movie_id BIGINT,
user_id BIGINT,
cost DECIMAL(10,2),
purchased_at TIMESTAMP(3),
PRIMARY KEY (ticket_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.1.4',
'port' = '5432',
'username' = 'root',
'password' = 'root',
'database-name' = 'source_db',
'schema-name' = 'osb',
'table-name' = 'tickets',
'slot.name' = 'cdc_osb_tickets_to_fluss',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true'
);
-- 4) Start the replication stream into Fluss
INSERT INTO tickets_staging
SELECT
ticket_id,
movie_id,
user_id,
cost,
purchased_at
FROM
pg_osb_tickets;
Then, I start the tiering service like this:
docker exec flink-jobmanager /opt/flink/bin/flink run \\
-Dpipeline.name=\"Fluss Tiering Service\" \\
-Dparallelism.default=2 \\
-Dstate.checkpoints.dir=\"s3://warehouse/checkpoints/tiering\" \\
-Ds3.multiobjectdelete.enable=false \\
/opt/flink/lib/fluss-flink-tiering-0.8.0-incubating.jar \\
--fluss.bootstrap.servers 192.168.1.4:9123 \\
--datalake.format paimon \\
--datalake.paimon.metastore jdbc \\
--datalake.paimon.uri \"jdbc:postgresql://192.168.1.4:5433/paimon_catalog\" \\
--datalake.paimon.jdbc.user root \\
--datalake.paimon.jdbc.password root \\
--datalake.paimon.catalog-key paimon_catalog \\
--datalake.paimon.warehouse \"s3://warehouse/paimon\" \\
--datalake.paimon.s3.endpoint \"http://192.168.1.4:3900\" \\
--datalake.paimon.s3.access-key \"${GARAGE_ACCESS_KEY}\" \\
--datalake.paimon.s3.secret-key \"${GARAGE_SECRET_KEY}\" \\
--datalake.paimon.s3.path.style.access true
So far, it almost works. I can see data real time, and when I go the bucket, I see that there are three types of directories for the table: bucket directories (3 or 4), one schema directory and one manifest directory. But no snapshots! Because of this, it doesn't matter how much I wait, I can't get data from a SELECT * FROM tickets_staging$lake.
Solution
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
No labels