Skip to content

KAFKA-14464: Make resource leaks for MM2 resources more difficult #12980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.TopicAdmin;

import java.time.Duration;
import java.util.IdentityHashMap;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;

public class BackgroundResources implements AutoCloseable {

private final Map<AutoCloseable, String> resources;

public BackgroundResources() {
resources = new IdentityHashMap<>();
}

private <T extends AutoCloseable> T open(T closeable, String name) {
resources.put(closeable, name);
return closeable;
}

@Override
public void close() {
for (Map.Entry<AutoCloseable, String> entry : resources.entrySet()) {
Utils.closeQuietly(entry.getKey(), entry.getValue());
}
}

public KafkaConsumer<byte[], byte[]> consumer(Map<String, Object> props, String name) {
return open(new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), name);
}

public KafkaProducer<byte[], byte[]> producer(Map<String, Object> props, String name) {
return open(new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()), name);
}

@SuppressWarnings({"unchecked", "rawtypes"})
public Admin admin(MirrorConnectorConfig config, Map<String, Object> props, String name) {
try {
return open(Utils.newParameterizedInstance(
config.getClass(FORWARDING_ADMIN_CLASS).getName(), (Class<Map<String, Object>>) (Class) Map.class, props
), name);
} catch (ClassNotFoundException e) {
throw new KafkaException("Can't create instance of " + config.getString(FORWARDING_ADMIN_CLASS), e);
}
}

public TopicFilter topicFilter(MirrorConnectorConfig config, String name) {
return open(config.getConfiguredInstance(MirrorConnectorConfig.TOPIC_FILTER_CLASS, TopicFilter.class), name);
}

public GroupFilter groupFilter(MirrorCheckpointConfig config, String name) {
return open(config.getConfiguredInstance(MirrorCheckpointConfig.GROUP_FILTER_CLASS, GroupFilter.class), name);
}

public OffsetSyncStore offsetSyncStore(MirrorCheckpointTaskConfig config, String name) {
Consumer<byte[], byte[]> consumer = null;
TopicAdmin admin;
try {
consumer = consumer(config.offsetSyncsTopicConsumerConfig(), "offset syncs consumer");
admin = new TopicAdmin(
config.offsetSyncsTopicAdminConfig(),
admin(config, config.offsetSyncsTopicAdminConfig(), "offset syncs admin"));
} catch (Throwable t) {
Utils.closeQuietly(consumer, "consumer for offset syncs");
throw t;
}
return open(new OffsetSyncStore(config, consumer, admin), name);
}

public Scheduler scheduler(Class<?> clazz, String role, Duration timeout, String name) {
return open(new Scheduler(clazz, role, timeout), name);
}

public ConfigPropertyFilter configPropertyFilter(MirrorSourceConfig config, String name) {
return open(config.getConfiguredInstance(MirrorSourceConfig.CONFIG_PROPERTY_FILTER_CLASS, ConfigPropertyFilter.class), name);
}

public MirrorSourceMetrics sourceMetrics(MirrorSourceTaskConfig config, String name) {
MirrorSourceMetrics metrics = new MirrorSourceMetrics(config);
config.metricsReporters().forEach(metrics::addReporter);
return open(metrics, name);
}

