diff --git a/pom.xml b/pom.xml index 3dd3bf74..1218bef1 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ 3.5.1 - 2.1.1 + 2.3.0-rc2 6.6.0.RELEASE 6.0.0 2.11.1 diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/ConnectionUtils.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/ConnectionUtils.java index e8a794ba..829fca1b 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/ConnectionUtils.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/ConnectionUtils.java @@ -17,6 +17,7 @@ import io.valkey.springframework.data.valkey.connection.jedis.JedisConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; /** * Utilities for examining a {@link ValkeyConnection} @@ -27,7 +28,8 @@ public abstract class ConnectionUtils { public static boolean isAsync(ValkeyConnectionFactory connectionFactory) { - return (connectionFactory instanceof LettuceConnectionFactory); + return connectionFactory instanceof LettuceConnectionFactory + || connectionFactory instanceof ValkeyGlideConnectionFactory; } public static boolean isLettuce(ValkeyConnectionFactory connectionFactory) { diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClusterConnection.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClusterConnection.java index 9949c711..5cd82b08 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClusterConnection.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClusterConnection.java @@ -83,15 +83,31 @@ public class ValkeyGlideClusterConnection extends ValkeyGlideConnection implemen private final ValkeyGlideClusterSetCommands clusterSetCommands; public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter) { - this(clusterAdapter, null, Duration.ofMillis(100)); + this(clusterAdapter, null, Duration.ofMillis(100), null); } - public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory) { - this(clusterAdapter, factory, Duration.ofMillis(100)); + public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, + @Nullable ValkeyGlideConnectionFactory factory) { + this(clusterAdapter, factory, Duration.ofMillis(100), null); } - public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory, Duration cacheTimeout) { - super(clusterAdapter, factory); + public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, + @Nullable ValkeyGlideConnectionFactory factory, + @Nullable DelegatingPubSubListener pubSubListener) { + this(clusterAdapter, factory, Duration.ofMillis(100), pubSubListener); + } + + public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, + @Nullable ValkeyGlideConnectionFactory factory, + Duration cacheTimeout) { + this(clusterAdapter, factory, cacheTimeout, null); + } + + public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, + @Nullable ValkeyGlideConnectionFactory factory, + Duration cacheTimeout, + @Nullable DelegatingPubSubListener pubSubListener) { + super(clusterAdapter, factory, pubSubListener); Assert.notNull(cacheTimeout, "CacheTimeout must not be null!"); this.clusterAdapter = clusterAdapter; @@ -123,10 +139,6 @@ protected void cleanupConnectionState() { @SuppressWarnings("unchecked") Callable[] actions = new Callable[] { () -> nativeClient.customCommand(new String[]{"UNWATCH"}, SimpleMultiNodeRoute.ALL_NODES).get(), - // TODO: Uncomment when dynamic pubsub is implemented - // () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(), - // () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(), - // () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get() }; for (Callable action : actions) { diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnection.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnection.java index 3ee1d305..30f19827 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnection.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnection.java @@ -63,11 +63,17 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection { protected final UnifiedGlideClient unifiedClient; protected final @Nullable ValkeyGlideConnectionFactory factory; + /** + * Bridges Glide's callback mechanism (configured at client creation) with Spring's + * {@link MessageListener} (provided at subscribe time). The user's listener is set + * on this delegate when {@code subscribe()} is called, and cleared on connection close. + */ + protected final @Nullable DelegatingPubSubListener delegatingListener; private final AtomicBoolean closed = new AtomicBoolean(false); private final List> batchCommandsConverters = new ArrayList<>(); private final Set watchedKeys = new HashSet<>(); - private @Nullable Subscription subscription; + private volatile @Nullable ValkeyGlideSubscription subscription; // Command interfaces private final ValkeyGlideKeyCommands keyCommands; @@ -90,10 +96,24 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection { * @param factory the connection factory (optional, for pooling support) */ public ValkeyGlideConnection(UnifiedGlideClient unifiedClient, @Nullable ValkeyGlideConnectionFactory factory) { + this(unifiedClient, factory, null); + } + + /** + * Creates a new {@link ValkeyGlideConnection} with a unified client adapter and delegating pub/sub listener. + * + * @param unifiedClient unified client adapter (standalone or cluster) + * @param factory the connection factory (optional, for pooling support) + * @param delegatingListener the delegating pub/sub listener for callback-based message delivery + */ + public ValkeyGlideConnection(UnifiedGlideClient unifiedClient, + @Nullable ValkeyGlideConnectionFactory factory, + @Nullable DelegatingPubSubListener delegatingListener) { Assert.notNull(unifiedClient, "UnifiedClient must not be null"); this.unifiedClient = unifiedClient; this.factory = factory; + this.delegatingListener = delegatingListener; // Initialize command interfaces this.keyCommands = new ValkeyGlideKeyCommands(this); @@ -184,12 +204,22 @@ public ValkeyCommands commands() { public void close() throws DataAccessException { try { if (closed.compareAndSet(false, true)) { + // Close subscription first + ValkeyGlideSubscription sub = this.subscription; + if (sub != null) { + sub.close(); + this.subscription = null; + } + cleanupConnectionState(); - // Return client to pool instead of closing it - factory.releaseClient(unifiedClient.getNativeClient()); - if (subscription != null) { - subscription.close(); - subscription = null; + + if (delegatingListener != null) { + delegatingListener.clearListener(); + } + + // Return client to pool + if (factory != null) { + factory.releaseClient(unifiedClient.getNativeClient()); } } } catch (Exception ex) { @@ -212,10 +242,6 @@ protected void cleanupConnectionState() { @SuppressWarnings("unchecked") Callable[] actions = new Callable[] { () -> nativeClient.customCommand(new String[]{"UNWATCH"}).get(), - // TODO: Uncomment when dynamic pubsub is implemented - // () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}).get(), - // () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}).get(), - // () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get() }; for (Callable action : actions) { @@ -431,8 +457,26 @@ public void subscribe(MessageListener listener, byte[]... channels) { throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode"); } - // TODO: Implement dynamic subscription management when supported by valkey-glide - throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented"); + if (delegatingListener == null) { + throw new InvalidDataAccessApiUsageException( + "Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support."); + } + + try { + delegatingListener.setListener(listener); + + ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription( + listener, unifiedClient, delegatingListener); + this.subscription = glideSubscription; + glideSubscription.subscribe(channels); + + } catch (Exception ex) { + if (delegatingListener != null) { + delegatingListener.clearListener(); + } + this.subscription = null; + throw new ValkeyGlideExceptionConverter().convert(ex); + } } @Override @@ -449,9 +493,27 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) { if (isQueueing() || isPipelined()) { throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode"); } + + if (delegatingListener == null) { + throw new InvalidDataAccessApiUsageException( + "Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support."); + } - // TODO: Implement dynamic subscription management when supported by valkey-glide - throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented"); + try { + delegatingListener.setListener(listener); + + ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription( + listener, unifiedClient, delegatingListener); + this.subscription = glideSubscription; + glideSubscription.pSubscribe(patterns); + + } catch (Exception ex) { + if (delegatingListener != null) { + delegatingListener.clearListener(); + } + this.subscription = null; + throw new ValkeyGlideExceptionConverter().convert(ex); + } } @Override @@ -461,7 +523,8 @@ public Subscription getSubscription() { @Override public boolean isSubscribed() { - return subscription != null && subscription.isAlive(); + Subscription sub = this.subscription; + return sub != null && sub.isAlive(); } /** diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java index 9d81c83d..4198e1b1 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java @@ -41,6 +41,7 @@ // Imports for valkey-glide library import glide.api.GlideClient; import glide.api.GlideClusterClient; +import glide.api.models.GlideString; import glide.api.models.configuration.AdvancedGlideClientConfiguration; import glide.api.models.configuration.AdvancedGlideClusterClientConfiguration; import glide.api.models.configuration.BackoffStrategy; @@ -48,10 +49,15 @@ import glide.api.models.configuration.GlideClusterClientConfiguration; import glide.api.models.configuration.NodeAddress; import glide.api.models.configuration.ReadFrom; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration; +import glide.api.models.configuration.ClusterSubscriptionConfiguration; +import java.util.Map; +import java.util.Set; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; /** @@ -91,6 +97,23 @@ public class ValkeyGlideConnectionFactory private boolean earlyStartup = true; private int phase = 0; + /** + * Maps native Glide clients ({@link GlideClient} or {@link GlideClusterClient}) to their + * associated {@link DelegatingPubSubListener}. + * + *

