Skip to content

Conversation

@liorsve
Copy link
Contributor

@liorsve liorsve commented Dec 31, 2025

This commit's main goal -

  1. Add the actual pubsub synchronizer implementation.
  2. remove the existing pubsub logic (using setup_connection, passed through the cluster_params).
  3. Unify tests - refactored all existing pubsub tests in python to also check dynamic subscription and added more test cases in python. Also added rust tests to verify behavior during slot migration.

In addition, the following is updated/fixed -

  1. Changed subscription command request types - SUBSCRIBE_LAZY becomes SUBSCRIBE (non blocking variation), SUBSCRIBE becomes SUBSCRIBE_BLOCKING (blocking variation)
  2. Moved synchronizer applier setting to the constructor (no 2 step initialization as before).
  3. Synchronizer Trait changes - removed set_initial_subscriptions, was not needed.
  4. Added a python --mock-pubsub flag to note tests that require real server connection (like those doing CLIENT KILL)
  5. mock changes - added other pubsub command support.
  6. Fixed Node tests cleanup - some node tests rely solely on unsubscription during cleanup for test isolation. Fixed the cleanup to ensure unsubscription between tests (formerly they were using custom-command UNSUBSCRIBE, which is now decided to be lazy).

Issue link

This Pull Request is linked to issue: #5098 #4919

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

@liorsve liorsve force-pushed the add-synchonizer-impl branch 2 times, most recently from 5a80756 to ef23149 Compare December 31, 2025 15:30
@liorsve liorsve force-pushed the add-synchonizer-impl branch 9 times, most recently from 1bab7bf to bfa1e4f Compare January 1, 2026 16:24
@liorsve liorsve force-pushed the add-synchonizer-impl branch from bfa1e4f to 5d6887e Compare January 1, 2026 19:03
@yipin-chen yipin-chen requested review from alexr-bq and jduo January 2, 2026 23:50
@liorsve liorsve marked this pull request as ready for review January 4, 2026 08:06
@liorsve liorsve requested a review from a team as a code owner January 4, 2026 08:06
@liorsve liorsve force-pushed the add-synchonizer-impl branch from f48d8b6 to c5076d0 Compare January 4, 2026 12:11
Signed-off-by: Lior Sventitzky <[email protected]>
@liorsve liorsve force-pushed the add-synchonizer-impl branch from c5076d0 to 93b0d91 Compare January 4, 2026 12:33
Signed-off-by: Lior Sventitzky <[email protected]>
@liorsve liorsve force-pushed the add-synchonizer-impl branch from 33576e5 to f50f72a Compare January 6, 2026 09:39
let mut params: ClusterParams = params.clone();
let params: ClusterParams = params.clone();
let glide_connection_options = glide_connection_options.clone();
// set subscriptions to none, they will be applied upon the topology discovery
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment

