Skip to content

Commit 9fdba13

Browse files
committed
redis lock adds an automatic renewal function
1 parent 1de81b6 commit 9fdba13

File tree

5 files changed

+135
-2
lines changed

5 files changed

+135
-2
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

+13
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import org.springframework.scheduling.TaskScheduler;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the renewal
2123
* of the time to live of a lock.
2224
*
2325
* @author Alexandre Strubel
2426
* @author Artem Bilan
27+
* @author Youbin Wu
2528
*
2629
* @since 5.4
2730
*/
@@ -34,4 +37,14 @@ public interface RenewableLockRegistry extends LockRegistry {
3437
*/
3538
void renewLock(Object lockKey);
3639

40+
/**
41+
* Set the {@link TaskScheduler} to use for the renewal task.
42+
* When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that
43+
* the lock does not expire while the thread is working.
44+
* @param renewalTaskScheduler renew task scheduler
45+
* @since 6.4.0
46+
*/
47+
default void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
48+
}
49+
3750
}

Diff for: spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Executors;
3434
import java.util.concurrent.Future;
35+
import java.util.concurrent.ScheduledFuture;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
3738
import java.util.concurrent.locks.Condition;
@@ -54,6 +55,8 @@
5455
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5556
import org.springframework.data.redis.listener.Topic;
5657
import org.springframework.integration.support.locks.ExpirableLockRegistry;
58+
import org.springframework.integration.support.locks.RenewableLockRegistry;
59+
import org.springframework.scheduling.TaskScheduler;
5760
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5861
import org.springframework.util.Assert;
5962
import org.springframework.util.ReflectionUtils;
@@ -89,11 +92,12 @@
8992
* @author Myeonghyeon Lee
9093
* @author Roman Zabaluev
9194
* @author Alex Peelman
95+
* @author Youbin Wu
9296
*
9397
* @since 4.0
9498
*
9599
*/
96-
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
100+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry {
97101

98102
private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);
99103

@@ -138,6 +142,8 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
138142
private Executor executor =
139143
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
140144

