@@ -3213,7 +3213,7 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInfo
32133213 subscriptions .seek (tp0 , 0 );
32143214 subscriptions .seek (tp1 , 0 );
32153215
3216- // Setup preferred read replica to node=1 by doing a fetch for both partitions.
3216+ // Setup preferred read replica to node=0 by doing a fetch for both partitions.
32173217 assertEquals (2 , sendFetches ());
32183218 assertFalse (fetcher .hasCompletedFetches ());
32193219 client .prepareResponseFrom (fullFetchResponse (tidp0 , this .records , Errors .NONE , 100L ,
@@ -3310,7 +3310,7 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInfo
33103310 assertEquals (Optional .of (nodeId1 .id ()), subscriptions .preferredReadReplica (tp0 , time .milliseconds ()));
33113311 assertEquals (Optional .of (nodeId1 .id ()), subscriptions .preferredReadReplica (tp1 , time .milliseconds ()));
33123312
3313- // Validate subscription is valid & fetch-able, and points to the new leader.
3313+ // Validate subscription is valid & fetch-able, and points to the new leader info .
33143314 assertTrue (subscriptions .isFetchable (tp0 ));
33153315 currentLeader = subscriptions .position (tp0 ).currentLeader ;
33163316 assertEquals (tp0Leader .id (), currentLeader .leader .get ().id ());
@@ -3347,7 +3347,7 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform
33473347 subscriptions .seek (tp0 , 0 );
33483348 subscriptions .seek (tp1 , 0 );
33493349
3350- // Setup preferred read replica to node=1 by doing a fetch for both partitions.
3350+ // Setup preferred read replica to node=0 by doing a fetch for both partitions.
33513351 assertEquals (2 , sendFetches ());
33523352 assertFalse (fetcher .hasCompletedFetches ());
33533353 client .prepareResponseFrom (fullFetchResponse (tidp0 , this .records , Errors .NONE , 100L ,
@@ -3428,9 +3428,9 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform
34283428 @ ParameterizedTest
34293429 @ EnumSource (value = Errors .class , names = {"KAFKA_STORAGE_ERROR" , "OFFSET_NOT_AVAILABLE" ,
34303430 "UNKNOWN_TOPIC_OR_PARTITION" , "UNKNOWN_TOPIC_ID" , "INCONSISTENT_TOPIC_ID" })
3431- public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata (Errors error ) {
3431+ public void testWhenFetchResponseReturnsAErrorCausingAwaitUpdate (Errors error ) {
34323432 // The test runs with 2 partitions where 1 partition is fetched without errors, and
3433- // 2nd partition faces errors due to leadership changes .
3433+ // 2nd partition faces errors.
34343434 buildFetcher (new MetricConfig (), OffsetResetStrategy .EARLIEST , new BytesDeserializer (),
34353435 new BytesDeserializer (),
34363436 Integer .MAX_VALUE , IsolationLevel .READ_UNCOMMITTED ,
@@ -3449,7 +3449,7 @@ public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata(Errors e
34493449 subscriptions .seek (tp0 , 0 );
34503450 subscriptions .seek (tp1 , 0 );
34513451
3452- // Setup preferred read replica to node=1 by doing a fetch for both partitions.
3452+ // Setup preferred read replica to node=0 by doing a fetch for both partitions.
34533453 assertEquals (2 , sendFetches ());
34543454 assertFalse (fetcher .hasCompletedFetches ());
34553455 client .prepareResponseFrom (fullFetchResponse (tidp0 , this .records , Errors .NONE , 100L ,
@@ -3473,8 +3473,7 @@ public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata(Errors e
34733473 // Verify that metadata-update isn't requested as metadata is considered upto-date.
34743474 assertFalse (metadata .updateRequested ());
34753475
3476- // TEST that next fetch returns an error(due to leadership change) but new leader info is not returned
3477- // in the FetchResponse. This is the behaviour prior to KIP-951, should keep on working.
3476+ // TEST that next fetch returns an error.
34783477 LinkedHashMap <TopicIdPartition , FetchResponseData .PartitionData > partitions = new LinkedHashMap <>();
34793478 partitions .put (tidp0 ,
34803479 new FetchResponseData .PartitionData ()
@@ -3494,10 +3493,10 @@ public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata(Errors e
34943493 assertFalse (partitionRecords .containsKey (tp0 ));
34953494 assertTrue (partitionRecords .containsKey (tp1 ));
34963495
3497- // Validate metadata is unchanged, as FetchResponse didn't have new leader information .
3496+ // Validate metadata is unchanged.
34983497 assertEquals (startingClusterMetadata , metadata .fetch ());
34993498
3500- // Validate metadata-update is requested due to the leadership- error on tp0.
3499+ // Validate metadata-update is requested due to the error on tp0.
35013500 assertTrue (metadata .updateRequested ());
35023501
35033502 // Validate preferred-read-replica is cleared for tp0 due to the error.
@@ -3546,7 +3545,7 @@ public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata(Errors e
35463545 assertEquals (Optional .of (nodeId1 .id ()), subscriptions .preferredReadReplica (tp0 , time .milliseconds ()));
35473546 assertEquals (Optional .of (nodeId1 .id ()), subscriptions .preferredReadReplica (tp1 , time .milliseconds ()));
35483547
3549- // Validate subscription is valid & fetch-able, and points to the new leader.
3548+ // Validate subscription is valid & fetch-able, and points to the new leader info .
35503549 assertTrue (subscriptions .isFetchable (tp0 ));
35513550 currentLeader = subscriptions .position (tp0 ).currentLeader ;
35523551 assertEquals (tp0Leader .id (), currentLeader .leader .get ().id ());
@@ -3556,6 +3555,97 @@ public void testWhenFetchResponseReturnsAErrorCausingRequestingMetadata(Errors e
35563555 assertTrue (subscriptions .isFetchable (tp1 ));
35573556 }
35583557
3558+ @ Test
3559+ public void testWhenFetchResponseReturnsAReplicaNotAvailableError () {
3560+ Errors error = Errors .REPLICA_NOT_AVAILABLE ;
3561+
3562+ // The test runs with 2 partitions where 1 partition is fetched without errors, and
3563+ // 2nd partition faces errors due to replica not available.
3564+ buildFetcher (new MetricConfig (), OffsetResetStrategy .EARLIEST , new BytesDeserializer (),
3565+ new BytesDeserializer (),
3566+ Integer .MAX_VALUE , IsolationLevel .READ_UNCOMMITTED ,
3567+ Duration .ofMinutes (5 ).toMillis ());
3568+
3569+ // Setup so that tp0 & tp1 are subscribed and will be fetched from.
3570+ // Also, setup client's metadata for tp0 & tp1.
3571+ subscriptions .assignFromUser (new HashSet <>(Arrays .asList (tp0 , tp1 )));
3572+ client .updateMetadata (
3573+ RequestTestUtils .metadataUpdateWithIds (2 , singletonMap (topicName , 4 ),
3574+ tp -> validLeaderEpoch , topicIds , false ));
3575+ Node tp0Leader = metadata .fetch ().leaderFor (tp0 );
3576+ Node tp1Leader = metadata .fetch ().leaderFor (tp1 );
3577+ Node nodeId0 = metadata .fetch ().nodeById (0 );
3578+ Cluster startingClusterMetadata = metadata .fetch ();
3579+ subscriptions .seek (tp0 , 0 );
3580+ subscriptions .seek (tp1 , 0 );
3581+
3582+ // Setup preferred read replica to node=0 by doing a fetch for both partitions.
3583+ assertEquals (2 , sendFetches ());
3584+ assertFalse (fetcher .hasCompletedFetches ());
3585+ client .prepareResponseFrom (fullFetchResponse (tidp0 , this .records , Errors .NONE , 100L ,
3586+ FetchResponse .INVALID_LAST_STABLE_OFFSET , 0 , Optional .of (nodeId0 .id ())), tp0Leader );
3587+ client .prepareResponseFrom (fullFetchResponse (tidp1 , this .records , Errors .NONE , 100L ,
3588+ FetchResponse .INVALID_LAST_STABLE_OFFSET , 0 , Optional .of (nodeId0 .id ())), tp1Leader );
3589+ networkClientDelegate .poll (time .timer (0 ));
3590+ assertTrue (fetcher .hasCompletedFetches ());
3591+ Map <TopicPartition , List <ConsumerRecord <byte [], byte []>>> partitionRecords = fetchRecords ();
3592+ assertTrue (partitionRecords .containsKey (tp0 ));
3593+ assertTrue (partitionRecords .containsKey (tp1 ));
3594+ // Validate setup of preferred read replica for tp0 & tp1 is done correctly.
3595+ Node selected = fetcher .selectReadReplica (tp0 , Node .noNode (), time .milliseconds ());
3596+ assertEquals (nodeId0 .id (), selected .id ());
3597+ selected = fetcher .selectReadReplica (tp1 , Node .noNode (), time .milliseconds ());
3598+ assertEquals (nodeId0 .id (), selected .id ());
3599+
3600+ // Send next fetch request.
3601+ assertEquals (1 , sendFetches ());
3602+ assertFalse (fetcher .hasCompletedFetches ());
3603+ // Verify that metadata-update isn't requested as metadata is considered upto-date.
3604+ assertFalse (metadata .updateRequested ());
3605+
3606+ // TEST that next fetch returns an error.
3607+ LinkedHashMap <TopicIdPartition , FetchResponseData .PartitionData > partitions = new LinkedHashMap <>();
3608+ partitions .put (tidp0 ,
3609+ new FetchResponseData .PartitionData ()
3610+ .setPartitionIndex (tidp0 .topicPartition ().partition ())
3611+ .setErrorCode (error .code ()));
3612+ partitions .put (tidp1 ,
3613+ new FetchResponseData .PartitionData ()
3614+ .setPartitionIndex (tidp1 .topicPartition ().partition ())
3615+ .setErrorCode (Errors .NONE .code ())
3616+ .setHighWatermark (100L )
3617+ .setLastStableOffset (FetchResponse .INVALID_LAST_STABLE_OFFSET )
3618+ .setLogStartOffset (0 )
3619+ .setRecords (nextRecords ));
3620+ client .prepareResponseFrom (FetchResponse .of (Errors .NONE , 0 , INVALID_SESSION_ID , partitions ), nodeId0 );
3621+ networkClientDelegate .poll (time .timer (0 ));
3622+ partitionRecords = fetchRecords ();
3623+ assertFalse (partitionRecords .containsKey (tp0 ));
3624+ assertTrue (partitionRecords .containsKey (tp1 ));
3625+
3626+ // Validate metadata is unchanged.
3627+ assertEquals (startingClusterMetadata , metadata .fetch ());
3628+
3629+ // Validate metadata-update is requested due to the replica not available error on tp0.
3630+ assertTrue (metadata .updateRequested ());
3631+
3632+ // Validate preferred-read-replica is cleared for tp0 due to the error.
3633+ assertEquals (Optional .empty (),
3634+ subscriptions .preferredReadReplica (tp0 , time .milliseconds ()));
3635+ // Validate preferred-read-replica is still set for tp1 as previous fetch for it was ok.
3636+ assertEquals (Optional .of (nodeId0 .id ()),
3637+ subscriptions .preferredReadReplica (tp1 , time .milliseconds ()));
3638+
3639+ // Validate subscription is still valid & fetch-able for tp0 & tp1.
3640+ // And tp0 points to original leader.
3641+ assertTrue (subscriptions .isFetchable (tp0 ));
3642+ Metadata .LeaderAndEpoch currentLeader = subscriptions .position (tp0 ).currentLeader ;
3643+ assertEquals (tp0Leader .id (), currentLeader .leader .get ().id ());
3644+ assertEquals (validLeaderEpoch , currentLeader .epoch .get ());
3645+
3646+ assertTrue (subscriptions .isFetchable (tp1 ));
3647+ }
3648+
35593649 private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse (
35603650 TopicPartition topicPartition ,
35613651 Errors error ,
0 commit comments