@@ -635,7 +635,7 @@ public void testCandidateLogSegmentsDelayUploadWhenRemoteCopyLagBytesUsesLocalRe
635635 }
636636
637637 @ Test
638- public void testCandidateLogSegmentsUploadWhenEitherLagConditionExceeded () throws IOException {
638+ public void testCandidateLogSegmentsUploadWhenTimeLagExceededAndSizeLagNotExceeded () throws IOException {
639639 UnifiedLog log = mock (UnifiedLog .class );
640640 LogSegment segment1 = mock (LogSegment .class );
641641 LogSegment segment2 = mock (LogSegment .class );
@@ -669,6 +669,41 @@ public void testCandidateLogSegmentsUploadWhenEitherLagConditionExceeded() throw
669669 assertEquals (expected , actual );
670670 }
671671
672+ @ Test
673+ public void testCandidateLogSegmentsUploadWhenSizeLagExceededAndTimeLagNotExceeded () throws IOException {
674+ UnifiedLog log = mock (UnifiedLog .class );
675+ LogSegment segment1 = mock (LogSegment .class );
676+ LogSegment segment2 = mock (LogSegment .class );
677+ LogSegment activeSegment = mock (LogSegment .class );
678+
679+ when (segment1 .baseOffset ()).thenReturn (5L );
680+ when (segment2 .baseOffset ()).thenReturn (10L );
681+ when (activeSegment .baseOffset ()).thenReturn (15L );
682+ when (segment1 .size ()).thenReturn (40 );
683+ when (segment2 .size ()).thenReturn (30 );
684+ when (activeSegment .size ()).thenReturn (20 );
685+
686+ Map <String , Long > logProps = new HashMap <>();
687+ logProps .put (TopicConfig .RETENTION_MS_CONFIG , 10_000L );
688+ logProps .put (TopicConfig .REMOTE_COPY_LAG_MS_CONFIG , 100L );
689+ logProps .put (TopicConfig .RETENTION_BYTES_CONFIG , 10_000L );
690+ logProps .put (TopicConfig .REMOTE_COPY_LAG_BYTES_CONFIG , 50L );
691+ LogConfig logConfig = new LogConfig (logProps );
692+ when (log .config ()).thenReturn (logConfig );
693+ when (log .logSegments (5L , Long .MAX_VALUE )).thenReturn (List .of (segment1 , segment2 , activeSegment ));
694+
695+ time .sleep (1000L );
696+ when (segment1 .largestTimestamp ()).thenReturn (time .milliseconds () - 50L );
697+ when (segment2 .largestTimestamp ()).thenReturn (time .milliseconds () - 20L );
698+ RemoteLogManager .RLMCopyTask task = remoteLogManager .new RLMCopyTask (
699+ leaderTopicIdPartition , RemoteLogManagerConfig .DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES );
700+ List <RemoteLogManager .EnrichedLogSegment > expected = List .of (
701+ new RemoteLogManager .EnrichedLogSegment (segment1 , 10L )
702+ );
703+ List <RemoteLogManager .EnrichedLogSegment > actual = task .candidateLogSegments (log , 5L , 20L );
704+ assertEquals (expected , actual );
705+ }
706+
672707 @ Test
673708 public void testCandidateLogSegmentsDelayUploadWhenBothLagConditionsNotExceeded () throws IOException {
674709 UnifiedLog log = mock (UnifiedLog .class );
0 commit comments