Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<springdata.parent.version>3.5.1</springdata.parent.version>

<!-- Valkey/Redis client versions -->
<valkey-glide.version>2.1.1</valkey-glide.version>
<valkey-glide.version>2.3.0-rc2</valkey-glide.version>
<lettuce.version>6.6.0.RELEASE</lettuce.version>
<jedis.version>6.0.0</jedis.version>
<pool.version>2.11.1</pool.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.valkey.springframework.data.valkey.connection.jedis.JedisConnectionFactory;
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory;
import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory;

/**
* Utilities for examining a {@link ValkeyConnection}
Expand All @@ -27,7 +28,8 @@
public abstract class ConnectionUtils {

public static boolean isAsync(ValkeyConnectionFactory connectionFactory) {
return (connectionFactory instanceof LettuceConnectionFactory);
return connectionFactory instanceof LettuceConnectionFactory
|| connectionFactory instanceof ValkeyGlideConnectionFactory;
}

public static boolean isLettuce(ValkeyConnectionFactory connectionFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,31 @@ public class ValkeyGlideClusterConnection extends ValkeyGlideConnection implemen
private final ValkeyGlideClusterSetCommands clusterSetCommands;

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter) {
this(clusterAdapter, null, Duration.ofMillis(100));
this(clusterAdapter, null, null, Duration.ofMillis(100));
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory) {
this(clusterAdapter, factory, Duration.ofMillis(100));
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just appears this way on github, see + signs beside

@Nullable ValkeyGlideConnectionFactory factory) {
this(clusterAdapter, factory, null, Duration.ofMillis(100));
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory, Duration cacheTimeout) {
super(clusterAdapter, factory);
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looks like it bc of github

@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener pubSubListener) {
this(clusterAdapter, factory, pubSubListener, Duration.ofMillis(100));
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
@Nullable ValkeyGlideConnectionFactory factory,
Duration cacheTimeout) {
this(clusterAdapter, factory, null, cacheTimeout);
}

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener pubSubListener,
Duration cacheTimeout) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not as the last param?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

super(clusterAdapter, factory, pubSubListener);
Assert.notNull(cacheTimeout, "CacheTimeout must not be null!");

this.clusterAdapter = clusterAdapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection {

protected final UnifiedGlideClient unifiedClient;
protected final @Nullable ValkeyGlideConnectionFactory factory;
protected final @Nullable DelegatingPubSubListener pubSubListener;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment why do we "Delegate" (we are using glide's callback which is supplied on the creation , yada yda)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

private final AtomicBoolean closed = new AtomicBoolean(false);

private final List<ResultMapper<?, ?>> batchCommandsConverters = new ArrayList<>();
private final Set<byte[]> watchedKeys = new HashSet<>();
private @Nullable Subscription subscription;
private volatile @Nullable ValkeyGlideSubscription subscription;

// Command interfaces
private final ValkeyGlideKeyCommands keyCommands;
Expand All @@ -90,10 +91,24 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection {
* @param factory the connection factory (optional, for pooling support)
*/
public ValkeyGlideConnection(UnifiedGlideClient unifiedClient, @Nullable ValkeyGlideConnectionFactory factory) {
this(unifiedClient, factory, null);
}

