|
72 | 72 | import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
|
73 | 73 | import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
|
74 | 74 | import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
|
75 |
| -import org.apache.rocketmq.common.BoundaryType; |
76 | 75 | import org.apache.rocketmq.common.BrokerConfig;
|
77 | 76 | import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
|
78 | 77 | import org.apache.rocketmq.common.KeyBuilder;
|
79 | 78 | import org.apache.rocketmq.common.LockCallback;
|
80 | 79 | import org.apache.rocketmq.common.MQVersion;
|
81 | 80 | import org.apache.rocketmq.common.MixAll;
|
82 |
| -import org.apache.rocketmq.common.Pair; |
83 | 81 | import org.apache.rocketmq.common.PlainAccessConfig;
|
84 | 82 | import org.apache.rocketmq.common.TopicAttributes;
|
85 | 83 | import org.apache.rocketmq.common.TopicConfig;
|
|
223 | 221 | import org.apache.rocketmq.store.MessageStore;
|
224 | 222 | import org.apache.rocketmq.store.PutMessageResult;
|
225 | 223 | import org.apache.rocketmq.store.PutMessageStatus;
|
226 |
| -import org.apache.rocketmq.store.RocksDBMessageStore; |
227 | 224 | import org.apache.rocketmq.store.SelectMappedBufferResult;
|
228 | 225 | import org.apache.rocketmq.store.StoreType;
|
229 | 226 | import org.apache.rocketmq.store.config.BrokerRole;
|
230 | 227 | import org.apache.rocketmq.store.exception.ConsumeQueueException;
|
231 | 228 | import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
|
| 229 | +import org.apache.rocketmq.store.queue.CombineConsumeQueueStore; |
232 | 230 | import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
|
| 231 | +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; |
233 | 232 | import org.apache.rocketmq.store.queue.CqUnit;
|
234 | 233 | import org.apache.rocketmq.store.queue.ReferredIterator;
|
235 | 234 | import org.apache.rocketmq.store.timer.TimerCheckpoint;
|
@@ -3479,129 +3478,24 @@ private boolean validateBlackListConfigExist(Properties properties) {
|
3479 | 3478 | private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
|
3480 | 3479 | RemotingCommand request) throws RemotingCommandException {
|
3481 | 3480 | CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
|
3482 |
| - String requestTopic = requestHeader.getTopic(); |
3483 | 3481 | MessageStore messageStore = brokerController.getMessageStore();
|
3484 | 3482 | DefaultMessageStore defaultMessageStore;
|
3485 | 3483 | if (messageStore instanceof AbstractPluginMessageStore) {
|
3486 | 3484 | defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
|
3487 | 3485 | } else {
|
3488 | 3486 | defaultMessageStore = (DefaultMessageStore) messageStore;
|
3489 | 3487 | }
|
3490 |
| - RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); |
3491 |
| - CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3488 | + ConsumeQueueStoreInterface consumeQueueStore = defaultMessageStore.getQueueStore(); |
3492 | 3489 |
|
3493 |
| - if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) { |
3494 |
| - result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check"); |
| 3490 | + if (!(consumeQueueStore instanceof CombineConsumeQueueStore)) { |
| 3491 | + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3492 | + result.setCheckResult("is not CombineConsumeQueueStore, no need check"); |
3495 | 3493 | result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
|
3496 | 3494 | return result;
|
3497 | 3495 | }
|
3498 | 3496 |
|
3499 |
| - if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { |
3500 |
| - result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"); |
3501 |
| - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3502 |
| - return result; |
3503 |
| - } |
3504 |
| - |
3505 |
| - ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); |
3506 |
| - StringBuilder diffResult = new StringBuilder(); |
3507 |
| - try { |
3508 |
| - if (StringUtils.isNotBlank(requestTopic)) { |
3509 |
| - boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime()); |
3510 |
| - result.setCheckResult(diffResult.toString()); |
3511 |
| - result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3512 |
| - return result; |
3513 |
| - } |
3514 |
| - int successNum = 0; |
3515 |
| - int checkSize = 0; |
3516 |
| - for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { |
3517 |
| - boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime()); |
3518 |
| - successNum += checkResult ? 1 : 0; |
3519 |
| - checkSize++; |
3520 |
| - } |
3521 |
| - // check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.) |
3522 |
| - // check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready |
3523 |
| - boolean checkReady = successNum == checkSize; |
3524 |
| - String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) : |
3525 |
| - String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size()); |
3526 |
| - diffResult.append("check all topic finish, ").append(checkResultString); |
3527 |
| - result.setCheckResult(diffResult.toString()); |
3528 |
| - result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3529 |
| - } catch (Exception e) { |
3530 |
| - LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); |
3531 |
| - result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace())); |
3532 |
| - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()); |
3533 |
| - } |
3534 |
| - return result; |
3535 |
| - } |
3536 |
| - |
3537 |
| - private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, |
3538 |
| - RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, |
3539 |
| - long checkpointByStoreTime) { |
3540 |
| - boolean processResult = true; |
3541 |
| - for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { |
3542 |
| - Integer queueId = queueEntry.getKey(); |
3543 |
| - ConsumeQueueInterface jsonCq = queueEntry.getValue(); |
3544 |
| - ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); |
3545 |
| - if (printDetail) { |
3546 |
| - String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", |
3547 |
| - topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); |
3548 |
| - diffResult.append(format).append("\n"); |
3549 |
| - } |
3550 |
| - |
3551 |
| - long minOffsetByTime = 0L; |
3552 |
| - try { |
3553 |
| - minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER); |
3554 |
| - } catch (Exception e) { |
3555 |
| - // ignore |
3556 |
| - } |
3557 |
| - long minOffsetInQueue = kvCq.getMinOffsetInQueue(); |
3558 |
| - long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime); |
3559 |
| - long checkTo = jsonCq.getMaxOffsetInQueue() - 1; |
3560 |
| - /* |
3561 |
| - checkTo(maxOffsetInQueue - 1) |
3562 |
| - v |
3563 |
| - fileCq +------------------------------------------------------+ |
3564 |
| - kvCq +----------------------------------------------+ |
3565 |
| - ^ ^ |
3566 |
| - minOffsetInQueue minOffsetByTime |
3567 |
| - ^ |
3568 |
| - checkFrom = max(minOffsetInQueue, minOffsetByTime) |
3569 |
| - */ |
3570 |
| - // The latest message is earlier than the check time |
3571 |
| - Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo); |
3572 |
| - if (fileLatestCq != null) { |
3573 |
| - if (fileLatestCq.getObject2() < checkpointByStoreTime) { |
3574 |
| - continue; |
3575 |
| - } |
3576 |
| - } |
3577 |
| - for (long i = checkFrom; i <= checkTo; i++) { |
3578 |
| - Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); |
3579 |
| - Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); |
3580 |
| - if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { |
3581 |
| - LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n", |
3582 |
| - topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); |
3583 |
| - processResult = false; |
3584 |
| - break; |
3585 |
| - } |
3586 |
| - } |
3587 |
| - } |
3588 |
| - return processResult; |
3589 |
| - } |
3590 |
| - |
3591 |
| - private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { |
3592 |
| - if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { |
3593 |
| - return false; |
3594 |
| - } |
3595 |
| - if (cqUnit1.getSize() != cqUnit2.getSize()) { |
3596 |
| - return false; |
3597 |
| - } |
3598 |
| - if (cqUnit1.getPos() != cqUnit2.getPos()) { |
3599 |
| - return false; |
3600 |
| - } |
3601 |
| - if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { |
3602 |
| - return false; |
3603 |
| - } |
3604 |
| - return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); |
| 3497 | + return ((CombineConsumeQueueStore) consumeQueueStore). |
| 3498 | + doCheckCqWriteProgress(requestHeader.getTopic(), requestHeader.getCheckStoreTime(), StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB); |
3605 | 3499 | }
|
3606 | 3500 |
|
3607 | 3501 | private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) {
|
|
0 commit comments