-
Notifications
You must be signed in to change notification settings - Fork 19
Cache sticky endpoint channels #2993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| /* | ||
| * (c) Copyright 2026 Palantir Technologies Inc. All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.palantir.dialogue.core; | ||
|
|
||
| import com.github.benmanes.caffeine.cache.Caffeine; | ||
| import com.github.benmanes.caffeine.cache.LoadingCache; | ||
| import com.palantir.dialogue.Endpoint; | ||
| import com.palantir.dialogue.EndpointChannel; | ||
| import com.palantir.dialogue.EndpointChannelFactory; | ||
|
|
||
| final class CachingEndpointChannelFactory implements EndpointChannelFactory { | ||
|
|
||
| private final LoadingCache<Endpoint, EndpointChannel> cache; | ||
| private final EndpointChannelFactory delegate; | ||
|
|
||
| CachingEndpointChannelFactory(EndpointChannelFactory delegate, int maxEndpointCacheSize) { | ||
| this.delegate = delegate; | ||
| this.cache = Caffeine.newBuilder() | ||
| .maximumSize(maxEndpointCacheSize) | ||
| .weakValues() | ||
| .build(delegate::endpoint); | ||
| } | ||
|
|
||
| @Override | ||
| public EndpointChannel endpoint(Endpoint endpoint) { | ||
| return cache.get(endpoint); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "CachingEndpointChannelFactory{" + delegate + '}'; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,6 +79,12 @@ default int maxQueueSize() { | |
| return 100_000; | ||
| } | ||
|
|
||
| /** Maximum number of cached endpoint channels. Values {@code <= 0} disable caching. */ | ||
| @Value.Default | ||
| default int maxEndpointCacheSize() { | ||
|
Comment on lines
+82
to
+84
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be specific to sticky channels, but the javadoc doesn't surface that at all. Additionally, I don't think users can modify this config directly, so we may want to pipe it through |
||
| return 1_000; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on starting with this at 0 to keep parity with existing logic, and testing a different value in some prod environment, before changing the default? I also wonder whether 1000 is enough/too much. |
||
| } | ||
|
|
||
| OptionalInt overrideSingleHostIndex(); | ||
|
|
||
| @Value.Default | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,28 +33,34 @@ | |
| import javax.annotation.concurrent.ThreadSafe; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| final class StickyEndpointChannels2 implements Supplier<Channel> { | ||
| final class StickyEndpointChannels2 implements StickyEndpointChannelsFactory { | ||
|
|
||
| private final Supplier<EndpointChannelFactory> delegate; | ||
| private final Supplier<Channel> queueOverrideSupplier; | ||
| private final EndpointChannelFactory delegate; | ||
|
|
||
| StickyEndpointChannels2(Supplier<EndpointChannelFactory> endpointChannelFactory) { | ||
| StickyEndpointChannels2(Supplier<Channel> queueOverrideSupplier, EndpointChannelFactory endpointChannelFactory) { | ||
| this.queueOverrideSupplier = queueOverrideSupplier; | ||
| this.delegate = endpointChannelFactory; | ||
| } | ||
|
|
||
| @Override | ||
| public Channel get() { | ||
| return new StickyChannel2(delegate.get()); | ||
| public Channel stickyChannel() { | ||
| return new StickyChannel2(queueOverrideSupplier, delegate); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "StickyEndpointChannels2{" + delegate + "}"; | ||
| } | ||
|
|
||
| static Supplier<Channel> create(Config cf, LimitedChannel nodeSelectionChannel, EndpointChannelFactory delegate) { | ||
| static StickyEndpointChannelsFactory create( | ||
| Config cf, LimitedChannel nodeSelectionChannel, EndpointChannelFactory delegate) { | ||
| Supplier<Channel> queueOverrideSupplier = new QueueOverrideSupplier(cf, nodeSelectionChannel); | ||
| return new StickyEndpointChannels2( | ||
| new StickyEndpointChannels2EndpointFactorySupplier(queueOverrideSupplier, delegate)); | ||
| int maxEndpointCacheSize = cf.maxEndpointCacheSize(); | ||
| EndpointChannelFactory channelFactory = (maxEndpointCacheSize <= 0) | ||
| ? delegate | ||
| : new CachingEndpointChannelFactory(delegate, maxEndpointCacheSize); | ||
|
Comment on lines
+59
to
+62
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels a bit like overkill to me. If we want to keep this, I'd consider making the |
||
| return new StickyEndpointChannels2(queueOverrideSupplier, channelFactory); | ||
| } | ||
|
|
||
| private static final class QueueOverrideSupplier implements Supplier<Channel> { | ||
|
|
@@ -81,43 +87,25 @@ public Channel get() { | |
| } | ||
| } | ||
|
|
||
| private static final class StickyEndpointChannels2EndpointFactorySupplier | ||
| implements Supplier<EndpointChannelFactory> { | ||
|
|
||
| private final Supplier<Channel> queueOverrideSupplier; | ||
| private final EndpointChannelFactory delegate; | ||
|
|
||
| StickyEndpointChannels2EndpointFactorySupplier( | ||
| Supplier<Channel> queueOverrideSupplier, EndpointChannelFactory delegate) { | ||
| this.queueOverrideSupplier = queueOverrideSupplier; | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public EndpointChannelFactory get() { | ||
| Channel queueOverride = queueOverrideSupplier.get(); | ||
| return endpoint -> { | ||
| EndpointChannel endpointChannel = delegate.endpoint(endpoint); | ||
| return (EndpointChannel) request -> { | ||
| QueueAttachments.setQueueOverride(request, queueOverride); | ||
| return endpointChannel.execute(request); | ||
| }; | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| private static final class StickyChannel2 implements EndpointChannelFactory, Channel { | ||
| private static final class StickyChannel2 implements Channel, EndpointChannelFactory { | ||
|
|
||
| private final Channel queueOverride; | ||
| private final EndpointChannelFactory channelFactory; | ||
| private final StickyRouter router = new StickyRouter(); | ||
|
|
||
| private StickyChannel2(EndpointChannelFactory channelFactory) { | ||
| private StickyChannel2(Supplier<Channel> queueOverrideSupplier, EndpointChannelFactory channelFactory) { | ||
| this.queueOverride = queueOverrideSupplier.get(); | ||
| this.channelFactory = channelFactory; | ||
| } | ||
|
|
||
| @Override | ||
| public EndpointChannel endpoint(Endpoint endpoint) { | ||
| return new StickyEndpointChannel(router, channelFactory.endpoint(endpoint)); | ||
| EndpointChannel endpointChannel = channelFactory.endpoint(endpoint); | ||
| EndpointChannel endpointWithQueueOverride = innerRequest -> { | ||
| QueueAttachments.setQueueOverride(innerRequest, queueOverride); | ||
| return endpointChannel.execute(innerRequest); | ||
| }; | ||
| return request -> router.execute(request, endpointWithQueueOverride); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -240,24 +228,4 @@ private static ListenableFuture<Response> executeWithStickyTarget( | |
| return endpointChannel.execute(request); | ||
| } | ||
| } | ||
|
|
||
| private static final class StickyEndpointChannel implements EndpointChannel { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can we keep this? Having non-anonymous classes is helpful for telemetry/debugging because it has a nice name in stacktraces and has a |
||
| private final StickyRouter stickyRouter; | ||
| private final EndpointChannel delegate; | ||
|
|
||
| StickyEndpointChannel(StickyRouter stickyRouter, EndpointChannel delegate) { | ||
| this.stickyRouter = stickyRouter; | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public ListenableFuture<Response> execute(Request request) { | ||
| return stickyRouter.execute(request, delegate); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "StickyEndpointChannel{delegate=" + delegate + '}'; | ||
| } | ||
|
Comment on lines
-258
to
-261
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seeing this makes me wonder whether anybody may have been logging the channel, as a way to debug stuff, and whether the loss of this override may cause debugging pain (since you're now returning |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,23 @@ | ||||||||||||||||||||||
| /* | ||||||||||||||||||||||
| * (c) Copyright 2025 Palantir Technologies Inc. All rights reserved. | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| package com.palantir.dialogue.core; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import com.palantir.dialogue.Channel; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public interface StickyEndpointChannelsFactory { | ||||||||||||||||||||||
|
Comment on lines
+17
to
+21
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change the package and make the name non-plural to be consistent with Also this returns
Suggested change
|
||||||||||||||||||||||
| Channel stickyChannel(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side-note: it may be worth starting to use https://github.com/palantir/cache for new caches like this (which iirc gives us instrumentation by default, which could be interesting to have regardless, since that would help identify cases where the cache size may be inappropriate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That library doesn't support weak values (and I'm not sure I want to support weak values there). Using Caffeine feels more appropriate here.