/**
* Creates a new {@link ValkeyGlideConnection} with a unified client adapter and pub/sub listener.
*
* @param unifiedClient unified client adapter (standalone or cluster)
* @param factory the connection factory (optional, for pooling support)
* @param pubSubListener the delegating pub/sub listener for callback-based message delivery
*/
public ValkeyGlideConnection(UnifiedGlideClient unifiedClient,
@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener pubSubListener) {
Assert.notNull(unifiedClient, "UnifiedClient must not be null");

this.unifiedClient = unifiedClient;
this.factory = factory;
this.pubSubListener = pubSubListener;

// Initialize command interfaces
this.keyCommands = new ValkeyGlideKeyCommands(this);
Expand Down Expand Up @@ -184,12 +199,22 @@ public ValkeyCommands commands() {
public void close() throws DataAccessException {
try {
if (closed.compareAndSet(false, true)) {
// Close subscription first
ValkeyGlideSubscription sub = this.subscription;
if (sub != null) {
sub.close();
this.subscription = null;
}

cleanupConnectionState();
// Return client to pool instead of closing it
factory.releaseClient(unifiedClient.getNativeClient());
if (subscription != null) {
subscription.close();
subscription = null;

if (pubSubListener != null) {
pubSubListener.clearListener();
}

// Return client to pool
if (factory != null) {
factory.releaseClient(unifiedClient.getNativeClient());
}
}
} catch (Exception ex) {
Expand All @@ -212,10 +237,6 @@ protected void cleanupConnectionState() {
@SuppressWarnings("unchecked")
Callable<Void>[] actions = new Callable[] {
() -> nativeClient.customCommand(new String[]{"UNWATCH"}).get(),
// TODO: Uncomment when dynamic pubsub is implemented
// () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get()
};

for (Callable<Void> action : actions) {
Expand Down Expand Up @@ -431,8 +452,26 @@ public void subscribe(MessageListener listener, byte[]... channels) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}

// TODO: Implement dynamic subscription management when supported by valkey-glide
throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented");
if (pubSubListener == null) {
throw new InvalidDataAccessApiUsageException(
"Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support.");
}

try {
pubSubListener.setListener(listener);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegating

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed


ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription(
listener, unifiedClient, pubSubListener);
this.subscription = glideSubscription;
glideSubscription.subscribe(channels);

} catch (Exception ex) {
if (pubSubListener != null) {
pubSubListener.clearListener();
}
this.subscription = null;
throw new ValkeyGlideExceptionConverter().convert(ex);
}
}

@Override
Expand All @@ -449,9 +488,27 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}

if (pubSubListener == null) {
throw new InvalidDataAccessApiUsageException(
"Pub/Sub not configured. Ensure the connection factory was created with pub/sub callback support.");
}

// TODO: Implement dynamic subscription management when supported by valkey-glide
throw new UnsupportedOperationException("Dynamic subscriptions not yet implemented");
try {
pubSubListener.setListener(listener);

ValkeyGlideSubscription glideSubscription = new ValkeyGlideSubscription(
listener, unifiedClient, pubSubListener);
this.subscription = glideSubscription;
glideSubscription.pSubscribe(patterns);

} catch (Exception ex) {
if (pubSubListener != null) {
pubSubListener.clearListener();
}
this.subscription = null;
throw new ValkeyGlideExceptionConverter().convert(ex);
}
}

@Override
Expand All @@ -461,7 +518,8 @@ public Subscription getSubscription() {

@Override
public boolean isSubscribed() {
return subscription != null && subscription.isAlive();
Subscription sub = this.subscription;
return sub != null && sub.isAlive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@
// Imports for valkey-glide library
import glide.api.GlideClient;
import glide.api.GlideClusterClient;
import glide.api.models.GlideString;
import glide.api.models.configuration.AdvancedGlideClientConfiguration;
import glide.api.models.configuration.AdvancedGlideClusterClientConfiguration;
import glide.api.models.configuration.BackoffStrategy;
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
import glide.api.models.configuration.ClusterSubscriptionConfiguration;

import java.util.Map;
import java.util.Set;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand Down Expand Up @@ -91,6 +97,8 @@ public class ValkeyGlideConnectionFactory
private boolean earlyStartup = true;
private int phase = 0;

private final Map<Object, DelegatingPubSubListener> clientListenerMap = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment what is the Object and what for

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


/**
* Constructs a new {@link ValkeyGlideConnectionFactory} instance with default settings.
*/
Expand Down Expand Up @@ -235,8 +243,11 @@ public ValkeyConnection getConnection() {
client = createGlideClient();
}

// Get the listener associated with this client
DelegatingPubSubListener listener = clientListenerMap.get(client);

// Return a new connection wrapper around the pooled client
return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this);
return new ValkeyGlideConnection(new StandaloneGlideClientAdapter(client), this, listener);
}

@Override
Expand All @@ -253,8 +264,11 @@ public ValkeyClusterConnection getClusterConnection() {
client = createGlideClusterClient();
}

// Get the listener associated with this client
DelegatingPubSubListener listener = clientListenerMap.get(client);

// Return a new connection wrapper around the pooled cluster client
return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this);
return new ValkeyGlideClusterConnection(new ClusterGlideClientAdapter(client), this, listener);
}

@Override
Expand Down Expand Up @@ -378,10 +392,26 @@ private GlideClient createGlideClient() {
if (reconnectStrategy != null) {
configBuilder.reconnectStrategy(reconnectStrategy);
}

// Pubsub listener
DelegatingPubSubListener clientListener = new DelegatingPubSubListener();


// Configure pub/sub with callback for event-driven message delivery
var subConfigBuilder = StandaloneSubscriptionConfiguration.builder();

// Set callback that delegates to our listener holder
subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context));
configBuilder.subscriptionConfiguration(subConfigBuilder.build());

