Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[fix] Use async back pressure to limit RIP #1621

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

Expand Down Expand Up @@ -66,7 +65,6 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final SslContextFactory.Server sslContextFactory;
@Getter
private final RequestStats requestStats;
private final OrderedScheduler sendResponseScheduler;

private final LengthFieldPrepender lengthFieldPrepender;

Expand All @@ -82,7 +80,6 @@ public KafkaChannelInitializer(PulsarService pulsarService,
EndPoint advertisedEndPoint,
boolean skipMessagesWithoutIndex,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) {
super();
this.pulsarService = pulsarService;
Expand All @@ -102,7 +99,6 @@ public KafkaChannelInitializer(PulsarService pulsarService,
} else {
sslContextFactory = null;
}
this.sendResponseScheduler = sendResponseScheduler;
this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
this.lengthFieldPrepender = new LengthFieldPrepender(4);
}
Expand All @@ -129,7 +125,7 @@ public KafkaRequestHandler newCnx() throws Exception {
return new KafkaRequestHandler(pulsarService, kafkaConfig,
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats,
kafkaTopicManagerSharedState);
}

Expand All @@ -142,7 +138,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
new RequestStats(rootStatsLogger.scope(SERVER_SCOPE)),
sendResponseScheduler,
kafkaTopicManagerSharedState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -60,28 +59,28 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected AtomicBoolean isActive = new AtomicBoolean(false);
// Queue to make response get responseFuture in order and limit the max request size
private final LinkedBlockingQueue<ResponseAndRequest> requestQueue;
private int numQueuedRequestsInProgress;
private final int maxQueuedRequests;
@Getter
@Setter
protected volatile RequestStats requestStats;
@Getter
protected final KafkaServiceConfiguration kafkaConfig;

private final OrderedScheduler sendResponseScheduler;

public KafkaCommandDecoder(RequestStats requestStats,
KafkaServiceConfiguration kafkaConfig,
OrderedScheduler sendResponseScheduler) {
KafkaServiceConfiguration kafkaConfig) {
this.requestStats = requestStats;
this.kafkaConfig = kafkaConfig;
this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests());
this.sendResponseScheduler = sendResponseScheduler;
this.maxQueuedRequests = kafkaConfig.getMaxQueuedRequests();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remoteAddress = ctx.channel().remoteAddress();
this.ctx = ctx;
this.numQueuedRequestsInProgress = 0;
isActive.set(true);
}

Expand Down Expand Up @@ -230,7 +229,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

final long timeBeforeParse = MathUtils.nowInNano();
KafkaHeaderAndRequest kafkaHeaderAndRequest = byteBufToRequest(buffer, remoteAddress);
// potentially blocking until there is room in the queue for the request.
registerRequestParseLatency.accept(timeBeforeParse, null);

try {
Expand All @@ -242,6 +240,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
final long startProcessRequestTimestamp = MathUtils.nowInNano();
// this callback is just meant to make sure that we roll through things, but it feels racy
responseFuture.whenComplete((response, e) -> {
if (e instanceof CancellationException) {
if (log.isDebugEnabled()) {
Expand All @@ -256,13 +255,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey(),
startProcessRequestTimestamp);

sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> {
ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});
});
// potentially blocking until there is room in the queue for the request.
requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
numQueuedRequestsInProgress++;
RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.incrementAndGet();
if (numQueuedRequestsInProgress == maxQueuedRequests) {
channel.config().setAutoRead(false);
}

if (!isActive.get()) {
handleInactive(kafkaHeaderAndRequest, responseFuture);
Expand Down Expand Up @@ -397,7 +399,12 @@ protected void writeAndFlushResponseToClient(Channel channel) {
} else {
if (requestQueue.remove(responseAndRequest)) {
RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet();
} else { // it has been removed by another thread, skip this element
if (numQueuedRequestsInProgress == maxQueuedRequests) {
channel.config().setAutoRead(true);
}
numQueuedRequestsInProgress--;
} else {
log.error("Request was removed from queue, but that shouldn't be possible.");
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag

@Getter
private KopEventManager kopEventManager;
private OrderedScheduler sendResponseScheduler;
private NamespaceBundleOwnershipListenerImpl bundleListener;
private SchemaRegistryManager schemaRegistryManager;
private MigrationManager migrationManager;
Expand Down Expand Up @@ -168,10 +167,6 @@ public void initialize(ServiceConfiguration conf) throws Exception {
statsProvider = new PrometheusMetricsProvider();
StatsLogger rootStatsLogger = statsProvider.getStatsLogger("");
requestStats = new RequestStats(rootStatsLogger.scope(SERVER_SCOPE));
sendResponseScheduler = OrderedScheduler.newSchedulerBuilder()
.name("send-response")
.numThreads(kafkaConfig.getNumSendKafkaResponseThreads())
.build();
}

// This method is called after initialize
Expand Down Expand Up @@ -401,7 +396,6 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
endPoint,
kafkaConfig.isSkipMessagesWithoutIndex(),
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState);
}

Expand Down Expand Up @@ -482,7 +476,6 @@ public void close() {
kafkaTopicManagerSharedState.close();
kopBrokerLookupManager.close();
statsProvider.stop();
sendResponseScheduler.shutdown();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -312,9 +311,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
EndPoint advertisedEndPoint,
boolean skipMessagesWithoutIndex,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) throws Exception {
super(requestStats, kafkaConfig, sendResponseScheduler);
super(requestStats, kafkaConfig);
this.pulsarService = pulsarService;
this.tenantContextManager = tenantContextManager;
this.replicaManager = replicaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
//
// --- Kafka on Pulsar Broker configuration ---
//
@FieldContext(
category = CATEGORY_KOP,
doc = "The number of threads used to respond to the response."
)
private int numSendKafkaResponseThreads = 4;

@FieldContext(
required = true,
Expand Down