Skip to content

Commit 5f99ff7

Browse files
committed
redis lock adds an automatic renewal function
1 parent 1de81b6 commit 5f99ff7

File tree

5 files changed

+139
-3
lines changed

5 files changed

+139
-3
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import org.springframework.scheduling.TaskScheduler;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the renewal
2123
* of the time to live of a lock.
2224
*
2325
* @author Alexandre Strubel
2426
* @author Artem Bilan
27+
* @author NaccOll
2528
*
2629
* @since 5.4
2730
*/
@@ -34,4 +37,11 @@ public interface RenewableLockRegistry extends LockRegistry {
3437
*/
3538
void renewLock(Object lockKey);
3639

40+
/**
41+
* Set the {@link TaskScheduler} to use for the renewal task.
42+
* when renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that the lock does not expire while the thread is working.
43+
* @param renewalTaskScheduler renew task scheduler
44+
* @since 6.4.0
45+
*/
46+
default void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler){}
3747
}

Diff for: spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

+79-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Executors;
3434
import java.util.concurrent.Future;
35+
import java.util.concurrent.ScheduledFuture;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
3738
import java.util.concurrent.locks.Condition;
@@ -54,6 +55,8 @@
5455
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5556
import org.springframework.data.redis.listener.Topic;
5657
import org.springframework.integration.support.locks.ExpirableLockRegistry;
58+
import org.springframework.integration.support.locks.RenewableLockRegistry;
59+
import org.springframework.scheduling.TaskScheduler;
5760
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5861
import org.springframework.util.Assert;
5962
import org.springframework.util.ReflectionUtils;
@@ -89,11 +92,12 @@
8992
* @author Myeonghyeon Lee
9093
* @author Roman Zabaluev
9194
* @author Alex Peelman
95+
* @author NaccOll
9296
*
9397
* @since 4.0
9498
*
9599
*/
96-
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
100+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry {
97101

98102
private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);
99103

@@ -138,6 +142,11 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
138142
private Executor executor =
139143
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
140144