// Build and create client
GlideClientConfiguration config = configBuilder.build();
return GlideClient.createClient(config).get();
GlideClient client = GlideClient.createClient(config).get();

// Save the mapping of this client to its DelegatingListener
clientListenerMap.put(client, clientListener);

return client;
} catch (Exception e) {
throw new IllegalStateException("Failed to create GlideClient: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -477,10 +507,24 @@ private GlideClusterClient createGlideClusterClient() {
configBuilder.reconnectStrategy(reconnectStrategy);
}


DelegatingPubSubListener clientListener = new DelegatingPubSubListener();

// Configure pub/sub with callback for event-driven message delivery
var subConfigBuilder = ClusterSubscriptionConfiguration.builder();

// Set callback that delegates to our listener holder
subConfigBuilder.callback((msg, context) -> clientListener.onMessage(msg, context));
configBuilder.subscriptionConfiguration(subConfigBuilder.build());


// Build and create cluster client
GlideClusterClientConfiguration config = configBuilder.build();
return GlideClusterClient.createClient(config).get();
GlideClusterClient client = GlideClusterClient.createClient(config).get();

clientListenerMap.put(client, clientListener);

return client;
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to create GlideClusterClient: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2011-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.valkey.springframework.data.valkey.connection.valkeyglide;

import glide.api.models.PubSubMessage;
import glide.api.models.GlideString;

import io.valkey.springframework.data.valkey.connection.DefaultMessage;
import io.valkey.springframework.data.valkey.connection.MessageListener;
import io.valkey.springframework.data.valkey.connection.SubscriptionListener;

/**
* A delegating pub/sub listener that is configured at client creation time,
* with the actual listener set later when subscribe() is called.
*/
class DelegatingPubSubListener {

private volatile MessageListener messageListener;
private volatile SubscriptionListener subscriptionListener =
SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really required

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


/**
* Called by Glide when a pub/sub message arrives.
*/
void onMessage(PubSubMessage msg, Object context) {
MessageListener listener = this.messageListener;
if (listener != null && msg != null) {
byte[] channel = msg.getChannel().getBytes();
byte[] body = msg.getMessage().getBytes();
byte[] pattern = msg.getPattern()
.map(GlideString::getBytes)
.orElse(null);

listener.onMessage(new DefaultMessage(channel, body), pattern);

}

}

/**
* Set the actual listener when subscribe() is called.
*/
void setListener(MessageListener listener) {
this.messageListener = listener;
}

/**
* Clear the listener when subscription closes.
*/
void clearListener() {
this.messageListener = null;
}

SubscriptionListener getSubscriptionListener() {
return subscriptionListener;
}

boolean hasListener() {
return messageListener != null;
}
}
Loading