Skip to content

Commit c6000cb

Browse files
committed
add a new redis lock type that renewal expire
1 parent 5589daa commit c6000cb

File tree

2 files changed

+213
-4
lines changed

2 files changed

+213
-4
lines changed

Diff for: spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

+184-2
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
import java.util.UUID;
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
3031
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.Executor;
3233
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.Executors;
3435
import java.util.concurrent.Future;
36+
import java.util.concurrent.ScheduledExecutorService;
37+
import java.util.concurrent.ScheduledFuture;
38+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3539
import java.util.concurrent.TimeUnit;
3640
import java.util.concurrent.TimeoutException;
3741
import java.util.concurrent.locks.Condition;
@@ -301,6 +305,11 @@ public enum RedisLockType {
301305
*/
302306
SPIN_LOCK,
303307

308+
/**
309+
* The lock is acquired by periodically(idleBetweenTries property) checking whether the lock can be acquired and schedule renewal expire time
310+
*/
311+
SPIN_LOCK_WITH_RENEWAL,
312+
304313
/**
305314
* The lock is acquired by redis pub-sub subscription.
306315
*/
@@ -311,10 +320,11 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
311320
return switch (redisLockType) {
312321
case SPIN_LOCK -> RedisSpinLock::new;
313322
case PUB_SUB_LOCK -> RedisPubSubLock::new;
323+
case SPIN_LOCK_WITH_RENEWAL -> RedisSpinLockWithRenewal::new;
314324
};
315325
}
316326

317-
private abstract class RedisLock implements Lock {
327+
public abstract class RedisLock implements Lock {
318328

319329
private static final String OBTAIN_LOCK_SCRIPT = """
320330
local lockClientId = redis.call('GET', KEYS[1])
@@ -724,7 +734,7 @@ private void unlockNotify(String lockKey) {
724734

725735
}
726736

727-
private final class RedisSpinLock extends RedisLock {
737+
private class RedisSpinLock extends RedisLock {
728738

729739
private static final String UNLINK_UNLOCK_SCRIPT = """
730740
local lockClientId = redis.call('GET', KEYS[1])
@@ -791,4 +801,176 @@ private boolean removeLockKeyWithScript(RedisScript<Boolean> redisScript) {
791801

792802
}
793803

804+
public final class RedisSpinLockWithRenewal extends RedisSpinLock {
805+
806+
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
807+
808+
private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10,
809+
new CustomizableThreadFactory("redis-lock-renewal-schedule"));
810+
811+
private static final String RENEW_SCRIPT = """
812+
if (redis.call('GET', KEYS[1]) == ARGV[2]) then
813+
redis.call('PEXPIRE', KEYS[1], ARGV[1])
814+
return true
815+
end
816+
return false
817+
""";
818+
819+
public static final RedisScript<Boolean>
820+
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
821+
822+
private RedisSpinLockWithRenewal(String path) {
823+
super(path);
824+
}
825+
826+
@Override
827+
protected boolean tryRedisLockInner(long time) throws InterruptedException {
828+
boolean res = super.tryRedisLockInner(time);
829+
if (res) {
830+
scheduleExpirationRenewal(Thread.currentThread().getId());
831+
}
832+
return res;
833+
}
834+
835+
@Override
836+
protected boolean removeLockKeyInnerUnlink() {
837+
boolean result = super.removeLockKeyInnerUnlink();
838+
if (result) {
839+
cancelExpirationRenewal(Thread.currentThread().getId());
840+
}
841+
return result;
842+
}
843+
844+
@Override
845+
protected boolean removeLockKeyInnerDelete() {
846+
boolean result = super.removeLockKeyInnerDelete();
847+
if (result) {
848+
cancelExpirationRenewal(Thread.currentThread().getId());
849+
}
850+
return result;
851+
}
852+
853+
protected void scheduleExpirationRenewal(long threadId) {
854+
ExpirationEntry entry = new ExpirationEntry();
855+
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(lockKey, entry);
856+
if (oldEntry != null) {
857+
oldEntry.addThreadId(threadId);
858+
}
859+
else {
860+
entry.addThreadId(threadId);
861+
renewExpiration();
862+
}
863+
}
864+
865+
private void renewExpiration() {
866+
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(lockKey);
867+
if (ee == null) {
868+
return;
869+
}
870+
Runnable task = () -> {
871+
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(lockKey);
872+
if (ent == null) {
873+
return;
874+
}
875+
Long threadId = ent.getFirstThreadId();
876+
if (threadId == null) {
877+
return;
878+
}
879+
880+
boolean res = renewKeyWithScript(RENEW_REDIS_SCRIPT);
881+
if (res) {
882+
renewExpiration();
883+
}
884+
else {
885+
EXPIRATION_RENEWAL_MAP.remove(lockKey);
886+
}
887+
888+
};
889+
ScheduledFuture<?> timeout = scheduledExecutorService.schedule(task, RedisLockRegistry.this.expireAfter / 3,
890+
TimeUnit.MILLISECONDS);
891+
ee.setTimeout(timeout);
892+
}
893+
894+
private void cancelExpirationRenewal(Long threadId) {
895+
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(lockKey);
896+
if (task == null) {
897+
return;
898+
}
899+
900+
if (threadId != null) {
901+
task.removeThreadId(threadId);
902+
}
903+
904+
if (threadId == null || task.hasNoThreads()) {
905+
ScheduledFuture<?> timeout = task.getTimeout();
906+
if (timeout != null) {
907+
timeout.cancel(true);
908+
}
909+
EXPIRATION_RENEWAL_MAP.remove(lockKey);
910+
}
911+
}
912+
913+
public boolean renewKeyWithScript(RedisScript<Boolean> redisScript) {
914+
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
915+
redisScript, Collections.singletonList(this.lockKey), String.valueOf(RedisLockRegistry.this.expireAfter),
916+
RedisLockRegistry.this.clientId));
917+
}
918+
919+
static class ExpirationEntry {
920+
921+
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
922+
923+
private ScheduledFuture<?> timeout;
924+
925+
ExpirationEntry() {
926+
super();
927+
}
928+
929+
public synchronized void addThreadId(long threadId) {
930+
Integer counter = this.threadIds.get(threadId);
931+
if (counter == null) {
932+
counter = 1;
933+
}
934+
else {
935+
counter++;
936+
}
937+
this.threadIds.put(threadId, counter);
938+
}
939+
940+
public synchronized boolean hasNoThreads() {
941+
return this.threadIds.isEmpty();
942+
}
943+
944+
public synchronized Long getFirstThreadId() {
945+
if (this.threadIds.isEmpty()) {
946+
return null;
947+
}
948+
return this.threadIds.keySet().iterator().next();
949+
}
950+
951+
public synchronized void removeThreadId(long threadId) {
952+
Integer counter = this.threadIds.get(threadId);
953+
if (counter == null) {
954+
return;
955+
}
956+
counter--;
957+
if (counter == 0) {
958+
this.threadIds.remove(threadId);
959+
}
960+
else {
961+
this.threadIds.put(threadId, counter);
962+
}
963+
}
964+
965+
public ScheduledFuture<?> getTimeout() {
966+
return this.timeout;
967+
}
968+
969+
public void setTimeout(ScheduledFuture<?> timeout) {
970+
this.timeout = timeout;
971+
}
972+
}
973+
974+
}
975+
794976
}

