Skip to content

Commit 6e36520

Browse files
committed
Document the Fluss Datastream Source
1 parent 2afb839 commit 6e36520

File tree

2 files changed

+281
-1
lines changed

2 files changed

+281
-1
lines changed

website/docs/apis/datastream.md

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
---
2+
title: "Datastream API"
3+
sidebar_position: 1
4+
---
5+
6+
<!--
7+
Copyright (c) 2025 Alibaba Group Holding Ltd.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
-->
21+
22+
# Fluss Datastream
23+
## Overview
24+
The Fluss Datastream API provides a Flink DataStream source implementation for reading data from Fluss tables. It allows you to seamlessly integrate Fluss tables with Flink's DataStream API, enabling you to process data from Fluss in your Flink applications.
25+
26+
Key features of the Fluss Datastream API include:
27+
* Reading from both primary key tables and log tables
28+
* Support for projection pushdown to select specific fields
29+
* Flexible offset initialization strategies
30+
* Custom deserialization schemas for converting Fluss records to your data types
31+
* Automatic handling of updates for primary key tables
32+
33+
## Dependency
34+
In order to use the Fluss Datastream API, you need to add the following dependency to your `pom.xml` file:
35+
36+
```xml
37+
<!-- https://mvnrepository.com/artifact/com.alibaba.fluss/fluss-client -->
38+
<dependency>
39+
<groupId>com.alibaba.fluss</groupId>
40+
<artifactId>fluss-datastream</artifactId>
41+
<version>0.7.0</version>
42+
</dependency>
43+
```
44+
45+
## Datastream Source
46+
### Initialization
47+
The main entry point for the Fluss Datastream API is the `FlussSource` class. You create a `FlussSource` instance using the builder pattern, which allows for step-by-step configuration of the source connector.
48+
49+
```java
50+
// Create a FlussSource using the builder pattern
51+
FlussSource<Order> flussSource = FlussSource.<Order>builder()
52+
.setBootstrapServers("localhost:9092")
53+
.setDatabase("mydb")
54+
.setTable("orders")
55+
.setProjectedFields("orderId", "amount")
56+
.setStartingOffsets(OffsetsInitializer.earliest())
57+
.setScanPartitionDiscoveryIntervalMs(1000L)
58+
.setDeserializationSchema(new OrderDeserializationSchema())
59+
.build();
60+
```
61+
62+
### Configuration Options
63+
The `FlussSourceBuilder` provides several methods for configuring the source connector:
64+
65+
#### Required Parameters
66+
* **setBootstrapServers(String bootstrapServers):** Sets the bootstrap servers for the Fluss source connection
67+
* **setDatabase(String database):** Sets the database name for the Fluss source
68+
* **setTable(String table):** Sets the table name for the Fluss source
69+
* **setDeserializationSchema(FlussDeserializationSchema&lt;T&gt; schema):** Sets the deserialization schema for converting Fluss records to output records
70+
71+
#### Optional Parameters
72+
* **setProjectedFields(String... projectedFieldNames):** Sets the fields to project from the table (if not specified, all fields are included)
73+
* **setScanPartitionDiscoveryIntervalMs(long intervalMs):** Sets the interval for discovering new partitions (default: from configuration)
74+
* **setStartingOffsets(OffsetsInitializer initializer):** Sets the strategy for determining starting offsets (default: `OffsetsInitializer.full()`)
75+
* **setFlussConfig(Configuration flussConf):** Sets custom Fluss configuration properties
76+
77+
### Offset Initializers
78+
The `OffsetsInitializer` interface provides several factory methods for creating different types of initializers:
79+
80+
* **OffsetsInitializer.earliest():** Initializes offsets to the earliest available offsets of each bucket
81+
* **OffsetsInitializer.latest():** Initializes offsets to the latest offsets of each bucket
82+
* **OffsetsInitializer.full():** Performs a full snapshot on the table upon first startup:
83+
* For log tables: reads from the earliest log offset (equivalent to earliest())
84+
* For primary key tables: reads the latest snapshot which materializes all changes on the table
85+
* **OffsetsInitializer.timestamp(long timestamp):** Initializes offsets based on a given timestamp
86+
87+
Example:
88+
```java
89+
// Start reading from the earliest available offsets
90+
FlussSource<Order> source = FlussSource.<Order>builder()
91+
.setStartingOffsets(OffsetsInitializer.earliest())
92+
// other configuration...
93+
.build();
94+
95+
// Start reading from the latest offsets
96+
FlussSource<Order> source = FlussSource.<Order>builder()
97+
.setStartingOffsets(OffsetsInitializer.latest())
98+
// other configuration...
99+
.build();
100+
101+
// Start reading from a specific timestamp
102+
FlussSource<Order> source = FlussSource.<Order>builder()
103+
.setStartingOffsets(OffsetsInitializer.timestamp(System.currentTimeMillis() - 3600 * 1000))
104+
// other configuration...
105+
.build();
106+
```
107+
108+
### Deserialization Schemas
109+
The `FlussDeserializationSchema` interface is used to convert Fluss records to your desired output type. Fluss provides some built-in implementations:
110+
111+
* **RowDataDeserializationSchema** - Converts Fluss records to Flink's `RowData` objects
112+
* **JsonStringDeserializationSchema** - Converts Fluss records to JSON strings
113+
114+
You can also implement your own deserialization schema by implementing the `FlussDeserializationSchema` interface:
115+
116+
```java
117+
public class OrderDeserializationSchema implements FlussDeserializationSchema<Order> {
118+
@Override
119+
public void open(InitializationContext context) throws Exception {
120+
// Initialization code if needed
121+
}
122+
123+
@Override
124+
public Order deserialize(LogRecord record) throws Exception {
125+
InternalRow row = record.getRow();
126+
127+
// Extract fields from the row
128+
long orderId = row.getLong(0);
129+
long itemId = row.getLong(1);
130+
int amount = row.getInt(2);
131+
String address = row.getString(3).toString();
132+
133+
// Create and return your custom object
134+
return new Order(orderId, itemId, amount, address);
135+
}
136+
137+
@Override
138+
public TypeInformation<Order> getProducedType(RowType rowSchema) {
139+
return TypeInformation.of(Order.class);
140+
}
141+
}
142+
```
143+
144+
### Examples
145+
146+
#### Reading from a Primary Key Table
147+
When reading from a primary key table, the Fluss Datastream API automatically handles updates to the data. For each update, it emits both the before and after versions of the record with the appropriate `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
148+
149+
```java
150+
// Create a FlussSource for a primary key table
151+
FlussSource<RowData> flussSource = FlussSource.<RowData>builder()
152+
.setBootstrapServers("localhost:9092")
153+
.setDatabase("mydb")
154+
.setTable("orders_pk")
155+
.setStartingOffsets(OffsetsInitializer.earliest())
156+
.setDeserializationSchema(new RowDataDeserializationSchema())
157+
.build();
158+
159+
// Create a DataStream from the FlussSource
160+
DataStreamSource<RowData> stream = env.fromSource(
161+
flussSource,
162+
WatermarkStrategy.noWatermarks(),
163+
"Fluss PK Source"
164+
);
165+
166+
// Process the stream to handle different row kinds
167+
// For INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE events
168+
```
169+
170+
#### Reading from a Log Table
171+
When reading from a log table, all records are emitted with `RowKind.INSERT` since log tables only support appends.
172+
173+
```java
174+
// Create a FlussSource for a log table
175+
FlussSource<RowData> flussSource = FlussSource.<RowData>builder()
176+
.setBootstrapServers("localhost:9092")
177+
.setDatabase("mydb")
178+
.setTable("orders_log")
179+
.setStartingOffsets(OffsetsInitializer.earliest())
180+
.setDeserializationSchema(new RowDataDeserializationSchema())
181+
.build();
182+
183+
// Create a DataStream from the FlussSource
184+
DataStreamSource<RowData> stream = env.fromSource(
185+
flussSource,
186+
WatermarkStrategy.noWatermarks(),
187+
"Fluss Log Source"
188+
);
189+
```
190+
191+
#### Using Projection Pushdown
192+
Projection pushdown allows you to select only the fields you need, which can improve performance by reducing the amount of data transferred.
193+
194+
```java
195+
// Create a FlussSource with projection pushdown
196+
FlussSource<OrderPartial> flussSource = FlussSource.<OrderPartial>builder()
197+
.setBootstrapServers("localhost:9092")
198+
.setDatabase("mydb")
199+
.setTable("orders")
200+
.setProjectedFields("orderId", "amount") // Only select these fields
201+
.setStartingOffsets(OffsetsInitializer.earliest())
202+
.setDeserializationSchema(new OrderPartialDeserializationSchema())
203+
.build();
204+
205+
// Create a DataStream from the FlussSource
206+
DataStreamSource<OrderPartial> stream = env.fromSource(
207+
flussSource,
208+
WatermarkStrategy.noWatermarks(),
209+
"Fluss Source with Projection"
210+
);
211+
```
212+
213+
In this example, `OrderPartial` is a class that only contains the `orderId` and `amount` fields, and `OrderPartialDeserializationSchema` is a deserialization schema that knows how to convert the projected fields to `OrderPartial` objects.
214+
215+
## Datastream Sink
216+
217+
### Serialization Schemas
218+
The `FlussSerializationSchema` interface is used to convert your data objects to Fluss's internal row format for writing to Fluss tables. Fluss provides built-in implementations:
219+
220+
* **RowDataSerializationSchema** - Converts Flink's `RowData` objects to Fluss rows
221+
* **JsonStringSerializationSchema** - Converts JSON strings to Fluss rows
222+
223+
The serialization schema is used when writing data to Fluss tables using the Fluss sink. When configuring a Fluss sink, you provide a serialization schema that converts your data objects to Fluss's internal row format. The serialization schema is set using the `setSerializer()` method on the sink builder.
224+
225+
You can implement your own serialization schema by implementing the `FlussSerializationSchema` interface:
226+
227+
```java
228+
public class OrderSerializationSchema implements FlussSerializationSchema<Order> {
229+
private RowType rowType;
230+
231+
@Override
232+
public void open(InitializationContext context) throws Exception {
233+
this.rowType = context.getRowSchema();
234+
235+
// Validate schema compatibility with Order class
236+
if (rowType.getFieldCount() < 4) {
237+
throw new IllegalStateException(
238+
"Schema must have at least 4 fields to serialize Order objects");
239+
}
240+
}
241+
242+
@Override
243+
public RowWithOp serialize(Order order) throws Exception {
244+
// Create a new row with the same number of fields as the schema
245+
GenericRow row = new GenericRow(rowType.getFieldCount());
246+
247+
// Set order fields directly
248+
row.setField(0, order.getOrderId());
249+
row.setField(1, order.getItemId());
250+
row.setField(2, order.getAmount());
251+
252+
// Convert String to BinaryString for Fluss internal representation
253+
String address = order.getAddress();
254+
if (address != null) {
255+
row.setField(3, BinaryString.fromString(address));
256+
} else {
257+
row.setField(3, null);
258+
}
259+
260+
// Return the row with an operation type (APPEND, UPSERT, DELETE)
261+
return new RowWithOp(row, OperationType.APPEND);
262+
}
263+
}
264+
```
265+
266+
The `RowDataSerializationSchema` provides additional configuration options:
267+
268+
* **isAppendOnly** - Whether the schema operates in append-only mode (only INSERT operations)
269+
* **ignoreDelete** - Whether to ignore DELETE and UPDATE_BEFORE operations
270+
271+
```java
272+
// Create a serialization schema for append-only operations
273+
RowDataSerializationSchema schema = new RowDataSerializationSchema(true, false);
274+
275+
// Create a serialization schema that handles all operation types
276+
RowDataSerializationSchema schema = new RowDataSerializationSchema(false, false);
277+
278+
// Create a serialization schema that ignores DELETE operations
279+
RowDataSerializationSchema schema = new RowDataSerializationSchema(false, true);
280+
```

website/docs/apis/java-client.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
title: "Java Client"
3-
sidebar_position: 1
3+
sidebar_position: 2
44
---
55

66
<!--

0 commit comments

Comments
 (0)