145+
private TaskScheduler renewalTaskScheduler;
146+
141147
/**
142148
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
143149
* thus should not be shutdown when {@link #destroy()} is called
@@ -207,6 +213,12 @@ public void setExecutor(Executor executor) {
207213
this.executorExplicitlySet = true;
208214
}
209215

216+
@Override
217+
public void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
218+
Assert.notNull(renewalTaskScheduler, "'renewalTaskScheduler' must not be null");
219+
this.renewalTaskScheduler = renewalTaskScheduler;
220+
}
221+
210222
/**
211223
* Set the capacity of cached locks.
212224
* @param cacheCapacity The capacity of cached lock, (default 100_000).
@@ -291,6 +303,26 @@ public void destroy() {
291303
}
292304
}
293305

306+
@Override
307+
public void renewLock(Object lockKey) {
308+
String path = (String) lockKey;
309+
RedisLock redisLock;
310+
this.lock.lock();
311+
try {
312+
redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType));
313+
}
314+
finally {
315+
this.lock.unlock();
316+
}
317+
if (redisLock == null) {
318+
throw new IllegalStateException("Could not renew mutex at " + path);
319+
}
320+
321+
if (!redisLock.renew()) {
322+
throw new IllegalStateException("Could not renew mutex at " + path);
323+
}
324+
}
325+
294326
/**
295327
* The mode in which this registry is going to work with locks.
296328
*/
@@ -328,15 +360,28 @@ private abstract class RedisLock implements Lock {
328360
return false
329361
""";
330362

363+
private static final String RENEW_SCRIPT = """
364+
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
365+
redis.call('PEXPIRE', KEYS[1], ARGV[2])
366+
return true
367+
end
368+
return false
369+
""";
370+
331371
protected static final RedisScript<Boolean>
332372
OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
333373

374+
public static final RedisScript<Boolean>
375+
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
376+
334377
protected final String lockKey;
335378

336379
private final ReentrantLock localLock = new ReentrantLock();
337380

338381
private volatile long lockedAt;
339382

383+
private volatile ScheduledFuture<?> renewFuture;
384+
340385
private RedisLock(String path) {
341386
this.lockKey = constructLockKey(path);
342387
}
@@ -454,6 +499,10 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
454499
LOGGER.debug("Acquired lock; " + this);
455500
}
456501
this.lockedAt = System.currentTimeMillis();
502+
if (RedisLockRegistry.this.renewalTaskScheduler != null) {
503+
Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3);
504+
this.renewFuture = RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(this::renew, delay);
505+
}
457506
}
458507
return acquired;
459508
}
@@ -515,6 +564,7 @@ private void removeLockKey() {
515564

516565
if (Boolean.TRUE.equals(unlinkResult)) {
517566
// Lock key successfully unlinked
567+
this.stopRenew();
518568
return;
519569
}
520570
else if (Boolean.FALSE.equals(unlinkResult)) {
@@ -526,6 +576,26 @@ else if (Boolean.FALSE.equals(unlinkResult)) {
526576
throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
527577
"The integrity of data protected by this lock may have been compromised.");
528578
}
579+
else {
580+
this.stopRenew();
581+
}
582+
}
583+
584+
protected final boolean renew() {
585+
boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
586+
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
587+
RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)));
588+
if (!res) {
589+
this.stopRenew();
590+
}
591+
return res;
592+
}
593+
594+
protected final void stopRenew() {
595+
if (this.renewFuture != null) {
596+
this.renewFuture.cancel(true);
597+
this.renewFuture = null;
598+
}
529599
}
530600

531601
@Override

Diff for: spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import org.springframework.integration.redis.RedisContainerTest;
5252
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
5353
import org.springframework.integration.test.util.TestUtils;
54+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
5455

5556
import static org.assertj.core.api.Assertions.assertThat;
57+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5658
import static org.assertj.core.api.Assertions.assertThatNoException;
5759
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5860
import static org.mockito.Mockito.mock;
@@ -66,6 +68,7 @@
6668
* @author Artem Vozhdayenko
6769
* @author Anton Gabov
6870
* @author Eddie Cho
71+
* @author Youbin Wu
6972
*
7073
* @since 4.0
7174
*
@@ -427,6 +430,20 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
427430
registry.destroy();
428431
}
429432

433+
@ParameterizedTest
434+
@EnumSource(RedisLockType.class)
435+
void testRenewalOnExpire(RedisLockType redisLockType) throws Exception {
436+
long expireAfter = 300L;
437+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, expireAfter);
438+
registry.setRenewalTaskScheduler(new SimpleAsyncTaskScheduler());
439+
registry.setRedisLockType(redisLockType);
440+
Lock lock1 = registry.obtain("foo");
441+
assertThat(lock1.tryLock()).isTrue();
442+
Thread.sleep(expireAfter * 2);
443+
lock1.unlock();
444+
registry.destroy();
445+
}
446+
430447
@ParameterizedTest
431448
@EnumSource(RedisLockType.class)
432449
void testEquals(RedisLockType testRedisLockType) {
@@ -900,6 +917,33 @@ void testTwoThreadsRemoveAndObtainSameLockSimultaneously(RedisLockType testRedis
900917
registry.destroy();
901918
}
902919

920+
@ParameterizedTest
921+
@EnumSource(RedisLockType.class)
922+
void testLockRenew(RedisLockType redisLockType) {
923+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
924+
registry.setRedisLockType(redisLockType);
925+
final Lock lock = registry.obtain("foo");
926+
927+
assertThat(lock.tryLock()).isTrue();
928+
try {
929+
registry.renewLock("foo");
930+
}
931+
finally {
932+
lock.unlock();
933+
}
934+
}
935+
936+
@ParameterizedTest
937+
@EnumSource(RedisLockType.class)
938+
void testLockRenewLockNotOwned(RedisLockType redisLockType) {
939+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
940+
registry.setRedisLockType(redisLockType);
941+
registry.obtain("foo");
942+
943+
assertThatExceptionOfType(IllegalStateException.class)
944+
.isThrownBy(() -> registry.renewLock("foo"));
945+
}
946+
903947
@Test
904948
void testInitialiseWithCustomExecutor() {
905949
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey");

Diff for: src/reference/antora/modules/ROOT/pages/redis.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -856,4 +856,7 @@ Default.
856856
The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process.
857857
However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.
858858

859-
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
859+
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
860+
861+
Starting with version 6.4, a method `RedisLockRegistry.setRenewalTaskScheduler` is added to configure the scheduler for automatic renewal of locks.
862+
When it is set, the lock will be automatically renewed every 1/3 of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed.

Diff for: src/reference/antora/modules/ROOT/pages/whats-new.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ See xref:zeromq.adoc[ZeroMQ Support] for more information.
6161
Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
6262
See xref:redis.adoc[Redis Support] for more information.
6363

64+
Add a `RedisLockRegistry.setRenewalTaskScheduler` to automatic renewal lock.
65+
See xref:redis.adoc[Redis Support] for more information.
66+
6467
[[x6.4-groovy-changes]]
6568
=== Groovy Changes
6669

0 commit comments

Comments
 (0)