Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<springdata.parent.version>3.5.1</springdata.parent.version>

<!-- Valkey/Redis client versions -->
<valkey-glide.version>2.1.1</valkey-glide.version>
<valkey-glide.version>2.3.0-rc2</valkey-glide.version>
<lettuce.version>6.6.0.RELEASE</lettuce.version>
<jedis.version>6.0.0</jedis.version>
<pool.version>2.11.1</pool.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.valkey.springframework.data.valkey.connection.jedis.JedisConnectionFactory;
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory;
import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory;

/**
* Utilities for examining a {@link ValkeyConnection}
Expand All @@ -27,7 +28,8 @@
public abstract class ConnectionUtils {

public static boolean isAsync(ValkeyConnectionFactory connectionFactory) {
return (connectionFactory instanceof LettuceConnectionFactory);
return connectionFactory instanceof LettuceConnectionFactory
|| connectionFactory instanceof ValkeyGlideConnectionFactory;
}

public static boolean isLettuce(ValkeyConnectionFactory connectionFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,31 @@ public class ValkeyGlideClusterConnection extends ValkeyGlideConnection implemen
private final ValkeyGlideClusterSetCommands clusterSetCommands;

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter) {
this(clusterAdapter, null, Duration.ofMillis(100));
this(clusterAdapter, null, Duration.ofMillis(100), null);
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory) {
this(clusterAdapter, factory, Duration.ofMillis(100));
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just appears this way on github, see + signs beside

@Nullable ValkeyGlideConnectionFactory factory) {
this(clusterAdapter, factory, Duration.ofMillis(100), null);
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory, Duration cacheTimeout) {
super(clusterAdapter, factory);
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looks like it bc of github

@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener pubSubListener) {
this(clusterAdapter, factory, Duration.ofMillis(100), pubSubListener);
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
@Nullable ValkeyGlideConnectionFactory factory,
Duration cacheTimeout) {
this(clusterAdapter, factory, cacheTimeout, null);
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
@Nullable ValkeyGlideConnectionFactory factory,
Duration cacheTimeout,
@Nullable DelegatingPubSubListener pubSubListener) {
super(clusterAdapter, factory, pubSubListener);
Assert.notNull(cacheTimeout, "CacheTimeout must not be null!");

this.clusterAdapter = clusterAdapter;
Expand Down Expand Up @@ -123,10 +139,6 @@ protected void cleanupConnectionState() {
@SuppressWarnings("unchecked")
Callable<Void>[] actions = new Callable[] {
() -> nativeClient.customCommand(new String[]{"UNWATCH"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// TODO: Uncomment when dynamic pubsub is implemented
// () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get()
};

for (Callable<Void> action : actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection {

protected final UnifiedGlideClient unifiedClient;
protected final @Nullable ValkeyGlideConnectionFactory factory;
/**
* Bridges Glide's callback mechanism (configured at client creation) with Spring's
* {@link MessageListener} (provided at subscribe time). The user's listener is set
* on this delegate when {@code subscribe()} is called, and cleared on connection close.
*/
protected final @Nullable DelegatingPubSubListener delegatingListener;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final List<ResultMapper<?, ?>> batchCommandsConverters = new ArrayList<>();
private final Set<byte[]> watchedKeys = new HashSet<>();
private @Nullable Subscription subscription;
private volatile @Nullable ValkeyGlideSubscription subscription;

