Skip to content

Commit dc014b5

Browse files
committed
fix
1 parent c5cd32a commit dc014b5

33 files changed

+1667
-1129
lines changed

Diff for: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+9-12
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@
6060
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
6161
import org.apache.rocketmq.broker.coldctr.ColdDataCgCtrService;
6262
import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
63+
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
64+
import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager;
65+
import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager;
66+
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
67+
import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
68+
import org.apache.rocketmq.broker.config.v2.ConfigStorage;
69+
import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2;
70+
import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2;
71+
import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2;
6372
import org.apache.rocketmq.broker.controller.ReplicasManager;
6473
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
6574
import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -76,7 +85,6 @@
7685
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
7786
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
7887
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
79-
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
8088
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
8189
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
8290
import org.apache.rocketmq.broker.pop.PopConsumerService;
@@ -100,16 +108,8 @@
100108
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
101109
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
102110
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
103-
import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager;
104-
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
105111
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
106-
import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2;
107-
import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2;
108-
import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2;
109-
import org.apache.rocketmq.broker.config.v2.ConfigStorage;
110112
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
111-
import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager;
112-
import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
113113
import org.apache.rocketmq.broker.topic.TopicConfigManager;
114114
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
115115
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
@@ -815,9 +815,6 @@ public boolean initializeMessageStore() {
815815
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
816816
} else {
817817
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
818-
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
819-
defaultMessageStore.enableRocksdbCQWrite();
820-
}
821818
}
822819

