Skip to content

Commit de08719

Browse files
author
Mike Skells
committed
a simple back pressure mechanism
1 parent 5d3450b commit de08719

File tree

7 files changed

+66
-19
lines changed

7 files changed

+66
-19
lines changed

azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public static ConfigDef configDef() {
8080
addKafkaBackoffPolicy(configDef);
8181
addAzureRetryPolicies(configDef);
8282
addUserAgentConfig(configDef);
83+
addOverloadConfigGroup(configDef);
8384
return configDef;
8485
}
8586

azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public final class AzureBlobSinkTask extends SinkTask {
5757
private BlobContainerClient containerClient;
5858
private final Map<String, BlockBlobClient> blobClientMap = new ConcurrentHashMap<>();
5959

60+
private long backPressureHardLimit;
61+
private long backPressureCurrentBuffer;
62+
6063
// required by Connect
6164
public AzureBlobSinkTask() {
6265
super();
@@ -104,6 +107,7 @@ public void start(final Map<String, String> props) {
104107
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
105108
context.timeout(config.getKafkaRetryBackoffMs());
106109
}
110+
backPressureHardLimit = config.getBackPressureHardLimit();
107111
}
108112

109113
private void initRecordGrouper() {
@@ -122,6 +126,11 @@ public void put(final Collection<SinkRecord> records) {
122126
for (final SinkRecord record : records) {
123127
recordGrouper.put(record);
124128
}
129+
backPressureCurrentBuffer += records.size();
130+
if (backPressureCurrentBuffer >= backPressureHardLimit) {
131+
LOG.warn("Back pressure limit reached, requesting flush");
132+
context.requestCommit();
133+
}
125134
}
126135

127136
@Override
@@ -131,6 +140,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
131140
} finally {
132141
recordGrouper.clear();
133142
}
143+
backPressureCurrentBuffer = 0;
134144
}
135145