// Command interfaces
private final ValkeyGlideKeyCommands keyCommands;
Expand All @@ -90,10 +96,24 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection {
* @param factory the connection factory (optional, for pooling support)
*/
public ValkeyGlideConnection(UnifiedGlideClient unifiedClient, @Nullable ValkeyGlideConnectionFactory factory) {
this(unifiedClient, factory, null);
}

/**
* Creates a new {@link ValkeyGlideConnection} with a unified client adapter and delegating pub/sub listener.
*
* @param unifiedClient unified client adapter (standalone or cluster)
* @param factory the connection factory (optional, for pooling support)
* @param delegatingListener the delegating pub/sub listener for callback-based message delivery
*/
public ValkeyGlideConnection(UnifiedGlideClient unifiedClient,
@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener delegatingListener) {
Assert.notNull(unifiedClient, "UnifiedClient must not be null");

this.unifiedClient = unifiedClient;
this.factory = factory;
this.delegatingListener = delegatingListener;

// Initialize command interfaces
this.keyCommands = new ValkeyGlideKeyCommands(this);
Expand Down Expand Up @@ -184,12 +204,22 @@ public ValkeyCommands commands() {
public void close() throws DataAccessException {
try {
if (closed.compareAndSet(false, true)) {
// Close subscription first
ValkeyGlideSubscription sub = this.subscription;
if (sub != null) {
sub.close();
this.subscription = null;
}

cleanupConnectionState();
// Return client to pool instead of closing it
factory.releaseClient(unifiedClient.getNativeClient());
if (subscription != null) {
subscription.close();
subscription = null;

if (delegatingListener != null) {
delegatingListener.clearListener();
}

// Return client to pool
if (factory != null) {
factory.releaseClient(unifiedClient.getNativeClient());
}
}
} catch (Exception ex) {
Expand All @@ -212,10 +242,6 @@ protected void cleanupConnectionState() {
@SuppressWarnings("unchecked")
Callable<Void>[] actions = new Callable[] {
() -> nativeClient.customCommand(new String[]{"UNWATCH"}).get(),
// TODO: Uncomment when dynamic pubsub is implemented
// () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get()
};

for (Callable<Void> action : actions) {
Expand Down Expand Up @@ -431,8 +457,26 @@ public void subscribe(MessageListener listener, byte[]... channels) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}

// TODO: Implement dynamic subscription management when supported by valkey-glide
throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented");
if (delegatingListener == null) {
throw new InvalidDataAccessApiUsageException(
"Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support.");
}

try {
delegatingListener.setListener(listener);

ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription(
listener, unifiedClient, delegatingListener);
this.subscription = glideSubscription;
glideSubscription.subscribe(channels);

} catch (Exception ex) {
if (delegatingListener != null) {
delegatingListener.clearListener();
}
this.subscription = null;
throw new ValkeyGlideExceptionConverter().convert(ex);
}
}

@Override
Expand All @@ -449,9 +493,27 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}

if (delegatingListener == null) {
throw new InvalidDataAccessApiUsageException(
"Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support.");
}

// TODO: Implement dynamic subscription management when supported by valkey-glide
throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented");
try {
delegatingListener.setListener(listener);

ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription(
listener, unifiedClient, delegatingListener);
this.subscription = glideSubscription;
glideSubscription.pSubscribe(patterns);

} catch (Exception ex) {
if (delegatingListener != null) {
delegatingListener.clearListener();
}
this.subscription = null;
throw new ValkeyGlideExceptionConverter().convert(ex);
}
}

@Override
Expand All @@ -461,7 +523,8 @@ public Subscription getSubscription() {

@Override
public boolean isSubscribed() {
return subscription != null && subscription.isAlive();
Subscription sub = this.subscription;
return sub != null && sub.isAlive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@
// Imports for valkey-glide library
import glide.api.GlideClient;
import glide.api.GlideClusterClient;
import glide.api.models.GlideString;
import glide.api.models.configuration.AdvancedGlideClientConfiguration;
import glide.api.models.configuration.AdvancedGlideClusterClientConfiguration;
import glide.api.models.configuration.BackoffStrategy;
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
import glide.api.models.configuration.ClusterSubscriptionConfiguration;

import java.util.Map;
import java.util.Set;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand Down Expand Up @@ -91,6 +97,23 @@ public class ValkeyGlideConnectionFactory
private boolean earlyStartup = true;
private int phase = 0;

/**
* Maps native Glide clients ({@link GlideClient} or {@link GlideClusterClient}) to their
* associated {@link DelegatingPubSubListener}.
*
* <p>This mapping is necessary because Glide requires pub/sub callbacks to be configured
* at client creation time, before any subscriptions exist. When a client is created and
* added to the pool, we also create a {@link DelegatingPubSubListener} and register it
* as the client's pub/sub callback. The actual {@link MessageListener} is set on the
* {@link DelegatingPubSubListener} later when {@code subscribe()} is called.
*
* <p>When a connection is obtained from the pool, we look up the corresponding
* {@link DelegatingPubSubListener} for that client and pass it to the
* {@link ValkeyGlideConnection}, which can then configure it with the user's
* {@link MessageListener} during subscription.
*/
private final Map<Object, DelegatingPubSubListener> clientListenerMap = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment what is the Object and what for

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


/**
* Constructs a new {@link ValkeyGlideConnectionFactory} instance with default settings.
*/
Expand Down Expand Up @@ -235,8 +258,11 @@ public ValkeyConnection getConnection() {
client = createGlideClient();
}

// Get the listener associated with this client
DelegatingPubSubListener listener = clientListenerMap.get(client);

// Return a new connection wrapper around the pooled client
return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this);
return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this, listener);
}

@Override
Expand All @@ -253,8 +279,11 @@ public ValkeyClusterConnection getClusterConnection() {
client = createGlideClusterClient();
}

// Get the listener associated with this client
DelegatingPubSubListener listener = clientListenerMap.get(client);

// Return a new connection wrapper around the pooled cluster client
return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this);
return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this, listener);
}

@Override
Expand Down Expand Up @@ -378,10 +407,26 @@ private GlideClient createGlideClient() {
if (reconnectStrategy != null) {
configBuilder.reconnectStrategy(reconnectStrategy);
}

// Pubsub listener
DelegatingPubSubListener clientListener = new DelegatingPubSubListener();


// Configure pub/sub with callback for event-driven message delivery
var subConfigBuilder = StandaloneSubscriptionConfiguration.builder();

// Set callback that delegates to our listener holder
subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context));
configBuilder.subscriptionConfiguration(subConfigBuilder.build());

// Build and create client
GlideClientConfiguration config = configBuilder.build();
return GlideClient.createClient(config).get();
GlideClient client = GlideClient.createClient(config).get();

// Save the mapping of this client to its DelegatingListener
clientListenerMap.put(client, clientListener);

return client;
} catch (Exception e) {
throw new IllegalStateException("Failed to create GlideClient: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -477,10 +522,24 @@ private GlideClusterClient createGlideClusterClient() {
configBuilder.reconnectStrategy(reconnectStrategy);
}


DelegatingPubSubListener clientListener = new DelegatingPubSubListener();

// Configure pub/sub with callback for event-driven message delivery
var subConfigBuilder = ClusterSubscriptionConfiguration.builder();

// Set callback that delegates to our listener holder
subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context));
configBuilder.subscriptionConfiguration(subConfigBuilder.build());


// Build and create cluster client
GlideClusterClientConfiguration config = configBuilder.build();
return GlideClusterClient.createClient(config).get();
GlideClusterClient client = GlideClusterClient.createClient(config).get();

clientListenerMap.put(client, clientListener);

return client;
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to create GlideClusterClient: " + e.getMessage(), e);
}
Expand Down
Loading