From d4a3e67760819e5e97172672920493bbd3a144fe Mon Sep 17 00:00:00 2001 From: FCAgent Date: Sat, 30 May 2026 11:37:17 +0200 Subject: [PATCH] Add lineage support to Kinesis Streams connector Implements LineageVertexProvider interface on KinesisStreamsSource and KinesisStreamsSink to enable automatic lineage extraction in Flink 2.x. The LineageGraph API in Flink 2.0+ allows the table planner and OpenLineage integration to automatically discover input/output datasets with their metadata. This change enables lineage for both DataStream and SQL/Table API usage of the Kinesis connector. New classes: - KinesisLineageUtil: namespace/name extraction from stream ARN - KinesisDatasetFacet: stream ARN, name, region metadata - TypeDatasetFacet: schema information via TypeInformation Namespace format uses full ARN prefix for governance specificity: arn:{partition}:kinesis:{region}:{account}:stream Dataset name is the stream name extracted from the ARN. Covers DataStream API (direct), SQL/Table API (via KinesisDynamicSource which internally creates KinesisStreamsSource), and the sink path. --- .../kinesis/lineage/KinesisDatasetFacet.java | 81 ++++++++++ .../kinesis/lineage/KinesisLineageUtil.java | 97 ++++++++++++ .../kinesis/lineage/TypeDatasetFacet.java | 69 +++++++++ .../kinesis/sink/KinesisStreamsSink.java | 23 ++- .../kinesis/source/KinesisStreamsSource.java | 13 +- .../kinesis/lineage/KinesisLineageTest.java | 146 ++++++++++++++++++ 6 files changed, 427 insertions(+), 2 deletions(-) create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisDatasetFacet.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageUtil.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/TypeDatasetFacet.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageTest.java diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisDatasetFacet.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisDatasetFacet.java new file mode 100644 index 00000000..ee2ae862 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisDatasetFacet.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Objects; + +/** + * Dataset facet containing Kinesis-specific metadata for lineage reporting. + * + *

Includes the stream ARN, region, and stream name for full identification. + */ +@PublicEvolving +public class KinesisDatasetFacet implements LineageDatasetFacet { + + public static final String KINESIS_FACET_NAME = "kinesis"; + + private final String streamArn; + private final String streamName; + private final String region; + + public KinesisDatasetFacet(String streamArn, String streamName, String region) { + this.streamArn = streamArn; + this.streamName = streamName; + this.region = region; + } + + public String getStreamArn() { + return streamArn; + } + + public String getStreamName() { + return streamName; + } + + public String getRegion() { + return region; + } + + @Override + public String name() { + return KINESIS_FACET_NAME; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KinesisDatasetFacet that = (KinesisDatasetFacet) o; + return Objects.equals(streamArn, that.streamArn) + && Objects.equals(streamName, that.streamName) + && Objects.equals(region, that.region); + } + + @Override + public int hashCode() { + return Objects.hash(streamArn, streamName, region); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageUtil.java new file mode 100644 index 00000000..f74b8085 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageUtil.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; +import org.apache.flink.streaming.api.lineage.DefaultLineageVertex; +import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import software.amazon.awssdk.arns.Arn; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Utility class for constructing lineage datasets and vertices for Kinesis Streams. */ +@Internal +public class KinesisLineageUtil { + + /** Constructs the namespace for a Kinesis stream from its ARN. */ + public static String namespaceOf(String streamArn) { + // Use the ARN prefix up to the resource as namespace + // e.g., arn:aws:kinesis:us-west-2:123456789012:stream + Arn arn = Arn.fromString(streamArn); + return String.format( + "arn:%s:kinesis:%s:%s:stream", + arn.partition(), arn.region().orElse(""), arn.accountId().orElse("")); + } + + /** Extracts the stream name from a Kinesis stream ARN. */ + public static String streamNameOf(String streamArn) { + Arn arn = Arn.fromString(streamArn); + return arn.resource().resource(); + } + + public static LineageDataset datasetOf(String streamArn) { + return new DefaultLineageDataset( + streamNameOf(streamArn), namespaceOf(streamArn), Collections.emptyMap()); + } + + public static LineageDataset datasetOf( + String streamArn, Map facets) { + return new DefaultLineageDataset(streamNameOf(streamArn), namespaceOf(streamArn), facets); + } + + public static LineageDataset datasetOf( + String streamArn, KinesisDatasetFacet kinesisDatasetFacet) { + Map facets = new HashMap<>(); + facets.put(KinesisDatasetFacet.KINESIS_FACET_NAME, kinesisDatasetFacet); + return new DefaultLineageDataset(streamNameOf(streamArn), namespaceOf(streamArn), facets); + } + + public static LineageDataset datasetOf( + String streamArn, + KinesisDatasetFacet kinesisDatasetFacet, + TypeDatasetFacet typeDatasetFacet) { + Map facets = new HashMap<>(); + facets.put(KinesisDatasetFacet.KINESIS_FACET_NAME, kinesisDatasetFacet); + facets.put(TypeDatasetFacet.TYPE_FACET_NAME, typeDatasetFacet); + return new DefaultLineageDataset(streamNameOf(streamArn), namespaceOf(streamArn), facets); + } + + public static SourceLineageVertex sourceLineageVertexOf(Collection datasets) { + DefaultSourceLineageVertex vertex = + new DefaultSourceLineageVertex( + org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED); + datasets.forEach(vertex::addDataset); + return vertex; + } + + public static LineageVertex sinkLineageVertexOf(Collection datasets) { + DefaultLineageVertex vertex = new DefaultLineageVertex(); + datasets.forEach(vertex::addLineageDataset); + return vertex; + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/TypeDatasetFacet.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/TypeDatasetFacet.java new file mode 100644 index 00000000..1753aaaf --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/lineage/TypeDatasetFacet.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Objects; + +/** + * Dataset facet containing type/schema information for lineage reporting. + * + *

Wraps the Flink {@link TypeInformation} which contains column names and types when available + * (e.g., from RowType schemas). + */ +@PublicEvolving +public class TypeDatasetFacet implements LineageDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public TypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TypeDatasetFacet that = (TypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java index 22ccb9d0..90bf38b5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java @@ -67,7 +67,8 @@ * @param Type of the elements handled by this sink */ @PublicEvolving -public class KinesisStreamsSink extends AsyncSinkBase { +public class KinesisStreamsSink extends AsyncSinkBase + implements org.apache.flink.streaming.api.lineage.LineageVertexProvider { private final boolean failOnError; private final String streamName; @@ -174,4 +175,24 @@ public StatefulSinkWriter> kinesisClientProperties, recoveredState); } + + // ---- Lineage support ---- + + @Override + public org.apache.flink.streaming.api.lineage.LineageVertex getLineageVertex() { + if (streamArn != null) { + return org.apache.flink.connector.kinesis.lineage.KinesisLineageUtil + .sinkLineageVertexOf( + java.util.Collections.singletonList( + org.apache.flink.connector.kinesis.lineage.KinesisLineageUtil + .datasetOf(streamArn))); + } + // Fallback when only streamName is provided (no ARN) + return org.apache.flink.connector.kinesis.lineage.KinesisLineageUtil.sinkLineageVertexOf( + java.util.Collections.singletonList( + new org.apache.flink.streaming.api.lineage.DefaultLineageDataset( + streamName, + "kinesis://unknown-region", + java.util.Collections.emptyMap()))); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 69c80653..d4b510b9 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -105,7 +105,8 @@ */ @Experimental public class KinesisStreamsSource - implements Source { + implements Source, + org.apache.flink.streaming.api.lineage.LineageVertexProvider { private final String streamArn; private final Configuration sourceConfig; @@ -354,4 +355,14 @@ private StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder( .retryOnExceptionOrCauseInstanceOf(LimitExceededException.class) .maxAttempts(maxAttempts); } + + // ---- Lineage support ---- + + @Override + public org.apache.flink.streaming.api.lineage.SourceLineageVertex getLineageVertex() { + return org.apache.flink.connector.kinesis.lineage.KinesisLineageUtil.sourceLineageVertexOf( + java.util.Collections.singletonList( + org.apache.flink.connector.kinesis.lineage.KinesisLineageUtil.datasetOf( + streamArn))); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageTest.java new file mode 100644 index 00000000..a1222367 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/lineage/KinesisLineageTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.lineage; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; +import org.apache.flink.connector.kinesis.source.KinesisStreamsSource; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for Kinesis Streams lineage support. */ +class KinesisLineageTest { + + private static final String TEST_STREAM_ARN = + "arn:aws:kinesis:us-east-1:123456789012:stream/my-test-stream"; + + @Test + void testSourceLineageVertex() { + KinesisStreamsSource source = + KinesisStreamsSource.builder() + .setStreamArn(TEST_STREAM_ARN) + .setDeserializationSchema(new SimpleStringSchema()) + .build(); + + assertThat(source).isInstanceOf(LineageVertexProvider.class); + + LineageVertex vertex = ((LineageVertexProvider) source).getLineageVertex(); + assertThat(vertex).isNotNull(); + + List datasets = vertex.datasets(); + assertThat(datasets).hasSize(1); + + LineageDataset dataset = datasets.get(0); + assertThat(dataset.name()).isEqualTo("my-test-stream"); + assertThat(dataset.namespace()).isEqualTo("arn:aws:kinesis:us-east-1:123456789012:stream"); + } + + @Test + void testSourceLineageDatasetHasEmptyFacets() { + KinesisStreamsSource source = + KinesisStreamsSource.builder() + .setStreamArn(TEST_STREAM_ARN) + .setDeserializationSchema(new SimpleStringSchema()) + .build(); + + LineageVertex vertex = ((LineageVertexProvider) source).getLineageVertex(); + LineageDataset dataset = vertex.datasets().get(0); + + // Facets are added by the table planner / OL module, not the connector + assertThat(dataset.facets()).isEmpty(); + } + + @Test + void testSinkLineageVertexWithArn() { + KinesisStreamsSink sink = + KinesisStreamsSink.builder() + .setStreamArn(TEST_STREAM_ARN) + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> "key") + .build(); + + assertThat(sink).isInstanceOf(LineageVertexProvider.class); + + LineageVertex vertex = ((LineageVertexProvider) sink).getLineageVertex(); + assertThat(vertex).isNotNull(); + + List datasets = vertex.datasets(); + assertThat(datasets).hasSize(1); + + LineageDataset dataset = datasets.get(0); + assertThat(dataset.name()).isEqualTo("my-test-stream"); + assertThat(dataset.namespace()).isEqualTo("arn:aws:kinesis:us-east-1:123456789012:stream"); + } + + @Test + void testSinkLineageVertexWithNameOnly() { + KinesisStreamsSink sink = + KinesisStreamsSink.builder() + .setStreamName("name-only-stream") + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> "key") + .build(); + + LineageVertex vertex = ((LineageVertexProvider) sink).getLineageVertex(); + List datasets = vertex.datasets(); + + assertThat(datasets).hasSize(1); + assertThat(datasets.get(0).name()).isEqualTo("name-only-stream"); + assertThat(datasets.get(0).namespace()).isEqualTo("kinesis://unknown-region"); + } + + @Test + void testLineageUtilNamespaceExtraction() { + String namespace = KinesisLineageUtil.namespaceOf(TEST_STREAM_ARN); + assertThat(namespace).isEqualTo("arn:aws:kinesis:us-east-1:123456789012:stream"); + } + + @Test + void testLineageUtilStreamNameExtraction() { + String streamName = KinesisLineageUtil.streamNameOf(TEST_STREAM_ARN); + assertThat(streamName).isEqualTo("my-test-stream"); + } + + @Test + void testLineageUtilDifferentRegions() { + String euArn = "arn:aws:kinesis:eu-west-1:999888777666:stream/orders"; + assertThat(KinesisLineageUtil.namespaceOf(euArn)) + .isEqualTo("arn:aws:kinesis:eu-west-1:999888777666:stream"); + assertThat(KinesisLineageUtil.streamNameOf(euArn)).isEqualTo("orders"); + } + + @Test + void testKinesisDatasetFacetEquality() { + KinesisDatasetFacet facet1 = new KinesisDatasetFacet(TEST_STREAM_ARN, "stream", "us-east-1"); + KinesisDatasetFacet facet2 = new KinesisDatasetFacet(TEST_STREAM_ARN, "stream", "us-east-1"); + KinesisDatasetFacet facet3 = + new KinesisDatasetFacet(TEST_STREAM_ARN, "other", "us-west-2"); + + assertThat(facet1).isEqualTo(facet2); + assertThat(facet1).isNotEqualTo(facet3); + assertThat(facet1.name()).isEqualTo(KinesisDatasetFacet.KINESIS_FACET_NAME); + } +}