Skip to content

Commit edcada2

Browse files
jiafu1115stroller.fu
andauthored
KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) (#20913)
[JIRA:19893](https://issues.apache.org/jira/browse/KAFKA-19893) [KIP:1241](https://cwiki.apache.org/confluence/x/A4LMFw) Currently, Kafka uploads all non-active local log segments to remote storage even when they are still within the local retention period, resulting in redundant storage of the same data in both tiers. <img width="1503" height="772" alt="image" src="https://github.com/user-attachments/assets/55e95e2e-4ab0-4ab9-b28b-871760f331fa" /> This wastes storage capacity (cost) without providing immediate benefits,since reads during the retention window prioritize local data. However, some users/topics do real-time analytics based on remote storage directly and need the latest data to be available as soon as possible (In fact, it only tries to stay as up-to-date as possible, but it still can’t include the latest data because the active segment hasn’t been uploaded yet.). Therefore, this optimization is offered as a **topic's optional configuration** rather than the default behavior. Here are some additional thoughts/considerations. 1. Local files won’t be deleted until they’ve been uploaded to the remote storage, so this change is very safe—you don’t need to worry about files being cleaned up before they be upload to the remote. 2. Considering the latency of remote storage, the local retention period won’t be set too short. For example, in our production environment, we keep 1 day of local data alongside 3-7 days in remote storage, so there’s still 1 day of redundancy. Example for the goal: <img width="797" height="520" alt="image" src="https://github.com/user-attachments/assets/be6725f1-02e7-4b09-aea9-7ce3bbb5e227" /> Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com> --------- Signed-off-by: stroller <fujian1115@gmail.com> Signed-off-by: Jian <fujian1115@gmail.com> Signed-off-by: stroller.fu <stroller.fu@zoom.us> Co-authored-by: stroller.fu <stroller.fu@zoom.us>
1 parent c5f16d2 commit edcada2

12 files changed

Lines changed: 1135 additions & 2 deletions

File tree

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,22 @@ public class TopicConfig {
101101
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
102102
"(i.e. retention.ms/bytes).";
103103

104+
public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
105+
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
106+
"When set to 0, immediate upload without any delay check. " +
107+
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
108+
"The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
109+
"When set to -1, resolves to the real local retention ms as maximum delay. " +
110+
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";
111+
112+
public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
113+
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
114+
"When set to 0, immediate upload without any delay check. " +
115+
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
116+
"The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
117+
"When set to -1, resolves to the real local retention bytes as maximum delay. " +
118+
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";
119+
104120
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
105121
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
106122
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,28 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE
579579
}
580580
}
581581

582+
def validateLogRemoteCopyLagMs(): Unit = {
583+
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
584+
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
585+
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
586+
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
587+
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
588+
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
589+
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
590+
}
591+
}
592+
593+
def validateLogRemoteCopyLagBytes(): Unit = {
594+
val logRetentionBytes: Long = newConfig.logRetentionBytes
595+
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
596+
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
597+
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
598+
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
599+
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
600+
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
601+
}
602+
}
603+
582604
def validateCordonedLogDirs(): Unit = {
583605
val logDirs = newConfig.logDirs()
584606
val cordonedLogDirs = newConfig.cordonedLogDirs()
@@ -592,6 +614,8 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE
592614

593615
validateLogLocalRetentionMs()
594616
validateLogLocalRetentionBytes()
617+
validateLogRemoteCopyLagMs()
618+
validateLogRemoteCopyLagBytes()
595619
validateCordonedLogDirs()
596620
}
597621

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
628628
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
629629
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
630630
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
631+
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
632+
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
631633
logProps
632634
}
633635
}

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class LogConfigTest {
7373
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
7474
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
7575
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
76+
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
77+
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
7678
case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
7779
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op
7880

@@ -258,6 +260,65 @@ class LogConfigTest {
258260
doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L)
259261
}
260262

263+
@Test
264+
def testInvalidRemoteCopyLagMsWhenGreaterThanEffectiveLocalRetentionMs(): Unit = {
265+
val props = new util.HashMap[String, String]()
266+
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
267+
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
268+
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
269+
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "1001")
270+
271+
val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
272+
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG))
273+
}
274+
275+
@Test
276+
def testInvalidRemoteCopyLagBytesWhenGreaterThanEffectiveLocalRetentionBytes(): Unit = {
277+
val props = new util.HashMap[String, String]()
278+
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
279+
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
280+
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
281+
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "1001")
282+
283+
val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
284+
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG))
285+
}
286+
287+
@Test
288+
def testValidRemoteCopyLagWhenBothLagChecksAreDisabled(): Unit = {
289+
val props = new util.HashMap[String, String]()
290+
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
291+
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
292+
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
293+
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
294+
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
295+
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "0")
296+
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "0")
297+
298+
validateTopicLogConfig(props)
299+
}
300+
301+
@Test
302+
def testValidRemoteCopyLagMinusOneResolvesToLocalRetention(): Unit = {
303+
val props = new util.HashMap[String, String]()
304+
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
305+
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
306+
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "900")
307+
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "-1")
308+
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "2000")
309+
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1800")
310+
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "-1")
311+
312+
validateTopicLogConfig(props)
313+
}
314+
315+
private def validateTopicLogConfig(props: util.Map[String, String]): Unit = {
316+
val kafkaProps = TestUtils.createDummyBrokerConfig()
317+
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
318+
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
319+
LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
320+
}
321+
261322
private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long,
262323
localRetentionBytes: Int,
263324
retentionBytes: Int,

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,55 @@ class DynamicBrokerConfigTest {
770770
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
771771
}
772772

773+
@Test
774+
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
775+
// remote copy lag ms cannot exceed effective local retention ms
776+
verifyIncorrectRemoteCopyLagProps(
777+
retentionMs = 1000L,
778+
logLocalRetentionMs = -2L,
779+
remoteCopyLagMs = 1001L,
780+
retentionBytes = 1000L,
781+
logLocalRetentionBytes = -2L,
782+
remoteCopyLagBytes = 100L
783+
)
784+
785+
// remote copy lag bytes cannot exceed effective local retention bytes
786+
verifyIncorrectRemoteCopyLagProps(
787+
retentionMs = 1000L,
788+
logLocalRetentionMs = -2L,
789+
remoteCopyLagMs = 100L,
790+
retentionBytes = 1000L,
791+
logLocalRetentionBytes = -2L,
792+
remoteCopyLagBytes = 1001L
793+
)
794+
795+
}
796+
797+
def verifyIncorrectRemoteCopyLagProps(retentionMs: Long,
798+
logLocalRetentionMs: Long,
799+
remoteCopyLagMs: Long,
800+
retentionBytes: Long,
801+
logLocalRetentionBytes: Long,
802+
remoteCopyLagBytes: Long): Unit = {
803+
val props = TestUtils.createBrokerConfig(0, port = 8181)
804+
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
805+
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
806+
val config = KafkaConfig(props)
807+
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
808+
config.dynamicConfig.initialize(None)
809+
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
810+
811+
val newProps = new Properties()
812+
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs.toString)
813+
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, remoteCopyLagMs.toString)
814+
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes.toString)
815+
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, remoteCopyLagBytes.toString)
816+
// validate default config
817+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
818+
// validate per broker config
819+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
820+
}
821+
773822
@Test
774823
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
775824
val props = TestUtils.createBrokerConfig(0, port = 8181)

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,8 @@ class KafkaConfigTest {
10471047
case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10481048
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
10491049
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
1050+
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
1051+
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
10501052

10511053
/** New group coordinator configs */
10521054
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1200,6 +1202,10 @@ class KafkaConfigTest {
12001202
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
12011203
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
12021204
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
1205+
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG =>
1206+
assertDynamic(kafkaConfigProp, 10017L, () => config.remoteLogManagerConfig.logRemoteCopyLagMs)
1207+
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG =>
1208+
assertDynamic(kafkaConfigProp, 10018L, () => config.remoteLogManagerConfig.logRemoteCopyLagBytes)
12031209
// not dynamically updatable
12041210
case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
12051211
// topic only config

server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ public final class ServerTopicConfigSynonyms {
8484
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
8585
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
8686
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG),
87-
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)
87+
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG),
88+
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG),
89+
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG)
8890
);
8991

9092
/**

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
6767
import org.apache.kafka.storage.internals.log.EpochEntry;
6868
import org.apache.kafka.storage.internals.log.FetchDataInfo;
69+
import org.apache.kafka.storage.internals.log.LogConfig;
6970
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
7071
import org.apache.kafka.storage.internals.log.LogSegment;
7172
import org.apache.kafka.storage.internals.log.OffsetIndex;
@@ -916,6 +917,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
916917
* 1) Segment is not the active segment and
917918
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
918919
* committed/acked messages
920+
* 3) Segment has exceeded copy lag by time or size when configured (remote.copy.lag.ms, remote.copy.lag.bytes)
919921
* @param log The log from which the segments are to be copied
920922
* @param fromOffset The offset from which the segments are to be copied
921923
* @param lastStableOffset The last stable offset of the log
@@ -925,18 +927,73 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
925927
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
926928
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
927929
if (!segments.isEmpty()) {
930+
long currentTimeMs = time.milliseconds();
931+
long totalLogSize = UnifiedLog.sizeInBytes(segments);
932+
long cumulativeSize = 0;
928933
for (int idx = 1; idx < segments.size(); idx++) {
929934
LogSegment previousSeg = segments.get(idx - 1);
930935
LogSegment currentSeg = segments.get(idx);
931936
if (currentSeg.baseOffset() <= lastStableOffset) {
932-
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
937+
cumulativeSize += previousSeg.size();
938+
if (isEligibleForUpload(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
939+
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
940+
} else {
941+
break;
942+
}
933943
}
934944
}
935945
// Discard the last active segment
936946
}
937947
return candidateLogSegments;
938948
}
939949

950+
private boolean isEligibleForUpload(LogConfig logConfig, LogSegment previousSeg, long currentTimeMs, long totalLogSize, long cumulativeSize) {
951+
long copyLagMs = logConfig.remoteCopyLagMs();
952+
long copyLagBytes = logConfig.remoteCopyLagBytes();
953+
if (logger.isTraceEnabled()) {
954+
logger.trace("delayCopy check for segment {}: copyLagMs={}, copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={}, sizeLagBytes={}",
955+
previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
956+
}
957+
958+
if (copyLagMs == 0 || copyLagBytes == 0) {
959+
return true;
960+
}
961+
962+
boolean limitedCopyLagMsCheck = copyLagMs > 0;
963+
boolean limitedCopyLagSizeCheck = copyLagBytes > 0;
964+
965+
if (limitedCopyLagMsCheck && eligibleUploadByTime(previousSeg, currentTimeMs, copyLagMs)) {
966+
return true;
967+
}
968+
969+
return limitedCopyLagSizeCheck && eligibleUploadBySize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
970+
}
971+
972+
private boolean eligibleUploadByTime(LogSegment segment, long currentTimeMs, long copyLagMs) {
973+
try {
974+
long segmentAgeMs = currentTimeMs - segment.largestTimestamp();
975+
boolean eligibleUpload = segmentAgeMs < 0 || segmentAgeMs >= copyLagMs;
976+
if (logger.isTraceEnabled()) {
977+
logger.trace("{} eligible for upload by time? {} (segment age {} ms, copy lag {} ms)",
978+
segment, eligibleUpload, segmentAgeMs, copyLagMs);
979+
}
980+
return eligibleUpload;
981+
} catch (IOException e) {
982+
logger.warn("Failed to get largest timestamp for segment {}, take it as eligible for upload based on time", segment, e);
983+
return true;
984+
}
985+
}
986+
987+
private boolean eligibleUploadBySize(LogSegment segment, long totalLogSize, long cumulativeSize, long copyLagBytes) {
988+
long sizeLagBytes = totalLogSize - cumulativeSize;
989+
boolean eligibleUpload = sizeLagBytes >= copyLagBytes;
990+
if (logger.isTraceEnabled()) {
991+
logger.trace("{} eligible for upload by size? {} (size lag {} bytes, copy lag {} bytes, totalLogSize={}, cumulativeSize={})",
992+
segment, eligibleUpload, sizeLagBytes, copyLagBytes, totalLogSize, cumulativeSize);
993+
}
994+
return eligibleUpload;
995+
}
996+
940997
public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
941998
if (isCancelled())
942999
return;

0 commit comments

Comments
 (0)