136146
private void flushFile(final String filename, final List<SinkRecord> records) {

commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class AivenCommonConfig extends AbstractConfig {
5353
private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy";
5454
public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";
5555

56+
private static final String GROUP_OVERLOAD = "Overload Control";
57+
private static final String OVERLOAD_MAX_RECORDS_HARD_LIMIT = "overload.hard.record.limit";
58+
5659
protected AivenCommonConfig(final ConfigDef definition, final Map<?, ?> originals) {
5760
super(definition, originals);
5861
// TODO: calls getOutputFields, can be overridden in subclasses.
@@ -74,23 +77,23 @@ private void validate() {
7477
protected static void addKafkaBackoffPolicy(final ConfigDef configDef) {
7578
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() {
7679

77-
final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24);
78-
79-
@Override
80-
public void ensureValid(final String name, final Object value) {
81-
if (Objects.isNull(value)) {
82-
return;
83-
}
84-
assert value instanceof Long;
85-
final var longValue = (Long) value;
86-
if (longValue < 0) {
87-
throw new ConfigException(name, value, "Value must be at least 0");
88-
} else if (longValue > maximumBackoffPolicy) {
89-
throw new ConfigException(name, value,
90-
"Value must be no more than " + maximumBackoffPolicy + " (24 hours)");
91-
}
92-
}
93-
}, ConfigDef.Importance.MEDIUM,
80+
final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24);
81+
82+
@Override
83+
public void ensureValid(final String name, final Object value) {
84+
if (Objects.isNull(value)) {
85+
return;
86+
}
87+
assert value instanceof Long;
88+
final var longValue = (Long) value;
89+
if (longValue < 0) {
90+
throw new ConfigException(name, value, "Value must be at least 0");
91+
} else if (longValue > maximumBackoffPolicy) {
92+
throw new ConfigException(name, value,
93+
"Value must be no more than " + maximumBackoffPolicy + " (24 hours)");
94+
}
95+
}
96+
}, ConfigDef.Importance.MEDIUM,
9497
"The retry backoff in milliseconds. "
9598
+ "This config is used to notify Kafka Connect to retry delivering a message batch or "
9699
+ "performing recovery in case of transient exceptions. Maximum value is "
@@ -103,7 +106,7 @@ public Long getKafkaRetryBackoffMs() {
103106
}
104107

105108
protected static void addOutputFieldsFormatConfigGroup(final ConfigDef configDef,
106-
final OutputFieldType defaultFieldType) {
109+
final OutputFieldType defaultFieldType) {
107110
int formatGroupCounter = 0;
108111

109112
addFormatTypeConfig(configDef, formatGroupCounter);
@@ -145,7 +148,7 @@ public FormatType getFormatType() {
145148
}
146149

147150
protected static void addCompressionTypeConfig(final ConfigDef configDef,
148-
final CompressionType defaultCompressionType) {
151+
final CompressionType defaultCompressionType) {
149152
configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
150153
Objects.isNull(defaultCompressionType) ? null : defaultCompressionType.name, // NOPMD NullAssignment
151154
new FileCompressionTypeValidator(), ConfigDef.Importance.MEDIUM,
@@ -229,4 +232,16 @@ private Boolean isKeyBased(final String groupType) {
229232
return RecordGrouperFactory.KEY_RECORD.equals(groupType)
230233
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType);
231234
}
235+
236+
public long getBackPressureHardLimit() {
237+
return getLong(OVERLOAD_MAX_RECORDS_HARD_LIMIT);
238+
}
239+
240+
protected static void addOverloadConfigGroup(final ConfigDef configDef) {
241+
int groupCounter = 0;
242+
243+
configDef.define(OVERLOAD_MAX_RECORDS_HARD_LIMIT, ConfigDef.Type.LONG, 1000000L, ConfigDef.Importance.MEDIUM,
244+
"The maximum number of records to buffer before requesting a flush.", GROUP_OVERLOAD, groupCounter++,
245+
ConfigDef.Width.NONE, OVERLOAD_MAX_RECORDS_HARD_LIMIT);
246+
}
232247
}

gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public static ConfigDef configDef() {
107107
addKafkaBackoffPolicy(configDef);
108108
addGcsRetryPolicies(configDef);
109109
addUserAgentConfig(configDef);
110+
addOverloadConfigGroup(configDef);
110111
return configDef;
111112
}
112113

gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public final class GcsSinkTask extends SinkTask {
4949
private GcsSinkConfig config;
5050

5151
private Storage storage;
52+
private long backPressureHardLimit;
53+
private long backPressureCurrentBuffer;
5254

5355
// required by Connect
5456
public GcsSinkTask() {
@@ -88,6 +90,7 @@ public void start(final Map<String, String> props) {
8890
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
8991
context.timeout(config.getKafkaRetryBackoffMs());
9092
}
93+
backPressureHardLimit = config.getBackPressureHardLimit();
9194
}
9295

9396
private void initRest() {
@@ -106,6 +109,11 @@ public void put(final Collection<SinkRecord> records) {
106109
for (final SinkRecord record : records) {
107110
recordGrouper.put(record);
108111
}
112+
backPressureCurrentBuffer += records.size();
113+
if (backPressureCurrentBuffer >= backPressureHardLimit) {
114+
LOG.warn("Back pressure limit reached, requesting flush");
115+
context.requestCommit();
116+
}
109117
}
110118

111119
@Override
@@ -115,6 +123,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
115123
} finally {
116124
recordGrouper.clear();
117125
}
126+
backPressureCurrentBuffer = 0;
118127
}
119128

120129
private void flushFile(final String filename, final List<SinkRecord> records) {

s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public final class S3SinkTask extends SinkTask {
6666

6767
AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();
6868

69+
private long backPressureHardLimit;
70+
private long backPressureCurrentBuffer;
71+
6972
@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
7073
public S3SinkTask() {
7174
super();
@@ -84,6 +87,7 @@ public void start(final Map<String, String> props) {
8487
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
8588
context.timeout(config.getKafkaRetryBackoffMs());
8689
}
90+
backPressureHardLimit = config.getBackPressureHardLimit();
8791
}
8892

8993
private AmazonS3 createAmazonS3Client(final S3SinkConfig config) {
@@ -110,6 +114,11 @@ public void put(final Collection<SinkRecord> records) {
110114
Objects.requireNonNull(records, "records cannot be null");
111115
LOGGER.info("Processing {} records", records.size());
112116
records.forEach(recordGrouper::put);
117+
backPressureCurrentBuffer += records.size();
118+
if (backPressureCurrentBuffer >= backPressureHardLimit) {
119+
LOGGER.warn("Back pressure limit reached, requesting flush");
120+
context.requestCommit();
121+
}
113122
}
114123

115124
@Override
@@ -119,6 +128,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
119128
} finally {
120129
recordGrouper.clear();
121130
}
131+
backPressureCurrentBuffer = 0;
122132
}
123133

124134
private void flushFile(final String filename, final List<SinkRecord> records) {

s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ public static ConfigDef configDef() {
189189
addDeprecatedConfiguration(configDef);
190190
addKafkaBackoffPolicy(configDef);
191191
addS3RetryPolicies(configDef);
192+
addOverloadConfigGroup(configDef);
192193
return configDef;
193194
}
194195

0 commit comments

Comments
 (0)