Skip to content

Commit e857382

Browse files
committed
[FLINK-39660][Connectors/Kinesis] Fix Netty event loop threads blocking behavior and concurrent activation race in FanOutKinesisShardSubscription
1 parent c662ddd commit e857382

5 files changed

Lines changed: 911 additions & 409 deletions

File tree

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
7373
import software.amazon.awssdk.services.kinesis.model.Record;
7474
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
75-
import software.amazon.awssdk.utils.AttributeMap;
7675

7776
import java.time.Duration;
7877
import java.util.Map;
@@ -271,7 +270,7 @@ private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(
271270

272271
SdkAsyncHttpClient asyncHttpClient =
273272
AWSGeneralUtil.createAsyncHttpClient(
274-
AttributeMap.builder().build(), NettyNioAsyncHttpClient.builder());
273+
kinesisClientProperties, NettyNioAsyncHttpClient.builder());
275274
KinesisAsyncClient kinesisAsyncClient =
276275
AWSClientUtil.createAwsAsyncClient(
277276
kinesisClientProperties,

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {
9393

9494
@Override
9595
public void close() throws Exception {
96+
splitSubscriptions.values().forEach(FanOutKinesisShardSubscription::close);
9697
asyncStreamProxy.close();
9798
}
9899
}

0 commit comments

Comments
 (0)