Diff for: spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void testLock(RedisLockType testRedisLockType) {
118118
}
119119

120120
@ParameterizedTest
121-
@EnumSource(RedisLockType.class)
121+
@EnumSource(value = RedisLockType.class, mode = EnumSource.Mode.EXCLUDE, names = {"SPIN_LOCK_WITH_RENEWAL"})
122122
void testUnlockAfterLockStatusHasBeenExpired(RedisLockType testRedisLockType) throws InterruptedException {
123123
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
124124
registry.setRedisLockType(testRedisLockType);
@@ -396,7 +396,7 @@ void testTwoThreadsWrongOneUnlocks(RedisLockType testRedisLockType) throws Excep
396396
}
397397

398398
@ParameterizedTest
399-
@EnumSource(RedisLockType.class)
399+
@EnumSource(value = RedisLockType.class, mode = EnumSource.Mode.EXCLUDE, names = {"SPIN_LOCK_WITH_RENEWAL"})
400400
void testExpireTwoRegistries(RedisLockType testRedisLockType) throws Exception {
401401
RedisLockRegistry registry1 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
402402
registry1.setRedisLockType(testRedisLockType);
@@ -427,6 +427,25 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
427427
registry.destroy();
428428
}
429429

430+
@ParameterizedTest
431+
@EnumSource(RedisLockType.class)
432+
void testRenewalOnExpire(RedisLockType redisLockType) throws Exception {
433+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 3_000L);
434+
registry.setRedisLockType(redisLockType);
435+
Lock lock1 = registry.obtain("foo");
436+
assertThat(lock1.tryLock()).isTrue();
437+
waitForExpire2("foo");
438+
if (redisLockType == RedisLockType.SPIN_LOCK_WITH_RENEWAL) {
439+
lock1.unlock();
440+
}
441+
else {
442+
assertThatThrownBy(lock1::unlock)
443+
.isInstanceOf(ConcurrentModificationException.class)
444+
.hasMessageContaining("Lock was released in the store due to expiration.");
445+
}
446+
registry.destroy();
447+
}
448+
430449
@ParameterizedTest
431450
@EnumSource(RedisLockType.class)
432451
void testEquals(RedisLockType testRedisLockType) {
@@ -922,6 +941,14 @@ private void waitForExpire(String key) throws Exception {
922941
assertThat(n < 100).as(key + " key did not expire").isTrue();
923942
}
924943

944+
private void waitForExpire2(String key) throws Exception {
945+
StringRedisTemplate template = createTemplate();
946+
int n = 0;
947+
while (n++ < 50 && template.keys(this.registryKey + ":" + key).size() > 0) {
948+
Thread.sleep(100);
949+
}
950+
}
951+
925952
@SuppressWarnings("unchecked")
926953
private static Map<String, Lock> getRedisLockRegistryLocks(RedisLockRegistry registry) {
927954
return TestUtils.getPropertyValue(registry, "locks", Map.class);

0 commit comments

Comments
 (0)