Skip to content

Commit fc25436

Browse files
MINOR: Modify KafkaEventQueue VoidEvent to as singleton and use more proper function interface (#19356)
class `VoidEvent` provides singleton object , but nobody use it. I think we should private `VoidEvent` constructor and only use singleton. use `UnaryOperator<OptionalLong>` instead `Function<OptionalLong,OptionalLong>` Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 17ab374 commit fc25436

File tree

3 files changed

+20
-54
lines changed

3 files changed

+20
-54
lines changed

Diff for: server-common/src/main/java/org/apache/kafka/queue/EventQueue.java

+15-50
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
package org.apache.kafka.queue;
1919

20-
import org.slf4j.Logger;
2120

2221
import java.util.OptionalLong;
23-
import java.util.concurrent.RejectedExecutionException;
24-
import java.util.function.Function;
22+
import java.util.function.UnaryOperator;
2523

2624

2725
public interface EventQueue extends AutoCloseable {
@@ -44,38 +42,20 @@ interface Event {
4442
default void handleException(Throwable e) {}
4543
}
4644

47-
abstract class FailureLoggingEvent implements Event {
48-
private final Logger log;
49-
50-
public FailureLoggingEvent(Logger log) {
51-
this.log = log;
52-
}
53-
54-
@Override
55-
public void handleException(Throwable e) {
56-
if (e instanceof RejectedExecutionException) {
57-
log.info("Not processing {} because the event queue is closed.", this);
58-
} else {
59-
log.error("Unexpected error handling {}", this, e);
60-
}
61-
}
62-
63-
@Override
64-
public String toString() {
65-
return this.getClass().getSimpleName();
66-
}
67-
}
68-
69-
class NoDeadlineFunction implements Function<OptionalLong, OptionalLong> {
45+
class NoDeadlineFunction implements UnaryOperator<OptionalLong> {
7046
public static final NoDeadlineFunction INSTANCE = new NoDeadlineFunction();
47+
48+
private NoDeadlineFunction() {
49+
50+
}
7151

7252
@Override
7353
public OptionalLong apply(OptionalLong ignored) {
7454
return OptionalLong.empty();
7555
}
7656
}
7757

78-
class DeadlineFunction implements Function<OptionalLong, OptionalLong> {
58+
class DeadlineFunction implements UnaryOperator<OptionalLong> {
7959
private final long deadlineNs;
8060

8161
public DeadlineFunction(long deadlineNs) {
@@ -88,7 +68,7 @@ public OptionalLong apply(OptionalLong ignored) {
8868
}
8969
}
9070

91-
class EarliestDeadlineFunction implements Function<OptionalLong, OptionalLong> {
71+
class EarliestDeadlineFunction implements UnaryOperator<OptionalLong> {
9272
private final long newDeadlineNs;
9373

9474
public EarliestDeadlineFunction(long newDeadlineNs) {
@@ -107,28 +87,13 @@ public OptionalLong apply(OptionalLong prevDeadlineNs) {
10787
}
10888
}
10989

110-
class LatestDeadlineFunction implements Function<OptionalLong, OptionalLong> {
111-
private final long newDeadlineNs;
112-
113-
public LatestDeadlineFunction(long newDeadlineNs) {
114-
this.newDeadlineNs = newDeadlineNs;
115-
}
116-
117-
@Override
118-
public OptionalLong apply(OptionalLong prevDeadlineNs) {
119-
if (prevDeadlineNs.isEmpty()) {
120-
return OptionalLong.of(newDeadlineNs);
121-
} else if (prevDeadlineNs.getAsLong() > newDeadlineNs) {
122-
return prevDeadlineNs;
123-
} else {
124-
return OptionalLong.of(newDeadlineNs);
125-
}
126-
}
127-
}
128-
12990
class VoidEvent implements Event {
13091
public static final VoidEvent INSTANCE = new VoidEvent();
131-
92+
93+
private VoidEvent() {
94+
95+
}
96+
13297
@Override
13398
public void run() throws Exception {
13499
}
@@ -182,7 +147,7 @@ default void appendWithDeadline(long deadlineNs, Event event) {
182147
* @param event The event to schedule.
183148
*/
184149
default void scheduleDeferred(String tag,
185-
Function<OptionalLong, OptionalLong> deadlineNsCalculator,
150+
UnaryOperator<OptionalLong> deadlineNsCalculator,
186151
Event event) {
187152
enqueue(EventInsertionType.DEFERRED, tag, deadlineNsCalculator, event);
188153
}
@@ -224,7 +189,7 @@ enum EventInsertionType {
224189
*/
225190
void enqueue(EventInsertionType insertionType,
226191
String tag,
227-
Function<OptionalLong, OptionalLong> deadlineNsCalculator,
192+
UnaryOperator<OptionalLong> deadlineNsCalculator,
228193
Event event);
229194

230195
/**

Diff for: server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.locks.Condition;
3535
import java.util.concurrent.locks.ReentrantLock;
3636
import java.util.function.Function;
37+
import java.util.function.UnaryOperator;
3738

3839

3940
public final class KafkaEventQueue implements EventQueue {
@@ -444,7 +445,7 @@ public KafkaEventQueue(
444445
LogContext logContext,
445446
String threadNamePrefix
446447
) {
447-
this(time, logContext, threadNamePrefix, VoidEvent::new);
448+
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
448449
}
449450

450451
public KafkaEventQueue(
@@ -472,7 +473,7 @@ public Time time() {
472473
@Override
473474
public void enqueue(EventInsertionType insertionType,
474475
String tag,
475-
Function<OptionalLong, OptionalLong> deadlineNsCalculator,
476+
UnaryOperator<OptionalLong> deadlineNsCalculator,
476477
Event event) {
477478
EventContext eventContext = new EventContext(event, insertionType, tag);
478479
Exception e = eventHandler.enqueue(eventContext, deadlineNsCalculator);

Diff for: server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import org.apache.kafka.common.utils.ExponentialBackoff;
2121

2222
import java.util.OptionalLong;
23-
import java.util.function.Function;
23+
import java.util.function.UnaryOperator;
2424

2525
import static org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
2626

2727
/**
2828
* This class calculates when the MaybeSendAssignmentsEvent should run for AssignmentsManager.
2929
*/
30-
public class AssignmentsManagerDeadlineFunction implements Function<OptionalLong, OptionalLong> {
30+
public class AssignmentsManagerDeadlineFunction implements UnaryOperator<OptionalLong> {
3131

3232
/**
3333
* The exponential backoff to use.

0 commit comments

Comments
 (0)