Skip to content

Conversation

@khroolick
Copy link

@khroolick khroolick commented Jan 8, 2026

Fixes #44228, #47508; Related to #28235

Description

Fix memory leak in ReactorConnection.getShutdownSignals()

What's Happening

Our production servers experienced OOM restarts during the Christmas break.
Why, you might ask?
25 instances of reactor.core.publisher.SinkOneMulticast, loaded by Module Class Loader for module xxx occupy 1,033,659,832 bytes

image small accumulating objects

image GC root path

Reasoning

The getShutdownSignals() method returns a Flux that emits when the AMQP connection shuts down. Multiple components subscribe to this signal: ReactorSender, ReactorReceiver, RequestResponseChannel, and ReactorSession.

The issue is that each call to getShutdownSignals() creates a new cache() wrapper. The cache() operator maintains internal state to replay values to late subscribers, which means it holds references to all its subscribers. Since multiple components call this method throughout the connection's lifetime, memory accumulates with each subscription and cannot be garbage collected until the connection closes.

In heap dumps, this manifests as excessive SinkOneMulticast or MonoCacheTime instances consuming significant memory (#44228).

Why cache() Was Originally Added

Looking at commit a028adc (March 2021), the cache() was introduced during the migration from DirectProcessor to Sinks.One. The intent was likely to ensure late subscribers could still receive the shutdown signal after it was emitted - a reasonable concern when changing reactive patterns.

Why cache() Is Actually Unnecessary
Here's the good news: Sinks.One.asMono() already provides replay semantics for its single value. Late subscribers automatically receive the emitted value without needing cache(). The operator was redundant from day one.

This was actually identified back in 2022 in #28235 by @anuchandy:

"The getShutdownSignals() API uses cache() operator. We don't have to use the cache operator here, Sink.One is capable of remembering the last signal and replaying it. While cache() doesn't directly contribute to any leak, we could remove it and save allocations."

Impact

Functional behavior: No change. Late subscribers will still receive the shutdown signal because Sinks.One handles this internally.

Memory: Significant improvement for long-lived connections. All subscribers now share the same underlying sink view instead of creating separate caching wrappers that retain subscriber references.

Performance: Slight improvement - no overhead from cache wrapper creation and management.

Reproduction

A reproduction tool demonstrating the memory leak is available at: https://github.com/khroolick/servicebus-memleak-repro

Proof of Fix Effectiveness

image HeapSize of processor jvm from reproduction tool before and after applying this change

References

#44228 (Open) - "ASB sdk consumers suspect of memory leak" - Reports SinkOneMulticast consuming 66% of heap
#47508 (Open) - "Azure Service Bus SDK - Possible Memory Leak" - Same symptoms with sessionProcessor
#28235 (Closed) - The original investigation that identified cache() as unnecessary but the fix was never applied

Copilot AI review requested due to automatic review settings January 8, 2026 18:44
@khroolick khroolick requested review from a team, anuchandy and conniey as code owners January 8, 2026 18:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a memory leak in ReactorConnection.getShutdownSignals() by removing an unnecessary cache() operator that was creating multiple wrapper instances and retaining subscriber references indefinitely.

Key Changes:

  • Removed the redundant cache() operator from the shutdown signal flux chain, as Sinks.One.asMono() already provides replay semantics for late subscribers
  • This eliminates memory accumulation from multiple subscription wrappers that persisted throughout the connection lifetime

@Override
public Flux<AmqpShutdownSignal> getShutdownSignals() {
return shutdownSignalSink.asMono().cache().flux();
return shutdownSignalSink.asMono().flux();
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change fixes a memory leak by removing cache(), but there should be test coverage verifying that late subscribers still receive the shutdown signal. Add a test that subscribes to getShutdownSignals() after the shutdown signal has been emitted to confirm Sinks.One's replay behavior works as expected without cache().

Copilot uses AI. Check for mistakes.
@github-actions github-actions bot added Azure.Core.AMQP azure-core-amqp Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. labels Jan 8, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

Thank you for your contribution @khroolick! We will review the pull request and get back to you soon.

Copy link
Member

@conniey conniey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for this fix. Could you add a CHANGELOG entry please? :)

https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/CHANGELOG.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Azure.Core.AMQP azure-core-amqp Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ASB sdk consumers suspect of memory leak

2 participants