Fluss is a streaming storage built for real-time analytics which can serve as the real-time data layer for Lakehouse architectures. It bridges the gap between streaming data and the data Lakehouse by enabling low-latency, high-throughput data ingestion and processing while seamlessly integrating with popular compute engines.
It's an unofficial experimental sample Apache Flink DataStream/SQL application that demonstrates real-time data processing using Apache Fluss as both source and sink. Enabling developers to explore Fluss within Flink ecosystems.
This project shows how to:
- Read data from a Fluss table using DataStream/SQL API
- Process data in real-time with transformations
- Write processed results back to another Fluss table
- Handle both primary key tables and log tables in Fluss
- Java 11 or higher
- Maven 3.6+
- Apache Flink 1.18+ (for DataStream/SQL API compatibility)
- Git
# Clone and build Fluss
git clone https://github.com/alibaba/fluss.git
cd fluss
mvn clean install -DskipTests
# Extract the built distribution
cd fluss-dist/target
tar -xzf fluss-*-bin.tgz
# Set FLUSS_HOME environment variable
export FLUSS_HOME=$(pwd)/fluss-0.7-SNAPSHOT
# Verify installation
echo $FLUSS_HOME
ls -la $FLUSS_HOME/bin/# Start local cluster (recommended for testing)
$FLUSS_HOME/bin/local-cluster.sh start
# Alternative: Start services individually
# $FLUSS_HOME/bin/coordinator-server.sh start
# sleep 5
# $FLUSS_HOME/bin/tablet-server.sh start
# Verify services are running
jps | grep -E "(CoordinatorServer|TabletServer)"# Install flink locally via SDK-MAN
curl -s "https://get.sdkman.io" | bash
sdk install flink
# Check Flink version (must be 1.18+)
$FLINK_HOME/bin/flink --version
# If using older version, download Flink 1.18.1:
# wget https://downloads.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
# tar -xzf flink-1.18.1-bin-scala_2.12.tgz
# export FLINK_HOME=$(pwd)/flink-1.18.1# Stop Flink if running
$FLINK_HOME/bin/stop-cluster.sh
# Copy Fluss connector JAR to Flink lib directory
cp $FLUSS_HOME/../fluss-flink/fluss-flink-1.18/target/fluss-flink-1.18-0.7-SNAPSHOT.jar $FLINK_HOME/lib/
# Verify JAR installation
ls -la $FLINK_HOME/lib/ | grep fluss
# Start Flink cluster
$FLINK_HOME/bin/start-cluster.sh$FLINK_HOME/bin/sql-client.sh-- Create Fluss catalog
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'localhost:9123'
);
-- Switch to Fluss catalog
USE CATALOG fluss_catalog;
-- Verify catalog
SHOW CURRENT CATALOG;
-- Create source table (Log Table)
CREATE TABLE Fluss_A (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
) WITH (
'bucket.num' = '4'
);
-- Create sink table (Log Table)
CREATE TABLE Fluss_B (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
processed_time BIGINT
) WITH (
'bucket.num' = '4'
);
-- Insert test data
INSERT INTO Fluss_A VALUES
(1, 'Jark', 45, 95.5),
(2, 'Giannis', 52, 87.2),
(3, 'Mehul', 60, 92.8);
-- Switch to tableau mode for better formatting and column headers
SET 'sql-client.execution.result-mode' = 'tableau';
-- Verify data
SELECT * FROM Fluss_A;
-- Check the table schema
DESCRIBE Fluss_A;
## For Upsert/PK Mode
-- Create source PrimaryKey table Fluss_PK_A
CREATE TABLE Fluss_PK_A (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
-- Create sink PrimaryKey table Fluss_PK_B
CREATE TABLE Fluss_PK_B (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
processed_time BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
-- For Multi table union stream sink
CREATE TABLE Fluss_PK_Target (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
processed_time BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
-- Insert test data with updates to test UPSERT behavior
INSERT INTO Fluss_PK_A VALUES
(1, 'Jark', 45, 95.5),
(2, 'Giannis', 52, 87.2),
(3, 'Mehul', 60, 92.8);
-- Update works in batch mode
SET 'execution.runtime-mode' = 'batch';
-- Update some records to generate changelogs
UPDATE Fluss_PK_A SET score = 99.0 WHERE id = 1;
UPDATE Fluss_PK_A SET age = 53 WHERE id = 2;
-- To revert
SET 'execution.runtime-mode' = 'streaming';
git clone <your-repo-url>
cd Flink-Fluss-Quickstart-Examplesmvn clean compile# Run from IDE (IntelliJ IDEA)
# Or run via Maven
mvn exec:java -Dexec.mainClass="com.example.FlussDataStreamApp"src/main/java/com/
βββ π¦ common/ # Shared components
βββ π¦ scanModes/ # Offset initialization strategies
βββ π¦ projection/ # Column pruning tests
βββ π¦ exampleAppend/ # Log table operations
βββ π¦ exampleUpsert/ # Primary key table operations
βββ π¦ exampleDelete/ # Deletion scenarios
βββ π¦ exampleRowData/ # Built-in schemas
βββ π¦ differentClientConfigs/ # Client configuration tests
βββ π¦ multiTable/ # Multi-source/sink processing
βββ π¦ recoveryModes/ # Fault tolerance & recovery
βββ π¦ errorHandling/ # Error handling & validation
| Job Class | Purpose | Fluss API | Use Case |
|---|---|---|---|
FlussFullModeTest |
Read snapshot + changelogs | OffsetsInitializer.full() |
Initial load + real-time |
FlussLatestModeTest |
Read new changes only | OffsetsInitializer.latest() |
Real-time processing |
FlussEarliestModeTest |
Read all historical changes | OffsetsInitializer.earliest() |
Data replay/recovery |
Example Output Differences:
- Full Mode: Shows all existing data first, then new changes
- Latest Mode: Shows only changes after job starts
- Earliest Mode: Shows historical changelog events in order
- Timestamp Mode: Shows changes from specified timestamp
| Job Class | Purpose | Fluss API | Benefit |
|---|---|---|---|
FlussProjectedFieldsTest |
Column pruning | setProjectedFields() |
Reduced I/O, memory usage |
Key Feature: Only reads specified columns, others appear as null
| Job Class | Operation | Table Type | Use Case |
|---|---|---|---|
FlussDataStreamApp |
APPEND | Log Table | Event logging, metrics |
| Job Class | Operation | Table Type | Use Case |
|---|---|---|---|
FlussDataStreamPKApp |
UPSERT | PK Table | Business entities, dimensions |
| Job Class | Operation | Scenario | Real-World Example |
|---|---|---|---|
FlussDeleteTest |
DELETE | General deletion | Data cleanup |
| Job Class | Configuration Area | Options Tested |
|---|---|---|
FlussCustomOptionsTest |
Writer optimization | batch-size, batch-timeout, buffer.memory-size |
FlussShuffleControlTest |
Data distribution | setShuffleByBucketId() |
| Job Class | Schema Type | Benefit |
|---|---|---|
FlussBuiltinSchemasTest |
RowData native schemas | Better performance |
| Job Class | Pattern | Use Case |
|---|---|---|
FlussMultiTableTest |
Union β Route β Multiple sinks | Complex data pipelines |
| Job Class | Focus Area | Testing Aspect |
|---|---|---|
FlussCheckpointRecoveryTest |
State management | Exactly-once processing |
graph LR
A[Setup Tables] --> B[Run Scan Mode Tests]
B --> C[Test Write Operations]
C --> D[Verify Results]
graph TD
A[Basic Tests] --> B[Configuration Tests]
B --> C[Error Handling Tests]
C --> D[Performance Tests]
D --> E[Recovery Tests]
| Feature | Log Table | PK Table | Status |
|---|---|---|---|
| Append Operations | β | β | Complete |
| Upsert Operations | β | β | Complete |
| Delete Operations | β | β | Complete |
| Full Mode Reading | β | β | Complete |
| Latest Mode Reading | β | β | Complete |
| Projection | β | β | Complete |
| Custom Configs | β | β | Complete |
| Error Handling | β | β | Complete |
- Create package for new functionality area
- Follow naming convention:
Fluss[Feature]Test - Include comprehensive logging for verification
- Add error handling for production readiness
- Update documentation with new examples
com.[functionality]/
βββ Fluss[Feature]Test.java # Main test class
βββ [Feature]SerializationSchema.java # Custom serializer if needed
βββ [Feature]DeserializationSchema.java # Custom deserializer if needed
βββ [Feature]Helper.java # Utility classes if needed
This organization provides clear separation of concerns and makes it easy to find and run specific types of tests for different Fluss DataStream API features.
Access the Flink Web UI at http://localhost:8081 to:
- Monitor running jobs and their status
- View job execution graphs and metrics
- Check checkpoint status and performance
- Review task manager resources
- Access job logs and exception details
# Submit a job
$FLINK_HOME/bin/flink run \
-c (classPath) \
(jarPath)
# Example for submitting a job
$FLINK_HOME/bin/flink run \
-c com.example.FlussDataStreamApp \
target/Flink-Fluss-Quickstart-1.0-SNAPSHOT.jar
# List running jobs
$FLINK_HOME/bin/flink list
# Cancel a job (replace <job-id> with actual job ID)
$FLINK_HOME/bin/flink cancel <job-id>
# Stop a job with savepoint
$FLINK_HOME/bin/flink stop <job-id>
# Get job status
$FLINK_HOME/bin/flink info <job-id>
- Source: Reads from
Fluss_Atable usingFlussSourceDatastreamApi/Sql - Processing:
- Adds processing timestamp
- Increases score by 5 for people over 25
- Logs processing details
- Sink: Writes processed data to
Fluss_Btable usingFlussSinkDatastreamApi/Sql
- Checkpointing: Enabled with 5-second intervals for exactly-once semantics
- Parallelism: Set to 1 for simplicity
- Offset Strategy: Reads from earliest available data
- Watermark Strategy: No watermarks (for simplicity)
- NoClassDefFoundError: Ensure all Flink dependencies are included
- Connection refused: Verify Fluss cluster is running on localhost:9123
- No data processing: Check if test data exists in
Fluss_Atable
-
Check Fluss cluster status:
jps | grep -E "(CoordinatorServer|TabletServer)"
-
Verify table data:
SELECT * FROM fluss_catalog.default.Fluss_A;
-
Check application logs for detailed error messages
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
[Add your license here]
For issues and questions:
- Create an issue in this repository
- Check the Fluss project documentation
- Consult Flink community resources