145+
/**
146+
* An {@link TaskScheduler} to call {@link RenewableLockRegistry#renewLock(Object)} after acquire lock
147+
*/
148+
private TaskScheduler renewalTaskScheduler;
149+
141150
/**
142151
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
143152
* thus should not be shutdown when {@link #destroy()} is called
@@ -207,6 +216,17 @@ public void setExecutor(Executor executor) {
207216
this.executorExplicitlySet = true;
208217
}
209218

219+
/**
220+
* Set the {@link TaskScheduler} to use for the renewal task.
221+
* when renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that the lock does not expire while the thread is working.
222+
* @param renewalTaskScheduler renew task scheduler
223+
* @since 6.4.0
224+
*/
225+
@Override
226+
public void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
227+
this.renewalTaskScheduler = renewalTaskScheduler;
228+
}
229+
210230
/**
211231
* Set the capacity of cached locks.
212232
* @param cacheCapacity The capacity of cached lock, (default 100_000).
@@ -291,6 +311,27 @@ public void destroy() {
291311
}
292312
}
293313

314+
@Override
315+
public void renewLock(Object lockKey) {
316+
String path = (String) lockKey;
317+
RedisLock redisLock;
318+
this.lock.lock();
319+
try {
320+
redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType));
321+
}
322+
finally {
323+
this.lock.unlock();
324+
}
325+
if (redisLock == null) {
326+
throw new IllegalStateException("Could not renew mutex at " + path);
327+
}
328+
329+
if (!redisLock.renew()) {
330+
redisLock.stopRenew();
331+
throw new IllegalStateException("Could not renew mutex at " + path);
332+
}
333+
}
334+
294335
/**
295336
* The mode in which this registry is going to work with locks.
296337
*/
@@ -328,16 +369,32 @@ private abstract class RedisLock implements Lock {
328369
return false
329370
""";
330371

372+
private static final String RENEW_SCRIPT = """
373+
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
374+
redis.call('PEXPIRE', KEYS[1], ARGV[2])
375+
return true
376+
end
377+
return false
378+
""";
379+
331380
protected static final RedisScript<Boolean>
332381
OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
333382

383+
public static final RedisScript<Boolean>
384+
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
385+
386+
protected final String rawKey;
387+
334388
protected final String lockKey;
335389

336390
private final ReentrantLock localLock = new ReentrantLock();
337391

338392
private volatile long lockedAt;
339393

394+
private volatile ScheduledFuture<?> renewFuture;
395+
340396
private RedisLock(String path) {
397+
this.rawKey = path;
341398
this.lockKey = constructLockKey(path);
342399
}
343400

@@ -454,6 +511,10 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
454511
LOGGER.debug("Acquired lock; " + this);
455512
}
456513
this.lockedAt = System.currentTimeMillis();
514+
if (RedisLockRegistry.this.renewalTaskScheduler != null) {
515+
Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3);
516+
this.renewFuture = RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(() -> renewLock(this.rawKey), delay);
517+
}
457518
}
458519
return acquired;
459520
}
@@ -515,6 +576,7 @@ private void removeLockKey() {
515576

516577
if (Boolean.TRUE.equals(unlinkResult)) {
517578
// Lock key successfully unlinked
579+
this.stopRenew();
518580
return;
519581
}
520582
else if (Boolean.FALSE.equals(unlinkResult)) {
@@ -526,6 +588,22 @@ else if (Boolean.FALSE.equals(unlinkResult)) {
526588
throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
527589
"The integrity of data protected by this lock may have been compromised.");
528590
}
591+
else {
592+
this.stopRenew();
593+
}
594+
}
595+
596+
public final boolean renew() {
597+
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
598+
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
599+
RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)));
600+
}
601+
602+
public final void stopRenew() {
603+
if (this.renewFuture != null) {
604+
this.renewFuture.cancel(false);
605+
this.renewFuture = null;
606+
}
529607
}
530608

531609
@Override

Diff for: spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import org.springframework.integration.redis.RedisContainerTest;
5252
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
5353
import org.springframework.integration.test.util.TestUtils;
54+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
5455

5556
import static org.assertj.core.api.Assertions.assertThat;
57+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5658
import static org.assertj.core.api.Assertions.assertThatNoException;
5759
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5860
import static org.mockito.Mockito.mock;
@@ -66,6 +68,7 @@
6668
* @author Artem Vozhdayenko
6769
* @author Anton Gabov
6870
* @author Eddie Cho
71+
* @author NaccOll
6972
*
7073
* @since 4.0
7174
*
@@ -427,6 +430,20 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
427430
registry.destroy();
428431
}
429432

433+
@ParameterizedTest
434+
@EnumSource(RedisLockType.class)
435+
void testRenewalOnExpire(RedisLockType redisLockType) throws Exception {
436+
long expireAfter = 3_000L;
437+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, expireAfter);
438+
registry.setRenewalTaskScheduler(new SimpleAsyncTaskScheduler());
439+
registry.setRedisLockType(redisLockType);
440+
Lock lock1 = registry.obtain("foo");
441+
assertThat(lock1.tryLock()).isTrue();
442+
Thread.sleep(expireAfter * 2);
443+
lock1.unlock();
444+
registry.destroy();
445+
}
446+
430447
@ParameterizedTest
431448
@EnumSource(RedisLockType.class)
432449
void testEquals(RedisLockType testRedisLockType) {
@@ -900,6 +917,33 @@ void testTwoThreadsRemoveAndObtainSameLockSimultaneously(RedisLockType testRedis
900917
registry.destroy();
901918
}
902919

920+
@ParameterizedTest
921+
@EnumSource(RedisLockType.class)
922+
void testLockRenew(RedisLockType redisLockType) {
923+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
924+
registry.setRedisLockType(redisLockType);
925+
final Lock lock = registry.obtain("foo");
926+
927+
assertThat(lock.tryLock()).isTrue();
928+
try {
929+
registry.renewLock("foo");
930+
}
931+
finally {
932+
lock.unlock();
933+
}
934+
}
935+
936+
@ParameterizedTest
937+
@EnumSource(RedisLockType.class)
938+
void testLockRenewLockNotOwned(RedisLockType redisLockType) {
939+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
940+
registry.setRedisLockType(redisLockType);
941+
registry.obtain("foo");
942+
943+
assertThatExceptionOfType(IllegalStateException.class)
944+
.isThrownBy(() -> registry.renewLock("foo"));
945+
}
946+
903947
@Test
904948
void testInitialiseWithCustomExecutor() {
905949
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey");

Diff for: src/reference/antora/modules/ROOT/pages/redis.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -856,4 +856,6 @@ Default.
856856
The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process.
857857
However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.
858858

859-
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
859+
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
860+
861+
Starting with version 6.4, a method `RedisLockRegistry.setRenewalTaskScheduler` is added to configure the scheduler for automatic renewal of locks. When it is set, the lock will be automatically renewed every 1/3 of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed

Diff for: src/reference/antora/modules/ROOT/pages/whats-new.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ See xref:zeromq.adoc[ZeroMQ Support] for more information.
6161
Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
6262
See xref:redis.adoc[Redis Support] for more information.
6363

64+
A method `RedisLockRegistry.setRenewalTaskScheduler` is added to configure the scheduler for automatic renewal of locks. When it is set, the lock will be automatically renewed every 1/3 of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed
65+
6466
[[x6.4-groovy-changes]]
6567
=== Groovy Changes
6668

0 commit comments

Comments
 (0)