Skip to content

Commit eb133b9

Browse files
committed
fix
1 parent 3629a45 commit eb133b9

File tree

7 files changed

+47
-30
lines changed

7 files changed

+47
-30
lines changed

Diff for: store/src/main/java/org/apache/rocketmq/store/CommitLog.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.apache.rocketmq.store;
1818

1919
import com.google.common.base.Strings;
20+
import com.sun.jna.NativeLong;
21+
import com.sun.jna.Pointer;
2022
import java.net.Inet6Address;
2123
import java.net.InetSocketAddress;
2224
import java.nio.ByteBuffer;
@@ -34,8 +36,6 @@
3436
import java.util.concurrent.TimeoutException;
3537
import java.util.function.Supplier;
3638
import java.util.stream.Collectors;
37-
import com.sun.jna.NativeLong;
38-
import com.sun.jna.Pointer;
3939
import org.apache.rocketmq.common.MixAll;
4040
import org.apache.rocketmq.common.ServiceThread;
4141
import org.apache.rocketmq.common.SystemClock;
@@ -66,7 +66,6 @@
6666
import org.apache.rocketmq.store.logfile.MappedFile;
6767
import org.apache.rocketmq.store.util.LibC;
6868
import org.rocksdb.RocksDBException;
69-
7069
import sun.nio.ch.DirectBuffer;
7170