public MirrorCheckpointMetrics checkpointMetrics(MirrorCheckpointTaskConfig config, String name) {
MirrorCheckpointMetrics metrics = new MirrorCheckpointMetrics(config);
config.metricsReporters().forEach(metrics::addReporter);
return open(metrics, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ short checkpointsTopicReplicationFactor() {
return getShort(CHECKPOINTS_TOPIC_REPLICATION_FACTOR);
}

GroupFilter groupFilter() {
return getConfiguredInstance(GROUP_FILTER_CLASS, GroupFilter.class);
}

TopicFilter topicFilter() {
return getConfiguredInstance(TOPIC_FILTER_CLASS, TopicFilter.class);
}

Duration syncGroupOffsetsInterval() {
if (getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
return Duration.ofSeconds(getLong(SYNC_GROUP_OFFSETS_INTERVAL_SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
Expand Down Expand Up @@ -60,6 +59,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget;
private Set<String> knownConsumerGroups = Collections.emptySet();
private BackgroundResources backgroundResources;

public MirrorCheckpointConnector() {
// nop
Expand All @@ -69,6 +69,7 @@ public MirrorCheckpointConnector() {
MirrorCheckpointConnector(Set<String> knownConsumerGroups, MirrorCheckpointConfig config) {
this.knownConsumerGroups = knownConsumerGroups;
this.config = config;
this.backgroundResources = new BackgroundResources();
}

@Override
Expand All @@ -77,13 +78,14 @@ public void start(Map<String, String> props) {
if (!config.enabled()) {
return;
}
backgroundResources = new BackgroundResources();
String connectorName = config.connectorName();
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
topicFilter = config.topicFilter();
groupFilter = config.groupFilter();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
topicFilter = backgroundResources.topicFilter(config, "topic filter");
groupFilter = backgroundResources.groupFilter(config, "group filter");
sourceAdminClient = backgroundResources.admin(config, config.sourceAdminConfig("checkpoint-source-admin"), "source admin client");
targetAdminClient = backgroundResources.admin(config, config.targetAdminConfig("checkpoint-target-admin"), "target admin client");
scheduler = backgroundResources.scheduler(getClass(), config.entityLabel(), config.adminTimeout(), "scheduler");
scheduler.execute(this::createInternalTopics, "creating internal topics");
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
Expand All @@ -97,11 +99,7 @@ public void stop() {
if (!config.enabled()) {
return;
}
Utils.closeQuietly(scheduler, "scheduler");
Utils.closeQuietly(topicFilter, "topic filter");
Utils.closeQuietly(groupFilter, "group filter");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
backgroundResources.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;

Expand All @@ -53,6 +52,7 @@ public class MirrorCheckpointTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);

private BackgroundResources backgroundResources;
private Admin sourceAdminClient;
private Admin targetAdminClient;
private String sourceClusterAlias;
Expand Down Expand Up @@ -83,27 +83,29 @@ public MirrorCheckpointTask() {}
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
this.topicFilter = topic -> true;
this.backgroundResources = new BackgroundResources();
}

@Override
public void start(Map<String, String> props) {
MirrorCheckpointTaskConfig config = new MirrorCheckpointTaskConfig(props);
stopping = false;
backgroundResources = new BackgroundResources();
sourceClusterAlias = config.sourceClusterAlias();
targetClusterAlias = config.targetClusterAlias();
consumerGroups = config.taskConsumerGroups();
checkpointsTopic = config.checkpointsTopic();
topicFilter = config.topicFilter();
topicFilter = backgroundResources.topicFilter(config, "topic filter");
replicationPolicy = config.replicationPolicy();
interval = config.emitCheckpointsInterval();
pollTimeout = config.consumerPollTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
offsetSyncStore = backgroundResources.offsetSyncStore(config, "offset sync store");
sourceAdminClient = backgroundResources.admin(config, config.sourceAdminConfig("checkpoint-source-admin"), "source admin client");
targetAdminClient = backgroundResources.admin(config, config.targetAdminConfig("checkpoint-target-admin"), "target admin client");
metrics = backgroundResources.checkpointMetrics(config, "checkpoint metrics");
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler = backgroundResources.scheduler(getClass(), config.entityLabel(), config.adminTimeout(), "scheduler");
scheduler.execute(() -> {
offsetSyncStore.start();
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
Expand All @@ -124,12 +126,7 @@ public void commit() {
public void stop() {
long start = System.currentTimeMillis();
stopping = true;
Utils.closeQuietly(topicFilter, "topic filter");
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Utils.closeQuietly(metrics, "metrics");
Utils.closeQuietly(scheduler, "scheduler");
backgroundResources.close();
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ Set<String> taskConsumerGroups() {
return new HashSet<>(fields);
}

MirrorCheckpointMetrics metrics() {
MirrorCheckpointMetrics metrics = new MirrorCheckpointMetrics(this);
metricsReporters().forEach(metrics::addReporter);
return metrics;
}

@Override
String entityLabel() {
return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : getInt(TASK_INDEX));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
Expand Down Expand Up @@ -226,17 +224,6 @@ List<MetricsReporter> metricsReporters() {
return reporters;
}

@SuppressWarnings({"unchecked", "rawtypes"})
ForwardingAdmin forwardingAdmin(Map<String, Object> config) {
try {
return Utils.newParameterizedInstance(
getClass(FORWARDING_ADMIN_CLASS).getName(), (Class<Map<String, Object>>) (Class) Map.class, config
);
} catch (ClassNotFoundException e) {
throw new KafkaException("Can't create instance of " + get(FORWARDING_ADMIN_CLASS), e);
}
}

void addClientId(Map<String, Object> props, String role) {
String clientId = entityLabel() + (role == null ? "" : "|" + role);
props.compute(CommonClientConfigs.CLIENT_ID_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* @see MirrorHeartbeatConfig for supported config properties.
*/
public class MirrorHeartbeatConnector extends SourceConnector {
private BackgroundResources backgroundResources;
private MirrorHeartbeatConfig config;
private Scheduler scheduler;
private Admin targetAdminClient;
Expand All @@ -47,12 +48,14 @@ public MirrorHeartbeatConnector() {
// visible for testing
MirrorHeartbeatConnector(MirrorHeartbeatConfig config) {
this.config = config;
this.backgroundResources = new BackgroundResources();
}

@Override
public void start(Map<String, String> props) {
config = new MirrorHeartbeatConfig(props);
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("heartbeats-target-admin"));
this.backgroundResources = new BackgroundResources();
targetAdminClient = backgroundResources.admin(config, config.targetAdminConfig("heartbeats-target-admin"), "target admin client");
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ short offsetSyncsTopicReplicationFactor() {
return getShort(OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR);
}

TopicFilter topicFilter() {
return getConfiguredInstance(TOPIC_FILTER_CLASS, TopicFilter.class);
}

ConfigPropertyFilter configPropertyFilter() {
return getConfiguredInstance(CONFIG_PROPERTY_FILTER_CLASS, ConfigPropertyFilter.class);
}

Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
Expand Down
Loading