Skip to content

Commit bf2d9d9

Browse files
author
Mike Skells
committed
a simple back pressure mechanism
1 parent 5d3450b commit bf2d9d9

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public final class GcsSinkTask extends SinkTask {
4949
private GcsSinkConfig config;
5050

5151
private Storage storage;
52+
private int backPressureHardLimit;
53+
private int backPressureCurrentBuffer;
5254

5355
// required by Connect
5456
public GcsSinkTask() {
@@ -88,6 +90,7 @@ public void start(final Map<String, String> props) {
8890
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
8991
context.timeout(config.getKafkaRetryBackoffMs());
9092
}
93+
backPressureHardLimit = 100_000;// TODO : this shoul dbe from some config - config.getBackPressureHardLimit();
9194
}
9295

9396
private void initRest() {
@@ -106,6 +109,11 @@ public void put(final Collection<SinkRecord> records) {
106109
for (final SinkRecord record : records) {
107110
recordGrouper.put(record);
108111
}
112+
backPressureCurrentBuffer += records.size();
113+
if (backPressureCurrentBuffer >= backPressureHardLimit) {
114+
LOG.warn("Back pressure limit reached, requesting flush");
115+
context.requestCommit();
116+
}
109117
}
110118

111119
@Override
@@ -115,6 +123,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
115123
} finally {
116124
recordGrouper.clear();
117125
}
126+
backPressureCurrentBuffer = 0;
118127
}
119128

120129
private void flushFile(final String filename, final List<SinkRecord> records) {

0 commit comments

Comments
 (0)