7271
/**
@@ -408,8 +407,7 @@ else if (!dispatchRequest.isSuccess()) {
408407
log.warn("The commitlog files are deleted, and delete the consume queue files");
409408
this.mappedFileQueue.setFlushedWhere(0);
410409
this.mappedFileQueue.setCommittedWhere(0);
411-
this.defaultMessageStore.getQueueStore().destroy();
412-
this.defaultMessageStore.getQueueStore().loadAfterDestroy();
410+
this.defaultMessageStore.destroyConsumeQueueStore(true);
413411
}
414412
}
415413

@@ -818,8 +816,7 @@ else if (size == 0) {
818816
log.warn("The commitlog files are deleted, and delete the consume queue files");
819817
this.mappedFileQueue.setFlushedWhere(0);
820818
this.mappedFileQueue.setCommittedWhere(0);
821-
this.defaultMessageStore.getQueueStore().destroy();
822-
this.defaultMessageStore.getQueueStore().loadAfterDestroy();
819+
this.defaultMessageStore.destroyConsumeQueueStore(true);
823820
}
824821
}
825822

Diff for: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.rocketmq.store.config.StorePathConfigHelper;
3636
import org.apache.rocketmq.store.logfile.MappedFile;
3737
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
38+
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
3839
import org.apache.rocketmq.store.queue.CqUnit;
3940
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
4041
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
@@ -61,6 +62,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
6162
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
6263

6364
private final MessageStore messageStore;
65+
private final ConsumeQueueStoreInterface consumeQueueStore;
6466

6567
private final MappedFileQueue mappedFileQueue;
6668
private final String topic;
@@ -83,9 +85,20 @@ public ConsumeQueue(
8385
final String storePath,
8486
final int mappedFileSize,
8587
final MessageStore messageStore) {
88+
this(topic, queueId, storePath, mappedFileSize, messageStore, messageStore.getQueueStore());
89+
}
90+
91+
public ConsumeQueue(
92+
final String topic,
93+
final int queueId,
94+
final String storePath,
95+
final int mappedFileSize,
96+
final MessageStore messageStore,
97+
final ConsumeQueueStoreInterface consumeQueueStore) {
8698
this.storePath = storePath;
8799
this.mappedFileSize = mappedFileSize;
88100
this.messageStore = messageStore;
101+
this.consumeQueueStore = consumeQueueStore;
89102

90103
this.topic = topic;
91104
this.queueId = queueId;
@@ -899,14 +912,14 @@ public CqUnit get(long offset) {
899912
@Override
900913
public Pair<CqUnit, Long> getCqUnitAndStoreTime(long index) {
901914
CqUnit cqUnit = get(index);
902-
Long messageStoreTime = this.messageStore.getQueueStore().getStoreTime(cqUnit);
915+
Long messageStoreTime = this.consumeQueueStore.getStoreTime(cqUnit);
903916
return new Pair<>(cqUnit, messageStoreTime);
904917
}
905918

906919
@Override
907920
public Pair<CqUnit, Long> getEarliestUnitAndStoreTime() {
908921
CqUnit cqUnit = getEarliestUnit();
909-
Long messageStoreTime = this.messageStore.getQueueStore().getStoreTime(cqUnit);
922+
Long messageStoreTime = this.consumeQueueStore.getStoreTime(cqUnit);
910923
return new Pair<>(cqUnit, messageStoreTime);
911924
}
912925

Diff for: store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

+7
Original file line numberDiff line numberDiff line change
@@ -3099,4 +3099,11 @@ public IndexService getIndexService() {
30993099
public ScheduledExecutorService getScheduledCleanQueueExecutorService() {
31003100
return scheduledCleanQueueExecutorService;
31013101
}
3102+
3103+
public void destroyConsumeQueueStore(boolean loadAfterDestroy) {
3104+
consumeQueueStore.destroy();
3105+
if (loadAfterDestroy) {
3106+
consumeQueueStore.loadAfterDestroy();
3107+
}
3108+
}
31023109
}

Diff for: store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top
259259
queueId,
260260
storePath,
261261
this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
262-
this.messageStore);
262+
this.messageStore,
263+
this);
263264
} else if (Objects.equals(CQType.BatchCQ, cqType)) {
264265
return new BatchConsumeQueue(
265266
topic,
@@ -433,7 +434,7 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId)
433434
queueId,
434435
getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
435436
this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
436-
this.messageStore);
437+
this.messageStore, this);
437438
}
438439

439440
ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);

Diff for: store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,20 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface {
3636
private static final Logger ERROR_LOG = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
3737

3838
private final MessageStore messageStore;
39+
private final ConsumeQueueStoreInterface consumeQueueStore;
3940
private final String topic;
4041
private final int queueId;
4142

42-
public RocksDBConsumeQueue(final MessageStore messageStore, final String topic, final int queueId) {
43+
public RocksDBConsumeQueue(final MessageStore messageStore, final ConsumeQueueStoreInterface consumeQueueStore,
44+
final String topic, final int queueId) {
4345
this.messageStore = messageStore;
46+
this.consumeQueueStore = consumeQueueStore;
4447
this.topic = topic;
4548
this.queueId = queueId;
4649
}
4750

4851
public RocksDBConsumeQueue(final String topic, final int queueId) {
49-
this.messageStore = null;
50-
this.topic = topic;
51-
this.queueId = queueId;
52+
this(null, null, topic, queueId);
5253
}
5354

5455
@Override
@@ -114,7 +115,7 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
114115
@Override
115116
public long getMaxOffsetInQueue() {
116117
try {
117-
return this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
118+
return this.consumeQueueStore.getMaxOffsetInQueue(topic, queueId);
118119
} catch (RocksDBException e) {
119120
ERROR_LOG.error("getMaxOffsetInQueue Failed. topic: {}, queueId: {}", topic, queueId, e);
120121
return 0;
@@ -124,8 +125,8 @@ public long getMaxOffsetInQueue() {
124125
@Override
125126
public long getMessageTotalInQueue() {
126127
try {
127-
long maxOffsetInQueue = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
128-
long minOffsetInQueue = this.messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId);
128+
long maxOffsetInQueue = this.consumeQueueStore.getMaxOffsetInQueue(topic, queueId);
129+
long minOffsetInQueue = this.consumeQueueStore.getMinOffsetInQueue(topic, queueId);
129130
return maxOffsetInQueue - minOffsetInQueue;
130131
} catch (RocksDBException e) {
131132
ERROR_LOG.error("getMessageTotalInQueue Failed. topic: {}, queueId: {}, {}", topic, queueId, e);
@@ -158,7 +159,7 @@ public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) {
158159

159160
@Override
160161
public long getMaxPhysicOffset() {
161-
Long maxPhyOffset = this.messageStore.getQueueStore().getMaxPhyOffsetInConsumeQueue(topic, queueId);
162+
Long maxPhyOffset = this.consumeQueueStore.getMaxPhyOffsetInConsumeQueue(topic, queueId);
162163
return maxPhyOffset == null ? -1 : maxPhyOffset;
163164
}
164165

@@ -195,7 +196,6 @@ public void correctMinOffset(long minCommitLogOffset) {
195196

196197
/**
197198
* Ignored, in rocksdb mode, we build cq in RocksDBConsumeQueueStore
198-
* @see org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore#putMessagePosition()
199199
*/
200200
@Override
201201
public void putMessagePositionInfoWrapper(DispatchRequest request) {
@@ -208,7 +208,7 @@ public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageEx
208208
Long queueOffset = queueOffsetOperator.getTopicQueueNextOffset(topicQueueKey);
209209
if (queueOffset == null) {
210210
// we will recover topic queue table from rocksdb when we use it.
211-
queueOffset = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
211+
queueOffset = this.consumeQueueStore.getMaxOffsetInQueue(topic, queueId);
212212
queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
213213
}
214214
msg.setQueueOffset(queueOffset);
@@ -306,7 +306,7 @@ public CqUnit get(long index) {
306306
public Pair<CqUnit, Long> getCqUnitAndStoreTime(long index) {
307307
ByteBuffer byteBuffer;
308308
try {
309-
byteBuffer = this.messageStore.getQueueStore().get(topic, queueId, index);
309+
byteBuffer = this.consumeQueueStore.get(topic, queueId, index);
310310
} catch (RocksDBException e) {
311311
ERROR_LOG.error("getUnitAndStoreTime Failed. topic: {}, queueId: {}", topic, queueId, e);
312312
return null;
@@ -324,7 +324,7 @@ public Pair<CqUnit, Long> getCqUnitAndStoreTime(long index) {
324324
@Override
325325
public Pair<CqUnit, Long> getEarliestUnitAndStoreTime() {
326326
try {
327-
long minOffset = this.messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId);
327+
long minOffset = this.consumeQueueStore.getMinOffsetInQueue(topic, queueId);
328328
return getCqUnitAndStoreTime(minOffset);
329329
} catch (RocksDBException e) {
330330
ERROR_LOG.error("getEarliestUnitAndStoreTime Failed. topic: {}, queueId: {}", topic, queueId, e);
@@ -341,7 +341,7 @@ public CqUnit getEarliestUnit() {
341341
@Override
342342
public CqUnit getLatestUnit() {
343343
try {
344-
long maxOffset = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
344+
long maxOffset = this.consumeQueueStore.getMaxOffsetInQueue(topic, queueId);
345345
return get(maxOffset > 0 ? maxOffset - 1 : maxOffset);
346346
} catch (RocksDBException e) {
347347
ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {}, {}", topic, queueId, e.getMessage());
@@ -355,7 +355,7 @@ public long getLastOffset() {
355355
}
356356

357357
private ReferredIterator<CqUnit> iterateFrom0(final long startIndex, final int count) throws RocksDBException {
358-
List<ByteBuffer> byteBufferList = this.messageStore.getQueueStore().rangeQuery(topic, queueId, startIndex, count);
358+
List<ByteBuffer> byteBufferList = this.consumeQueueStore.rangeQuery(topic, queueId, startIndex, count);
359359
if (byteBufferList == null || byteBufferList.isEmpty()) {
360360
if (this.messageStore.getMessageStoreConfig().isEnableRocksDBLog()) {
361361
log.warn("iterateFrom0 - find nothing, startIndex:{}, count:{}", startIndex, count);
@@ -449,7 +449,7 @@ public CqUnit next() {
449449

450450
final ByteBuffer byteBuffer;
451451
try {
452-
byteBuffer = messageStore.getQueueStore().get(topic, queueId, startIndex + currentIndex);
452+
byteBuffer = consumeQueueStore.get(topic, queueId, startIndex + currentIndex);
453453
} catch (RocksDBException e) {
454454
ERROR_LOG.error("get cq from rocksdb failed. topic: {}, queueId: {}", topic, queueId, e);
455455
return null;

Diff for: store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId)
521521
return logic;
522522
}
523523

524-
ConsumeQueueInterface newLogic = new RocksDBConsumeQueue(this.messageStore, topic, queueId);
524+
ConsumeQueueInterface newLogic = new RocksDBConsumeQueue(this.messageStore, this, topic, queueId);
525525
ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
526526

527527
return oldLogic != null ? oldLogic : newLogic;

Diff for: store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
package org.apache.rocketmq.store.queue;
1818

19+
import java.nio.ByteBuffer;
1920
import org.apache.rocketmq.common.MixAll;
2021
import org.apache.rocketmq.store.DefaultMessageStore;
2122
import org.junit.Test;
2223
import org.mockito.invocation.InvocationOnMock;
2324
import org.mockito.stubbing.Answer;
2425

25-
import java.nio.ByteBuffer;
26-
2726
import static org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
2827
import static org.junit.Assert.assertEquals;
2928
import static org.junit.Assert.assertFalse;
@@ -60,7 +59,7 @@ public ByteBuffer answer(InvocationOnMock mock) throws Throwable {
6059
}
6160
});
6261

63-
RocksDBConsumeQueue consumeQueue = new RocksDBConsumeQueue(messageStore, "topic", 0);
62+
RocksDBConsumeQueue consumeQueue = new RocksDBConsumeQueue(messageStore, rocksDBConsumeQueueStore, "topic", 0);
6463
ReferredIterator<CqUnit> it = consumeQueue.iterateFrom(9000);
6564
for (int i = 0; i < 1000; i++) {
6665
assertTrue(it.hasNext());

0 commit comments

Comments
 (0)