Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,82 @@ sidebar_position: 3

# Versioned Merge Engine

TODO: Fill me #459
The **Versioned Merge Engine** enables data updates based on version numbers or event timestamps. It ensures that only the row with the highest version number (or event timestamp) for a given primary key is retained. This mechanism is particularly useful for deduplicating or merging out-of-order data while guaranteeing eventual consistency with the upstream source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to suggest to add a statement to explain how the feature is useful just like first-row merge page does, maybe something like:

This feature is particularly valuable for replacing [Deduplication](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/deduplication/) transformations in streaming computations, reducing complexity and improving overall efficiency.

By setting `'table.merge-engine' = 'versioned'`, users can update data based on a configured version column. Updates are performed when the latest value of the specified field is greater than or equal to the stored value. If the incoming value is less than the stored value or is null, no update will occur.

This feature is especially valuable as a replacement for [Deduplication](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/deduplication/) transformations in streaming computations. It simplifies workflows, reduces complexity, and improves overall efficiency.


:::note
When using the `versioned` merge engine, keep the following limitations in mind:
- **`UPDATE` and `DELETE` statements are not supported.**
- **Partial updates are not supported.**
- **`UPDATE_BEFORE` and `DELETE` changelog events are ignored automatically.**
:::

### Version Column

The version column is a column in the table that stores the version number (or event timestamp) of the data record.
When enabling the versioned merge engine, the version column must be explicitly specified using the property:

```sql
'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = '<column_name>'
```

The version column can be of the following data types:
- `INT`
- `BIGINT`
- `TIMESTAMP`
- `TIMESTAMP(p)` (with precision)
- `TIMESTAMP_LTZ` (timestamp with local time zone)
- `TIMESTAMP_LTZ(p)` (timestamp with local time zone and precision)


## Example:

```sql title="Flink SQL"

CREATE TABLE VERSIONED (
a INT NOT NULL PRIMARY KEY NOT ENFORCED,
b STRING,
ts BIGINT
) WITH (
'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = 'ts'
);
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v1', 1000);

-- insert data with ts < 1000, no update will be made
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v2', 999);
SELECT * FROM VERSIONED;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v1 | 1000 |
-- +---+-----+------+


-- insert data with ts > 1000, update will be made
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v3', 2000);
SELECT * FROM VERSIONED;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v3 | 2000 |
-- +---+-----+------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add -- +---+-----+------+ in table end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add the bottom line to align with the style of above content

-- +---+-----+------+
-- | a | b   | ts   |
-- +---+-----+------+
-- | 1 | v2  | 2000 |
-- +---+-----+------+

-- insert data with ts = null, no update will be made
INSERT INTO VERSIONED (a, b, ts) VALUES (1, 'v4', null);
SELECT * FROM VERSIONED;
-- Output
-- +---+-----+------+
-- | a | b | ts |
-- +---+-----+------+
-- | 1 | v3 | 2000 |
-- +---+-----+------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

```