Skip to content

Commit 282d7a1

Browse files
authored
RATIS-2290. Simply the EventQueue in leader (apache#1258)
1 parent b0f5330 commit 282d7a1

File tree

1 file changed

+16
-21
lines changed

1 file changed

+16
-21
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,10 @@ private enum Type {
112112
}
113113

114114
private final Type type;
115-
private final long newTerm;
116115
private final Runnable handler;
117116

118-
StateUpdateEvent(Type type, long newTerm, Runnable handler) {
117+
StateUpdateEvent(Type type, Runnable handler) {
119118
this.type = type;
120-
this.newTerm = newTerm;
121119
this.handler = handler;
122120
}
123121

@@ -133,25 +131,30 @@ public boolean equals(Object obj) {
133131
return false;
134132
}
135133
final StateUpdateEvent that = (StateUpdateEvent)obj;
136-
return this.type == that.type && this.newTerm == that.newTerm;
134+
return this.type == that.type;
137135
}
138136

139137
@Override
140138
public int hashCode() {
141-
return Objects.hash(type, newTerm);
139+
return type.hashCode();
142140
}
143141

144142
@Override
145143
public String toString() {
146-
return type + (newTerm >= 0? ":" + newTerm: "");
144+
return type.name();
147145
}
148146
}
149147

150148
private class EventQueue {
151149
private final String name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
152-
private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096);
150+
private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(
151+
StateUpdateEvent.Type.values().length);;
153152

154-
void submit(StateUpdateEvent event) {
153+
// submit can be invoked by different threads -- need to be synchronized
154+
synchronized void submit(StateUpdateEvent event) {
155+
if (queue.contains(event)) { // avoid duplicated events
156+
return;
157+
}
155158
try {
156159
queue.put(event);
157160
} catch (InterruptedException e) {
@@ -160,10 +163,10 @@ void submit(StateUpdateEvent event) {
160163
}
161164
}
162165

166+
// poll is invoked only by the EventProcessor thread -- synchronized is not needed
163167
StateUpdateEvent poll() {
164-
final StateUpdateEvent e;
165168
try {
166-
e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
169+
return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
167170
} catch (InterruptedException ie) {
168171
Thread.currentThread().interrupt();
169172
String s = this + ": poll() is interrupted";
@@ -174,14 +177,6 @@ StateUpdateEvent poll() {
174177
throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
175178
}
176179
}
177-
178-
if (e != null) {
179-
// remove duplicated events from the head.
180-
while(e.equals(queue.peek())) {
181-
queue.poll();
182-
}
183-
}
184-
return e;
185180
}
186181

187182
@Override
@@ -323,9 +318,9 @@ boolean isApplied() {
323318
}
324319

325320
private final StateUpdateEvent updateCommitEvent =
326-
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
321+
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, this::updateCommit);
327322
private final StateUpdateEvent checkStagingEvent =
328-
new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging);
323+
new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, this::checkStaging);
329324

330325
private final String name;
331326
private final RaftServerImpl server;
@@ -704,7 +699,7 @@ void submitStepDownEvent(StepDownReason reason) {
704699
}
705700

706701
void submitStepDownEvent(long term, StepDownReason reason) {
707-
eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason)));
702+
eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, () -> stepDown(term, reason)));
708703
}
709704

710705
private void stepDown(long term, StepDownReason reason) {

0 commit comments

Comments
 (0)