Skip to content

Commit 2ee862e

Browse files
[FLINK-37298] Added Pluggable Components for BatchStrategy & BufferWrapper in AsyncSinkWriter. (#26274)
* [FLINK-37298] Add custom batch handling in AsyncSinkWriter. * License added for new files. * Review Comments Incorporated * Corrected comments based on review. * Comment correction * Spotless check done * Review comments incorporated for async custom batch - Minor fixes based on reviewer feedback - Cleaned up naming, comments, and Javadoc * Removed Builder Pattern and corrected Test case. * Marked Deque constructor as deprecated.
1 parent e98915a commit 2ee862e

File tree

11 files changed

+961
-52
lines changed

11 files changed

+961
-52
lines changed

Diff for: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java

+56-52
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,8 @@
3333

3434
import java.io.IOException;
3535
import java.io.Serializable;
36-
import java.util.ArrayDeque;
37-
import java.util.ArrayList;
3836
import java.util.Collection;
3937
import java.util.Collections;
40-
import java.util.Deque;
4138
import java.util.List;
4239
import java.util.ListIterator;
4340
import java.util.concurrent.ScheduledFuture;
@@ -83,7 +80,16 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
8380

8481
private final int maxBatchSize;
8582
private final int maxBufferedRequests;
86-
private final long maxBatchSizeInBytes;
83+
84+
/**
85+
* Threshold in bytes to trigger a flush from the buffer.
86+
*
87+
* <p>This is derived from {@code maxBatchSizeInBytes} in the configuration, but is only used
88+
* here to decide when the buffer should be flushed. The actual batch size limit is now enforced
89+
* by the {@link BatchCreator}.
90+
*/
91+
private final long flushThresholdBytes;
92+
8793
private final long maxTimeInBufferMS;
8894
private final long maxRecordSizeInBytes;
8995

@@ -112,8 +118,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
112118
* construct a new (retry) request entry from the response and add that back to the queue for
113119
* later retry.
114120
*/
115-
private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
116-
new ArrayDeque<>();
121+
private final RequestBuffer<RequestEntryT> bufferedRequestEntries;
122+
123+
/**
124+
* Batch component responsible for forming a batch of request entries from the buffer when the
125+
* sink is ready to flush. This determines the logic of including entries in a batch from the
126+
* buffered requests.
127+
*/
128+
private final BatchCreator<RequestEntryT> batchCreator;
117129

118130
/**
119131
* Tracks all pending async calls that have been executed since the last checkpoint. Calls that
@@ -126,12 +138,6 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
126138
*/
127139
private int inFlightRequestsCount;
128140

129-
/**
130-
* Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
131-
* the criterion for flushing after {@code maxBatchSizeInBytes} is reached.
132-
*/
133-
private double bufferedRequestEntriesTotalSizeInBytes;
134-
135141
private boolean existsActiveTimerCallback = false;
136142

137143
/**
@@ -213,11 +219,32 @@ protected void submitRequestEntries(
213219
*/
214220
protected abstract long getSizeInBytes(RequestEntryT requestEntry);
215221

222+
/**
223+
* This constructor is deprecated. Users should use {@link #AsyncSinkWriter(ElementConverter,
224+
* WriterInitContext, AsyncSinkWriterConfiguration, Collection, BatchCreator, RequestBuffer)}.
225+
*/
226+
@Deprecated
216227
public AsyncSinkWriter(
217228
ElementConverter<InputT, RequestEntryT> elementConverter,
218229
WriterInitContext context,
219230
AsyncSinkWriterConfiguration configuration,
220231
Collection<BufferedRequestState<RequestEntryT>> states) {
232+
this(
233+
elementConverter,
234+
context,
235+
configuration,
236+
states,
237+
new SimpleBatchCreator<>(configuration.getMaxBatchSizeInBytes()),
238+
new DequeRequestBuffer<>());
239+
}
240+
241+
public AsyncSinkWriter(
242+
ElementConverter<InputT, RequestEntryT> elementConverter,
243+
WriterInitContext context,
244+
AsyncSinkWriterConfiguration configuration,
245+
Collection<BufferedRequestState<RequestEntryT>> states,
246+
BatchCreator<RequestEntryT> batchCreator,
247+
RequestBuffer<RequestEntryT> bufferedRequestEntries) {
221248
this.elementConverter = elementConverter;
222249
this.mailboxExecutor = context.getMailboxExecutor();
223250
this.timeService = context.getProcessingTimeService();
@@ -237,23 +264,26 @@ public AsyncSinkWriter(
237264
"The maximum allowed size in bytes per flush must be greater than or equal to the"
238265
+ " maximum allowed size in bytes of a single record.");
239266
Preconditions.checkNotNull(configuration.getRateLimitingStrategy());
267+
Preconditions.checkNotNull(
268+
batchCreator, "batchCreator must not be null; required for creating batches.");
269+
Preconditions.checkNotNull(
270+
bufferedRequestEntries,
271+
"bufferedRequestEntries must not be null; holds pending request data.");
240272
this.maxBatchSize = configuration.getMaxBatchSize();
241273
this.maxBufferedRequests = configuration.getMaxBufferedRequests();
242-
this.maxBatchSizeInBytes = configuration.getMaxBatchSizeInBytes();
274+
this.flushThresholdBytes = configuration.getMaxBatchSizeInBytes();
243275
this.maxTimeInBufferMS = configuration.getMaxTimeInBufferMS();
244276
this.maxRecordSizeInBytes = configuration.getMaxRecordSizeInBytes();
245277
this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
246278
this.requestTimeoutMS = configuration.getRequestTimeoutMS();
247279
this.failOnTimeout = configuration.isFailOnTimeout();
248-
249280
this.inFlightRequestsCount = 0;
250-
this.bufferedRequestEntriesTotalSizeInBytes = 0;
251-
252281
this.metrics = context.metricGroup();
253282
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
254283
this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
255284
this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
256-
285+
this.batchCreator = batchCreator;
286+
this.bufferedRequestEntries = bufferedRequestEntries;
257287
this.fatalExceptionCons =
258288
exception ->
259289
mailboxExecutor.execute(
@@ -303,7 +333,7 @@ public void write(InputT element, Context context) throws IOException, Interrupt
303333
private void nonBlockingFlush() throws InterruptedException {
304334
while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
305335
&& (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
306-
|| bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
336+
|| bufferedRequestEntries.totalSizeInBytes() >= flushThresholdBytes)) {
307337
flush();
308338
}
309339
}
@@ -327,7 +357,12 @@ private void flush() throws InterruptedException {
327357
requestInfo = createRequestInfo();
328358
}
329359

330-
List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
360+
Batch<RequestEntryT> batchCreationResult =
361+
batchCreator.createNextBatch(requestInfo, bufferedRequestEntries);
362+
List<RequestEntryT> batch = batchCreationResult.getBatchEntries();
363+
numBytesOutCounter.inc(batchCreationResult.getSizeInBytes());
364+
numRecordsOutCounter.inc(batchCreationResult.getRecordCount());
365+
331366
if (batch.isEmpty()) {
332367
return;
333368
}
@@ -344,31 +379,6 @@ private int getNextBatchSize() {
344379
return Math.min(getNextBatchSizeLimit(), bufferedRequestEntries.size());
345380
}
346381

347-
/**
348-
* Creates the next batch of request entries while respecting the {@code maxBatchSize} and
349-
* {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
350-
*/
351-
private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
352-
List<RequestEntryT> batch = new ArrayList<>(requestInfo.getBatchSize());
353-
354-
long batchSizeBytes = 0;
355-
for (int i = 0; i < requestInfo.getBatchSize(); i++) {
356-
long requestEntrySize = bufferedRequestEntries.peek().getSize();
357-
if (batchSizeBytes + requestEntrySize > maxBatchSizeInBytes) {
358-
break;
359-
}
360-
RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
361-
batch.add(elem.getRequestEntry());
362-
bufferedRequestEntriesTotalSizeInBytes -= requestEntrySize;
363-
batchSizeBytes += requestEntrySize;
364-
}
365-
366-
numRecordsOutCounter.inc(batch.size());
367-
numBytesOutCounter.inc(batchSizeBytes);
368-
369-
return batch;
370-
}
371-
372382
/**
373383
* Marks an in-flight request as completed and prepends failed requestEntries back to the
374384
* internal requestEntry buffer for later retry.
@@ -409,13 +419,7 @@ private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean
409419
entry.getSize(), maxRecordSizeInBytes));
410420
}
411421

412-
if (insertAtHead) {
413-
bufferedRequestEntries.addFirst(entry);
414-
} else {
415-
bufferedRequestEntries.add(entry);
416-
}
417-
418-
bufferedRequestEntriesTotalSizeInBytes += entry.getSize();
422+
bufferedRequestEntries.add(entry, insertAtHead);
419423
}
420424

421425
/**
@@ -428,7 +432,7 @@ private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean
428432
*/
429433
@Override
430434
public void flush(boolean flush) throws InterruptedException {
431-
while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
435+
while (inFlightRequestsCount > 0 || (!bufferedRequestEntries.isEmpty() && flush)) {
432436
yieldIfThereExistsInFlightRequests();
433437
if (flush) {
434438
flush();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.base.sink.writer;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
import java.io.Serializable;
23+
import java.util.List;
24+
25+
/**
26+
* A container for the result of creating a batch of request entries, including:
27+
*
28+
* <ul>
29+
* <li>The actual list of entries forming the batch
30+
* <li>The total size in bytes of those entries
31+
* <li>The total number of entries in the batch
32+
* </ul>
33+
*
34+
* <p>Instances of this class are typically created by a {@link BatchCreator} to summarize which
35+
* entries have been selected for sending downstream and to provide any relevant metrics for
36+
* tracking, such as the byte size or the record count.
37+
*
38+
* @param <RequestEntryT> the type of request entry in this batch
39+
*/
40+
@PublicEvolving
41+
public class Batch<RequestEntryT extends Serializable> {
42+
43+
/** The list of request entries in this batch. */
44+
private final List<RequestEntryT> batchEntries;
45+
46+
/** The total size in bytes of the entire batch. */
47+
private final long sizeInBytes;
48+
49+
/** The total number of entries in the batch. */
50+
private final int recordCount;
51+
52+
/**
53+
* Creates a new {@code Batch} with the specified entries, total size, and record count.
54+
*
55+
* @param requestEntries the list of request entries that form the batch
56+
* @param sizeInBytes the total size in bytes of the entire batch
57+
*/
58+
public Batch(List<RequestEntryT> requestEntries, long sizeInBytes) {
59+
this.batchEntries = requestEntries;
60+
this.sizeInBytes = sizeInBytes;
61+
this.recordCount = requestEntries.size();
62+
}
63+
64+
/**
65+
* Returns the list of request entries in this batch.
66+
*
67+
* @return a list of request entries for the batch
68+
*/
69+
public List<RequestEntryT> getBatchEntries() {
70+
return batchEntries;
71+
}
72+
73+
/**
74+
* Returns the total size in bytes of the batch.
75+
*
76+
* @return the batch's cumulative byte size
77+
*/
78+
public long getSizeInBytes() {
79+
return sizeInBytes;
80+
}
81+
82+
/**
83+
* Returns the total number of entries in the batch.
84+
*
85+
* @return the record count in the batch
86+
*/
87+
public int getRecordCount() {
88+
return recordCount;
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.base.sink.writer;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
22+
23+
import java.io.Serializable;
24+
import java.util.Deque;
25+
26+
/**
27+
* A pluggable interface for forming batches of request entries from a buffer. Implementations
28+
* control how many entries are grouped together and in what manner before sending them downstream.
29+
*
30+
* <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
31+
* #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it decides to flush or
32+
* otherwise gather a new batch of elements. For instance, a batch creator might limit the batch by
33+
* the number of elements, total payload size, or any custom partition-based strategy.
34+
*
35+
* @param <RequestEntryT> the type of the request entries to be batched
36+
*/
37+
@PublicEvolving
38+
public interface BatchCreator<RequestEntryT extends Serializable> {
39+
40+
/**
41+
* Creates the next batch of request entries based on the provided {@link RequestInfo} and the
42+
* currently buffered entries.
43+
*
44+
* <p>This method is expected to:
45+
*
46+
* <ul>
47+
* <li>Mutate the {@code bufferedRequestEntries} by polling/removing elements from it.
48+
* <li>Return a batch containing the selected entries.
49+
* </ul>
50+
*
51+
* <p><strong>Thread-safety note:</strong> This method is called from {@code flush()}, which is
52+
* executed on the Flink main thread. Implementations should assume single-threaded access and
53+
* must not be shared across subtasks.
54+
*
55+
* <p><strong>Contract:</strong> Implementations must ensure that any entry removed from {@code
56+
* bufferedRequestEntries} is either added to the returned batch or properly handled (e.g.,
57+
* retried or logged), and not silently dropped.
58+
*
59+
* @param requestInfo information about the desired request properties or constraints (e.g., an
60+
* allowed batch size or other relevant hints)
61+
* @param bufferedRequestEntries a collection ex: {@link Deque} of all currently buffered
62+
* entries waiting to be grouped into batches
63+
* @return a {@link Batch} containing the new batch of entries along with metadata about the
64+
* batch (e.g., total byte size, record count)
65+
*/
66+
Batch<RequestEntryT> createNextBatch(
67+
RequestInfo requestInfo, RequestBuffer<RequestEntryT> bufferedRequestEntries);
68+
}

Diff for: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class BufferedRequestState<RequestEntryT extends Serializable> implements
3939
private final List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries;
4040
private final long stateSize;
4141

42+
@Deprecated
4243
public BufferedRequestState(Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries) {
4344
this.bufferedRequestEntries = new ArrayList<>(bufferedRequestEntries);
4445
this.stateSize = calculateStateSize();
@@ -49,6 +50,11 @@ public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedReq
4950
this.stateSize = calculateStateSize();
5051
}
5152

53+
public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer) {
54+
this.bufferedRequestEntries = new ArrayList<>(requestBuffer.getBufferedState());
55+
this.stateSize = calculateStateSize();
56+
}
57+
5258
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries() {
5359
return bufferedRequestEntries;
5460
}

0 commit comments

Comments
 (0)