Skip to content

Commit 804bdf3

Browse files
authored
[hotfix] Add bucket info for bucket related logs (#1750)
1 parent 9df7b72 commit 804bdf3

File tree

7 files changed

+119
-63
lines changed

7 files changed

+119
-63
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,11 @@ public FetchDataInfo read(
362362
throws IOException {
363363
if (LOG.isTraceEnabled()) {
364364
LOG.trace(
365-
"Reading maximum {} bytes at offset {} from log with total length {} bytes",
365+
"Reading maximum {} bytes at offset {} from log with total length {} bytes for bucket {}",
366366
maxLength,
367367
readOffset,
368-
segments.sizeInBytes());
368+
segments.sizeInBytes(),
369+
tableBucket);
369370
}
370371

371372
long startOffset = localLogStartOffset;
@@ -471,21 +472,22 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
471472
// true for an active segment of size zero because one of the indexes is
472473
// "full" (due to _maxEntries == 0).
473474
LOG.warn(
474-
"Trying to roll a new log segment with start offset "
475-
+ newOffset
476-
+ " =max(provided offset = "
477-
+ expectedNextOffset
478-
+ ", LEO = "
479-
+ getLocalLogEndOffset()
480-
+ ") while it already exists and is active with size 0."
481-
+ ", size of offset index: "
482-
+ activeSegment.offsetIndex().entries()
483-
+ ".");
475+
"Trying to roll a new log segment for bucket {} with start offset {} "
476+
+ "=max(provided offset = {}, LEO = {}) while it already exists "
477+
+ "and is active with size 0, size of offset index: {}.",
478+
tableBucket,
479+
newOffset,
480+
expectedNextOffset,
481+
getLocalLogEndOffset(),
482+
activeSegment.offsetIndex().entries());
484483
LogSegment newSegment =
485484
createAndDeleteSegment(
486485
newOffset, activeSegment, SegmentDeletionReason.LOG_ROLL);
487486
updateLogEndOffset(getLocalLogEndOffset());
488-
LOG.info("Rolled new log segment at offset " + newOffset);
487+
LOG.info(
488+
"Rolled new log segment for bucket {} at offset {}",
489+
tableBucket,
490+
newOffset);
489491
return newSegment;
490492
} else {
491493
throw new FlussRuntimeException(
@@ -520,9 +522,9 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
520522
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIndexFile)) {
521523
if (file.exists()) {
522524
LOG.warn(
523-
"Newly rolled segment file "
524-
+ file.getAbsolutePath()
525-
+ " already exists; deleting it first");
525+
"Newly rolled segment file {} for bucket {} already exists; deleting it first",
526+
tableBucket,
527+
file.getAbsolutePath());
526528
Files.delete(file.toPath());
527529
}
528530
}
@@ -536,7 +538,7 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
536538
// metadata when log rolls.
537539
// The next offset should not change.
538540
updateLogEndOffset(getLocalLogEndOffset());
539-
LOG.info("Rolled new log segment at offset " + newOffset);
541+
LOG.info("Rolled new log segment for bucket {} at offset {}", tableBucket, newOffset);
540542
return newSegment;
541543
}
542544

@@ -547,7 +549,7 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
547549
* @return the list of segments that were scheduled for deletion
548550
*/
549551
List<LogSegment> truncateFullyAndStartAt(long newOffset) throws IOException {
550-
LOG.debug("Truncate and start at offset " + newOffset);
552+
LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, tableBucket);
551553

552554
checkIfMemoryMappedBufferClosed();
553555
List<LogSegment> segmentsToDelete = segments.values();

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ private void loadSegmentFiles() throws IOException {
149149
File logFile = FlussPaths.logFile(logTabletDir, offset);
150150
if (!logFile.exists()) {
151151
LOG.warn(
152-
"Found an orphaned index file {}, with no corresponding log file.",
152+
"Found an orphaned index file {} for bucket {}, with no corresponding log file.",
153+
logSegments.getTableBucket(),
153154
file.getAbsolutePath());
154155
Files.deleteIfExists(file.toPath());
155156
}

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -430,14 +430,18 @@ private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
430430
synchronized (lock) {
431431
if (newHighWatermark.getMessageOffset() < highWatermarkMetadata.getMessageOffset()) {
432432
LOG.warn(
433-
"Non-monotonic update of high watermark from {} to {}",
433+
"Non-monotonic update of high watermark from {} to {} for bucket {}",
434434
highWatermarkMetadata,
435-
newHighWatermark);
435+
newHighWatermark,
436+
localLog.getTableBucket());
436437
}
437438
highWatermarkMetadata = newHighWatermark;
438439
// TODO log offset listener to update log offset.
439440
}
440-
LOG.trace("Setting high watermark {}", newHighWatermark);
441+
LOG.trace(
442+
"Setting high watermark {} for bucket {}",
443+
newHighWatermark,
444+
localLog.getTableBucket());
441445
}
442446

