Skip to content

[do-not-merge] Queue timeout#2926

Open
mpritham wants to merge 14 commits into
developfrom
pmarupaka/queue-timeouts-proto
Open

[do-not-merge] Queue timeout#2926
mpritham wants to merge 14 commits into
developfrom
pmarupaka/queue-timeouts-proto

Conversation

@mpritham
Copy link
Copy Markdown
Contributor

@mpritham mpritham commented Mar 27, 2026

Current state

When a Dialogue client submits a request, it may be queued waiting for a concurrency permit without a timeout. On the blocking path, this manifests as callers stuck with no way to time out. On the async path, the ListenableFuture returned to the caller doesn’t complete.

There are 2 related concepts in Dialogue:

  1. read timeout - applies only to requests on the wire, after they've completed queueing and acquired a permit from concurrency limiters
  2. deadlines - propagated from upstream callers, checked in the DeadlineAdvertisementChannel which sits below the host-level concurrency limiter in the per-host pipeline.

Neither bounds the time a request spends waiting in the queue. A request waiting for a concurrency permit has not yet reached the wire (so read timeout hasn't started) and has not yet reached DeadlineAdvertisementChannel (so deadline expiry isn't checked).

Under sustained load where all hosts' AIMD limits are saturated, requests can queue for a long time.

Queue timeouts

It'd be useful to have requests in the queues (channel and endpoint) timeout after a certain duration: clients will have predictable upper bounds on the time spent waiting for a response.

To be more precise:

  • Per-client, disabled by default via Optional queueTimeout() in Config
  • Proactive eviction using per-request ScheduledFuture tasks on a dedicated single-threaded ScheduledExecutorService
  • Shared timeout budget across both queue layers (channel-level and endpoint-level) via a request attachment (QueueTimeoutAttachments)
  • Timeout cancelled on dispatch (when a request leaves the queue, its ScheduledFuture is cancelled to avoid wasting work on already-dispatched requests)
  • Span and timer eagerly closed on timeout via IdempotentTimerContext wrapper (makes timer.stop() safe to call multiple times)
  • Not retried: SafeRuntimeException is not IOException, so RetryingChannel won't retry
    • if a request is retried (for some other reason: e.g. 429), the request will be re-queued and a fresh queue timeout will be applied
  • New metric: dialogue.client.request.queue.timeout (counter, tagged by channel-name)

Which queues does this apply to?

Dialogue has two layers of queues:

  1. Channel queue (created here). One per DialogueChannel. Requests wait for any host to have capacity. When a request is dequeud, the request goes through node selection fresh and can land on any host.
  2. Endpoint queue (created here when concurrency limiting is enabled). One per (host, endpoint) pair. It sits below the host-level concurrency limiter. Requests have already been assigned to a specific host and are waiting for that endpoint's concurrency permit on that host. A request in this queue holds a host-level concurrency permit.

A request is never in both queues simultaneously. When a host-level concurrency limit is acquired, the request can transition to the endpoint queue (if there is no permits available for the endpoint on the host).

The same queue timeout is shared between both queues.

Downsides

There is added memory usage: one new ScheduledFuture is created per enqueued request. (todo: use JOL to find how many bytes exactly).

Test plan

I would like to cut an RC and experiment on a real service to see the behavior in production end-to-end before merging.

==CHANGELOG_MSG==
Add configurable queue timeout to QueuedChannel that proactively fails requests waiting for concurrency permits after a configured duration, preventing unbounded queueing under load.
==CHANGELOG_MSG==

@changelog-app
Copy link
Copy Markdown

changelog-app Bot commented Mar 27, 2026

Generate changelog in changelog/@unreleased

Type (Select exactly one)

  • Feature (Adding new functionality)
  • Improvement (Improving existing functionality)
  • Fix (Fixing an issue with existing functionality)
  • Break (Creating a new major version by breaking public APIs)
  • Deprecation (Removing functionality in a non-breaking way)
  • Migration (Automatically moving data/functionality to a new system)

Description

Add configurable queue timeout to QueuedChannel that proactively fails requests waiting for concurrency permits after a configured duration, preventing unbounded queueing under load.

Check the box to generate changelog(s)

  • Generate changelog entry