823820
if (messageStoreConfig.isEnableDLegerCommitLog()) {

Diff for: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

+8-114
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,12 @@
7272
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
7373
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
7474
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
75-
import org.apache.rocketmq.common.BoundaryType;
7675
import org.apache.rocketmq.common.BrokerConfig;
7776
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
7877
import org.apache.rocketmq.common.KeyBuilder;
7978
import org.apache.rocketmq.common.LockCallback;
8079
import org.apache.rocketmq.common.MQVersion;
8180
import org.apache.rocketmq.common.MixAll;
82-
import org.apache.rocketmq.common.Pair;
8381
import org.apache.rocketmq.common.PlainAccessConfig;
8482
import org.apache.rocketmq.common.TopicAttributes;
8583
import org.apache.rocketmq.common.TopicConfig;
@@ -223,13 +221,14 @@
223221
import org.apache.rocketmq.store.MessageStore;
224222
import org.apache.rocketmq.store.PutMessageResult;
225223
import org.apache.rocketmq.store.PutMessageStatus;
226-
import org.apache.rocketmq.store.RocksDBMessageStore;
227224
import org.apache.rocketmq.store.SelectMappedBufferResult;
228225
import org.apache.rocketmq.store.StoreType;
229226
import org.apache.rocketmq.store.config.BrokerRole;
230227
import org.apache.rocketmq.store.exception.ConsumeQueueException;
231228
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
229+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
232230
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
231+
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
233232
import org.apache.rocketmq.store.queue.CqUnit;
234233
import org.apache.rocketmq.store.queue.ReferredIterator;
235234
import org.apache.rocketmq.store.timer.TimerCheckpoint;
@@ -3479,129 +3478,24 @@ private boolean validateBlackListConfigExist(Properties properties) {
34793478
private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
34803479
RemotingCommand request) throws RemotingCommandException {
34813480
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
3482-
String requestTopic = requestHeader.getTopic();
34833481
MessageStore messageStore = brokerController.getMessageStore();
34843482
DefaultMessageStore defaultMessageStore;
34853483
if (messageStore instanceof AbstractPluginMessageStore) {
34863484
defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
34873485
} else {
34883486
defaultMessageStore = (DefaultMessageStore) messageStore;
34893487
}
3490-
RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
3491-
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
3488+
ConsumeQueueStoreInterface consumeQueueStore = defaultMessageStore.getQueueStore();
34923489

3493-
if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) {
3494-
result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check");
3490+
if (!(consumeQueueStore instanceof CombineConsumeQueueStore)) {
3491+
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
3492+
result.setCheckResult("is not CombineConsumeQueueStore, no need check");
34953493
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
34963494
return result;
34973495
}
34983496

3499-
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
3500-
result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid");
3501-
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3502-
return result;
3503-
}
3504-
3505-
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
3506-
StringBuilder diffResult = new StringBuilder();
3507-
try {
3508-
if (StringUtils.isNotBlank(requestTopic)) {
3509-
boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
3510-
result.setCheckResult(diffResult.toString());
3511-
result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3512-
return result;
3513-
}
3514-
int successNum = 0;
3515-
int checkSize = 0;
3516-
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
3517-
boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
3518-
successNum += checkResult ? 1 : 0;
3519-
checkSize++;
3520-
}
3521-
// check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.)
3522-
// check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready
3523-
boolean checkReady = successNum == checkSize;
3524-
String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) :
3525-
String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size());
3526-
diffResult.append("check all topic finish, ").append(checkResultString);
3527-
result.setCheckResult(diffResult.toString());
3528-
result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3529-
} catch (Exception e) {
3530-
LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
3531-
result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace()));
3532-
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
3533-
}
3534-
return result;
3535-
}
3536-
3537-
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic,
3538-
RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail,
3539-
long checkpointByStoreTime) {
3540-
boolean processResult = true;
3541-
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
3542-
Integer queueId = queueEntry.getKey();
3543-
ConsumeQueueInterface jsonCq = queueEntry.getValue();
3544-
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);
3545-
if (printDetail) {
3546-
String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
3547-
topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
3548-
diffResult.append(format).append("\n");
3549-
}
3550-
3551-
long minOffsetByTime = 0L;
3552-
try {
3553-
minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER);
3554-
} catch (Exception e) {
3555-
// ignore
3556-
}
3557-
long minOffsetInQueue = kvCq.getMinOffsetInQueue();
3558-
long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
3559-
long checkTo = jsonCq.getMaxOffsetInQueue() - 1;
3560-
/*
3561-
checkTo(maxOffsetInQueue - 1)
3562-
v
3563-
fileCq +------------------------------------------------------+
3564-
kvCq +----------------------------------------------+
3565-
^ ^
3566-
minOffsetInQueue minOffsetByTime
3567-
^
3568-
checkFrom = max(minOffsetInQueue, minOffsetByTime)
3569-
*/
3570-
// The latest message is earlier than the check time
3571-
Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo);
3572-
if (fileLatestCq != null) {
3573-
if (fileLatestCq.getObject2() < checkpointByStoreTime) {
3574-
continue;
3575-
}
3576-
}
3577-
for (long i = checkFrom; i <= checkTo; i++) {
3578-
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
3579-
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
3580-
if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
3581-
LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n",
3582-
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
3583-
processResult = false;
3584-
break;
3585-
}
3586-
}
3587-
}
3588-
return processResult;
3589-
}
3590-
3591-
private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
3592-
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
3593-
return false;
3594-
}
3595-
if (cqUnit1.getSize() != cqUnit2.getSize()) {
3596-
return false;
3597-
}
3598-
if (cqUnit1.getPos() != cqUnit2.getPos()) {
3599-
return false;
3600-
}
3601-
if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
3602-
return false;
3603-
}
3604-
return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
3497+
return ((CombineConsumeQueueStore) consumeQueueStore).
3498+
doCheckCqWriteProgress(requestHeader.getTopic(), requestHeader.getCheckStoreTime(), StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB);
36053499
}
36063500

