@@ -445,26 +445,15 @@ class ReplicaManagerTest {
445445
446446 try {
447447 val brokerList = Seq [Integer ](0 , 1 ).asJava
448- val topicIds = Collections .singletonMap(topic, topicId)
449448
450- val partition = rm.createPartition(new TopicPartition (topic, 0 ))
449+ val topicPartition = new TopicPartition (topic, 0 )
450+ val partition = rm.createPartition(topicPartition)
451451 partition.createLogIfNotExists(isNew = false , isFutureReplica = false ,
452452 new LazyOffsetCheckpoints (rm.highWatermarkCheckpoints.asJava), None )
453453 // Make this replica the leader.
454- val leaderAndIsrRequest1 = new LeaderAndIsrRequest .Builder (0 , 0 , brokerEpoch,
455- Seq (new LeaderAndIsrRequest .PartitionState ()
456- .setTopicName(topic)
457- .setPartitionIndex(0 )
458- .setControllerEpoch(0 )
459- .setLeader(0 )
460- .setLeaderEpoch(0 )
461- .setIsr(brokerList)
462- .setPartitionEpoch(0 )
463- .setReplicas(brokerList)
464- .setIsNew(false )).asJava,
465- topicIds,
466- Set (new Node (0 , " host1" , 0 ), new Node (1 , " host2" , 1 )).asJava).build()
467- rm.becomeLeaderOrFollower(0 , leaderAndIsrRequest1, (_, _) => ())
454+ val delta = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(0 ), brokerList, brokerList)
455+ val leaderMetadataImage = imageFromTopics(delta.apply())
456+ rm.applyDelta(delta, leaderMetadataImage)
468457 rm.getPartitionOrException(new TopicPartition (topic, 0 ))
469458 .localLogOrException
470459
@@ -474,20 +463,9 @@ class ReplicaManagerTest {
474463 }
475464
476465 // Make this replica the follower
477- val leaderAndIsrRequest2 = new LeaderAndIsrRequest .Builder (0 , 0 , brokerEpoch,
478- Seq (new LeaderAndIsrRequest .PartitionState ()
479- .setTopicName(topic)
480- .setPartitionIndex(0 )
481- .setControllerEpoch(0 )
482- .setLeader(1 )
483- .setLeaderEpoch(1 )
484- .setIsr(brokerList)
485- .setPartitionEpoch(0 )
486- .setReplicas(brokerList)
487- .setIsNew(false )).asJava,
488- topicIds,
489- Set (new Node (0 , " host1" , 0 ), new Node (1 , " host2" , 1 )).asJava).build()
490- rm.becomeLeaderOrFollower(1 , leaderAndIsrRequest2, (_, _) => ())
466+ val delta1 = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(1 ), brokerList, brokerList, 1 )
467+ val followerMetadataImage = imageFromTopics(delta1.apply())
468+ rm.applyDelta(delta1, followerMetadataImage)
491469
492470 assertTrue(appendResult.hasFired)
493471 } finally {
@@ -945,20 +923,9 @@ class ReplicaManagerTest {
945923 new LazyOffsetCheckpoints (replicaManager.highWatermarkCheckpoints.asJava), None )
946924
947925 // Make this replica the leader.
948- val leaderAndIsrRequest1 = new LeaderAndIsrRequest .Builder (0 , 0 , brokerEpoch,
949- Seq (new LeaderAndIsrRequest .PartitionState ()
950- .setTopicName(topic)
951- .setPartitionIndex(0 )
952- .setControllerEpoch(0 )
953- .setLeader(0 )
954- .setLeaderEpoch(0 )
955- .setIsr(brokerList)
956- .setPartitionEpoch(0 )
957- .setReplicas(brokerList)
958- .setIsNew(true )).asJava,
959- topicIds.asJava,
960- Set (new Node (0 , " host1" , 0 ), new Node (1 , " host2" , 1 )).asJava).build()
961- replicaManager.becomeLeaderOrFollower(0 , leaderAndIsrRequest1, (_, _) => ())
926+ val delta = topicsCreateDelta(brokerList.get(0 ), isStartIdLeader = true , partitions = List (0 ), List .empty, topic, topicIds(topic))
927+ val leaderMetadataImage = imageFromTopics(delta.apply())
928+ replicaManager.applyDelta(delta, leaderMetadataImage)
962929 replicaManager.getPartitionOrException(new TopicPartition (topic, 0 ))
963930 .localLogOrException
964931
@@ -2548,8 +2515,9 @@ class ReplicaManagerTest {
25482515 val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List (tp), config = config)
25492516
25502517 try {
2551- val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq (0 , 1 ), new LeaderAndIsr (0 , List (0 , 1 ).map(Int .box).asJava))
2552- replicaManager.becomeLeaderOrFollower(1 , becomeLeaderRequest, (_, _) => ())
2518+ val delta = topicsCreateDelta(0 , isStartIdLeader = true , partitions = List (0 ), List .empty, topic, topicIds(topic))
2519+ val leaderMetadataImage = imageFromTopics(delta.apply())
2520+ replicaManager.applyDelta(delta, leaderMetadataImage)
25532521
25542522 val transactionalRecords = MemoryRecords .withTransactionalRecords(Compression .NONE , producerId, producerEpoch, sequence,
25552523 new SimpleRecord (s " message $sequence" .getBytes))
@@ -4110,23 +4078,6 @@ class ReplicaManagerTest {
41104078 try {
41114079 val offsetCheckpoints = new LazyOffsetCheckpoints (replicaManager.highWatermarkCheckpoints.asJava)
41124080 replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false , isFutureReplica = false , offsetCheckpoints, None )
4113- val partition0Replicas = Seq [Integer ](0 , 1 ).asJava
4114- val topicIds = Map (tp0.topic -> topicId).asJava
4115- val leaderAndIsrRequest = new LeaderAndIsrRequest .Builder (0 , 0 , brokerEpoch,
4116- Seq (
4117- new LeaderAndIsrRequest .PartitionState ()
4118- .setTopicName(tp0.topic)
4119- .setPartitionIndex(tp0.partition)
4120- .setControllerEpoch(0 )
4121- .setLeader(1 )
4122- .setLeaderEpoch(0 )
4123- .setIsr(partition0Replicas)
4124- .setPartitionEpoch(0 )
4125- .setReplicas(partition0Replicas)
4126- .setIsNew(true )
4127- ).asJava,
4128- topicIds,
4129- Set (new Node (0 , " host1" , 0 ), new Node (1 , " host2" , 1 )).asJava).build()
41304081
41314082 // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
41324083 assertEquals(0 , brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@@ -4135,7 +4086,9 @@ class ReplicaManagerTest {
41354086 assertEquals(0 , brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
41364087 assertEquals(0 , brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
41374088
4138- replicaManager.becomeLeaderOrFollower(0 , leaderAndIsrRequest, (_, _) => ())
4089+ val delta = createLeaderDelta(topicIds(topic), new TopicPartition (topic, 0 ), 1 , util.List .of(0 , 1 ), util.List .of(0 , 1 ))
4090+ val leaderMetadataImage = imageFromTopics(delta.apply())
4091+ replicaManager.applyDelta(delta, leaderMetadataImage)
41394092
41404093 // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
41414094 // We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata
0 commit comments