443447
/**
@@ -567,17 +571,19 @@ private void deleteSegments(long cleanUpToOffset) {
567571
long localLogStartOffset = localLog.getLocalLogStartOffset();
568572
if (cleanUpToOffset < localLogStartOffset) {
569573
LOG.debug(
570-
"Ignore the delete segments action while the input cleanUpToOffset {} "
574+
"Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} "
571575
+ "is smaller than the current localLogStartOffset {}",
576+
getTableBucket(),
572577
cleanUpToOffset,
573578
localLogStartOffset);
574579
return;
575580
}
576581

577582
if (cleanUpToOffset > getHighWatermark()) {
578583
LOG.warn(
579-
"Ignore the delete segments action while the input cleanUpToOffset {} "
584+
"Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} "
580585
+ "is larger than the current highWatermark {}",
586+
getTableBucket(),
581587
cleanUpToOffset,
582588
getHighWatermark());
583589
return;
@@ -716,11 +722,13 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
716722
// todo update the first unstable offset (which is used to compute lso)
717723

718724
LOG.trace(
719-
"Appended message set with last offset: {}, first offset {}, next offset: {} and messages {}",
725+
"Appended message set with last offset: {}, first offset {}, next offset: {} "
726+
+ "and messages {} for bucket {}",
720727
appendInfo.lastOffset(),
721728
appendInfo.firstOffset(),
722729
localLog.getLocalLogEndOffset(),
723-
validRecords);
730+
validRecords,
731+
getTableBucket());
724732

725733
if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
726734
flush(false);
@@ -787,11 +795,12 @@ private void flush(long offset, boolean includingOffset) throws IOException {
787795
if (flushOffset > localLog.getRecoveryPoint()) {
788796
if (LOG.isDebugEnabled()) {
789797
LOG.debug(
790-
"Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}",
798+
"Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}, for bucket {}",
791799
offset,
792800
includingOffsetStr,
793801
flushOffset,
794-
localLog.unflushedMessages());
802+
localLog.unflushedMessages(),
803+
getTableBucket());
795804
}
796805

797806
localLog.flush(flushOffset);
@@ -810,7 +819,9 @@ private void maybeRoll(int messageSize, LogAppendInfo appendInfo) throws Excepti
810819
new RollParams(maxSegmentFileSize, appendInfo.lastOffset(), messageSize))) {
811820
if (LOG.isDebugEnabled()) {
812821
LOG.debug(
813-
"Rolling new log segment (log_size = {}/{}), offset_index_size = {}/{}, time_index_size = {}/{}",
822+
"Rolling new log segment for bucket {} (log_size = {}/{}), offset_index_size = {}/{}, "
823+
+ "time_index_size = {}/{}",
824+
getTableBucket(),
814825
segment.getSizeInBytes(),
815826
maxSegmentFileSize,
816827
segment.offsetIndex().entries(),
@@ -863,12 +874,13 @@ boolean truncateTo(long targetOffset) throws LogStorageException {
863874

864875
if (targetOffset >= localLog.getLocalLogEndOffset()) {
865876
LOG.info(
866-
"Truncate to {} has no effect as the largest offset in the log is {}.",
877+
"Truncate to {} for bucket {} has no effect as the largest offset in the log is {}.",
867878
targetOffset,
879+
getTableBucket(),
868880
localLog.getLocalLogEndOffset() - 1);
869881
return false;
870882
} else {
871-
LOG.info("Truncating to offset {}", targetOffset);
883+
LOG.info("Truncating to offset {} for bucket {}", targetOffset, getTableBucket());
872884
synchronized (lock) {
873885
try {
874886
localLog.checkIfMemoryMappedBufferClosed();
@@ -902,7 +914,7 @@ boolean truncateTo(long targetOffset) throws LogStorageException {
902914

903915
/** Delete all data in the log and start at the new offset. */
904916
void truncateFullyAndStartAt(long newOffset) throws LogStorageException {
905-
LOG.debug("Truncate and start at offset {}", newOffset);
917+
LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, getTableBucket());
906918
synchronized (lock) {
907919
try {
908920
localLog.truncateFullyAndStartAt(newOffset);
@@ -950,14 +962,14 @@ public List<LogSegment> logSegments() {
950962
}
951963

952964
public void close() {
953-
LOG.debug("close log tablet");
965+
LOG.debug("close log tablet for bucket {}", getTableBucket());
954966
synchronized (lock) {
955967
localLog.checkIfMemoryMappedBufferClosed();
956968
writerExpireCheck.cancel(true);
957969
try {
958970
writerStateManager.takeSnapshot();
959971
} catch (IOException e) {
960-
LOG.error("Error while taking writer snapshot.", e);
972+
LOG.error("Error while taking writer snapshot for bucket {}.", getTableBucket(), e);
961973
}
962974
localLog.close();
963975
}

fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,9 @@ private List<EnrichedLogSegment> candidateToCopyLogSegments(LogTablet log) {
200200
long fromOffset = Math.max(copiedOffset + 1, log.localLogStartOffset());
201201
candidateLogSegments = candidateLogSegments(log, fromOffset, highWatermark);
202202
LOG.debug(
203-
"Candidate log segments: logLocalStartOffset: {}, copiedOffset: {}, "
203+
"Candidate log segments for bucket {}: logLocalStartOffset: {}, copiedOffset: {}, "
204204
+ "fromOffset: {}, highWatermark: {} and candidateLogSegments: {}",
205+
tableBucket,
205206
log.localLogStartOffset(),
206207
copiedOffset,
207208
fromOffset,
@@ -216,7 +217,8 @@ private List<EnrichedLogSegment> candidateToCopyLogSegments(LogTablet log) {
216217
}
217218
} else {
218219
LOG.debug(
219-
"Skipping copying segments to remote, current read-offset:{}, and highWatermark:{}",
220+
"Skipping copying segments for bucket {} to remote, current read-offset:{}, and highWatermark:{}",
221+
tableBucket,
220222
copiedOffset,
221223
highWatermark);
222224
}
@@ -314,7 +316,10 @@ public boolean tryToCommitRemoteLogManifest(
314316
remoteLogManifestPath =
315317
remoteLogStorage.writeRemoteLogManifestSnapshot(newRemoteLogManifest);
316318
} catch (Exception e) {
317-
LOG.error("Write remote log manifest file to remote storage failed.", e);
319+
LOG.error(
320+
"Write remote log manifest file to remote storage failed for bucket {}.",
321+
tableBucket,
322+
e);
318323
return false;
319324
}
320325

@@ -364,8 +369,9 @@ public boolean tryToCommitRemoteLogManifest(
364369
// the commit failed with unexpected exception, like network error, we will
365370
// retry send.
366371
LOG.error(
367-
"The {} time try to commit remote log manifest failed.",
372+
"The {} time try to commit remote log manifest failed for bucket {}.",
368373
retrySendCommitTimes,
374+
tableBucket,
369375
e);
370376
retrySendCommitTimes++;
371377
}
@@ -458,8 +464,9 @@ private void deleteRemoteLogSegmentFiles(
458464
metricGroup.remoteLogDeleteRequests().inc();
459465
} catch (Exception e) {
460466
LOG.error(
461-
"Error occurred while deleting remote log segment files: {}, "
467+
"Error occurred while deleting remote log segment files: {} for bucket {}, "
462468
+ "the delete files operation will be skipped.",
469+
tableBucket,
463470
remoteLogSegment,
464471
e);
465472
metricGroup.remoteLogDeleteErrors().inc();

fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,10 @@ private void doHandleLeaderReplica(
289289
remoteLogStorage,
290290
coordinatorGateway,
291291
clock);
292-
LOG.info("Created a new remote log task: {} and getting scheduled", task);
292+
LOG.info(
293+
"Created a new remote log task for table-bucket{}: {} and getting scheduled",
294+
tableBucket,
295+
task);
293296
ScheduledFuture<?> future =
294297
rlManagerScheduledThreadPool.scheduleWithFixedDelay(
295298
task,

0 commit comments

Comments
 (0)