Skip to content

Commit da61b19

Browse files
authored
KAFKA-19981: Handle retriable remote storage exception in RemoteLogManager (apache#21150)
This PR distinguishes between `RemoteStorageException` and `RetriableRemoteStorageException` in `RemoteLogManager` to handle temporary storage degradations gracefully: 1. Copy path: Avoids deleting partially uploaded segments when `RetriableRemoteStorageException` is thrown; 2. Delete path: Skips incrementing failedRemoteDeleteRequestRate metric for retriable exceptions; 3. Documentation: Updates `RemoteStorageManager` Javadoc to clarify exception usage; 4. Testing: Adds UT for retriable scenarios. Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>
1 parent 29ec918 commit da61b19

3 files changed

Lines changed: 160 additions & 8 deletions

File tree

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics.
4444
* The following tags are automatically added to all metrics registered: <code>config</code> set to
4545
* <code>remote.log.storage.manager.class.name</code>, and <code>class</code> set to the RemoteStorageManager class name.
46+
* <p>
47+
* Plugin implementors of {@link RemoteStorageManager} should throw {@link RetriableRemoteStorageException}
48+
* for transient errors that can be recovered by retrying. For non-recoverable errors,
49+
* {@link RemoteStorageException} should be thrown. This distinction allows RemoteLogManager to
50+
* handle retries gracefully and report metrics accurately.
4651
*/
4752
public interface RemoteStorageManager extends Configurable, Closeable {
4853

@@ -90,11 +95,11 @@ enum IndexType {
9095
* @param remoteLogSegmentMetadata metadata about the remote log segment.
9196
* @param logSegmentData data to be copied to tiered storage.
9297
* @return custom metadata to be added to the segment metadata after copying.
93-
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
98+
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
99+
* @throws RetriableRemoteStorageException if the error is transient and the operation can be retried.
94100
*/
95101
Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
96-
LogSegmentData logSegmentData)
97-
throws RemoteStorageException;
102+
LogSegmentData logSegmentData) throws RemoteStorageException;
98103

99104
/**
100105
* Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata}
@@ -150,6 +155,7 @@ InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
150155
*
151156
* @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
152157
* @throws RemoteStorageException if there are any storage related errors occurred.
158+
* @throws RetriableRemoteStorageException if the error is transient and the operation can be retried.
153159
*/
154160
void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
155161
}

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ public void run() {
836836
if (!isCancelled()) {
837837
logger.warn("Current thread for partition {} is interrupted", topicIdPartition, ex);
838838
}
839-
} catch (RetriableException ex) {
839+
} catch (RetriableException | RetriableRemoteStorageException ex) {
840840
logger.debug("Encountered a retryable error while executing current task for partition {}", topicIdPartition, ex);
841841
} catch (Exception ex) {
842842
if (!isCancelled()) {
@@ -869,7 +869,7 @@ public RLMCopyTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimi
869869
}
870870

871871
@Override
872-
protected void execute(UnifiedLog log) throws InterruptedException {
872+
protected void execute(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
873873
// In the first run after completing altering logDir within broker, we should make sure the state is reset. (KAFKA-16711)
874874
if (!log.parentDir().equals(logDirectory.orElse(null))) {
875875
copiedOffsetOption = Optional.empty();
@@ -928,7 +928,7 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
928928
return candidateLogSegments;
929929
}
930930

931-
public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException {
931+
public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
932932
if (isCancelled())
933933
return;
934934

@@ -1001,7 +1001,7 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
10011001
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
10021002
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
10031003
this.cancel();
1004-
} catch (InterruptedException | RetriableException ex) {
1004+
} catch (InterruptedException | RetriableException | RetriableRemoteStorageException ex) {
10051005
throw ex;
10061006
} catch (Exception ex) {
10071007
if (!isCancelled()) {
@@ -1044,6 +1044,9 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
10441044

10451045
try {
10461046
customMetadata = remoteStorageManagerPlugin.get().copyLogSegmentData(copySegmentStartedRlsm, segmentData);
1047+
} catch (RetriableRemoteStorageException e) {
1048+
logger.info("Copy failed with retriable error for segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
1049+
throw e;
10471050
} catch (RemoteStorageException e) {
10481051
logger.info("Copy failed, cleaning segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
10491052
try {
@@ -1513,6 +1516,8 @@ private boolean deleteRemoteLogSegment(
15131516
// Delete the segment in remote storage.
15141517
try {
15151518
remoteStorageManagerPlugin.get().deleteLogSegmentData(segmentMetadata);
1519+
} catch (RetriableRemoteStorageException e) {
1520+
throw e;
15161521
} catch (RemoteStorageException e) {
15171522
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
15181523
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,92 @@ void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
774774
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
775775
}
776776

777+
@Test
778+
void testFailedCopyWithRetriableExceptionShouldNotDeleteTheDanglingSegment() throws Exception {
779+
long oldSegmentStartOffset = 0L;
780+
long nextSegmentStartOffset = 150L;
781+
long lastStableOffset = 150L;
782+
long logEndOffset = 150L;
783+
784+
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
785+
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
786+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
787+
788+
// leader epoch preparation
789+
checkpoint.write(totalEpochEntries);
790+
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
791+
when(mockLog.leaderEpochCache()).thenReturn(cache);
792+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
793+
794+
File tempFile = TestUtils.tempFile();
795+
File mockProducerSnapshotIndex = TestUtils.tempFile();
796+
File tempDir = TestUtils.tempDirectory();
797+
// create 2 log segments, with 0 and 150 as log start offset
798+
LogSegment oldSegment = mock(LogSegment.class);
799+
LogSegment activeSegment = mock(LogSegment.class);
800+
801+
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
802+
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
803+
when(activeSegment.size()).thenReturn(2);
804+
verify(oldSegment, times(0)).readNextOffset();
805+
verify(activeSegment, times(0)).readNextOffset();
806+
807+
FileRecords fileRecords = mock(FileRecords.class);
808+
when(oldSegment.log()).thenReturn(fileRecords);
809+
when(fileRecords.file()).thenReturn(tempFile);
810+
when(fileRecords.sizeInBytes()).thenReturn(10);
811+
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
812+
813+
when(mockLog.activeSegment()).thenReturn(activeSegment);
814+
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
815+
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(List.of(oldSegment, activeSegment));
816+
817+
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
818+
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
819+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
820+
when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
821+
when(mockLog.logEndOffset()).thenReturn(logEndOffset);
822+
823+
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
824+
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
825+
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
826+
txnFile.createNewFile();
827+
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
828+
when(oldSegment.timeIndex()).thenReturn(timeIdx);
829+
when(oldSegment.offsetIndex()).thenReturn(idx);
830+
when(oldSegment.txnIndex()).thenReturn(txnIndex);
831+
832+
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
833+
dummyFuture.complete(null);
834+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
835+
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
836+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
837+
838+
// throw retriable exception when copyLogSegmentData
839+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
840+
.thenThrow(new RetriableRemoteStorageException("test-retriable"));
841+
RemoteLogManager.RLMCopyTask task = remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128);
842+
assertThrows(RetriableRemoteStorageException.class, () -> task.copyLogSegmentsToRemote(mockLog));
843+
844+
ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
845+
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
846+
// verify the segment is not deleted for retriable exception
847+
verify(remoteStorageManager, never()).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
848+
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
849+
850+
// Verify the metrics
851+
// Retriable exceptions should not count as failures for copy
852+
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
853+
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
854+
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
855+
// Verify aggregate metrics
856+
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
857+
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
858+
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
859+
assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
860+
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
861+
}
862+
777863
@Test
778864
void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exception {
779865
long oldSegmentStartOffset = 0L;
@@ -2401,7 +2487,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
24012487
Thread copyThread = new Thread(() -> {
24022488
try {
24032489
copyTask.copyLogSegmentsToRemote(mockLog);
2404-
} catch (InterruptedException e) {
2490+
} catch (InterruptedException | RetriableRemoteStorageException e) {
24052491
throw new RuntimeException(e);
24062492
}
24072493
});
@@ -2840,6 +2926,61 @@ public void testFailedDeleteExpiredSegments(long retentionSize,
28402926
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
28412927
}
28422928

2929+
@ParameterizedTest(name = "testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics retentionSize={0} retentionMs={1}")
2930+
@CsvSource(value = {"0, -1", "-1, 0"})
2931+
public void testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics(long retentionSize,
2932+
long retentionMs) throws RemoteStorageException, ExecutionException, InterruptedException {
2933+
Map<String, Long> logProps = new HashMap<>();
2934+
logProps.put("retention.bytes", retentionSize);
2935+
logProps.put("retention.ms", retentionMs);
2936+
LogConfig mockLogConfig = new LogConfig(logProps);
2937+
when(mockLog.config()).thenReturn(mockLogConfig);
2938+
2939+
List<EpochEntry> epochEntries = List.of(epochEntry0);
2940+
checkpoint.write(epochEntries);
2941+
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
2942+
when(mockLog.leaderEpochCache()).thenReturn(cache);
2943+
2944+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
2945+
when(mockLog.logEndOffset()).thenReturn(200L);
2946+
2947+
List<RemoteLogSegmentMetadata> metadataList =
2948+
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
2949+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
2950+
.thenReturn(metadataList.iterator());
2951+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
2952+
.thenAnswer(ans -> metadataList.iterator());
2953+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
2954+
.thenReturn(CompletableFuture.runAsync(() -> { }));
2955+
2956+
// Verify the metrics for remote deletes and for failures is zero before attempt to delete segments
2957+
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
2958+
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
2959+
// Verify aggregate metrics
2960+
assertEquals(0, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
2961+
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
2962+
2963+
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
2964+
doThrow(new RetriableRemoteStorageException("Failed to delete segment with retriable exception")).when(remoteStorageManager).deleteLogSegmentData(any());
2965+
assertThrows(RetriableRemoteStorageException.class, task::cleanupExpiredRemoteLogSegments);
2966+
2967+
assertEquals(100L, currentLogStartOffset.get());
2968+
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
2969+
2970+
// Verify the metric for remote delete is updated correctly
2971+
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
2972+
// Verify we did not report failure for remote deletes with retriable exception
2973+
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
2974+
// Verify aggregate metrics
2975+
assertEquals(1, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
2976+
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
2977+
2978+
// make sure we'll retry the deletion in next run
2979+
doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
2980+
task.cleanupExpiredRemoteLogSegments();
2981+
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
2982+
}
2983+
28432984
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
28442985
@CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
28452986
public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,

0 commit comments

Comments
 (0)