Skip to content

Commit 3c3c675

Browse files
authored
Merge branch 'apache:trunk' into followupRemoteStorage
2 parents b5798fd + fa097fa commit 3c3c675

15 files changed

Lines changed: 309 additions & 345 deletions

File tree

.github/actions/setup-gradle/action.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ runs:
4242
distribution: temurin
4343
java-version: ${{ inputs.java-version }}
4444
- name: Setup Gradle
45-
uses: gradle/actions/setup-gradle@4d9f0ba0025fe599b4ebab900eb7f3a1d93ef4c2 # v5.0.0
45+
uses: gradle/actions/setup-gradle@0723195856401067f7a2779048b490ace7a47d7c # v5.0.2
4646
env:
4747
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
4848
with:

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class DynamicBrokerConfigTest {
249249

250250
val props3 = new Properties()
251251
props3.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
252-
val err3 = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
252+
val err3 = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props3, perBrokerConfig = true))
253253
assertTrue(err3.getMessage.contains("Value must be at least 1"))
254254
verifyNoMoreInteractions(remoteLogManager)
255255

@@ -771,35 +771,35 @@ class DynamicBrokerConfigTest {
771771
}
772772

773773
@Test
774-
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
775-
// remote copy lag ms cannot exceed effective local retention ms
776-
verifyIncorrectRemoteCopyLagProps(
774+
def testDynamicLogRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
775+
// log remote copy lag ms cannot exceed effective log local retention ms
776+
verifyIncorrectLogRemoteCopyLagProps(
777777
retentionMs = 1000L,
778778
logLocalRetentionMs = -2L,
779-
remoteCopyLagMs = 1001L,
779+
logRemoteCopyLagMs = 1001L,
780780
retentionBytes = 1000L,
781781
logLocalRetentionBytes = -2L,
782-
remoteCopyLagBytes = 100L
782+
logRemoteCopyLagBytes = 100L
783783
)
784784

785-
// remote copy lag bytes cannot exceed effective local retention bytes
786-
verifyIncorrectRemoteCopyLagProps(
785+
// log remote copy lag bytes cannot exceed effective log local retention bytes
786+
verifyIncorrectLogRemoteCopyLagProps(
787787
retentionMs = 1000L,
788788
logLocalRetentionMs = -2L,
789-
remoteCopyLagMs = 100L,
789+
logRemoteCopyLagMs = 100L,
790790
retentionBytes = 1000L,
791791
logLocalRetentionBytes = -2L,
792-
remoteCopyLagBytes = 1001L
792+
logRemoteCopyLagBytes = 1001L
793793
)
794794

795795
}
796796

797-
def verifyIncorrectRemoteCopyLagProps(retentionMs: Long,
798-
logLocalRetentionMs: Long,
799-
remoteCopyLagMs: Long,
800-
retentionBytes: Long,
801-
logLocalRetentionBytes: Long,
802-
remoteCopyLagBytes: Long): Unit = {
797+
def verifyIncorrectLogRemoteCopyLagProps(retentionMs: Long,
798+
logLocalRetentionMs: Long,
799+
logRemoteCopyLagMs: Long,
800+
retentionBytes: Long,
801+
logLocalRetentionBytes: Long,
802+
logRemoteCopyLagBytes: Long): Unit = {
803803
val props = TestUtils.createBrokerConfig(0, port = 8181)
804804
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
805805
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
@@ -810,9 +810,9 @@ class DynamicBrokerConfigTest {
810810

811811
val newProps = new Properties()
812812
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs.toString)
813-
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, remoteCopyLagMs.toString)
813+
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs.toString)
814814
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes.toString)
815-
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, remoteCopyLagBytes.toString)
815+
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes.toString)
816816
// validate default config
817817
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
818818
// validate per broker config

server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,20 +349,24 @@ private static String parseListenerName(String connectionString) {
349349
* <p>
350350
* This method exists to support migration from kafka.server.KafkaConfig (Scala/core) to AbstractKafkaConfig (Java/server).
351351
* When migrating code, replace KafkaConfig references with AbstractKafkaConfig.
352+
* Subclasses should override this method to integrate with their dynamic configuration mechanism;
353+
* the default implementation is a no-op.
352354
*
353355
* @param reconfigurable the component to register for configuration updates
354356
*/
355-
public abstract void addReconfigurable(Reconfigurable reconfigurable);
357+
public void addReconfigurable(Reconfigurable reconfigurable) { }
356358

357359
/**
358360
* Unregisters a component from dynamic reconfiguration notifications.
359361
* <p>
360362
* This method exists to support migration from kafka.server.KafkaConfig (Scala/core) to AbstractKafkaConfig (Java/server).
361363
* When migrating code, replace KafkaConfig references with AbstractKafkaConfig.
364+
* Subclasses should override this method to integrate with their dynamic configuration mechanism;
365+
* the default implementation is a no-op.
362366
*
363367
* @param reconfigurable the component to unregister
364368
*/
365-
public abstract void removeReconfigurable(Reconfigurable reconfigurable);
369+
public void removeReconfigurable(Reconfigurable reconfigurable) { }
366370

367371
/**
368372
* Determines whether a config entry might be sensitive based on its type.

server/src/test/java/org/apache/kafka/server/DefaultAutoTopicCreationManagerTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,7 @@ public void setup() {
100100
props.setProperty(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1");
101101
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1");
102102

103-
config = new AbstractKafkaConfig(AbstractKafkaConfig.CONFIG_DEF, props, Map.of(), false) {
104-
@Override
105-
public void addReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
106-
}
107-
@Override
108-
public void removeReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
109-
}
110-
};
103+
config = new AbstractKafkaConfig(AbstractKafkaConfig.CONFIG_DEF, props, Map.of(), false) { };
111104
topicCreator.reset();
112105
}
113106

server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.server.config;
1818

19-
import org.apache.kafka.common.Reconfigurable;
2019
import org.apache.kafka.common.config.ConfigDef;
2120
import org.apache.kafka.coordinator.group.GroupConfig;
2221
import org.apache.kafka.raft.KRaftConfigs;
@@ -101,13 +100,7 @@ private static Map<String, Object> extractGroupConfigMap(Map<String, Object> bro
101100
ConfigDef configDef = new ConfigDef().define(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM, ConfigDef.Type.STRING,
102101
"default-value", ConfigDef.Importance.LOW, "test broker synonym");
103102

104-
AbstractKafkaConfig kafkaConfig = new AbstractKafkaConfig(configDef, new HashMap<>(brokerProps), Map.of(), false) {
105-
@Override
106-
public void addReconfigurable(Reconfigurable reconfigurable) { }
107-
108-
@Override
109-
public void removeReconfigurable(Reconfigurable reconfigurable) { }
110-
};
103+
AbstractKafkaConfig kafkaConfig = new AbstractKafkaConfig(configDef, new HashMap<>(brokerProps), Map.of(), false) { };
111104

112105
return kafkaConfig.extractGroupConfigMap();
113106
}

server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,7 @@ public class AddPartitionsToTxnManagerTest {
9090
KRaftConfigs.NODE_ID_CONFIG, "1",
9191
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"),
9292
Map.of(),
93-
false) {
94-
@Override
95-
public void addReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
96-
// No-op for test
97-
}
98-
99-
@Override
100-
public void removeReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
101-
// No-op for test
102-
}
103-
};
93+
false) { };
10494
private final AddPartitionsToTxnManager addPartitionsToTxnManager =
10595
new AddPartitionsToTxnManager(config, networkClient, metadataCache, partitionFor, time);
10696

storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java

Lines changed: 0 additions & 101 deletions
This file was deleted.

storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)