@mpritham mpritham changed the title [do-not-merge] [discussion] Queue timeouts [do-not-merge] [discussion] Queue timeouts - scheduled timeout approach Apr 2, 2026
@mpritham mpritham force-pushed the pmarupaka/queue-timeouts-proto branch 8 times, most recently from 6c2176c to 8e4e7f4 Compare April 13, 2026 21:08

SettableFuture<Response> responseFuture = SettableFuture.create();
DetachedSpan span = DetachedSpan.start("Dialogue-request-enqueued");
IdempotentTimerContext timer = new IdempotentTimerContext(queuedTime.time());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of creating this thin wrapper around the timer is so that we are able to call .stop() both when the timeout task runs and when isDone() is checked in scheduleNextTask. Some alternatives I considered were to have timer.stop() run only once by either 1) adding a direct callback to the request future which closes the span and stops the timer removing the logic in isDone that does the same thing, 2) attempting to read the exception on the future in isDone before calling stop() on the timer.

(1) does not work: the time measured will include the time on the wire which we do not want.
(2) seems clunky: we will need to call get() on the future and catch an exception which we probably do not want to do on the hot path here.

@mpritham mpritham changed the title [do-not-merge] [discussion] Queue timeouts - scheduled timeout approach [do-not-merge] Queue timeout Apr 14, 2026
@mpritham mpritham marked this pull request as ready for review April 14, 2026 03:10
}

@Value.Default
default ScheduledExecutorService queueTimeoutScheduler() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why's this in Config.java? Could it instead be in QueuedChannel.java?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we could only instantiate it if queueTimeout is non-empty

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it could be in QueuedChannel.java. I was following the same pattern used for the RetryingChannel.sharedScheduler. In theory, putting it in the config allows tests to pass in a custom scheduler but I didn't add any such tests that use a custom scheduler.

No strong feeling here. Will update.

return expirationNanos;
}

@com.google.common.annotations.VisibleForTesting
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we import these 2 instead of fully qualifying

SettableFuture<Response> responseFuture = SettableFuture.create();
DetachedSpan span = DetachedSpan.start("Dialogue-request-enqueued");
IdempotentTimerContext timer = new IdempotentTimerContext(queuedTime.time());
@Nullable ScheduledFuture<?> timeoutFuture = scheduleQueueTimeout(request, responseFuture, span, timer);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a

    if (responseFuture.isDone()) {
        return Optional.of(responseFuture);
    }

immediately after running scheduleQueueTimeout? Would prevent it from creating a DeferredCall if the request has already timed out (eg due to having already gone through the first queue)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea.

Comment on lines +395 to +396
// Clear the queue timeout expiration so the retry gets a fresh queue timeout.
QueueTimeoutAttachments.clearExpiration(request);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that RetryingChannel needs to know about queue timeouts. The alternative approach I thought of was worse:

  • when reading the timeout attachment in QueuedChannel, consider the queueType to decide whether to overwrite or read the existing expiration.
  • The channel-level queue and sticky queue are "entry points" (retries re-enter them), so they'd always stamp fresh. The endpoint queue is downstream, so it'd read the remaining budget.

This works but requires QueuedChannel to know its role in the pipeline, which is clunky and breaks down if the pipeline ever changes (e.g. a new queue type is added without updating the check).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably due to how complex our layering of the various channels is. Having multiple queues doesn't help either. I agree that this is probably the simplest, though it may be interesting to have an ete test for this behavior, as this seems easy to break through a refactor

private final AtomicInteger inFlight = new AtomicInteger();

// The timeout budget is shared across both queue layers via a Request attachment (see QueueTimeoutAttachments).
private final OptionalLong queueTimeoutNanos;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an OptionalLong while the queue timeout in the config is Optional<Duration>. Duration is user-friendly and using an OptionalLong avoids the call to toNanos() on duration 1x/queued request.


Optional<HostEventsSink> hostEventsSink();

Optional<Duration> queueTimeout();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow here is a bit confusing and WIP. There are two related changes in other repos: 1, 2

ReloadingClientFactory.withQueueTimeout(30s)
params.withQueueTimeout(30s) (stored on ReloadingParams which extends AugmentClientConfig)