36073501
private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) {

Diff for: broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public boolean correctDelayOffset() {
236236
try {
237237
for (int delayLevel : delayLevelTable.keySet()) {
238238
ConsumeQueueInterface cq =
239-
brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
239+
brokerController.getMessageStore().findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
240240
delayLevel2QueueId(delayLevel));
241241
Long currentDelayOffset = offsetTable.get(delayLevel);
242242
if (currentDelayOffset == null || cq == null) {

Diff for: broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java

+26-12
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@
2828
import org.apache.rocketmq.broker.BrokerController;
2929
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
3030
import org.apache.rocketmq.common.BrokerConfig;
31+
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
3132
import org.apache.rocketmq.common.Pair;
32-
import org.apache.rocketmq.common.TopicConfig;
3333
import org.apache.rocketmq.store.DefaultMessageStore;
3434
import org.apache.rocketmq.store.DispatchRequest;
35-
import org.apache.rocketmq.store.RocksDBMessageStore;
35+
import org.apache.rocketmq.store.StoreType;
3636
import org.apache.rocketmq.store.config.MessageStoreConfig;
37+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
3738
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
39+
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
3840
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
3941
import org.apache.rocketmq.store.queue.CqUnit;
42+
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
4043
import org.apache.rocketmq.store.stats.BrokerStatsManager;
4144
import org.awaitility.Awaitility;
4245
import org.junit.Assert;
@@ -81,9 +84,8 @@ public void init() throws IOException {
8184
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
8285
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
8386

84-
defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("aaa", true), null,
85-
brokerConfig, new ConcurrentHashMap<String, TopicConfig>());
86-
defaultMessageStore.enableRocksdbCQWrite();
87+
defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("for-test", true), null,
88+
brokerConfig, new ConcurrentHashMap<>());
8789
defaultMessageStore.loadCheckPoint();
8890

8991
consumerOffsetManager = new ConsumerOffsetManager(brokerController);
@@ -134,23 +136,35 @@ public void testRocksdbCqWrite() throws RocksDBException {
134136
if (notToBeExecuted()) {
135137
return;
136138
}
137-
RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore();
138-
ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
139-
store.start();
140-
ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
141-
ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId);
139+
long startTimestamp = System.currentTimeMillis();
140+
141+
ConsumeQueueStoreInterface combineConsumeQueueStore = defaultMessageStore.getQueueStore();
142+
Assert.assertTrue(combineConsumeQueueStore instanceof CombineConsumeQueueStore);
143+
combineConsumeQueueStore.load();
144+
combineConsumeQueueStore.recover(false);
145+
combineConsumeQueueStore.start();
146+
147+
RocksDBConsumeQueueStore rocksDBConsumeQueueStore = ((CombineConsumeQueueStore) combineConsumeQueueStore).getRocksDBConsumeQueueStore();
148+
ConsumeQueueStore consumeQueueStore = ((CombineConsumeQueueStore) combineConsumeQueueStore).getConsumeQueueStore();
149+
142150
for (int i = 0; i < 200; i++) {
143151
DispatchRequest request = new DispatchRequest(topic, queueId, i, 200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>());
144-
fileCq.putMessagePositionInfoWrapper(request);
145-
store.putMessagePositionInfoWrapper(request);
152+
combineConsumeQueueStore.putMessagePositionInfoWrapper(request);
146153
}
154+
155+
ConsumeQueueInterface rocksdbCq = rocksDBConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
156+
ConsumeQueueInterface fileCq = consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
157+
147158
Awaitility.await()
148159
.pollInterval(100, TimeUnit.MILLISECONDS)
149160
.atMost(3, TimeUnit.SECONDS)
150161
.until(() -> rocksdbCq.getMaxOffsetInQueue() == 200);
151162
Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
152163
Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
153164
Assert.assertEquals(unit.getObject1().getPos(), unit1.getObject1().getPos());
165+
166+
CheckRocksdbCqWriteResult result = ((CombineConsumeQueueStore) combineConsumeQueueStore).doCheckCqWriteProgress(topic, startTimestamp, StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB);
167+
Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus());
154168
}
155169

156170
/**

Diff for: client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*/
1717
package org.apache.rocketmq.client.impl.consumer;
1818

19+
import java.lang.reflect.Field;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.TreeMap;
1924
import org.apache.commons.lang3.reflect.FieldUtils;
2025
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2126
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -29,12 +34,6 @@
2934
import org.junit.runner.RunWith;
3035
import org.mockito.junit.MockitoJUnitRunner;
3136

32-
import java.lang.reflect.Field;
33-
import java.util.ArrayList;
34-
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.TreeMap;
37-
3837
import static org.assertj.core.api.Assertions.assertThat;
3938
import static org.junit.Assert.assertEquals;
4039
import static org.junit.Assert.assertFalse;
@@ -158,7 +157,6 @@ public void testProcessQueue() {
158157
ProcessQueue processQueue2 = createProcessQueue();
159158
assertEquals(processQueue1.getMsgAccCnt(), processQueue2.getMsgAccCnt());
160159
assertEquals(processQueue1.getTryUnlockTimes(), processQueue2.getTryUnlockTimes());
161-
assertEquals(processQueue1.getLastLockTimestamp(), processQueue2.getLastLockTimestamp());
162160
assertEquals(processQueue1.getLastPullTimestamp(), processQueue2.getLastPullTimestamp());
163161
}
164162

0 commit comments

Comments
 (0)