Skip to content

Commit 595f2d0

Browse files
committed
Experimental lock implementation
1 parent 9458b6b commit 595f2d0

File tree

8 files changed

+227
-242
lines changed

8 files changed

+227
-242
lines changed

core/src/main/java/org/polypheny/db/transaction/locking/Lockable.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,15 @@
1919
import lombok.NonNull;
2020
import org.polypheny.db.transaction.Transaction;
2121
import org.polypheny.db.util.DeadlockException;
22+
import javax.annotation.Nullable;
2223

2324
public interface Lockable {
2425

25-
enum LockState {
26-
SHARED,
27-
EXCLUSIVE,
28-
}
29-
30-
3126
enum LockType {
3227
SHARED,
3328
EXCLUSIVE,
3429
}
3530

36-
void acquire( @NonNull Transaction transaction, @NonNull LockType lockType ) throws DeadlockException;
37-
38-
void release( @NonNull Transaction transaction );
39-
40-
LockType getLockType();
41-
42-
boolean isRoot();
43-
31+
@Nullable
32+
Lockable parent();
4433
}

dbms/src/main/java/org/polypheny/db/transaction/TransactionImpl.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
package org.polypheny.db.transaction;
1818

1919

20-
import java.text.MessageFormat;
2120
import java.util.ArrayList;
2221
import java.util.Collection;
2322
import java.util.Comparator;
2423
import java.util.HashMap;
25-
import java.util.HashSet;
2624
import java.util.List;
2725
import java.util.Map;
2826
import java.util.Objects;
@@ -65,9 +63,9 @@
6563
import org.polypheny.db.processing.Processor;
6664
import org.polypheny.db.processing.QueryProcessor;
6765
import org.polypheny.db.transaction.QueryAnalyzer.TransactionAnalyzer;
66+
import org.polypheny.db.transaction.locking.LockManager;
6867
import org.polypheny.db.transaction.locking.Lockable;
6968
import org.polypheny.db.type.entity.category.PolyNumber;
70-
import org.polypheny.db.util.DeadlockException;
7169
import org.polypheny.db.util.Pair;
7270
import org.polypheny.db.view.MaterializedViewManager;
7371

@@ -131,9 +129,6 @@ public class TransactionImpl implements Transaction, Comparable<Object> {
131129

132130
private final Collection<ConstraintCondition> commitConstraints = new ConcurrentLinkedDeque<>();
133131

134-
@Getter
135-
private final Set<Lockable> lockedEntities = new HashSet<>();
136-
137132
private boolean releasePhase = false;
138133

139134

@@ -153,6 +148,7 @@ public class TransactionImpl implements Transaction, Comparable<Object> {
153148
this.analyzer = queryAnalyzer == null ? null : new TransactionAnalyzer( queryAnalyzer, this );
154149
this.origin = origin;
155150
this.flavor = flavor;
151+
LockManager.getInstance().registerTransaction( this );
156152
}
157153

158154

@@ -427,14 +423,7 @@ public List<LogicalConstraint> getUsedConstraints( long id ) {
427423
@Override
428424
public void releaseAllLocks() {
429425
releasePhase = true;
430-
lockedEntities.forEach( lockedEntity -> {
431-
try {
432-
lockedEntity.release( this );
433-
} catch ( Exception e ) {
434-
// TODO TH: introduce proper exception type here. Or wrap in release method of lockable
435-
throw new DeadlockException( MessageFormat.format( "Failed to release lock for transaction {0}", this ) );
436-
}
437-
} );
426+
LockManager.getInstance().releaseAllLocks(this);
438427
}
439428

440429

@@ -446,8 +435,7 @@ public void acquireLockable( Lockable lockable, Lockable.LockType lockType ) {
446435
if ( releasePhase ) {
447436
throw new IllegalStateException( "Cannot acquire lock: transaction is in release phase!" );
448437
}
449-
lockable.acquire( this, lockType );
450-
lockedEntities.add( lockable );
438+
LockManager.getInstance().acquire(this, lockable, lockType);
451439
}
452440

453441

dbms/src/main/java/org/polypheny/db/transaction/locking/GlobalLockable.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,18 @@
1616

1717
package org.polypheny.db.transaction.locking;
1818

19-
public class GlobalLockable extends LockableImpl {
19+
import org.jetbrains.annotations.Nullable;
20+
21+
public class GlobalLockable implements Lockable {
2022

2123
public GlobalLockable() {
22-
super( null );
24+
25+
}
26+
27+
28+
@Override
29+
public @Nullable Lockable parent() {
30+
return null;
2331
}
2432

2533
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2019-2025 The Polypheny Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.polypheny.db.transaction.locking;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
21+
import org.polypheny.db.transaction.Transaction;
22+
import org.polypheny.db.transaction.locking.Lockable.LockType;
23+
import org.polypheny.db.util.DeadlockException;
24+
import java.util.HashSet;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
import java.util.stream.Collectors;
34+
35+
@Slf4j
36+
public class LockManager {
37+
38+
private static final LockManager INSTANCE = new LockManager();
39+
40+
// This map contains active transactions and a lock ensuring only a single thread is using a transaction
41+
private final Map<Transaction, Lock> transactions = new ConcurrentHashMap<>();
42+
// The lock state of the database
43+
private final AtomicReference<Set<LockEntry>> entries = new AtomicReference<>( Set.of() );
44+
45+
private final Map<Lockable, CompletableFuture<Boolean>> futures = new ConcurrentHashMap<>();
46+
private final Map<Transaction, Lockable> waitFor = new ConcurrentHashMap<>();
47+
48+
49+
record LockEntry( Transaction transaction, Lockable lockable, LockType lockType ) {
50+
51+
}
52+
53+
54+
private LockManager() {
55+
}
56+
57+
58+
public static LockManager getInstance() {
59+
return INSTANCE;
60+
}
61+
62+
63+
public void registerTransaction( Transaction transaction ) {
64+
transactions.put( transaction, new ReentrantLock() );
65+
}
66+
67+
68+
private static Set<LockEntry> tryAcquireLock( Set<LockEntry> locks, LockEntry entry ) {
69+
if ( locks.contains( entry ) ) {
70+
return locks; // We already have the lock
71+
}
72+
73+
// Acquire shared lock while holding exclusive lock
74+
if ( entry.lockType == LockType.SHARED && locks.stream().anyMatch( e -> e.transaction == entry.transaction && e.lockable == entry.lockable && e.lockType == LockType.EXCLUSIVE ) ) {
75+
Set<LockEntry> newLocks = new HashSet<>( locks );
76+
newLocks.add( entry );
77+
return Set.copyOf( newLocks );
78+
}
79+
80+
if ( entry.lockType == LockType.EXCLUSIVE && locks.stream().noneMatch( e -> e.transaction != entry.transaction && e.lockable == entry.lockable ) ) {
81+
// No one else holds a lock
82+
Set<LockEntry> newLocks = new HashSet<>( locks );
83+
newLocks.add( entry );
84+
return Set.copyOf( newLocks );
85+
} else if ( entry.lockType == LockType.SHARED && locks.stream().noneMatch( e -> e.lockable == entry.lockable && e.lockType == LockType.EXCLUSIVE ) ) { // No match for transaction, handled above
86+
// No one else has an exclusive lock
87+
Set<LockEntry> newLocks = new HashSet<>( locks );
88+
newLocks.add( entry );
89+
return Set.copyOf( newLocks );
90+
}
91+
return null;
92+
}
93+
94+
95+
private static Set<Transaction> findLockHolders( Transaction transaction, Lockable lockable, Set<LockEntry> locks ) {
96+
return locks.stream()
97+
.filter( e -> e.lockable == lockable && e.transaction != transaction )
98+
.map( e -> e.transaction )
99+
.collect( Collectors.toSet() );
100+
}
101+
102+
103+
private static boolean hasDeadlock( Set<LockEntry> locks, Map<Transaction, Lockable> waiting, Transaction transaction, Lockable lockable ) {
104+
// Transaction is waiting for lockable
105+
Set<Transaction> openTransactions = new HashSet<>( findLockHolders( transaction, lockable, locks ) );
106+
Set<Transaction> closedTransactions = new HashSet<>();
107+
while ( !openTransactions.isEmpty() ) {
108+
Transaction t = openTransactions.iterator().next();
109+
openTransactions.remove( t );
110+
closedTransactions.add( t );
111+
Lockable waitingFor = waiting.get( t );
112+
if (waitingFor != null) {
113+
for (Transaction t2 : findLockHolders( t, waitingFor, locks )) {
114+
if (t2 == transaction) {
115+
return true; // Deadlock!
116+
} else if (!closedTransactions.contains( t2)) {
117+
openTransactions.add( t2 );
118+
}
119+
}
120+
}
121+
}
122+
return false;
123+
}
124+
125+
126+
private boolean acquireLock( Transaction transaction, Lockable lockable, LockType lockType ) {
127+
Set<LockEntry> locks = entries.get();
128+
129+
LockEntry entry = new LockEntry( transaction, lockable, lockType );
130+
131+
Set<LockEntry> newLocks = tryAcquireLock( locks, entry );
132+
133+
if ( newLocks != null && (locks == newLocks || entries.compareAndSet( locks, newLocks )) ) {// TODO: Use equal
134+
Lockable l = waitFor.remove( transaction ); // No longer waiting
135+
if ( l != null && l != lockable ) {
136+
throw new AssertionError( "Wrong lockable" );
137+
}
138+
return true;
139+
} else {
140+
// If there is a Deadlock, this means that other Transactions are already waiting for us, so the relevant parts wont change
141+
if ( hasDeadlock( locks, Map.copyOf( waitFor ), transaction, lockable ) ) {
142+
throw new DeadlockException( "Deadlock detected" );
143+
}
144+
145+
CompletableFuture<Boolean> future = futures.putIfAbsent( lockable, new CompletableFuture<>() );
146+
if ( future == null ) {
147+
// Retry, lock state could have changed
148+
return false;
149+
}
150+
try {
151+
waitFor.put( transaction, lockable );
152+
future.get();
153+
} catch ( InterruptedException | ExecutionException e ) {
154+
// ignore
155+
}
156+
return false;
157+
}
158+
}
159+
160+
161+
public void acquire( Transaction transaction, Lockable lockable, LockType lockType ) {
162+
Lock lock = transactions.get( transaction );
163+
if ( !lock.tryLock() ) {
164+
throw new GenericRuntimeException( "Multiple threads using same transaction" );
165+
}
166+
try {
167+
while ( true ) {
168+
if ( acquireLock( transaction, lockable, lockType ) ) {
169+
break;
170+
}
171+
}
172+
} finally {
173+
lock.unlock();
174+
}
175+
}
176+
177+
178+
public void releaseAllLocks( Transaction transaction ) {
179+
Lock lock = transactions.remove( transaction );
180+
if ( lock == null ) {
181+
return;
182+
}
183+
if ( !lock.tryLock() ) {
184+
throw new GenericRuntimeException( "Multiple threads using same transaction" );
185+
}
186+
try {
187+
while ( true ) {
188+
Set<LockEntry> locks = entries.get();
189+
Set<LockEntry> newEntries = locks.stream().filter( e -> e.transaction != transaction ).collect( Collectors.toUnmodifiableSet() );
190+
if ( entries.compareAndExchange( locks, newEntries ) == locks ) {
191+
// Success
192+
locks.stream().filter( e -> e.transaction == transaction ).forEach( e -> {
193+
CompletableFuture<Boolean> future = futures.remove( e.lockable );
194+
future.complete( true );
195+
} );
196+
}
197+
}
198+
} finally {
199+
lock.unlock();
200+
}
201+
}
202+
203+
}

0 commit comments

Comments
 (0)