Skip to content

Commit 6ae7d9a

Browse files
committed
RedisLockRegistry implements RenewableLockRegistry
1 parent 3803b57 commit 6ae7d9a

File tree

2 files changed

+83
-193
lines changed

2 files changed

+83
-193
lines changed

Diff for: spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

+63-183
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,12 @@
2727
import java.util.UUID;
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentMap;
3130
import java.util.concurrent.ExecutionException;
3231
import java.util.concurrent.Executor;
3332
import java.util.concurrent.ExecutorService;
3433
import java.util.concurrent.Executors;
3534
import java.util.concurrent.Future;
3635
import java.util.concurrent.ScheduledExecutorService;
37-
import java.util.concurrent.ScheduledFuture;
38-
import java.util.concurrent.ScheduledThreadPoolExecutor;
3936
import java.util.concurrent.TimeUnit;
4037
import java.util.concurrent.TimeoutException;
4138
import java.util.concurrent.locks.Condition;
@@ -58,6 +55,7 @@
5855
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5956
import org.springframework.data.redis.listener.Topic;
6057
import org.springframework.integration.support.locks.ExpirableLockRegistry;
58+
import org.springframework.integration.support.locks.RenewableLockRegistry;
6159
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
6260
import org.springframework.util.Assert;
6361
import org.springframework.util.ReflectionUtils;
@@ -97,7 +95,7 @@
9795
* @since 4.0
9896
*
9997
*/
100-
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
98+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry {
10199

102100
private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);
103101

@@ -142,6 +140,11 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
142140
private Executor executor =
143141
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
144142

143+
/**
144+
* renewal scheduleExecutor
145+
*/
146+
private ScheduledExecutorService renewalScheduledExecutor;
147+
145148
/**
146149
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
147150
* thus should not be shutdown when {@link #destroy()} is called
@@ -211,6 +214,14 @@ public void setExecutor(Executor executor) {
211214
this.executorExplicitlySet = true;
212215
}
213216

217+
/**
218+
* Set the {@link ScheduledExecutorService}, where is not provided then renewal not work
219+
* @param renewalScheduledExecutor the executor service
220+
*/
221+
public void setRenewalScheduledExecutor(ScheduledExecutorService renewalScheduledExecutor) {
222+
this.renewalScheduledExecutor = renewalScheduledExecutor;
223+
}
224+
214225
/**
215226
* Set the capacity of cached locks.
216227
* @param cacheCapacity The capacity of cached lock, (default 100_000).
@@ -293,6 +304,29 @@ public void destroy() {
293304
throw new IllegalStateException(ex);
294305
}
295306
}
307+
if (renewalScheduledExecutor != null) {
308+
renewalScheduledExecutor.shutdown();;
309+
}
310+
}
311+
312+
@Override
313+
public void renewLock(Object lockKey) {
314+
String path = (String) lockKey;
315+
RedisLock redisLock;
316+
this.lock.lock();
317+
try {
318+
redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType));
319+
}
320+
finally {
321+
this.lock.unlock();
322+
}
323+
if (redisLock == null) {
324+
throw new IllegalStateException("Could not renew mutex at " + path);
325+
}
326+
327+
if (!redisLock.renew()) {
328+
throw new IllegalStateException("Could not renew mutex at " + path);
329+
}
296330
}
297331

298332
/**
@@ -305,11 +339,6 @@ public enum RedisLockType {
305339
*/
306340
SPIN_LOCK,
307341

308-
/**
309-
* The lock is acquired by periodically(idleBetweenTries property) checking whether the lock can be acquired and schedule renewal expire time
310-
*/
311-
SPIN_LOCK_WITH_RENEWAL,
312-
313342
/**
314343
* The lock is acquired by redis pub-sub subscription.
315344
*/
@@ -320,7 +349,6 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
320349
return switch (redisLockType) {
321350
case SPIN_LOCK -> RedisSpinLock::new;
322351
case PUB_SUB_LOCK -> RedisPubSubLock::new;
323-
case SPIN_LOCK_WITH_RENEWAL -> RedisSpinLockWithRenewal::new;
324352
};
325353
}
326354

@@ -338,9 +366,20 @@ private abstract class RedisLock implements Lock {
338366
return false
339367
""";
340368

369+
private static final String RENEW_SCRIPT = """
370+
if (redis.call('GET', KEYS[1]) == ARGV[2]) then
371+
redis.call('PEXPIRE', KEYS[1], ARGV[1])
372+
return true
373+
end
374+
return false
375+
""";
376+
341377
protected static final RedisScript<Boolean>
342378
OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
343379

380+
public static final RedisScript<Boolean>
381+
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
382+
344383
protected final String lockKey;
345384

346385
private final ReentrantLock localLock = new ReentrantLock();
@@ -464,6 +503,13 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
464503
LOGGER.debug("Acquired lock; " + this);
465504
}
466505
this.lockedAt = System.currentTimeMillis();
506+
if (RedisLockRegistry.this.renewalScheduledExecutor != null) {
507+
long delay = RedisLockRegistry.this.expireAfter / 3;
508+
Runnable task = () -> {
509+
renewLock(this.lockKey.substring((RedisLockRegistry.this.registryKey + ':').length()));
510+
};
511+
RedisLockRegistry.this.renewalScheduledExecutor.scheduleWithFixedDelay(task, delay, delay, TimeUnit.MILLISECONDS);
512+
}
467513
}
468514
return acquired;
469515
}
@@ -538,6 +584,12 @@ else if (Boolean.FALSE.equals(unlinkResult)) {
538584
}
539585
}
540586

587+
public final boolean renew() {
588+
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
589+
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey), String.valueOf(RedisLockRegistry.this.expireAfter),
590+
RedisLockRegistry.this.clientId));
591+
}
592+
541593
@Override
542594
public final Condition newCondition() {
543595
throw new UnsupportedOperationException("Conditions are not supported");
@@ -734,7 +786,7 @@ private void unlockNotify(String lockKey) {
734786

735787
}
736788

737-
private class RedisSpinLock extends RedisLock {
789+
private final class RedisSpinLock extends RedisLock {
738790

739791
private static final String UNLINK_UNLOCK_SCRIPT = """
740792
local lockClientId = redis.call('GET', KEYS[1])
@@ -801,176 +853,4 @@ private boolean removeLockKeyWithScript(RedisScript<Boolean> redisScript) {
801853

802854
}
803855

804-
public final class RedisSpinLockWithRenewal extends RedisSpinLock {
805-
806-
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
807-
808-
private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10,
809-
new CustomizableThreadFactory("redis-lock-renewal-schedule"));
810-
811-
private static final String RENEW_SCRIPT = """
812-
if (redis.call('GET', KEYS[1]) == ARGV[2]) then
813-
redis.call('PEXPIRE', KEYS[1], ARGV[1])
814-
return true
815-
end
816-
return false
817-
""";
818-
819-
public static final RedisScript<Boolean>
820-
RENEW_REDIS_SCRIPT = new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
821-
822-
private RedisSpinLockWithRenewal(String path) {
823-
super(path);
824-
}
825-
826-
@Override
827-
protected boolean tryRedisLockInner(long time) throws InterruptedException {
828-
boolean res = super.tryRedisLockInner(time);
829-
if (res) {
830-
scheduleExpirationRenewal(Thread.currentThread().getId());
831-
}
832-
return res;
833-
}
834-
835-
@Override
836-
protected boolean removeLockKeyInnerUnlink() {
837-
boolean result = super.removeLockKeyInnerUnlink();
838-
if (result) {
839-
cancelExpirationRenewal(Thread.currentThread().getId());
840-
}
841-
return result;
842-
}
843-
844-
@Override
845-
protected boolean removeLockKeyInnerDelete() {
846-
boolean result = super.removeLockKeyInnerDelete();
847-
if (result) {
848-
cancelExpirationRenewal(Thread.currentThread().getId());
849-
}
850-
return result;
851-
}
852-
853-
protected void scheduleExpirationRenewal(long threadId) {
854-
ExpirationEntry entry = new ExpirationEntry();
855-
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(lockKey, entry);
856-
if (oldEntry != null) {
857-
oldEntry.addThreadId(threadId);
858-
}
859-
else {
860-
entry.addThreadId(threadId);
861-
renewExpiration();
862-
}
863-
}
864-
865-
private void renewExpiration() {
866-
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(lockKey);
867-
if (ee == null) {
868-
return;
869-
}
870-
Runnable task = () -> {
871-
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(lockKey);
872-
if (ent == null) {
873-
return;
874-
}
875-
Long threadId = ent.getFirstThreadId();
876-
if (threadId == null) {
877-
return;
878-
}
879-
880-
boolean res = renewKeyWithScript(RENEW_REDIS_SCRIPT);
881-
if (res) {
882-
renewExpiration();
883-
}
884-
else {
885-
EXPIRATION_RENEWAL_MAP.remove(lockKey);
886-
}
887-
888-
};
889-
ScheduledFuture<?> timeout = scheduledExecutorService.schedule(task, RedisLockRegistry.this.expireAfter / 3,
890-
TimeUnit.MILLISECONDS);
891-
ee.setTimeout(timeout);
892-
}
893-
894-
private void cancelExpirationRenewal(Long threadId) {
895-
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(lockKey);
896-
if (task == null) {
897-
return;
898-
}
899-
900-
if (threadId != null) {
901-
task.removeThreadId(threadId);
902-
}
903-
904-
if (threadId == null || task.hasNoThreads()) {
905-
ScheduledFuture<?> timeout = task.getTimeout();
906-
if (timeout != null) {
907-
timeout.cancel(true);
908-
}
909-
EXPIRATION_RENEWAL_MAP.remove(lockKey);
910-
}
911-
}
912-
913-
public boolean renewKeyWithScript(RedisScript<Boolean> redisScript) {
914-
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
915-
redisScript, Collections.singletonList(this.lockKey), String.valueOf(RedisLockRegistry.this.expireAfter),
916-
RedisLockRegistry.this.clientId));
917-
}
918-
919-
static class ExpirationEntry {
920-
921-
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
922-
923-
private ScheduledFuture<?> timeout;
924-
925-
ExpirationEntry() {
926-
super();
927-
}
928-
929-
public synchronized void addThreadId(long threadId) {
930-
Integer counter = this.threadIds.get(threadId);
931-
if (counter == null) {
932-
counter = 1;
933-
}
934-
else {
935-
counter++;
936-
}
937-
this.threadIds.put(threadId, counter);
938-
}
939-
940-
public synchronized boolean hasNoThreads() {
941-
return this.threadIds.isEmpty();
942-
}
943-
944-
public synchronized Long getFirstThreadId() {
945-
if (this.threadIds.isEmpty()) {
946-
return null;
947-
}
948-
return this.threadIds.keySet().iterator().next();
949-
}
950-
951-
public synchronized void removeThreadId(long threadId) {
952-
Integer counter = this.threadIds.get(threadId);
953-
if (counter == null) {
954-
return;
955-
}
956-
counter--;
957-
if (counter == 0) {
958-
this.threadIds.remove(threadId);
959-
}
960-
else {
961-
this.threadIds.put(threadId, counter);
962-
}
963-
}
964-
965-
public ScheduledFuture<?> getTimeout() {
966-
return this.timeout;
967-
}
968-
969-
public void setTimeout(ScheduledFuture<?> timeout) {
970-
this.timeout = timeout;
971-
}
972-
}
973-
974-
}
975-
976856
}

0 commit comments

Comments
 (0)