let connections = stream::iter(initial_nodes.iter().cloned())
.map(|(node_addr, socket_addr)| {
let mut params: ClusterParams = params.clone();
let params: ClusterParams = params.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets ensure we are reverting all the unnesesary mods that were introduced in the original pubsub impl
amazon-contributing/redis-rs@b36c959...cb81fb7#diff-5ddfd8b2718c21504d43c50b0490bc22de4189208d97f5604a2297124e529577

.await;
}
in_progress.store(false, Ordering::Relaxed);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to be able to update the active subscriptions both on the pasive and active disconnect. The primary candidate for sourcing event for subsctiption removal is the desctructor of MultiplexedConnection (only user type)

We need also to pay attention to the order of the events. e.i. when we remove the active subscription and when we add it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before that, we need to ensure that the current implementation with the passive notifier suffice for this situation:

We remove node from the connection map BUT reconcile is not triggered

Copy link
Contributor Author

@liorsve liorsve Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to PipelineSink::drop (not MultiplexedConnection::drop - it's clonable, it's possible that one would drop but the TCP would remain alive on other clones). I fixed the management connection to never be set with the synchronizer, so drop there won't trigger subscription cleanup. And I removed the cleanup for both cluster and standalone, since this covers both. Since PiplelineSink struct is defined in a pin_project! macro, I had to put it inside with impl<T> PinnedDrop for PipelineSink<T>.

I tried to trace whether there are places where connections are actively dropped without passing through trigger_refresh_connection_tasks (the previous location of remove_subscriptions_for_addresses, always triggered on passive disconnect bc of the disconnect notifier):

  1. refresh_slots_inner - Replaces the entire ConnectionsContainer, which drops old connections (ones that aren't responsible for slots anymore). However, it already calls handle_topology_refresh for cleanup.
  2. Client shutdown - But we don't particularly care about cleanup in this case because the synchronizer is dropped anyway.

So in theory we'd be covered even if we kept it in trigger_refresh_connection_tasks.
Why move it to PipelineSink::drop anyway?

  1. No inconsistency window - With the previous approach, there's a gap between when the connection dies and when validate_all_user_connections detects is_closed(). During this window, our state thinks we're subscribed but we're not.
  2. Cleanup is tied to actual connection lifetime, not to explicit refresh calls.
  3. Future-proof - Any new code path that drops a connection will automatically get cleanup, without needing to remember to add the call.

/// Wait for initial subscriptions (set via config) to be synchronized.
/// Returns Ok(()) when all desired subscriptions are established, or error on timeout.
/// Default implementation returns immediately (no-op for clients without initial subscriptions).
async fn wait_for_initial_sync(&self, _timeout_ms: u64) -> RedisResult<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not to have it specificaly for "initial" subs?

// to channels provided via config. Keeping the guard would cause a deadlock. We wait
// for the subscription here to ensure the lazy client is subscribed immediately upon creation.
drop(guard);
if let Err(e) = self.pubsub_synchronizer.wait_for_initial_sync(1000).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 1000? I think it should use the budget of connection timeout - verify me


if !is_lazy {
pubsub_synchronizer.trigger_reconciliation();
if let Err(e) = pubsub_synchronizer.wait_for_initial_sync(2000).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2000?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not blocking?


let tls_mode = connection_request.tls_mode;
let node_count = connection_request.addresses.len();
// randomize pubsub nodes, maybe a batter option is to always use the primary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two issues, one potential and one probable:

probable - We use default route for sub/unsub commands, which is Primary. In case of a role change, the primary will change, sending unsubscribe command on a node on which we did not subscribe. A test requried to show this.

potential - double subscribe. In case the guard desired_subs == actual_subs fails, we might to subscribe again on a new primary

/// Real implementation of PubSub synchronizer (stub for now)
#[allow(dead_code)]
const LOCK_ERR: &str = "Lock poisoned";
const RECONCILIATION_INTERVAL: Duration = Duration::from_secs(5);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We dont know how it will behave, lets expose it in the client params.
  2. The cost should be low, why not to have it each 3 second?

}

impl GlidePubSubSynchronizer {
pub async fn create(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create and not new


/// Single pass: aggregate current subscriptions, compare with desired, return both + sync status
fn compute_sync_state(&self) -> SyncCheckResult {
let desired = self.desired_subscriptions.read().expect(LOCK_ERR).clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets try to compute the diffs here with minimal passes

}

let _ = GlideOpenTelemetry::record_subscription_out_of_sync();
log_info(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not info but warn, also desired (and by extenstion actual) can get very large

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove altogether

);
}

async fn send_command(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply_pubsub

}

/// Parse an address string into SingleNodeRoutingInfo::ByAddress
fn parse_address_to_routing(address: &str) -> RedisResult<SingleNodeRoutingInfo> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. check if we can key by ByAddress or long long
  2. dont we have an util func that does parse_address_to_routing ?

fn start_reconciliation_task(self: &Arc<Self>) {
// We use a weak ref to self here as an indication that the client has dropped and thus there
// is no longer a strong ref of the synchornizer and we should break from the loop
let sync_weak = Arc::downgrade(self);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why

};
match self.send_command(&mut cmd, routing).await {
Ok(_) => {
log_debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please go over all new logs

  1. consider removing
  2. consider making them cheap

});

for (slot, slot_channels) in channels_by_slot {
log_debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

}
}

fn format_subscription_info(info: &PubSubSubscriptionInfo) -> String {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?


fn remove_current_subscriptions_for_addresses(&self, _addresses: &HashSet<String>) {
// TODO: Implement
async fn wait_for_initial_sync(&self, timeout_ms: u64) -> RedisResult<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!

Copy link
Collaborator

@ikolomi ikolomi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving pending comments

data: &[Value],
address: Option<String>,
) {
logger_core::log_debug(
Copy link
Collaborator

@jduo jduo Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will execute all the formatting logic, string conversion, etc. even if logging is off. This happens throughout the core, not specifically in this PR. We should really add logging APIs that take in functions instead of raw strings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants