Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public SharePartitionManager(
long remoteFetchMaxWaitMs,
Persister persister,
ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats,
Supplier<Boolean> shareGroupDlqEnableSupplier,
ShareGroupDLQManager shareGroupDLQManager
Expand All @@ -184,7 +185,7 @@ public SharePartitionManager(
remoteFetchMaxWaitMs,
persister,
configProvider,
new ShareGroupMetrics(time),
shareGroupMetrics,
brokerTopicStats,
shareGroupDlqEnableSupplier,
shareGroupDLQManager
Expand Down Expand Up @@ -634,7 +635,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List<Dela
@Override
public void close() throws Exception {
this.timer.close();
this.shareGroupMetrics.close();
}

private ShareSessionKey shareSessionKey(String groupId, String memberId) {
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogManager
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.server.partition.{AlterPartitionManager, DefaultAlterPartitionManager}
import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager, NoOpShareGroupDLQManager, ShareGroupDLQManager}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics

import java.time.Duration
import java.util
Expand Down Expand Up @@ -165,6 +166,8 @@ class BrokerServer(

var clientMetricsManager: ClientMetricsManager = _

var shareGroupMetrics: ShareGroupMetrics = _

var sharePartitionManager: SharePartitionManager = _

var persister: Persister = _
Expand Down Expand Up @@ -389,6 +392,9 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()

/* create metrics object to be shared with share DLQ manager share partition manager*/
shareGroupMetrics = new ShareGroupMetrics(time)

/* create share group DLQ manager */
shareGroupDLQManager = createShareGroupDLQManager()

Expand Down Expand Up @@ -468,6 +474,7 @@ class BrokerServer(
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister,
new ShareGroupConfigProvider(groupConfigManager),
shareGroupMetrics,
brokerTopicStats,
() => ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)).supportsShareGroupDLQ(),
shareGroupDLQManager
Expand Down Expand Up @@ -765,7 +772,8 @@ class BrokerServer(
NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager broker=${config.brokerId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.partitionFor(key), config.interBrokerListenerName, groupConfigManager),
Time.SYSTEM,
shareGroupTimer
shareGroupTimer,
shareGroupMetrics
)
} else if (klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
info("Using no-op share group DLQ manager")
Expand Down Expand Up @@ -917,6 +925,9 @@ class BrokerServer(
if (shareGroupDLQManager != null)
Utils.swallow(this.logger.underlying, () => shareGroupDLQManager.stop())

if (shareGroupMetrics != null)
Utils.swallow(this.logger.underlying, () => shareGroupMetrics.close())

Utils.closeQuietly(shareGroupTimer, "share group timer")

if (lifecycleManager != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,6 @@ public void testCloseSharePartitionManager() throws Exception {
sharePartitionManager.close();
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and shareGroupMetrics.close().
Mockito.verify(timer, times(1)).close();
Mockito.verify(shareGroupMetrics, times(1)).close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.util.timer.Timer;

import org.slf4j.Logger;
Expand All @@ -39,14 +40,26 @@ public class DefaultShareGroupDLQManager implements ShareGroupDLQManager {

private static final Logger log = LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);

public static ShareGroupDLQManager instance(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
DefaultShareGroupDLQManager instance = new DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
public static ShareGroupDLQManager instance(
KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper,
Time time,
Timer timer,
ShareGroupMetrics metrics
) {
DefaultShareGroupDLQManager instance = new DefaultShareGroupDLQManager(client, cacheHelper, time, timer, metrics);
instance.start();
return instance;
}

private DefaultShareGroupDLQManager(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer);
private DefaultShareGroupDLQManager(
KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper,
Time time,
Timer timer,
ShareGroupMetrics shareGroupMetrics
) {
this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer, shareGroupMetrics);
}

private void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.apache.kafka.server.util.timer.Timer;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class ShareGroupDLQStateManager {
private final Time time;
private final Timer timer;
private final ShareGroupDLQMetadataCacheHelper cacheHelper;
private final ShareGroupMetrics shareGroupMetrics;
public static final long REQUEST_BACKOFF_MS = 1_000L;
public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
private static final int MAX_REQUEST_ATTEMPTS = 5;
Expand All @@ -91,7 +93,18 @@ public class ShareGroupDLQStateManager {
private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new HashMap<>();
private final Object nodeMapLock = new Object();

public ShareGroupDLQStateManager(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
// Called when the generateRequests method is executed by InterBrokerSendThread, returning requests.
// Mainly for testing and introspection purpose to inspect the state of the nodeRPC map
// when generateRequests is called.
private Runnable generateCallback;

public ShareGroupDLQStateManager(
KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper,
Time time,
Timer timer,
ShareGroupMetrics shareGroupMetrics
) {
if (client == null) {
throw new IllegalArgumentException("Kafkaclient must not be null.");
}
Expand All @@ -108,9 +121,14 @@ public ShareGroupDLQStateManager(KafkaClient client, ShareGroupDLQMetadataCacheH
throw new IllegalArgumentException("Timer must not be null.");
}

if (shareGroupMetrics == null) {
throw new IllegalArgumentException("ShareGroupMetrics must not be null.");
}

this.time = time;
this.timer = timer;
this.cacheHelper = cacheHelper;
this.shareGroupMetrics = shareGroupMetrics;
this.sender = new SendThread(
"ShareGroupDLQSendThread",
client,
Expand Down Expand Up @@ -154,6 +172,16 @@ CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long requestBack
return future;
}

// Visibility for tests
void setGenerateCallback(Runnable generateCallback) {
this.generateCallback = generateCallback;
}

// Visibility for tests
Map<Node, List<ShareGroupDLQStateManager.ProduceRequestHandler>> nodeRPCMap() {
return nodeRPCMap;
}

private void enqueue(ProduceRequestHandler requestHandler) {
sender.enqueue(requestHandler);
}
Expand Down Expand Up @@ -333,6 +361,10 @@ public ProduceRequestData.TopicProduceData topicProduceData() {
simpleRecords.toArray(new SimpleRecord[]{})
);

// Update the metric to say a new request is created to se sent. This might not be the
// actual RPC count as we coalesce the requests before sending.
shareGroupMetrics.recordDLQProduce(param.groupId());

return new ProduceRequestData.TopicProduceData()
.setName(dlqTopicPartitionData.topicName())
.setTopicId(dlqTopicPartitionData.topicId().get())
Expand Down Expand Up @@ -580,6 +612,7 @@ private void handleProduceResponse(ClientResponse response) {
switch (error) {
case NONE:
LOG.debug("Successfully produced records {} to dlq topic node {}.", this, dlqPartitionLeaderNode());
shareGroupMetrics.recordDLQRecordWrite(param.groupId(), (int) (param.lastOffset() - param.firstOffset() + 1));
produceRequestBackoff.resetAttempts();
this.result.complete(null);
break;
Expand All @@ -588,6 +621,7 @@ private void handleProduceResponse(ClientResponse response) {
LOG.debug("Received retriable error produce response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
if (!produceRequestBackoff.canAttempt()) {
LOG.error("Exhausted max retries to produce {} to DLQ topic node {}.", this, dlqPartitionLeaderNode());
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(new Exception("Exhausted max retries to produce to DLQ topic without success."));
break;
}
Expand All @@ -598,6 +632,7 @@ private void handleProduceResponse(ClientResponse response) {
LOG.error("Unable to produce {} to DLQ topic node {} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
partitionResponse.recordErrors().forEach(recordError ->
LOG.error("Records with errors {} - {}.", recordError.batchIndex(), recordError.batchIndexErrorMessage()));
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(error.exception());
}
break;
Expand All @@ -609,6 +644,7 @@ private void handleProduceResponse(ClientResponse response) {
if (!produceRequestBackoff.canAttempt()) {
LOG.error("Exhausted max retries to produce {} to DLQ topic node {} due to client response error {}.",
param, dlqPartitionLeaderNode(), clientResponseErrorMessage);
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(clientResponseError.exception());
break;
}
Expand All @@ -618,6 +654,7 @@ private void handleProduceResponse(ClientResponse response) {
default:
LOG.error("Unable to produce {} to DLQ topic node {} due to client response error {}.",
param, dlqPartitionLeaderNode(), clientResponseErrorMessage);
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(clientResponseError.exception());
}
}
Expand All @@ -634,6 +671,11 @@ private class SendThread extends InterBrokerSendThread {

@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
// Introspection for testing - will be null in prod
if (generateCallback != null) {
generateCallback.run();
}

List<RequestAndCompletionHandler> requests = new ArrayList<>();

if (!queue.isEmpty()) {
Expand Down Expand Up @@ -772,13 +814,19 @@ public void run() {
}
}

private record CoalesceResults(
// Visibility for tests
record CoalesceResults(
AbstractRequest.Builder<? extends AbstractRequest> request,
List<ProduceRequestHandler> liveHandlers
) {
}

private static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
// Visibility for tests
static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
// Above handlers are destined for the same broker node - it could be for different DLQ topics and partitions
// but the same broker node. Now the produce request requires each topic data request to be
// scoped to a specific topic/topicId and the partition data could have all the record information
// and the destination DLQ partition. To accomplish this, we will map handlers by DLQ topic id.
Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new HashMap<>();
List<ProduceRequestHandler> liveHandlers = new ArrayList<>(handlers.size());
handlers.forEach(handler -> {
Expand Down
Loading
Loading