Skip to content

Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / RowDataEvolver / TableUpdateOperator #13277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 11, 2025

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Jun 8, 2025

This change adds the following components for the Flink Dynamic Iceberg Sink:

HashKeyGenerator

A hash key generator which will be used in DynamicIcebergSink class (next PR) to implement one of Iceberg's DistributionModes (NONE, HASH, RANGE).

The HashKeyGenerator is responsible for creating the appropriate hash key for Flink's keyBy operation. The hash key is generated depending on the user-provided DynamicRecord and the table metadata. Under the hood, we maintain a set of Flink KeySelectors which implement the appropriate Iceberg DistributionMode. For every table, we randomly select a consistent subset of writer subtasks which receive data via their associated keys, depending on the chosen DistributionMode.

Caching ensures that a new key selector is also created when the table metadata (e.g. schema, spec) or the user-provided metadata changes (e.g. distribution mode, write parallelism).

RowDataEvolver

RowDataEvolver is responsible to change the input RowData to make it compatible with the target schema. This is done when:

  1. The input schema has fewer fields than the target schema.
  2. The table types are wider than the input type.
  3. The field order differs for source and target schema.

The resolution is as follows:

In the first case, we would add a null values for the missing field (if the field is optional). In the second case, we would convert the data for the input field to a wider type, e.g. int (input type) => long (table type). In the third case, we would rearrange the input data to match the target table.

DynamicUpdateOperator

A dedicated operator to updating the schema / spec for the table associated with a DynamicRecord.

…TableUpdateOperator

This change adds the following components for the Flink Dynamic Iceberg Sink:

*** HashKeyGenerator

A hash key generator which will be used in DynamicIcebergSink class (next PR) to
implement one of Iceberg's DistributionModes (NONE, HASH, RANGE).

The HashKeyGenerator is responsible for creating the appropriate hash key for Flink's keyBy
operation. The hash key is generated depending on the user-provided DynamicRecord and the table
metadata. Under the hood, we maintain a set of Flink {@link KeySelector}s which implement the
appropriate Iceberg {@link DistributionMode}. For every table, we randomly select a consistent
subset of writer subtasks which receive data via their associated keys, depending on the chosen
DistributionMode.

Caching ensures that a new key selector is also created when the table metadata (e.g. schema,
spec) or the user-provided metadata changes (e.g. distribution mode, write
parallelism).

*** RowDataEvolver

RowDataEvolver is responsible to change the input RowData to make it compatible
with the target schema. This is done when:

1. The input schema has fewer fields than the target schema.
2. The table types are wider than the input type.
3. The field order differs for source and target schema.

The resolution is as follows:

In the first case, we would add a null values for the missing field (if the field is optional).
In the second case, we would convert the data for the input field to a wider type, e.g. int (input type) => long (table type).
In the third case, we would rearrange the input data to match the target table.

*** DynamicUpdateOperator

A dedicated operator to updating the schema / spec for the table associated with
a DynamicRecord.

private DynamicSinkUtil() {}

static List<Integer> getEqualityFieldIds(List<String> equalityFields, Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit strange to me to ignore the empty equalityFields.
Maybe a javadoc to highlight this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by ignoring empty equalityFields? There may be none defined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, if the equalityFields is empty, then we fall back to the Schema defined stuff.
I feel that if (equalityFields==null) then Schema is natural, but handling empty list as not set is a bit strange.
Minimally we need to document it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have at least a javadoc for this?

Catalog catalog = catalogLoader.loadCatalog();
this.updater =
new TableUpdater(
new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs), catalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we share the TableMetadataCache in a static way, so it is shared between the operator instances?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, possible. Let me look into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to defer this change to a follow-up. There are some challenges with regard to concurrent get / put operations on the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}
}

case RANGE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the RANGE distribution mode work with the dyntable sink?
Range needed statistics collection and complicated infrastructure to work. It relied on Partitioners to direct the records to the correct writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we fall back to HASH. Adding support for RANGE in the Dynamic Sink will be a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - leave a comment/TODO

private final Integer specId;
private final Schema schema;
private final PartitionSpec spec;
private final List<String> equalityFields;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a Set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. Need to double-check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, we are always using List for equality fields (IcebergSink V1 / V2). It doesn't seem though that this is required. Updated everywhere. See 0412697.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only a single place where this touches a public API.
Maybe for backward compatibility we could use Collection there. Or we just deprecate the old methods, add a new one (only ones which are currently used)
Seems like a good change to me, but I would ask @stevenzwu. He might know more why the equalityFields are stored in a List instead of a Set.

@@ -60,7 +60,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public API, since we don't have annotation on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I've reverted this change in favor of an internal conversion. See 05fd580

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending tests

@pvary pvary merged commit 76972ef into apache:main Jun 11, 2025
18 checks passed
@pvary
Copy link
Contributor

pvary commented Jun 11, 2025

Merged to main.
Thanks for the PR @mxm!

@mxm
Copy link
Contributor Author

mxm commented Jun 11, 2025

Thank for reviewing / merging @pvary 🙏

@mxm
Copy link
Contributor Author

mxm commented Jun 11, 2025

I'll prepare the backports.

mxm added a commit to mxm/iceberg that referenced this pull request Jun 12, 2025
mxm added a commit to mxm/iceberg that referenced this pull request Jun 12, 2025
pvary pushed a commit that referenced this pull request Jun 12, 2025
…volver / TableUpdateOperator to Flink 1.19 / 1.20 (#13303)

backports #13277
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants