KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) #20913
Conversation
Signed-off-by: stroller <fujian1115@gmail.com>
… for remote storage. Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
Hi, @kamalcph |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
cc @kamalcph here due to community's email don't allow to attach the image. We can discuss the content in email about the KIP. Thanks |
|
The already uploaded segments are eligible for deletion from broker. So, when remote storage is down, then those segments can be deleted as per the local retention settings and new segments can occupy those space. This provides more time for the Admin to act when remote storage is down for a longer time. |
|
@kamalcph I think I understand what you mean now. I’ve updated the picture above. Could you help double-check whether we’ve reached the same understanding? |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Thanks @jiafu1115 for addressing the review comments. We may have to further simplify the logic.
- The current logic has multiple negative conditions, which makes it harder to follow. The
delayCopymethod internally callsnotExceededCopyLagTimeandnotExceededCopyLagSize. Both of thesenot...methods check whether the threshold is exceeded and then invert the result again. Can we update this logic to use a positive flow instead?
(eg)
List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
...
...
if (isEligibleForUpload(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
} else {
break;
}and update the delay logic to:
private boolean isEligibleForUpload(LogConfig logConfig,
LogSegment previousSeg,
long currentTimeMs,
long totalLogSize,
long cumulativeSize) {
long effectiveCopyLagMs = logConfig.remoteCopyLagMs();
// derive the value from local retention time when copyLagMs configured to -1
if (effectiveCopyLagMs == -1L) {
effectiveCopyLagMs = logConfig.localRetentionMs();
// if the local retention time is configured to infinite, then configure copyLagMs as infinite
if (effectiveCopyLagMs == -1L) {
effectiveCopyLagMs = Long.MAX_VALUE;
}
}
long effectiveCopyLagBytes = logConfig.remoteCopyLagBytes();
// derive the value from local retention size when copyLagBytes configured to -1
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = logConfig.localRetentionBytes();
// if the local retention size is configured to infinite, then configure copyLagBytes as infinite
if (effectiveCopyLagBytes == -1L) {
effectiveCopyLagBytes = Long.MAX_VALUE;
}
}
try {
long segmentAgeMs = currentTimeMs - previousSeg.largestTimestamp();
// If the segment's largestTimestamp is higher than the current time, then allow the segment to upload.
boolean shouldUploadNow = segmentAgeMs < 0;
// Upload when either of remote-copy lag time/size breaches the threshold.
if (!shouldUploadNow) {
shouldUploadNow = segmentAgeMs >= effectiveCopyLagMs;
}
if (!shouldUploadNow) {
long sizeLagBytes = totalLogSize - cumulativeSize;
shouldUploadNow = sizeLagBytes >= effectiveCopyLagBytes;
}
return shouldUploadNow;
} catch (IOException e) {
// in case of any error, allow the segment to upload. Should not block the upload that might hinder the
// deletion logic
LOGGER.warn("Failed to get largest timestamp for segment {}, marking it as eligible for upload based on time", previousSeg, e);
return true;
}
}I ran the newly added unit tests and it is passing.
- Also, move the unit tests from RemoteLogManagerTest to a new RemoteLagCopyTest since the RemoteLogManagerTest is huge with 4k+ lines.
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
kamalcph
left a comment
There was a problem hiding this comment.
LGTM, thanks for addressing the review comments! Left few minor comments.
| } | ||
|
|
||
| @Test | ||
| def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = { |
There was a problem hiding this comment.
could you also add one test for valid dynamic broker config change? Thanks!
There was a problem hiding this comment.
Also, move the testDynamicRemoteCopyLagThrowsOnIncorrectConfig test to DynamicBrokerConfigTest.java instead of DynamicBrokerConfigTest.scala.
There was a problem hiding this comment.
This can be taken up later, we already have one dynamic broker config change test in KafkaConfigTest.
There was a problem hiding this comment.
Got it. thanks @kamalcph
Thank you very much for your patient and thorough review.
chia7712
left a comment
There was a problem hiding this comment.
@jiafu1115 thanks for this patch. It is cooool
| private boolean eligibleUploadByTime(LogSegment segment, long currentTimeMs, long copyLagMs) { | ||
| try { | ||
| long segmentAgeMs = currentTimeMs - segment.largestTimestamp(); | ||
| boolean eligibleUpload = segmentAgeMs < 0 || segmentAgeMs >= copyLagMs; |
There was a problem hiding this comment.
This is a lingering issue. The local segment with a future timestamp is still NOT deleted, right? Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?
There was a problem hiding this comment.
Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?
Currently, it gets deleted based on the local retention bytes limit since the local retention deletion logic follow the similar methods of full deletion logic.
There was a problem hiding this comment.
The current behavior looks like a bug to me, as local segments shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records. Alternatively, we could introduce a new configuration flag to provide alternative logic for handling future records.
There was a problem hiding this comment.
shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records.
Yeah, we can allow this behavior by introducing a config. The segment exist in the remote storage but the user might face slowness in reading the data from remote if they don't have prefetching feature implemented in the Remote Storage Manager. So, better to gate the change in behavior via config and the change applies only when remote storage is enabled on the topic.
There was a problem hiding this comment.
If we all hate -2, there is another approach: reverse the configuration logic.
For example, we could introduce remote.copy.before.deletion.ms instead of remote.copy.lag.ms. If a user sets remote.copy.before.deletion.ms=2hr, it means they want to upload log segments 2 hours before they are scheduled to be deleted from local storage.
The default value would be -1, which dynamically resolves to local.retention.ms. This fully maintains backward compatibility (resulting in immediate upload). Setting it to 0 means the user wants to delay the upload as much as possible, triggering it right before local deletion.
The only side effect is that this kind of "reverse" or "countdown" time calculation is quite rare in the existing Kafka codebase.
There was a problem hiding this comment.
The only side effect is that this kind of "reverse"
yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:
- Improve the config documentation about configuring both the values and
- Add validation to throw an error when
remote-copy-lag-time = 0 and remote-copy-lag-bytes != 0and vice-versa.
There was a problem hiding this comment.
I just realized that I replied to the wrong thread earlier, sorry about that 😢
How about this solution? if we found it contain future record. we use LogSegment#lastModified to compare the time?
Yes, that is an acceptable approach. I've opened https://issues.apache.org/jira/browse/KAFKA-20609 so we can keep discussing there.
yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:
While documentation is the final line of defense for users, it's better to have an intuitive design out of the box. After all, me proposed reverse configuration was rejected precisely because it wasn't intuitive enough
Another way is to align the logic with retention.ms and retention.bytes. Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1. This way, most users can just adjust the time setting without having to touch the size configuration. WDYT?
There was a problem hiding this comment.
- Thanks for filing KAFKA-20609 ticket. Fixing this is out of scope of the KIP-1241 as the bug exist before too.
Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1.
I like the idea to provide out-of-box default values that works in majority of use-cases and retain the eager upload logic. The new default values for remote-copy lag align with the log.retention.hours = 7 days and log.retention.bytes = infinite (-1). It lgtm.
| } | ||
| } | ||
|
|
||
| private static void validateRemoteCopyLagTime(Map<?, ?> props) { |
There was a problem hiding this comment.
duplicated with another one. propose the fix in #22363 . thanks.
| } | ||
| } | ||
|
|
||
| private static void validateRemoteCopyLagSize(Map<?, ?> props) { |
| previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize); | ||
| } | ||
|
|
||
| if (copyLagMs == 0 || copyLagBytes == 0) { |
There was a problem hiding this comment.
Should we log a warning or info message when a user configures one lag property but leaves the other at its default value of 0?
There was a problem hiding this comment.
@chia7712 Thanks for your view.
How about describing this case in the documentation? If we only log it as a warning, users may ignore it. WDYT?
There was a problem hiding this comment.
Sounds good. We can expand upgrade.md as well.
This is a follow-up PR for #20913 (comment) Thanks. cc @kamalcph Reviewers: Murali Basani <muralidhar.basani@aiven.io>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>






JIRA:19893
KIP:1241
Currently, Kafka uploads all non-active local log segments to remote
storage even when they are still within the local retention period,
resulting in redundant storage of the same data in both tiers.
This wastes storage capacity (cost) without providing immediate
benefits,since reads during the retention window prioritize local data.
However, some users/topics do real-time analytics based on remote
storage directly and need the latest data to be available as soon as
possible (In fact, it only tries to stay as up-to-date as possible, but
it still can’t include the latest data because the active segment
hasn’t been uploaded yet.). Therefore, this optimization is offered as
a topic's optional configuration rather than the default behavior.
Here are some additional thoughts/considerations.
remote storage, so this change is very safe—you don’t need to worry
about files being cleaned up before they be upload to the remote.
won’t be set too short. For example, in our production environment, we
keep 1 day of local data alongside 3-7 days in remote storage, so
there’s still 1 day of redundancy.
Example for the goal:
Reviewers: Kamal Chandraprakash kamal.chandraprakash@gmail.com