-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-20297: move Scheduler from client common to trogdor #21753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf6b0f9
f2be142
b17f21a
4391eac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ | |
| import org.apache.kafka.common.config.SaslConfigs; | ||
| import org.apache.kafka.common.internals.KafkaFutureImpl; | ||
| import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.LoginContextFactory; | ||
| import org.apache.kafka.common.utils.MockScheduler; | ||
| import org.apache.kafka.common.utils.MockTime; | ||
| import org.apache.kafka.common.utils.Time; | ||
|
|
||
|
|
@@ -35,6 +34,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.TreeMap; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
|
|
@@ -284,6 +284,40 @@ public Future<?> refresherThreadDoneFuture() { | |
| } | ||
| } | ||
|
|
||
| /* | ||
| * */ | ||
| private static class MockScheduler implements MockTime.Listener { | ||
|
|
||
| private final MockTime time; | ||
| private final Map<Long, List<KafkaFutureImpl<Long>>> waiters = new TreeMap<>(); | ||
|
|
||
| public MockScheduler(MockTime time) { | ||
| this.time = time; | ||
| time.addListener(this); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void onTimeUpdated() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Override
public synchronized void onTimeUpdated() {
long timeMs = time.milliseconds();
var iterator = waiters.entrySet().iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.getKey() > timeMs) break;
entry.getValue().forEach(future -> future.complete(timeMs));
iterator.remove();
}
} |
||
| long timeMs = time.milliseconds(); | ||
| var iterator = waiters.entrySet().iterator(); | ||
| while (iterator.hasNext()) { | ||
| var entry = iterator.next(); | ||
| if (entry.getKey() > timeMs) break; | ||
| entry.getValue().forEach(future -> future.complete(timeMs)); | ||
| iterator.remove(); | ||
| } | ||
| } | ||
|
|
||
| public synchronized void addWaiter(long delayMs, KafkaFutureImpl<Long> waiter) { | ||
| long timeMs = time.milliseconds(); | ||
| if (delayMs <= 0) { | ||
| waiter.complete(timeMs); | ||
| } else { | ||
| waiters.computeIfAbsent(timeMs + delayMs, k -> new ArrayList<>()).add(waiter); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testRefresh() throws Exception { | ||
| for (int numExpectedRefreshes : new int[] {0, 1, 2}) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,6 @@ | |
|
|
||
| package org.apache.kafka.trogdor.common; | ||
|
|
||
| import org.apache.kafka.common.utils.Scheduler; | ||
| import org.apache.kafka.common.utils.Utils; | ||
|
|
||
| import com.fasterxml.jackson.databind.JsonNode; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
@@ -34,11 +33,6 @@ class Config { | |
|
|
||
| public static final String TROGDOR_COORDINATOR_PORT = "trogdor.coordinator.port"; | ||
|
|
||
| public static final String TROGDOR_COORDINATOR_HEARTBEAT_MS = | ||
| "trogdor.coordinator.heartbeat.ms"; | ||
|
|
||
| public static final int TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT = 60000; | ||
|
|
||
| public static Platform parse(String curNodeName, String path) throws Exception { | ||
| JsonNode root = JsonUtil.JSON_SERDE.readTree(new File(path)); | ||
| JsonNode platformNode = root.get("platform"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you remove unused comment?