Skip to content

add a new redis lock type that renewal expire #9557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package org.springframework.integration.support.locks;

import org.springframework.scheduling.TaskScheduler;

/**
* A {@link LockRegistry} implementing this interface supports the renewal
* of the time to live of a lock.
*
* @author Alexandre Strubel
* @author Artem Bilan
* @author Youbin Wu
*
* @since 5.4
*/
Expand All @@ -34,4 +37,14 @@ public interface RenewableLockRegistry extends LockRegistry {
*/
void renewLock(Object lockKey);

/**
* Set the {@link TaskScheduler} to use for the renewal task.
* When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that
* the lock does not expire while the thread is working.
* @param renewalTaskScheduler renew task scheduler
* @since 6.4.0
*/
default void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
Expand All @@ -54,6 +55,8 @@
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.integration.support.locks.RenewableLockRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
Expand Down Expand Up @@ -89,11 +92,12 @@
* @author Myeonghyeon Lee
* @author Roman Zabaluev
* @author Alex Peelman
* @author Youbin Wu
*
* @since 4.0
*
*/
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry {

private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);

Expand Down Expand Up @@ -138,6 +142,8 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
private Executor executor =
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));

private TaskScheduler renewalTaskScheduler;

/**
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
* thus should not be shutdown when {@link #destroy()} is called
Expand Down Expand Up @@ -207,6 +213,12 @@ public void setExecutor(Executor executor) {
this.executorExplicitlySet = true;
}

@Override
public void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
Assert.notNull(renewalTaskScheduler, "'renewalTaskScheduler' must not be null");
this.renewalTaskScheduler = renewalTaskScheduler;
}

/**
* Set the capacity of cached locks.
* @param cacheCapacity The capacity of cached lock, (default 100_000).
Expand Down Expand Up @@ -291,6 +303,26 @@ public void destroy() {
}
}

@Override
public void renewLock(Object lockKey) {
String path = (String) lockKey;
RedisLock redisLock;
this.lock.lock();
try {
redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType));
}
finally {
this.lock.unlock();
}
if (redisLock == null) {
throw new IllegalStateException("Could not renew mutex at " + path);
}

if (!redisLock.renew()) {
throw new IllegalStateException("Could not renew mutex at " + path);
}
}

/**
* The mode in which this registry is going to work with locks.
*/
Expand Down Expand Up @@ -328,15 +360,28 @@ private abstract class RedisLock implements Lock {
return false
""";

private static final String RENEW_SCRIPT = """
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
redis.call('PEXPIRE', KEYS[1], ARGV[2])
return true
end
return false
""";

protected static final RedisScript<Boolean>
OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);

public static final RedisScript<Boolean>
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);

protected final String lockKey;

private final ReentrantLock localLock = new ReentrantLock();

private volatile long lockedAt;

private volatile ScheduledFuture<?> renewFuture;

private RedisLock(String path) {
this.lockKey = constructLockKey(path);
}
Expand Down Expand Up @@ -454,6 +499,10 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
LOGGER.debug("Acquired lock; " + this);
}
this.lockedAt = System.currentTimeMillis();
if (RedisLockRegistry.this.renewalTaskScheduler != null) {
Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3);
this.renewFuture = RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(this::renew, delay);
}
}
return acquired;
}
Expand Down Expand Up @@ -515,6 +564,7 @@ private void removeLockKey() {

if (Boolean.TRUE.equals(unlinkResult)) {
// Lock key successfully unlinked
this.stopRenew();
return;
}
else if (Boolean.FALSE.equals(unlinkResult)) {
Expand All @@ -526,6 +576,26 @@ else if (Boolean.FALSE.equals(unlinkResult)) {
throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
else {
this.stopRenew();
}
}

protected final boolean renew() {
boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)));
if (!res) {
this.stopRenew();
}
return res;
}

protected final void stopRenew() {
if (this.renewFuture != null) {
this.renewFuture.cancel(true);
this.renewFuture = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.springframework.integration.redis.RedisContainerTest;
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
Expand All @@ -66,6 +68,7 @@
* @author Artem Vozhdayenko
* @author Anton Gabov
* @author Eddie Cho
* @author Youbin Wu
*
* @since 4.0
*
Expand Down Expand Up @@ -427,6 +430,20 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testRenewalOnExpire(RedisLockType redisLockType) throws Exception {
long expireAfter = 300L;
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, expireAfter);
registry.setRenewalTaskScheduler(new SimpleAsyncTaskScheduler());
registry.setRedisLockType(redisLockType);
Lock lock1 = registry.obtain("foo");
assertThat(lock1.tryLock()).isTrue();
Thread.sleep(expireAfter * 2);
lock1.unlock();
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testEquals(RedisLockType testRedisLockType) {
Expand Down Expand Up @@ -900,6 +917,33 @@ void testTwoThreadsRemoveAndObtainSameLockSimultaneously(RedisLockType testRedis
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testLockRenew(RedisLockType redisLockType) {
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
registry.setRedisLockType(redisLockType);
final Lock lock = registry.obtain("foo");

assertThat(lock.tryLock()).isTrue();
try {
registry.renewLock("foo");
}
finally {
lock.unlock();
}
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testLockRenewLockNotOwned(RedisLockType redisLockType) {
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
registry.setRedisLockType(redisLockType);
registry.obtain("foo");

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> registry.renewLock("foo"));
}

@Test
void testInitialiseWithCustomExecutor() {
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey");
Expand Down
5 changes: 4 additions & 1 deletion src/reference/antora/modules/ROOT/pages/redis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -856,4 +856,7 @@ Default.
The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process.
However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.

Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.

Starting with version 6.4, a method `RedisLockRegistry.setRenewalTaskScheduler` is added to configure the scheduler for automatic renewal of locks.
When it is set, the lock will be automatically renewed every 1/3 of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed.
3 changes: 3 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ See xref:zeromq.adoc[ZeroMQ Support] for more information.
Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
See xref:redis.adoc[Redis Support] for more information.

Add a `RedisLockRegistry.setRenewalTaskScheduler` to automatic renewal lock.
See xref:redis.adoc[Redis Support] for more information.

[[x6.4-groovy-changes]]
=== Groovy Changes

Expand Down
Loading