Skip to content

Commit da78405

Browse files
committed
fix
1 parent e6ac3af commit da78405

10 files changed

+152
-217
lines changed

Diff for: store/src/main/java/org/apache/rocketmq/store/CommitLog.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -320,15 +320,15 @@ public boolean getLastMappedFile(final long startOffset) {
320320
*
321321
* @throws RocksDBException only in rocksdb mode
322322
*/
323-
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
323+
public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException {
324324
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
325325
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
326326
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
327327
if (!mappedFiles.isEmpty()) {
328328
int index = mappedFiles.size() - 1;
329329
while (index > 0) {
330330
MappedFile mappedFile = mappedFiles.get(index);
331-
if (mappedFile.getFileFromOffset() <= maxPhyOffsetOfConsumeQueue) {
331+
if (isMappedFileMatchedRecover(mappedFile, true)) {
332332
// It's safe to recover from this mapped file
333333
break;
334334
}
@@ -344,7 +344,7 @@ public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExcep
344344
while (true) {
345345
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
346346
int size = dispatchRequest.getMsgSize();
347-
boolean doDispatch = dispatchRequest.getCommitLogOffset() > maxPhyOffsetOfConsumeQueue;
347+
boolean doDispatch = dispatchRequest.getCommitLogOffset() > dispatchFromPhyOffset;
348348
// Normal data
349349
if (dispatchRequest.isSuccess() && size > 0) {
350350
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
@@ -394,10 +394,7 @@ else if (!dispatchRequest.isSuccess()) {
394394
}
395395

396396
// Clear ConsumeQueue redundant data
397-
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
398-
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
399-
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
400-
}
397+
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
401398

402399
this.mappedFileQueue.setFlushedWhere(processOffset);
403400
this.mappedFileQueue.setCommittedWhere(processOffset);
@@ -715,7 +712,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
715712
MappedFile mappedFile = null;
716713
for (; index >= 0; index--) {
717714
mappedFile = mappedFiles.get(index);
718-
if (this.isMappedFileMatchedRecover(mappedFile)) {
715+
if (this.isMappedFileMatchedRecover(mappedFile, false)) {
719716
log.info("recover from this mapped file " + mappedFile.getFileName());
720717
break;
721718
}
@@ -802,10 +799,7 @@ else if (size == 0) {
802799
}
803800

804801
// Clear ConsumeQueue redundant data
805-
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
806-
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
807-
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
808-
}
802+
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
809803

810804
this.mappedFileQueue.setFlushedWhere(processOffset);
811805
this.mappedFileQueue.setCommittedWhere(processOffset);
@@ -839,7 +833,8 @@ protected void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult
839833
this.getMessageStore().onCommitLogAppend(msg, result, commitLogFile);
840834
}
841835

842-
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) throws RocksDBException {
836+
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
837+
boolean recoverNormally) throws RocksDBException {
843838
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
844839

845840
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
@@ -854,16 +849,16 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) throws R
854849
if (0 == storeTimestamp) {
855850
return false;
856851
}
852+
long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
857853

858-
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
859-
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()
860-
&& storeTimestamp > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
854+
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
855+
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe() &&
856+
storeTimestamp > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
861857
return false;
862858
}
863859

864-
long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
865860
return this.defaultMessageStore.getQueueStore()
866-
.isMappedFileMatchedRecover(phyOffset, storeTimestamp);
861+
.isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
867862
}
868863

869864
public boolean resetOffset(long offset) {

Diff for: store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,14 @@ private void recover(final boolean lastExitOK) throws RocksDBException {
346346
// recover consume queue
347347
long recoverConsumeQueueStart = System.currentTimeMillis();
348348
this.consumeQueueStore.recover(this.brokerConfig.isRecoverConcurrently());
349-
long maxPhyOffsetOfConsumeQueue = this.consumeQueueStore.getMaxPhyOffsetInConsumeQueue();
349+
long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset();
350350
long recoverConsumeQueueEnd = System.currentTimeMillis();
351351

352352
// recover commitlog
353353
if (lastExitOK) {
354-
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
354+
this.commitLog.recoverNormally(dispatchFromPhyOffset);
355355
} else {
356-
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
356+
this.commitLog.recoverAbnormally(dispatchFromPhyOffset);
357357
}
358358

359359
// recover consume offset table

Diff for: store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

+9
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ public class MessageStoreConfig {
436436
private String combineCQPreferCQType = StoreType.DEFAULT.getStoreType();
437437
private String combineAssignOffsetCQType = StoreType.DEFAULT.getStoreType();
438438
private boolean combineCQEnableCheckSelf = false;
439+
private int combineCQMaxExtraLookBackCommitLogFiles = 3;
439440

440441
/**
441442
* If ConsumeQueueStore is RocksDB based, this option is to configure bottom-most tier compression type.
@@ -1992,4 +1993,12 @@ public boolean isCombineCQEnableCheckSelf() {
19921993
public void setCombineCQEnableCheckSelf(boolean combineCQEnableCheckSelf) {
19931994
this.combineCQEnableCheckSelf = combineCQEnableCheckSelf;
19941995
}
1996+
1997+
public int getCombineCQMaxExtraLookBackCommitLogFiles() {
1998+
return combineCQMaxExtraLookBackCommitLogFiles;
1999+
}
2000+
2001+
public void setCombineCQMaxExtraLookBackCommitLogFiles(int combineCQMaxExtraLookBackCommitLogFiles) {
2002+
this.combineCQMaxExtraLookBackCommitLogFiles = combineCQMaxExtraLookBackCommitLogFiles;
2003+
}
19952004
}

Diff for: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ private void dledgerRecoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws Ro
330330
MmapFile mmapFile = null;
331331
for (; index >= 0; index--) {
332332
mmapFile = mmapFiles.get(index);
333-
if (isMmapFileMatchedRecover(mmapFile)) {
333+
if (isMmapFileMatchedRecover(mmapFile, false)) {
334334
log.info("dledger recover from this mappFile " + mmapFile.getFileName());
335335
break;
336336
}
@@ -426,7 +426,7 @@ private void setRecoverPosition() {
426426
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
427427
}
428428

429-
private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws RocksDBException {
429+
private boolean isMmapFileMatchedRecover(final MmapFile mmapFile, boolean recoverNormally) throws RocksDBException {
430430
ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
431431

432432
int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
@@ -456,12 +456,12 @@ private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws RocksDB
456456
}
457457

458458
long phyOffset = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
459-
return this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp);
459+
return this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
460460
}
461461

462462
@Override
463-
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
464-
dledgerRecoverNormally(maxPhyOffsetOfConsumeQueue);
463+
public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException {
464+
dledgerRecoverNormally(dispatchFromPhyOffset);
465465
}
466466

467467
@Override

Diff for: store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java

+8
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ public long getStoreTime(CqUnit cqUnit) {
110110
return -1;
111111
}
112112

113+
/**
114+
* get max physic offset in consumeQueue
115+
*
116+
* @return the max physic offset in consumeQueue
117+
* @throws RocksDBException only in rocksdb mode
118+
*/
119+
public abstract long getMaxPhyOffsetInConsumeQueue() throws RocksDBException;
120+
113121
/**
114122
* destroy the specific consumeQueue
115123
*

0 commit comments

Comments
 (0)