-
Notifications
You must be signed in to change notification settings - Fork 11.8k
[ISSUE #9254] Refactor CQ-related in DefaultMessageStorage #9256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
[ISSUE #9254] Refactor CQ-related in DefaultMessageStorage #9256
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #9256 +/- ##
=============================================
+ Coverage 48.11% 48.22% +0.10%
- Complexity 12110 12229 +119
=============================================
Files 1322 1324 +2
Lines 93126 93576 +450
Branches 11940 12023 +83
=============================================
+ Hits 44812 45126 +314
- Misses 42775 42872 +97
- Partials 5539 5578 +39 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ad190f2
to
61ee2f4
Compare
dc014b5
to
a8996fa
Compare
da78405
to
59ee17a
Compare
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
Show resolved
Hide resolved
return this.consumeQueueStore.getMinOffsetInQueue(topic, queueId); | ||
} catch (RocksDBException e) { | ||
ERROR_LOG.error("getMinOffsetInQueue Failed. topic: {}, queueId: {}", topic, queueId, e); | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure here the default value is -1, but not 0.
@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() { | |||
} | |||
} | |||
} | |||
|
|||
public void initializeWithOffset(long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for mount --> main in container mode? If so, add some java docs here.
|
||
// update the max and min offset | ||
if (offset > 0) { | ||
this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, offset - 1, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use two methods for updating CQ offset for min and max offsets.
Which Issue(s) This PR Fixes
Fixes #9254
Brief Description
How Did You Test This Change?