Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {

private static final Logger log = LoggerFactory.getLogger(Reflector.class);
private static final String KUBERNETES_DISABLE_WATCH_BOOKMARKS = "kubernetes.disable.watch.bookmarks";

private static long MIN_TIMEOUT = TimeUnit.MINUTES.toSeconds(5);

Expand All @@ -56,6 +57,8 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final Executor executor;
private final boolean disableWatchBookmarks;

//default behavior - retry if started and it's not a watcherexception
private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException);
private long minTimeout = MIN_TIMEOUT;
Expand All @@ -75,6 +78,14 @@ public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store, Exe
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(),
ExponentialBackoffIntervalCalculator.UNLIMITED_RETRIES);
this.executor = executor;

if(Boolean.parseBoolean(Utils.getSystemPropertyOrEnvVar(KUBERNETES_DISABLE_WATCH_BOOKMARKS))) {
log.debug("Disabling watch bookmarks");
this.disableWatchBookmarks = true;
} else {
log.debug("Allowing watch bookmarks");
this.disableWatchBookmarks = false;
}
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -180,15 +191,17 @@ protected void reconnect() {
}

private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
CompletableFuture<L> futureResult = listerWatcher
.submitList(
new ListOptionsBuilder()
// if caching is allowed, start with 0 - meaning any cached version is fine for the initial listing
.withResourceVersion(isCachedListing(continueVal) ? "0" : null)
.withLimit(listerWatcher.getLimit())
.withContinue(continueVal)
.withAllowWatchBookmarks(false)
.build());
ListOptionsBuilder listOptionsBuilder = new ListOptionsBuilder()
// if caching is allowed, start with 0 - meaning any cached version is fine for the initial listing
.withResourceVersion(isCachedListing(continueVal) ? "0" : null)
.withLimit(listerWatcher.getLimit())
.withContinue(continueVal);

if(disableWatchBookmarks) {
listOptionsBuilder = listOptionsBuilder.withAllowWatchBookmarks(false);
}

CompletableFuture<L> futureResult = listerWatcher.submitList(listOptionsBuilder.build());

return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
Expand Down Expand Up @@ -219,17 +232,23 @@ private synchronized CompletableFuture<? extends Watch> startWatcher(final Strin
if (isStopped()) {
return CompletableFuture.completedFuture(null);
}

ListOptionsBuilder listOptionsBuilder = new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
// this would match the behavior of the go client, but requires changing a lot of mock expectations
// so instead we'll terminate below and set a fail-safe here
// .withTimeoutSeconds((long) ((Math.random() + 1) * minTimeout))
.withTimeoutSeconds(minTimeout * 2);

if(disableWatchBookmarks) {
listOptionsBuilder = listOptionsBuilder.withAllowWatchBookmarks(false);
}

log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
// there's no need to stop the old watch, that will happen automatically when this call completes
CompletableFuture<AbstractWatchManager<T>> future = listerWatcher.submitWatch(
new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
// this would match the behavior of the go client, but requires changing a lot of mock expectations
// so instead we'll terminate below and set a fail-safe here
// .withTimeoutSeconds((long) ((Math.random() + 1) * minTimeout))
.withTimeoutSeconds(minTimeout * 2)
.withAllowWatchBookmarks(false)
.build(),
watcher);
listOptionsBuilder.build(),
watcher
);

// the alternative to this is to localize the logic in the AbstractWatchManager, however since
// we only need it for informers, it seems fine here
Expand Down