diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java index 969f3c3ade5..38d4bee8d78 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java @@ -16,12 +16,15 @@ package org.springframework.integration.support.locks; +import org.springframework.scheduling.TaskScheduler; + /** * A {@link LockRegistry} implementing this interface supports the renewal * of the time to live of a lock. * * @author Alexandre Strubel * @author Artem Bilan + * @author Youbin Wu * * @since 5.4 */ @@ -34,4 +37,14 @@ public interface RenewableLockRegistry extends LockRegistry { */ void renewLock(Object lockKey); + /** + * Set the {@link TaskScheduler} to use for the renewal task. + * When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that + * the lock does not expire while the thread is working. + * @param renewalTaskScheduler renew task scheduler + * @since 6.4.0 + */ + default void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) { + } + } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 5b05127fa7d..191f5c94de3 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; @@ -54,6 +55,8 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.integration.support.locks.ExpirableLockRegistry; +import org.springframework.integration.support.locks.RenewableLockRegistry; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; @@ -89,11 +92,12 @@ * @author Myeonghyeon Lee * @author Roman Zabaluev * @author Alex Peelman + * @author Youbin Wu * * @since 4.0 * */ -public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean { +public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry { private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class); @@ -138,6 +142,8 @@ protected boolean removeEldestEntry(Entry eldest) { private Executor executor = Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-")); + private TaskScheduler renewalTaskScheduler; + /** * Flag to denote whether the {@link ExecutorService} was provided via the setter and * thus should not be shutdown when {@link #destroy()} is called @@ -207,6 +213,12 @@ public void setExecutor(Executor executor) { this.executorExplicitlySet = true; } + @Override + public void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) { + Assert.notNull(renewalTaskScheduler, "'renewalTaskScheduler' must not be null"); + this.renewalTaskScheduler = renewalTaskScheduler; + } + /** * Set the capacity of cached locks. * @param cacheCapacity The capacity of cached lock, (default 100_000). @@ -291,6 +303,26 @@ public void destroy() { } } + @Override + public void renewLock(Object lockKey) { + String path = (String) lockKey; + RedisLock redisLock; + this.lock.lock(); + try { + redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType)); + } + finally { + this.lock.unlock(); + } + if (redisLock == null) { + throw new IllegalStateException("Could not renew mutex at " + path); + } + + if (!redisLock.renew()) { + throw new IllegalStateException("Could not renew mutex at " + path); + } + } + /** * The mode in which this registry is going to work with locks. */ @@ -328,15 +360,28 @@ private abstract class RedisLock implements Lock { return false """; + private static final String RENEW_SCRIPT = """ + if (redis.call('GET', KEYS[1]) == ARGV[1]) then + redis.call('PEXPIRE', KEYS[1], ARGV[2]) + return true + end + return false + """; + protected static final RedisScript OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class); + public static final RedisScript + RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class); + protected final String lockKey; private final ReentrantLock localLock = new ReentrantLock(); private volatile long lockedAt; + private volatile ScheduledFuture renewFuture; + private RedisLock(String path) { this.lockKey = constructLockKey(path); } @@ -454,6 +499,10 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx LOGGER.debug("Acquired lock; " + this); } this.lockedAt = System.currentTimeMillis(); + if (RedisLockRegistry.this.renewalTaskScheduler != null) { + Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3); + this.renewFuture = RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(this::renew, delay); + } } return acquired; } @@ -515,6 +564,7 @@ private void removeLockKey() { if (Boolean.TRUE.equals(unlinkResult)) { // Lock key successfully unlinked + this.stopRenew(); return; } else if (Boolean.FALSE.equals(unlinkResult)) { @@ -526,6 +576,26 @@ else if (Boolean.FALSE.equals(unlinkResult)) { throw new ConcurrentModificationException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised."); } + else { + this.stopRenew(); + } + } + + protected final boolean renew() { + boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( + RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey), + RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter))); + if (!res) { + this.stopRenew(); + } + return res; + } + + protected final void stopRenew() { + if (this.renewFuture != null) { + this.renewFuture.cancel(true); + this.renewFuture = null; + } } @Override diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java index 15e9937dd57..bb0313a1f15 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java @@ -51,8 +51,10 @@ import org.springframework.integration.redis.RedisContainerTest; import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType; import org.springframework.integration.test.util.TestUtils; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -66,6 +68,7 @@ * @author Artem Vozhdayenko * @author Anton Gabov * @author Eddie Cho + * @author Youbin Wu * * @since 4.0 * @@ -427,6 +430,20 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception { registry.destroy(); } + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testRenewalOnExpire(RedisLockType redisLockType) throws Exception { + long expireAfter = 300L; + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, expireAfter); + registry.setRenewalTaskScheduler(new SimpleAsyncTaskScheduler()); + registry.setRedisLockType(redisLockType); + Lock lock1 = registry.obtain("foo"); + assertThat(lock1.tryLock()).isTrue(); + Thread.sleep(expireAfter * 2); + lock1.unlock(); + registry.destroy(); + } + @ParameterizedTest @EnumSource(RedisLockType.class) void testEquals(RedisLockType testRedisLockType) { @@ -900,6 +917,33 @@ void testTwoThreadsRemoveAndObtainSameLockSimultaneously(RedisLockType testRedis registry.destroy(); } + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testLockRenew(RedisLockType redisLockType) { + final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + registry.setRedisLockType(redisLockType); + final Lock lock = registry.obtain("foo"); + + assertThat(lock.tryLock()).isTrue(); + try { + registry.renewLock("foo"); + } + finally { + lock.unlock(); + } + } + + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testLockRenewLockNotOwned(RedisLockType redisLockType) { + final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + registry.setRedisLockType(redisLockType); + registry.obtain("foo"); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> registry.renewLock("foo")); + } + @Test void testInitialiseWithCustomExecutor() { RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey"); diff --git a/src/reference/antora/modules/ROOT/pages/redis.adoc b/src/reference/antora/modules/ROOT/pages/redis.adoc index 545777fb361..3087071d2b2 100644 --- a/src/reference/antora/modules/ROOT/pages/redis.adoc +++ b/src/reference/antora/modules/ROOT/pages/redis.adoc @@ -856,4 +856,7 @@ Default. The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process. However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment. -Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired. \ No newline at end of file +Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired. + +Starting with version 6.4, a method `RedisLockRegistry.setRenewalTaskScheduler` is added to configure the scheduler for automatic renewal of locks. +When it is set, the lock will be automatically renewed every 1/3 of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed. diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 3a0b552d4cb..7c14e5b5c9e 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -61,6 +61,9 @@ See xref:zeromq.adoc[ZeroMQ Support] for more information. Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired. See xref:redis.adoc[Redis Support] for more information. +Add a `RedisLockRegistry.setRenewalTaskScheduler` to automatic renewal lock. +See xref:redis.adoc[Redis Support] for more information. + [[x6.4-groovy-changes]] === Groovy Changes