This mapping is necessary because Glide requires pub/sub callbacks to be configured + * at client creation time, before any subscriptions exist. When a client is created and + * added to the pool, we also create a {@link DelegatingPubSubListener} and register it + * as the client's pub/sub callback. The actual {@link MessageListener} is set on the + * {@link DelegatingPubSubListener} later when {@code subscribe()} is called. + * + *

When a connection is obtained from the pool, we look up the corresponding + * {@link DelegatingPubSubListener} for that client and pass it to the + * {@link ValkeyGlideConnection}, which can then configure it with the user's + * {@link MessageListener} during subscription. + */ + private final Map clientListenerMap = new ConcurrentHashMap<>(); + /** * Constructs a new {@link ValkeyGlideConnectionFactory} instance with default settings. */ @@ -235,8 +258,11 @@ public ValkeyConnection getConnection() { client = createGlideClient(); } + // Get the listener associated with this client + DelegatingPubSubListener listener = clientListenerMap.get(client); + // Return a new connection wrapper around the pooled client - return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this); + return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this, listener); } @Override @@ -253,8 +279,11 @@ public ValkeyClusterConnection getClusterConnection() { client = createGlideClusterClient(); } + // Get the listener associated with this client + DelegatingPubSubListener listener = clientListenerMap.get(client); + // Return a new connection wrapper around the pooled cluster client - return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this); + return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this, listener); } @Override @@ -378,10 +407,26 @@ private GlideClient createGlideClient() { if (reconnectStrategy != null) { configBuilder.reconnectStrategy(reconnectStrategy); } + + // Pubsub listener + DelegatingPubSubListener clientListener = new DelegatingPubSubListener(); + + + // Configure pub/sub with callback for event-driven message delivery + var subConfigBuilder = StandaloneSubscriptionConfiguration.builder(); + // Set callback that delegates to our listener holder + subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context)); + configBuilder.subscriptionConfiguration(subConfigBuilder.build()); + // Build and create client GlideClientConfiguration config = configBuilder.build(); - return GlideClient.createClient(config).get(); + GlideClient client = GlideClient.createClient(config).get(); + + // Save the mapping of this client to its DelegatingListener + clientListenerMap.put(client, clientListener); + + return client; } catch (Exception e) { throw new IllegalStateException("Failed to create GlideClient: " + e.getMessage(), e); } @@ -477,10 +522,24 @@ private GlideClusterClient createGlideClusterClient() { configBuilder.reconnectStrategy(reconnectStrategy); } + + DelegatingPubSubListener clientListener = new DelegatingPubSubListener(); + + // Configure pub/sub with callback for event-driven message delivery + var subConfigBuilder = ClusterSubscriptionConfiguration.builder(); + + // Set callback that delegates to our listener holder + subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context)); + configBuilder.subscriptionConfiguration(subConfigBuilder.build()); + + // Build and create cluster client GlideClusterClientConfiguration config = configBuilder.build(); - return GlideClusterClient.createClient(config).get(); + GlideClusterClient client = GlideClusterClient.createClient(config).get(); + + clientListenerMap.put(client, clientListener); + return client; } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("Failed to create GlideClusterClient: " + e.getMessage(), e); } diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideDelegatingPubSubListener.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideDelegatingPubSubListener.java new file mode 100644 index 00000000..441c5683 --- /dev/null +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideDelegatingPubSubListener.java @@ -0,0 +1,68 @@ +/* + * Copyright 2011-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.valkey.springframework.data.valkey.connection.valkeyglide; + +import glide.api.models.PubSubMessage; +import glide.api.models.GlideString; + +import io.valkey.springframework.data.valkey.connection.DefaultMessage; +import io.valkey.springframework.data.valkey.connection.MessageListener; + +/** + * A delegating pub/sub listener that is configured at client creation time, + * with the actual listener set later when subscribe() is called. + */ +class DelegatingPubSubListener { + + private volatile MessageListener messageListener; + + /** + * Called by Glide when a pub/sub message arrives. + */ + void onMessage(PubSubMessage msg, Object context) { + MessageListener listener = this.messageListener; + if (listener != null && msg != null) { + byte[] channel = msg.getChannel().getBytes(); + byte[] body = msg.getMessage().getBytes(); + byte[] pattern = msg.getPattern() + .map(GlideString::getBytes) + .orElse(null); + + listener.onMessage(new DefaultMessage(channel, body), pattern); + + } + + } + + /** + * Set the actual listener when subscribe() is called. + */ + void setListener(MessageListener listener) { + this.messageListener = listener; + } + + /** + * Clear the listener when subscription closes. + */ + void clearListener() { + this.messageListener = null; + } + + + boolean hasListener() { + return messageListener != null; + } +} diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscription.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscription.java index ef9c6bed..061e088d 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscription.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscription.java @@ -15,145 +15,139 @@ */ package io.valkey.springframework.data.valkey.connection.valkeyglide; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import glide.api.models.GlideString; -import org.springframework.dao.DataAccessException; import io.valkey.springframework.data.valkey.connection.MessageListener; -import io.valkey.springframework.data.valkey.connection.Subscription; -import io.valkey.springframework.data.valkey.connection.Message; +import io.valkey.springframework.data.valkey.connection.SubscriptionListener; +import io.valkey.springframework.data.valkey.connection.util.AbstractSubscription; import org.springframework.util.Assert; /** * Implementation of {@link Subscription} using valkey-glide. * * @author Ilia Kolominsky - * @since 2.0 */ -public class ValkeyGlideSubscription implements Subscription { - - private final Object client; // Will be GlideClient in actual implementation - private final MessageListener listener; - private final AtomicBoolean active = new AtomicBoolean(true); - private final List channels = new ArrayList<>(); - private final List patterns = new ArrayList<>(); - - /** - * Create a new {@link ValkeyGlideSubscription} given a client and message listener. - * - * @param client the Valkey-Glide client - * @param listener the message listener - */ - public ValkeyGlideSubscription(Object client, MessageListener listener) { - Assert.notNull(client, "Client must not be null!"); - Assert.notNull(listener, "MessageListener must not be null!"); - +class ValkeyGlideSubscription extends AbstractSubscription { + + private final UnifiedGlideClient client; + private final DelegatingPubSubListener pubSubListener; + private final SubscriptionListener subscriptionListener; + + ValkeyGlideSubscription(MessageListener listener, UnifiedGlideClient client, + DelegatingPubSubListener pubSubListener) { + super(listener); + + Assert.notNull(client, "UnifiedGlideClient must not be null"); + Assert.notNull(pubSubListener, "DelegatingPubSubListener must not be null"); + this.client = client; - this.listener = listener; + this.pubSubListener = pubSubListener; + this.subscriptionListener = listener instanceof SubscriptionListener + ? (SubscriptionListener) listener + : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER; } @Override - public void subscribe(byte[]... channels) { - Assert.notNull(channels, "Channels must not be null!"); + protected void doSubscribe(byte[]... channels) { + sendPubsubCommand("SUBSCRIBE_BLOCKING", channels); - // In a real implementation, this would use the valkey-glide client - // to subscribe to the specified channels for (byte[] channel : channels) { - if (channel == null) { - throw new IllegalArgumentException("Channel must not be null!"); - } - - // Add channels to tracking list - this.channels.add(channel); + subscriptionListener.onChannelSubscribed(channel, getChannels().size()); } } @Override - public void pSubscribe(byte[]... patterns) { - Assert.notNull(patterns, "Patterns must not be null!"); + protected void doPsubscribe(byte[]... patterns) { + sendPubsubCommand("PSUBSCRIBE_BLOCKING", patterns); - // In a real implementation, this would use the valkey-glide client - // to pattern-subscribe to the specified patterns for (byte[] pattern : patterns) { - if (pattern == null) { - throw new IllegalArgumentException("Pattern must not be null!"); - } - - // Add pattern to tracking list - this.patterns.add(pattern); + subscriptionListener.onPatternSubscribed(pattern, getPatterns().size()); } } @Override - public boolean isAlive() { - return active.get(); - } + protected void doUnsubscribe(boolean all, byte[]... channels) { + byte[][] toNotify; + + if (all) { + toNotify = getChannels().toArray(new byte[0][]); + sendPubsubCommand("UNSUBSCRIBE_BLOCKING"); + } else { + toNotify = channels; + sendPubsubCommand("UNSUBSCRIBE_BLOCKING", channels); + } - @Override - public void close() { - if (active.compareAndSet(true, false)) { - // In a real implementation, this would unsubscribe from all channels and patterns - // using the valkey-glide client - channels.clear(); - patterns.clear(); + for (byte[] channel : toNotify) { + subscriptionListener.onChannelUnsubscribed(channel, getChannels().size()); } } @Override - public void unsubscribe() { - if (active.get()) { - // In a real implementation, this would unsubscribe from all channels - // using the valkey-glide client - channels.clear(); + protected void doPUnsubscribe(boolean all, byte[]... patterns) { + byte[][] toNotify; + + if (all) { + toNotify = getPatterns().toArray(new byte[0][]); + sendPubsubCommand("PUNSUBSCRIBE_BLOCKING"); + } else { + toNotify = patterns; + sendPubsubCommand("PUNSUBSCRIBE_BLOCKING", patterns); } - } - @Override - public void pUnsubscribe() { - if (active.get()) { - // In a real implementation, this would unsubscribe from all patterns - // using the valkey-glide client - patterns.clear(); + for (byte[] pattern : toNotify) { + subscriptionListener.onPatternUnsubscribed(pattern, getPatterns().size()); } } - + @Override - public void unsubscribe(byte[]... channels) { - if (active.get() && channels != null) { - // In a real implementation, this would unsubscribe from specific channels - // using the valkey-glide client - for (byte[] channel : channels) { - this.channels.remove(channel); - } + protected void doClose() { + // Clear listener first to prevent stale messages + pubSubListener.clearListener(); + + // Capture channels/patterns BEFORE unsubscribing (they get cleared by parent) + byte[][] channelsToNotify = getChannels().toArray(new byte[0][]); + byte[][] patternsToNotify = getPatterns().toArray(new byte[0][]); + + // Unsubscribe from SPECIFIC channels we subscribed to, not ALL + if (channelsToNotify.length > 0) { + sendPubsubCommand("UNSUBSCRIBE_BLOCKING"); } - } - - @Override - public void pUnsubscribe(byte[]... patterns) { - if (active.get() && patterns != null) { - // In a real implementation, this would unsubscribe from specific patterns - // using the valkey-glide client - for (byte[] pattern : patterns) { - this.patterns.remove(pattern); - } + + if (patternsToNotify.length > 0) { + sendPubsubCommand("PUNSUBSCRIBE_BLOCKING"); + } + + // Notify subscription callbacks + for (byte[] channel : channelsToNotify) { + subscriptionListener.onChannelUnsubscribed(channel, 0); + } + for (byte[] pattern : patternsToNotify) { + subscriptionListener.onPatternUnsubscribed(pattern, 0); } } - @Override - public Collection getChannels() { - return new ArrayList<>(channels); - } + /** + * Send a pub/sub command directly to the client using GlideString. + */ + private void sendPubsubCommand(String command, byte[]... channels) { + GlideString[] cmd = new GlideString[channels.length + 2]; - @Override - public Collection getPatterns() { - return new ArrayList<>(patterns); - } - - @Override - public MessageListener getListener() { - return listener; + int i = 0; + cmd[i++] = GlideString.of(command); + for (byte[] channel : channels) { + cmd[i++] = GlideString.of(channel); + } + + // Always append timeout = 0 + cmd[i] = GlideString.of("0"); + + try { + client.customCommand(cmd); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new ValkeyGlideExceptionConverter().convert(e); + } } } diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlidePubSubPoolingIntegrationTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlidePubSubPoolingIntegrationTests.java new file mode 100644 index 00000000..f1b08d2c --- /dev/null +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlidePubSubPoolingIntegrationTests.java @@ -0,0 +1,697 @@ +/* + * Copyright 2011-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.valkey.springframework.data.valkey.connection.valkeyglide; + +import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.*; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.valkey.springframework.data.valkey.SettingsUtils; +import io.valkey.springframework.data.valkey.connection.Message; +import io.valkey.springframework.data.valkey.connection.MessageListener; +import io.valkey.springframework.data.valkey.connection.ValkeyClusterConfiguration; +import io.valkey.springframework.data.valkey.connection.ValkeyConnection; +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration; +import io.valkey.springframework.data.valkey.connection.SubscriptionListener; +import io.valkey.springframework.data.valkey.test.condition.ValkeyDetector; +import io.valkey.springframework.data.valkey.test.extension.parametrized.MethodSource; +import io.valkey.springframework.data.valkey.test.extension.parametrized.ParameterizedValkeyTest; +import org.springframework.lang.Nullable; + +/** + * Integration tests for Valkey Glide pub/sub with connection pooling. + * These tests specifically target issues that can arise when clients are pooled + * and their state may persist between uses. + */ +@MethodSource("testParams") +class ValkeyGlidePubSubPoolingIntegrationTests { + + private static final Logger logger = LoggerFactory.getLogger(ValkeyGlidePubSubPoolingIntegrationTests.class); + + private static final Duration AWAIT_TIMEOUT = Duration.ofSeconds(2); + private static final long TERMINATION_TIMEOUT_SECONDS = 10; + + private final boolean useCluster; + + // Shared publisher factory - created in setUp, destroyed in tearDown + private ValkeyGlideConnectionFactory publisherFactory; + + // Test-specific factories - tests manage their own lifecycle + private final List testFactories = new ArrayList<>(); + + public ValkeyGlidePubSubPoolingIntegrationTests(boolean useCluster) { + this.useCluster = useCluster; + } + + public static Collection testParams() { + List params = new ArrayList<>(); + + // Always test standalone + params.add(new Object[] { false }); + + // Also test cluster if available + if (clusterAvailable()) { + params.add(new Object[] { true }); + } + + return params; + } + + private static boolean clusterAvailable() { + return ValkeyDetector.isClusterAvailable(); + } + + @BeforeEach + void setUp() { + publisherFactory = createConnectionFactory(2); + } + + @AfterEach + void tearDown() { + // Clean up all test-created factories + for (ValkeyGlideConnectionFactory factory : testFactories) { + destroyFactory(factory); + } + testFactories.clear(); + + destroyFactory(publisherFactory); + } + + /** + * Creates a connection factory with the specified pool size and registers it for cleanup. + */ + private ValkeyGlideConnectionFactory createTestFactory(int poolSize) { + ValkeyGlideConnectionFactory factory = createConnectionFactory(poolSize); + testFactories.add(factory); + return factory; + } + + private ValkeyGlideConnectionFactory createConnectionFactory(int poolSize) { + ValkeyGlideClientConfiguration clientConfig = ValkeyGlideClientConfiguration.builder() + .maxPoolSize(poolSize) + .build(); + + ValkeyGlideConnectionFactory factory; + if (useCluster) { + ValkeyClusterConfiguration clusterConfig = SettingsUtils.clusterConfiguration(); + factory = new ValkeyGlideConnectionFactory(clusterConfig, clientConfig); + } else { + ValkeyStandaloneConfiguration standaloneConfig = SettingsUtils.standaloneConfiguration(); + factory = new ValkeyGlideConnectionFactory(standaloneConfig, clientConfig); + } + + factory.afterPropertiesSet(); + factory.start(); + return factory; + } + + private void destroyFactory(@Nullable ValkeyGlideConnectionFactory factory) { + if (factory != null) { + try { + factory.destroy(); + } catch (Exception e) { + logger.warn("Failed to destroy factory: {}", e.getMessage()); + } + } + } + + private void publish(String channel, String message) { + try (ValkeyConnection publisher = publisherFactory.getConnection()) { + publisher.publish(channel.getBytes(), message.getBytes()); + } + } + + interface CompositeListener extends MessageListener, SubscriptionListener {} + + // ==================== Single Client Pool Tests ==================== + // These tests use pool size 1 to guarantee client reuse + + @ParameterizedValkeyTest + void sameClientShouldNotReceiveMessagesFromPreviousSubscription() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channel1 = "test:pool:prev:ch1"; + String channel2 = "test:pool:prev:ch2"; + + List listener1Messages = Collections.synchronizedList(new ArrayList<>()); + List listener2Messages = Collections.synchronizedList(new ArrayList<>()); + + ValkeyConnection conn1 = connectionFactory.getConnection(); + Object nativeClient1 = conn1.getNativeConnection(); + + conn1.subscribe((message, pattern) -> listener1Messages.add(new String(message.getBody())), + channel1.getBytes()); + + publish(channel1, "message1"); + + await().atMost(AWAIT_TIMEOUT).until(() -> listener1Messages.size() >= 1); + assertThat(listener1Messages).contains("message1"); + + conn1.close(); + + ValkeyConnection conn2 = connectionFactory.getConnection(); + assertThat(conn2.getNativeConnection()).isSameAs(nativeClient1); + + conn2.subscribe((message, pattern) -> listener2Messages.add(new String(message.getBody())), + channel2.getBytes()); + + publish(channel1, "message_to_old_channel"); + publish(channel2, "message2"); + + await().atMost(AWAIT_TIMEOUT).until(() -> listener2Messages.size() >= 1); + + assertThat(listener2Messages).contains("message2"); + assertThat(listener2Messages).doesNotContain("message_to_old_channel"); + assertThat(listener1Messages).hasSize(1); + + conn2.close(); + } + + @ParameterizedValkeyTest + void rapidSubscribeUnsubscribeCyclesOnSameClient() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channelBase = "test:pool:rapid"; + int cycles = 10; + Object firstClient = null; + + for (int i = 0; i < cycles; i++) { + String channel = channelBase + ":" + i; + AtomicInteger received = new AtomicInteger(0); + + ValkeyConnection conn = connectionFactory.getConnection(); + + if (firstClient == null) { + firstClient = conn.getNativeConnection(); + } else { + assertThat(conn.getNativeConnection()).isSameAs(firstClient); + } + + conn.subscribe((message, pattern) -> received.incrementAndGet(), channel.getBytes()); + + publish(channel, "msg" + i); + + await().atMost(AWAIT_TIMEOUT).until(() -> received.get() >= 1); + + conn.close(); + } + } + + @ParameterizedValkeyTest + void subscriptionCallbacksWithCompositeListener() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channel = "test:pool:callbacks"; + Object firstClient = null; + + for (int i = 0; i < 3; i++) { + CompletableFuture subscribed = new CompletableFuture<>(); + CompletableFuture unsubscribed = new CompletableFuture<>(); + AtomicReference subscribedChannel = new AtomicReference<>(); + AtomicReference unsubscribedChannel = new AtomicReference<>(); + + ValkeyConnection conn = connectionFactory.getConnection(); + + if (firstClient == null) { + firstClient = conn.getNativeConnection(); + } else { + assertThat(conn.getNativeConnection()).isSameAs(firstClient); + } + + CompositeListener listener = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) {} + + @Override + public void onChannelSubscribed(byte[] ch, long count) { + subscribedChannel.set(ch); + subscribed.complete(null); + } + + @Override + public void onChannelUnsubscribed(byte[] ch, long count) { + unsubscribedChannel.set(ch); + unsubscribed.complete(null); + } + }; + + conn.subscribe(listener, channel.getBytes()); + + subscribed.get(1, TimeUnit.SECONDS); + assertThat(subscribedChannel.get()).isEqualTo(channel.getBytes()); + + conn.close(); + + unsubscribed.get(1, TimeUnit.SECONDS); + assertThat(unsubscribedChannel.get()).isEqualTo(channel.getBytes()); + } + } + + @ParameterizedValkeyTest + void partialUnsubscribeThenReuseClient() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channel1 = "test:pool:partial:ch1"; + String channel2 = "test:pool:partial:ch2"; + String channel3 = "test:pool:partial:ch3"; + + List messages = Collections.synchronizedList(new ArrayList<>()); + + ValkeyConnection conn1 = connectionFactory.getConnection(); + Object nativeClient = conn1.getNativeConnection(); + + conn1.subscribe((message, pattern) -> { + messages.add(new String(message.getChannel()) + ":" + new String(message.getBody())); + }, channel1.getBytes(), channel2.getBytes()); + + publish(channel1, "msg1"); + publish(channel2, "msg2"); + + await().atMost(AWAIT_TIMEOUT).until(() -> messages.size() >= 2); + + conn1.getSubscription().unsubscribe(channel1.getBytes()); + conn1.close(); + + messages.clear(); + + ValkeyConnection conn2 = connectionFactory.getConnection(); + assertThat(conn2.getNativeConnection()).isSameAs(nativeClient); + + conn2.subscribe((message, pattern) -> { + messages.add(new String(message.getChannel()) + ":" + new String(message.getBody())); + }, channel3.getBytes()); + + publish(channel1, "should_not_receive1"); + publish(channel2, "should_not_receive2"); + publish(channel3, "msg3"); + + await().atMost(AWAIT_TIMEOUT).until(() -> messages.stream().anyMatch(m -> m.contains("msg3"))); + + assertThat(messages.stream().filter(m -> m.contains("msg3")).count()).isEqualTo(1); + assertThat(messages.stream().filter(m -> m.contains("should_not")).count()).isZero(); + + conn2.close(); + } + + @ParameterizedValkeyTest + void clientStateShouldBeCleanAfterNoOpConnection() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channel = "test:pool:noop"; + + ValkeyConnection conn1 = connectionFactory.getConnection(); + Object nativeClient = conn1.getNativeConnection(); + conn1.close(); + + ValkeyConnection conn2 = connectionFactory.getConnection(); + assertThat(conn2.getNativeConnection()).isSameAs(nativeClient); + + AtomicReference received = new AtomicReference<>(); + conn2.subscribe((message, pattern) -> received.set(new String(message.getBody())), + channel.getBytes()); + + publish(channel, "test_msg"); + + await().atMost(AWAIT_TIMEOUT).until(() -> received.get() != null); + assertThat(received.get()).isEqualTo("test_msg"); + + conn2.close(); + } + + @ParameterizedValkeyTest + void mixedChannelAndPatternSubscriptionsOnSameConnection() throws Exception { + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(1); + + String channel = "test:pool:mixed:exact"; + String patternBase = "test:pool:mixed:pattern"; + String pattern = patternBase + ":*"; + String patternMatch = patternBase + ":foo"; + + List receivedFromChannel = Collections.synchronizedList(new ArrayList<>()); + List receivedFromPattern = Collections.synchronizedList(new ArrayList<>()); + + ValkeyConnection conn = connectionFactory.getConnection(); + Object nativeClient = conn.getNativeConnection(); + + conn.subscribe((message, pat) -> { + if (pat == null) { + receivedFromChannel.add(new String(message.getBody())); + } else { + receivedFromPattern.add(new String(message.getBody())); + } + }, channel.getBytes()); + + conn.getSubscription().pSubscribe(pattern.getBytes()); + + publish(channel, "channel_msg"); + publish(patternMatch, "pattern_msg"); + + await().atMost(AWAIT_TIMEOUT).until(() -> + receivedFromChannel.size() >= 1 && receivedFromPattern.size() >= 1); + + assertThat(receivedFromChannel).contains("channel_msg"); + assertThat(receivedFromPattern).contains("pattern_msg"); + + conn.close(); + + ValkeyConnection conn2 = connectionFactory.getConnection(); + assertThat(conn2.getNativeConnection()).isSameAs(nativeClient); + + List newMessages = Collections.synchronizedList(new ArrayList<>()); + String newChannel = "test:pool:mixed:new"; + + conn2.subscribe((message, pat) -> newMessages.add(new String(message.getBody())), + newChannel.getBytes()); + + publish(channel, "old_channel"); + publish(patternMatch, "old_pattern"); + publish(newChannel, "new_msg"); + + await().atMost(AWAIT_TIMEOUT).until(() -> newMessages.contains("new_msg")); + + assertThat(newMessages).containsExactly("new_msg"); + + conn2.close(); + } + + // ==================== Concurrent Usage Tests ==================== + // These tests use pool size 3 with semaphore to force reuse + + @ParameterizedValkeyTest + void concurrentThreadsReceiveOnlyTheirOwnMessages() throws Exception { + int poolSize = 3; + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(poolSize); + Semaphore connectionLimiter = new Semaphore(poolSize); + + int threadCount = 6; + int iterationsPerThread = 10; + String channelBase = "test:concurrent:isolation"; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + + executor.submit(() -> { + try { + for (int iter = 0; iter < iterationsPerThread; iter++) { + String myChannel = channelBase + ":t" + threadId + ":i" + iter; + String expectedMsg = "t" + threadId + "_i" + iter; + AtomicReference received = new AtomicReference<>(); + + connectionLimiter.acquire(); + try { + ValkeyConnection conn = connectionFactory.getConnection(); + + conn.subscribe((message, pattern) -> { + received.set(new String(message.getBody())); + }, myChannel.getBytes()); + + publish(myChannel, expectedMsg); + + await().atMost(AWAIT_TIMEOUT).until(() -> received.get() != null); + assertThat(received.get()).isEqualTo(expectedMsg); + + conn.close(); + } finally { + connectionLimiter.release(); + } + } + } catch (Exception e) { + logger.error("Thread {} failed: {}", threadId, e.getMessage(), e); + errorCount.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isZero(); + } + + @ParameterizedValkeyTest + void concurrentPatternSubscriptions() throws Exception { + int poolSize = 3; + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(poolSize); + Semaphore connectionLimiter = new Semaphore(poolSize); + + int threadCount = 4; + int iterationsPerThread = 8; + String patternBase = "test:concurrent:pattern"; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + + executor.submit(() -> { + try { + for (int iter = 0; iter < iterationsPerThread; iter++) { + String myPattern = patternBase + ":t" + threadId + ":*"; + String myChannel = patternBase + ":t" + threadId + ":i" + iter; + String expectedMsg = "t" + threadId + "_iter" + iter; + AtomicReference received = new AtomicReference<>(); + + connectionLimiter.acquire(); + try { + ValkeyConnection conn = connectionFactory.getConnection(); + + conn.pSubscribe((message, pattern) -> { + received.set(new String(message.getBody())); + }, myPattern.getBytes()); + + publish(myChannel, expectedMsg); + + await().atMost(AWAIT_TIMEOUT).until(() -> received.get() != null); + assertThat(received.get()).isEqualTo(expectedMsg); + + conn.close(); + } finally { + connectionLimiter.release(); + } + } + } catch (Exception e) { + logger.error("Thread {} failed: {}", threadId, e.getMessage(), e); + errorCount.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isZero(); + } + + @ParameterizedValkeyTest + void rapidConcurrentSubscribeUnsubscribeCycles() throws Exception { + int poolSize = 3; + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(poolSize); + Semaphore connectionLimiter = new Semaphore(poolSize); + + int threadCount = 5; + int cyclesPerThread = 20; + String channelBase = "test:concurrent:rapid"; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + + executor.submit(() -> { + try { + for (int cycle = 0; cycle < cyclesPerThread; cycle++) { + String channel = channelBase + ":t" + threadId + ":c" + cycle; + String expectedMsg = "t" + threadId + "_c" + cycle; + AtomicReference received = new AtomicReference<>(); + + connectionLimiter.acquire(); + try { + ValkeyConnection conn = connectionFactory.getConnection(); + + conn.subscribe((message, pattern) -> { + received.set(new String(message.getBody())); + }, channel.getBytes()); + + publish(channel, expectedMsg); + + await().atMost(AWAIT_TIMEOUT).until(() -> received.get() != null); + assertThat(received.get()).isEqualTo(expectedMsg); + + conn.close(); + successCount.incrementAndGet(); + } finally { + connectionLimiter.release(); + } + } + } catch (Exception e) { + logger.error("Thread {} failed: {}", threadId, e.getMessage(), e); + errorCount.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(2 * TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isZero(); + assertThat(successCount.get()).isEqualTo(threadCount * cyclesPerThread); + } + + @ParameterizedValkeyTest + void concurrentMixedChannelAndPatternSubscriptions() throws Exception { + int poolSize = 3; + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(poolSize); + Semaphore connectionLimiter = new Semaphore(poolSize); + + int threadCount = 4; + int iterationsPerThread = 5; + String base = "test:concurrent:mixed"; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + + executor.submit(() -> { + try { + for (int iter = 0; iter < iterationsPerThread; iter++) { + String myChannel = base + ":channel:t" + threadId + ":i" + iter; + String myPattern = base + ":pattern:t" + threadId + ":*"; + String myPatternChannel = base + ":pattern:t" + threadId + ":i" + iter; + + AtomicReference channelMsg = new AtomicReference<>(); + AtomicReference patternMsg = new AtomicReference<>(); + + connectionLimiter.acquire(); + try { + ValkeyConnection conn = connectionFactory.getConnection(); + + conn.subscribe((message, pattern) -> { + String body = new String(message.getBody()); + if (pattern != null) { + patternMsg.set(body); + } else { + channelMsg.set(body); + } + }, myChannel.getBytes()); + + conn.getSubscription().pSubscribe(myPattern.getBytes()); + + Thread.sleep(50); + + publish(myChannel, "channel_" + threadId); + publish(myPatternChannel, "pattern_" + threadId); + + await().atMost(AWAIT_TIMEOUT).until(() -> + channelMsg.get() != null && patternMsg.get() != null); + + assertThat(channelMsg.get()).isEqualTo("channel_" + threadId); + assertThat(patternMsg.get()).isEqualTo("pattern_" + threadId); + + conn.close(); + } finally { + connectionLimiter.release(); + } + } + } catch (Exception e) { + logger.error("Thread {} failed: {}", threadId, e.getMessage(), e); + errorCount.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isZero(); + } + + @ParameterizedValkeyTest + void highThroughputMessaging() throws Exception { + int poolSize = 3; + ValkeyGlideConnectionFactory connectionFactory = createTestFactory(poolSize); + Semaphore connectionLimiter = new Semaphore(poolSize); + + int threadCount = 4; + int messagesPerThread = 50; + String channelBase = "test:concurrent:highthroughput"; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + + executor.submit(() -> { + try { + String myChannel = channelBase + ":t" + threadId; + AtomicInteger receivedCount = new AtomicInteger(0); + + connectionLimiter.acquire(); + try { + ValkeyConnection conn = connectionFactory.getConnection(); + + conn.subscribe((message, pattern) -> { + receivedCount.incrementAndGet(); + }, myChannel.getBytes()); + + try (ValkeyConnection pub = publisherFactory.getConnection()) { + for (int m = 0; m < messagesPerThread; m++) { + pub.publish(myChannel.getBytes(), ("msg" + m).getBytes()); + } + } + + await().atMost(Duration.ofSeconds(10)) + .until(() -> receivedCount.get() >= messagesPerThread); + + assertThat(receivedCount.get()).isEqualTo(messagesPerThread); + + conn.close(); + } finally { + connectionLimiter.release(); + } + } catch (Exception e) { + logger.error("Thread {} failed: {}", threadId, e.getMessage(), e); + errorCount.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isZero(); + } +} diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscriptionUnitTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscriptionUnitTests.java new file mode 100644 index 00000000..f8746cd1 --- /dev/null +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideSubscriptionUnitTests.java @@ -0,0 +1,457 @@ +package io.valkey.springframework.data.valkey.connection.valkeyglide; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import glide.api.models.GlideString; + +import java.util.Collection; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.valkey.springframework.data.valkey.connection.MessageListener; +import io.valkey.springframework.data.valkey.connection.ValkeyInvalidSubscriptionException; +import io.valkey.springframework.data.valkey.connection.SubscriptionListener; + +class ValkeyGlideSubscriptionUnitTests { + + private ValkeyGlideSubscription subscription; + private UnifiedGlideClient client; + private DelegatingPubSubListener pubSubListener; + private MessageListener messageListener; + + @BeforeEach + void setUp() throws Exception { + client = mock(UnifiedGlideClient.class); + pubSubListener = mock(DelegatingPubSubListener.class); + messageListener = mock(MessageListener.class); + + // Mock customCommand to return null (no exception) + when(client.customCommand(any(GlideString[].class))).thenReturn(null); + + subscription = new ValkeyGlideSubscription(messageListener, client, pubSubListener); + } + + @Test + void testUnsubscribeAllAndClose() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.unsubscribe(); + + verify(pubSubListener).clearListener(); + + assertThat(subscription.isAlive()).isFalse(); + assertThat(subscription.getChannels()).isEmpty(); + assertThat(subscription.getPatterns()).isEmpty(); + } + + @Test + void testUnsubscribeAllChannelsWithPatterns() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.pSubscribe(new byte[][] { "s*".getBytes() }); + subscription.unsubscribe(); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getChannels()).isEmpty(); + + Collection patterns = subscription.getPatterns(); + assertThat(patterns).hasSize(1); + assertThat(patterns.iterator().next()).isEqualTo("s*".getBytes()); + } + + @Test + void testUnsubscribeChannelAndClose() { + byte[][] channel = new byte[][] { "a".getBytes() }; + + subscription.subscribe(channel); + subscription.unsubscribe(channel); + + verify(pubSubListener).clearListener(); + + assertThat(subscription.isAlive()).isFalse(); + assertThat(subscription.getChannels()).isEmpty(); + assertThat(subscription.getPatterns()).isEmpty(); + } + + @Test + void testUnsubscribeChannelSomeLeft() { + byte[][] channels = new byte[][] { "a".getBytes(), "b".getBytes() }; + + subscription.subscribe(channels); + subscription.unsubscribe(new byte[][] { "a".getBytes() }); + + assertThat(subscription.isAlive()).isTrue(); + + Collection subChannels = subscription.getChannels(); + assertThat(subChannels).hasSize(1); + assertThat(subChannels.iterator().next()).isEqualTo("b".getBytes()); + assertThat(subscription.getPatterns()).isEmpty(); + } + + @Test + void testUnsubscribeChannelWithPatterns() { + byte[][] channel = new byte[][] { "a".getBytes() }; + + subscription.subscribe(channel); + subscription.pSubscribe(new byte[][] { "s*".getBytes() }); + subscription.unsubscribe(channel); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getChannels()).isEmpty(); + + Collection patterns = subscription.getPatterns(); + assertThat(patterns).hasSize(1); + assertThat(patterns.iterator().next()).isEqualTo("s*".getBytes()); + } + + @Test + void testUnsubscribeChannelWithPatternsSomeLeft() { + byte[][] channel = new byte[][] { "a".getBytes() }; + + subscription.subscribe("a".getBytes(), "b".getBytes()); + subscription.pSubscribe(new byte[][] { "s*".getBytes() }); + subscription.unsubscribe(channel); + + assertThat(subscription.isAlive()).isTrue(); + + Collection channels = subscription.getChannels(); + assertThat(channels).hasSize(1); + assertThat(channels.iterator().next()).isEqualTo("b".getBytes()); + + Collection patterns = subscription.getPatterns(); + assertThat(patterns).hasSize(1); + assertThat(patterns.iterator().next()).isEqualTo("s*".getBytes()); + } + + @Test + void testUnsubscribeAllNoChannels() { + subscription.pSubscribe(new byte[][] { "s*".getBytes() }); + subscription.unsubscribe(); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getChannels()).isEmpty(); + + Collection patterns = subscription.getPatterns(); + assertThat(patterns).hasSize(1); + assertThat(patterns.iterator().next()).isEqualTo("s*".getBytes()); + } + + @Test + void testUnsubscribeNotAlive() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.unsubscribe(); + + verify(pubSubListener).clearListener(); + + assertThat(subscription.isAlive()).isFalse(); + + // Calling unsubscribe again should not throw + subscription.unsubscribe(); + } + + @Test + void testSubscribeNotAlive() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.unsubscribe(); + + assertThat(subscription.isAlive()).isFalse(); + + assertThatExceptionOfType(ValkeyInvalidSubscriptionException.class) + .isThrownBy(() -> subscription.subscribe(new byte[][] { "s".getBytes() })); + } + + @Test + void testPUnsubscribeAllAndClose() { + subscription.pSubscribe(new byte[][] { "a*".getBytes() }); + subscription.pUnsubscribe(); + + verify(pubSubListener).clearListener(); + + assertThat(subscription.isAlive()).isFalse(); + assertThat(subscription.getChannels()).isEmpty(); + assertThat(subscription.getPatterns()).isEmpty(); + } + + @Test + void testPUnsubscribeAllPatternsWithChannels() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.pSubscribe(new byte[][] { "s*".getBytes() }); + subscription.pUnsubscribe(); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getPatterns()).isEmpty(); + + Collection channels = subscription.getChannels(); + assertThat(channels).hasSize(1); + assertThat(channels.iterator().next()).isEqualTo("a".getBytes()); + } + + @Test + void testPUnsubscribeAndClose() { + byte[][] pattern = new byte[][] { "a*".getBytes() }; + + subscription.pSubscribe(pattern); + subscription.pUnsubscribe(pattern); + + verify(pubSubListener).clearListener(); + + assertThat(subscription.isAlive()).isFalse(); + assertThat(subscription.getChannels()).isEmpty(); + assertThat(subscription.getPatterns()).isEmpty(); + } + + @Test + void testPUnsubscribePatternSomeLeft() { + byte[][] patterns = new byte[][] { "a*".getBytes(), "b*".getBytes() }; + subscription.pSubscribe(patterns); + subscription.pUnsubscribe(new byte[][] { "a*".getBytes() }); + + assertThat(subscription.isAlive()).isTrue(); + + Collection subPatterns = subscription.getPatterns(); + assertThat(subPatterns).hasSize(1); + assertThat(subPatterns.iterator().next()).isEqualTo("b*".getBytes()); + assertThat(subscription.getChannels()).isEmpty(); + } + + @Test + void testPUnsubscribePatternWithChannels() { + byte[][] pattern = new byte[][] { "s*".getBytes() }; + + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.pSubscribe(pattern); + subscription.pUnsubscribe(pattern); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getPatterns()).isEmpty(); + + Collection channels = subscription.getChannels(); + assertThat(channels).hasSize(1); + assertThat(channels.iterator().next()).isEqualTo("a".getBytes()); + } + + @Test + void testUnsubscribePatternWithChannelsSomeLeft() { + byte[][] pattern = new byte[][] { "a*".getBytes() }; + + subscription.pSubscribe("a*".getBytes(), "b*".getBytes()); + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.pUnsubscribe(pattern); + + assertThat(subscription.isAlive()).isTrue(); + + Collection channels = subscription.getChannels(); + assertThat(channels).hasSize(1); + assertThat(channels.iterator().next()).isEqualTo("a".getBytes()); + + Collection patterns = subscription.getPatterns(); + assertThat(patterns).hasSize(1); + assertThat(patterns.iterator().next()).isEqualTo("b*".getBytes()); + } + + @Test + void testPUnsubscribeAllNoPatterns() { + subscription.subscribe(new byte[][] { "s".getBytes() }); + subscription.pUnsubscribe(); + + assertThat(subscription.isAlive()).isTrue(); + assertThat(subscription.getPatterns()).isEmpty(); + + Collection channels = subscription.getChannels(); + assertThat(channels).hasSize(1); + assertThat(channels.iterator().next()).isEqualTo("s".getBytes()); + } + + @Test + void testPUnsubscribeNotAlive() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.unsubscribe(); + + assertThat(subscription.isAlive()).isFalse(); + + // Calling pUnsubscribe when not alive should not throw + subscription.pUnsubscribe(); + + verify(pubSubListener).clearListener(); + } + + @Test + void testPSubscribeNotAlive() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.unsubscribe(); + + assertThat(subscription.isAlive()).isFalse(); + + assertThatExceptionOfType(ValkeyInvalidSubscriptionException.class) + .isThrownBy(() -> subscription.pSubscribe(new byte[][] { "s*".getBytes() })); + } + + @Test + void testDoCloseNotSubscribed() { + subscription.doClose(); + + verify(pubSubListener).clearListener(); + } + + @Test + void testDoCloseSubscribedChannels() throws Exception { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.doClose(); + + verify(pubSubListener).clearListener(); + verify(client).customCommand(argThat(args -> + args.length >= 2 && + "UNSUBSCRIBE_BLOCKING".equals(args[0].getString()) && + "0".equals(args[args.length - 1].getString()) + )); + } + + @Test + void testDoCloseSubscribedPatterns() throws Exception { + subscription.pSubscribe(new byte[][] { "a*".getBytes() }); + subscription.doClose(); + + verify(pubSubListener).clearListener(); + verify(client).customCommand(argThat(args -> + args.length >= 2 && + "PUNSUBSCRIBE_BLOCKING".equals(args[0].getString()) && + "0".equals(args[args.length - 1].getString()) + )); + } + + @Test + void testDoCloseSubscribedChannelsAndPatterns() throws Exception { + subscription.subscribe(new byte[][] { "a".getBytes() }); + subscription.pSubscribe(new byte[][] { "a*".getBytes() }); + subscription.doClose(); + + verify(pubSubListener).clearListener(); + } + + @Test + void testSubscribeCallsCustomCommand() throws Exception { + subscription.subscribe(new byte[][] { "channel1".getBytes() }); + + verify(client).customCommand(argThat(args -> + args.length == 3 && + "SUBSCRIBE_BLOCKING".equals(args[0].getString()) && + "channel1".equals(args[1].getString()) && + "0".equals(args[2].getString()) + )); + } + + @Test + void testPSubscribeCallsCustomCommand() throws Exception { + subscription.pSubscribe(new byte[][] { "pattern*".getBytes() }); + + verify(client).customCommand(argThat(args -> + args.length == 3 && + "PSUBSCRIBE_BLOCKING".equals(args[0].getString()) && + "pattern*".equals(args[1].getString()) && + "0".equals(args[2].getString()) + )); + } + + @Test + void testSubscribeCallsSubscriptionListener() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.subscribe(new byte[][] { "channel1".getBytes() }); + + verify((SubscriptionListener) compositeListener).onChannelSubscribed(eq("channel1".getBytes()), anyLong()); + } + + @Test + void testSubscribeMultipleChannelsCallsSubscriptionListener() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.subscribe(new byte[][] { "channel1".getBytes(), "channel2".getBytes() }); + + verify((SubscriptionListener) compositeListener).onChannelSubscribed(eq("channel1".getBytes()), anyLong()); + verify((SubscriptionListener) compositeListener).onChannelSubscribed(eq("channel2".getBytes()), anyLong()); + } + + @Test + void testPSubscribeCallsSubscriptionListener() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.pSubscribe(new byte[][] { "pattern*".getBytes() }); + + verify((SubscriptionListener) compositeListener).onPatternSubscribed(eq("pattern*".getBytes()), anyLong()); + } + + @Test + void testUnsubscribeCallsSubscriptionListener() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.subscribe(new byte[][] { "channel1".getBytes(), "channel2".getBytes() }); + sub.unsubscribe(new byte[][] { "channel1".getBytes() }); + + verify((SubscriptionListener) compositeListener).onChannelUnsubscribed(eq("channel1".getBytes()), anyLong()); + } + + @Test + void testDoCloseCallsSubscriptionListenerForChannels() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.subscribe(new byte[][] { "channel1".getBytes() }); + sub.doClose(); + + verify((SubscriptionListener) compositeListener).onChannelUnsubscribed(eq("channel1".getBytes()), anyLong()); + } + + @Test + void testDoCloseCallsSubscriptionListenerForPatterns() { + MessageListener compositeListener = mock(MessageListener.class, + withSettings().extraInterfaces(SubscriptionListener.class)); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(compositeListener, client, pubSubListener); + + sub.pSubscribe(new byte[][] { "pattern*".getBytes() }); + sub.doClose(); + + verify((SubscriptionListener) compositeListener).onPatternUnsubscribed(eq("pattern*".getBytes()), anyLong()); + } + + @Test + void testNonSubscriptionListenerDoesNotFail() { + MessageListener plainListener = mock(MessageListener.class); + + ValkeyGlideSubscription sub = new ValkeyGlideSubscription(plainListener, client, pubSubListener); + + sub.subscribe(new byte[][] { "channel1".getBytes() }); + sub.pSubscribe(new byte[][] { "pattern*".getBytes() }); + sub.unsubscribe(new byte[][] { "channel1".getBytes() }); + sub.pUnsubscribe(new byte[][] { "pattern*".getBytes() }); + sub.doClose(); + + assertThat(sub.isAlive()).isFalse(); + } + + @Test + void closeTwiceShouldNotFail() { + subscription.subscribe(new byte[][] { "a".getBytes() }); + + subscription.close(); + subscription.close(); + + verify(pubSubListener, times(1)).clearListener(); + assertThat(subscription.isAlive()).isFalse(); + } +} diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubResubscribeTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubResubscribeTests.java index 6d6c885d..c27e87f8 100644 --- a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubResubscribeTests.java +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubResubscribeTests.java @@ -39,6 +39,8 @@ import io.valkey.springframework.data.valkey.connection.jedis.extension.JedisConnectionFactoryExtension; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.extension.ValkeyGlideConnectionFactoryExtension; import io.valkey.springframework.data.valkey.core.ValkeyTemplate; import io.valkey.springframework.data.valkey.core.StringValkeyTemplate; import io.valkey.springframework.data.valkey.listener.adapter.MessageListenerAdapter; @@ -100,6 +102,20 @@ public static Collection testParams() { factories.add(lettuceClusterConnFactory); } + + // Valkey-GLIDE + ValkeyGlideConnectionFactory glideConnFactory = ValkeyGlideConnectionFactoryExtension + .getConnectionFactory(ValkeyStanalone.class); + + factories.add(glideConnFactory); + + if (clusterAvailable()) { + ValkeyGlideConnectionFactory glideClusterConnFactory = ValkeyGlideConnectionFactoryExtension + .getConnectionFactory(ValkeyCluster.class); + + factories.add(glideClusterConnFactory); + } + return factories.stream().map(factory -> new Object[] { factory }).collect(Collectors.toList()); } @@ -234,13 +250,15 @@ private static boolean clusterAvailable() { return ValkeyDetector.isClusterAvailable(); } - private static boolean isClusterAware(ValkeyConnectionFactory connectionFactory) { - - if (connectionFactory instanceof LettuceConnectionFactory lettuceConnectionFactory) { - return lettuceConnectionFactory.isClusterAware(); - } else if (connectionFactory instanceof JedisConnectionFactory jedisConnectionFactory) { - return jedisConnectionFactory.isValkeyClusterAware(); - } - return false; - } + private static boolean isClusterAware(ValkeyConnectionFactory connectionFactory) { + + if (connectionFactory instanceof LettuceConnectionFactory lettuceConnectionFactory) { + return lettuceConnectionFactory.isClusterAware(); + } else if (connectionFactory instanceof JedisConnectionFactory jedisConnectionFactory) { + return jedisConnectionFactory.isValkeyClusterAware(); + } else if (connectionFactory instanceof ValkeyGlideConnectionFactory glideConnectionFactory) { + return glideConnectionFactory.isClusterAware(); + } + return false; + } } diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTestParams.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTestParams.java index 378417ff..f6746113 100644 --- a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTestParams.java +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTestParams.java @@ -26,6 +26,8 @@ import io.valkey.springframework.data.valkey.connection.jedis.extension.JedisConnectionFactoryExtension; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.extension.ValkeyGlideConnectionFactoryExtension; import io.valkey.springframework.data.valkey.core.ValkeyTemplate; import io.valkey.springframework.data.valkey.core.StringValkeyTemplate; import io.valkey.springframework.data.valkey.test.condition.ValkeyDetector; @@ -71,11 +73,26 @@ public static Collection testParams() { rawTemplateLtc.setConnectionFactory(lettuceConnFactory); rawTemplateLtc.afterPropertiesSet(); + // add Valkey Glide + ValkeyGlideConnectionFactory glideConnFactory = ValkeyGlideConnectionFactoryExtension + .getConnectionFactory(ValkeyStanalone.class); + + ValkeyTemplate stringTemplateGlide = new StringValkeyTemplate(glideConnFactory); + ValkeyTemplate personTemplateGlide = new ValkeyTemplate<>(); + personTemplateGlide.setConnectionFactory(glideConnFactory); + personTemplateGlide.afterPropertiesSet(); + ValkeyTemplate rawTemplateGlide = new ValkeyTemplate<>(); + rawTemplateGlide.setEnableDefaultSerializer(false); + rawTemplateGlide.setConnectionFactory(glideConnFactory); + rawTemplateGlide.afterPropertiesSet(); + Collection parameters = new ArrayList<>(); parameters.add(new Object[] { stringFactory, stringTemplate }); parameters.add(new Object[] { personFactory, personTemplate }); parameters.add(new Object[] { stringFactory, stringTemplateLtc }); parameters.add(new Object[] { personFactory, personTemplateLtc }); + parameters.add(new Object[] { stringFactory, stringTemplateGlide }); + parameters.add(new Object[] { personFactory, personTemplateGlide }); if (clusterAvailable()) { @@ -83,16 +100,26 @@ public static Collection testParams() { JedisConnectionFactory jedisClusterFactory = JedisConnectionFactoryExtension .getNewConnectionFactory(ValkeyCluster.class); - ValkeyTemplate jedisClusterStringTemplate = new StringValkeyTemplate(jedisClusterFactory); + ValkeyTemplate jedisClusterStringTemplate = + new StringValkeyTemplate(jedisClusterFactory); // add Lettuce LettuceConnectionFactory lettuceClusterFactory = LettuceConnectionFactoryExtension .getConnectionFactory(ValkeyCluster.class); - ValkeyTemplate lettuceClusterStringTemplate = new StringValkeyTemplate(lettuceClusterFactory); + ValkeyTemplate lettuceClusterStringTemplate = + new StringValkeyTemplate(lettuceClusterFactory); + + // Add Valkey-GLIDE + ValkeyGlideConnectionFactory glideClusterFactory = + ValkeyGlideConnectionFactoryExtension.getConnectionFactory(ValkeyCluster.class); + + ValkeyTemplate glideClusterStringTemplate = + new StringValkeyTemplate(glideClusterFactory); parameters.add(new Object[] { stringFactory, jedisClusterStringTemplate }); parameters.add(new Object[] { stringFactory, lettuceClusterStringTemplate }); + parameters.add(new Object[] { stringFactory, glideClusterStringTemplate }); } return parameters; diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTests.java index 52ec03bc..accc4b39 100644 --- a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTests.java +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/PubSubTests.java @@ -34,6 +34,7 @@ import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory; import io.valkey.springframework.data.valkey.connection.jedis.JedisConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; import io.valkey.springframework.data.valkey.core.ValkeyTemplate; import io.valkey.springframework.data.valkey.listener.adapter.MessageListenerAdapter; import io.valkey.springframework.data.valkey.test.condition.EnabledIfLongRunningTest; @@ -169,11 +170,12 @@ void testStartListenersToNoSpecificChannelTest() { } private static boolean isClusterAware(ValkeyConnectionFactory connectionFactory) { - if (connectionFactory instanceof LettuceConnectionFactory lettuce) { return lettuce.isClusterAware(); } else if (connectionFactory instanceof JedisConnectionFactory jedis) { return jedis.isValkeyClusterAware(); + } else if (connectionFactory instanceof ValkeyGlideConnectionFactory glide) { + return glide.isClusterAware(); } return false; } diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/SubscriptionConnectionTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/SubscriptionConnectionTests.java index 483a461e..93721028 100644 --- a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/SubscriptionConnectionTests.java +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/SubscriptionConnectionTests.java @@ -34,6 +34,8 @@ import io.valkey.springframework.data.valkey.connection.jedis.extension.JedisConnectionFactoryExtension; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.extension.ValkeyGlideConnectionFactoryExtension; import io.valkey.springframework.data.valkey.listener.adapter.MessageListenerAdapter; import io.valkey.springframework.data.valkey.test.extension.ValkeyStanalone; import io.valkey.springframework.data.valkey.test.extension.parametrized.MethodSource; @@ -78,7 +80,11 @@ public static Collection testParams() { LettuceConnectionFactory lettuceConnFactory = LettuceConnectionFactoryExtension .getConnectionFactory(ValkeyStanalone.class); - return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory } }); + // Valkey-GLIDE + ValkeyGlideConnectionFactory glideConnFactory = ValkeyGlideConnectionFactoryExtension + .getConnectionFactory(ValkeyStanalone.class); + + return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory }, { glideConnFactory } }); } @AfterEach diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/ValkeyMessageListenerContainerIntegrationTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/ValkeyMessageListenerContainerIntegrationTests.java index 6780809c..677325d7 100644 --- a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/ValkeyMessageListenerContainerIntegrationTests.java +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/listener/ValkeyMessageListenerContainerIntegrationTests.java @@ -39,6 +39,8 @@ import io.valkey.springframework.data.valkey.connection.jedis.extension.JedisConnectionFactoryExtension; import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory; import io.valkey.springframework.data.valkey.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; +import io.valkey.springframework.data.valkey.connection.valkeyglide.extension.ValkeyGlideConnectionFactoryExtension; import io.valkey.springframework.data.valkey.test.extension.ValkeyStanalone; import io.valkey.springframework.data.valkey.test.extension.parametrized.MethodSource; import io.valkey.springframework.data.valkey.test.extension.parametrized.ParameterizedValkeyTest; @@ -79,7 +81,12 @@ public static Collection testParams() { LettuceConnectionFactory lettuceConnFactory = LettuceConnectionFactoryExtension .getConnectionFactory(ValkeyStanalone.class); - return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory } }); + + // Valkey-GLIDE + ValkeyGlideConnectionFactory glideConnFactory = ValkeyGlideConnectionFactoryExtension + .getConnectionFactory(ValkeyStanalone.class); + + return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory } , { glideConnFactory} }); } @AfterEach