When a client is created:
ChannelCache line 212: AugmentClientConfig.getClientConf(serviceConf, request)
builder.queueTimeout(30s) (from augment, line 93)
→ returns ClientConfiguration with queueTimeout=30s
ChannelCache line 178: ClientConfiguration.builder().from(apacheClient.conf())
→ copies queueTimeout=30s into the new ClientConfiguration
DialogueChannel.builder().clientConfiguration(conf)
Config.rawConfig() = conf
Config.queueTimeout() = rawConfig().queueTimeout() = Optional.of(30s)
QueuedChannel.create(cf, delegate)
cf.queueTimeout() = Optional.of(30s)
→ timeout enabled

Comment on lines +36 to +42
/**
* Clears the expiration attachment by setting it to a sentinel value that {@link #setExpirationIfAbsent} treats as
* absent.
*/
static void clearExpiration(Request request) {
request.attachments().put(QUEUE_EXPIRATION_NANOS, CLEARED_SENTINEL);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not allowing to remove attachments? It seems like this would make this code simpler

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #2957. Initially I had a slight preference for not supporting remove because it would force every user of Attachments to reason about the case where an attachment is removed, but I don't think this is a huge deal.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there places we'd need to update right now, so they don't break in the future for a removed attachment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? None of the other attachments are ever removed. In the future, if they are, the author will need to reason about the impact of the removal and audit the pipeline. We don't need to think through that/update anything now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about places that may consider absent as "never present" rather than "potentially removed", and whether that may be a concern, but I'm fine putting the onus on the person starting to remove attachments

Comment on lines +46 to +50
Long existing = request.attachments().getOrDefault(QUEUE_EXPIRATION_NANOS, null);
if (existing != null && existing != CLEARED_SENTINEL) {
return existing;
}
request.attachments().put(QUEUE_EXPIRATION_NANOS, expirationNanos);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is technically a race condition here where the attachment would have been updated in between. This is probably not a huge deal, but I'm wondering whether we should add putIfAbsent to RequestAttachments and Attachments (since it's ultimately backed by a ConcurrentHashMap)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #2957

Comment on lines 392 to 395
cancelTimeoutFuture(queueHead);
decrementQueueSize();
queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
queueHead.timer().stop();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 4 places that seem to all do this same block, or something very close to it. Would it make sense to extract it to a dedicated method?

Comment on lines +410 to +411
// Request is leaving the queue — cancel the timeout task.
cancelTimeoutFuture(queueHead);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mention it is possible to have a race condition where we dispatch the request, but the response is discarded. Can't we cancel the timeout future before dispatching?
Since the timeout is for bounding the queued duration, it would make sense to order operations such as:

  • Pop the queue head
  • Cancel the queue head timeout
  • Check if it was done already
    That way, it can't be timed out after we have already checked (and since we popped it from the queue, it shouldn't accept a timeout anymore at that point)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mention it is possible to have a race condition where we dispatch the request, but the response is discarded.

Yeah. This happens because the current ordering is isDone() -> delegate.execute -> cancel timeout if delegate.execute successfully got a concurrency permit (otherwise re-queue the request with the timeout unchanged)

The race is when the timeout runs while delegate.execute is running but has not returned: the blocking logic that runs here is the acquisition of a concurrency permit (a CAS):

@Override
public Optional<ListenableFuture<Response>> maybeExecute(
Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) {
@Nullable Permit maybePermit = limiter.acquire(limitEnforcement);
if (maybePermit != null) {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit permit = maybePermit;
logPermitAcquired();
ListenableFuture<Response> result = delegate.execute(endpoint, request);
DialogueFutures.addDirectCallback(result, permit);
return Optional.of(result);
} else {
logPermitRefused();
return Optional.empty();
}
}

FWIW I think this race will be rare.

Can't we cancel the timeout future before dispatching?

Yes good idea. I agree this better represents that the queue timeout is strictly counted against the time the request is in the queue. Updated the logic.

I think one trade-off here is that, when the delegate responds with Optional.empty (i.e. we are not able to get a concurrency permit), we need to re-queue the request (and re-schedule the same timeout). The way I've written it requires an allocation of a DeferredCall each time delegate.execute returns Optional.empty. This code is also a bit more complex.

I think it's fine to have the extra short-lived allocation + complexity here for the sake of correctness.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that correctness seems worth it, though I didn't realize the race condition was only about acquiring the permit, and not waiting for the response.

@mpritham mpritham force-pushed the pmarupaka/queue-timeouts-proto branch from 4f51791 to 76ae003 Compare April 16, 2026 19:12
Copy link
Copy Markdown
Contributor

@aldexis aldexis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have various comments and questions, but approving for RC

Comment on lines +85 to +88
@VisibleForTesting
QueuedChannel getMultiHostQueuedChannelForTesting() {
return multiHostQueuedChannel;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we're only using this to get access to the queue size in tests. Why are we not relying on the metrics instead?

Comment on lines +85 to +94
// Wait for timeout
Thread.sleep(QUEUE_TIMEOUT.toMillis() + 100);

for (int i = 0; i < 5; i++) {
assertThat(queued[i]).isDone();
assertThat(queued[i])
.failsWithin(Duration.ZERO)
.withThrowableThat()
.withMessageContaining("queue timeout");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a race condition here which could make the test flaky? How about waiting up to twice the timeout using awaitility (so we don't wait longer than needed, but always wait enough)?

Comment on lines +98 to +100
// The queue should now be empty because scheduleNextTask will clean up the queue entries that have timed
// out
assertThat(outerQueue.getQueueSizeForTesting()).isEqualTo(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made me realize that the queue size metrics will look larger than necessary when there are expired requests in there.

Do you think it would make sense to proactively decrement the queue size counter when we timeout? (and check when we pop whether it was already removed)

private static final String HOST_A = "http://hostA:8080";
private static final String HOST_B = "http://hostB:8080";
private static final String HOST_C = "http://hostC:8080";
private static final Duration QUEUE_TIMEOUT = Duration.ofSeconds(5);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds feels a bit long if it's going to be applied to a number of the below tests 🤔 Maybe one or two seconds?

Comment on lines +133 to +139
// Wait past the original timeout
Thread.sleep(QUEUE_TIMEOUT.toMillis() + 100);

// The dispatched request should NOT have timed out — timeout was cancelled on dispatch
assertThat(queued)
.as("Timeout should have been cancelled on dispatch")
.isNotDone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the same reason as above, this could pass even if the timeout wasn't cancelled, since it may end up taking a bit longer. Unfortunately, I'm not sure how to have better guarantees. I'd probably just add a bit more delay past the expected timeout (and maybe reduce the timeout as well?)

Comment on lines +586 to +588
// Scheduler is required when timeout is configured. The 10-hour timeout means scheduled tasks won't fire
// during the test, and we call failWithQueueTimeout directly.
scheduler = Executors.newSingleThreadScheduledExecutor();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a better executor service that you can control granularly? (e.g. something made specifically for tests)
This would also avoid having to fake the timeout behavior with the below simulateTimeout

I see DeterministicScheduler from jmock used in various places (at least internally)

Comment on lines +646 to +657
// Pop the first request from the queue which was used to set inFlight > 0.
delegate.lastDispatched();
SettableFuture<Response> wireFuture = delegate.lastDispatched();
assertThat(wireFuture).isNotNull();

// callerFuture is now failed with timeout, but ForwardAndSchedule will
// close the response when the wire completes
TestResponse wireResponse = new TestResponse().code(200);
wireFuture.set(wireResponse);
assertThat(wireResponse.isClosed())
.as("Response must be closed since caller future was already failed")
.isTrue();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused at how this is behaving. If we are dispatching the request first, then shouldn't callerFuture is now failed with timeout be false?

Comment on lines +697 to +698
assertThat(instrumentation.requestsQueued().getCount()).isEqualTo(1);
assertThat(callerFuture).isNotDone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some test that verifies the eventual timeout stays the same in this scenario? (i.e. we aren't resetting the queue timeout)

Comment on lines +884 to +886
assertThat(future2)
.as("Second request should have been dispatched, not timed out")
.isNotDone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we could test that future2 is actually in-flight, rather than just not done? We're testing the same state for future2 as before calling queuedChannel.schedule();

Comment on lines +395 to +396
// Clear the queue timeout expiration so the retry gets a fresh queue timeout.
QueueTimeoutAttachments.clearExpiration(request);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably due to how complex our layering of the various channels is. Having multiple queues doesn't help either. I agree that this is probably the simplest, though it may be interesting to have an ete test for this behavior, as this seems easy to break through a refactor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants