Skip to content

Commit 180a6ff

Browse files
authored
Merge pull request #386 from LiUSemWeb/385-optimized-persisted-cache-for-cardinality-queries
385 optimized persisted cache for cardinality queries
2 parents 188651a + 54cdf12 commit 180a6ff

19 files changed

+1957
-32
lines changed

hefquin-base/src/main/java/se/liu/ida/hefquin/base/datastructures/impl/cache/CacheEntry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,11 @@ public interface CacheEntry<ObjectType>
99
* Returns the object that is cached via this cache entry.
1010
*/
1111
ObjectType getObject();
12+
13+
/**
14+
* Returns the time at which this cache entry was created.
15+
*/
16+
long createdAt();
1217
}
18+
19+
Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,45 @@
11
package se.liu.ida.hefquin.base.datastructures.impl.cache;
22

3-
public class CacheEntryBase<ObjectType> implements CacheEntry<ObjectType>
3+
import java.io.Serializable;
4+
import java.time.Instant;
5+
6+
public class CacheEntryBase<ObjectType> implements CacheEntry<ObjectType>, Serializable
47
{
8+
private static final long serialVersionUID = 1L;
59
protected final ObjectType obj;
10+
protected final long creationTime;
611

712
public CacheEntryBase( final ObjectType obj ) {
13+
this( obj, Instant.now().toEpochMilli() );
14+
}
15+
16+
public CacheEntryBase( final ObjectType obj, final long creationTime ) {
817
assert obj != null;
918
this.obj = obj;
19+
this.creationTime = creationTime;
1020
}
1121

1222
@Override
1323
public ObjectType getObject() {
1424
return obj;
1525
}
1626

27+
@Override
28+
public long createdAt() {
29+
return creationTime;
30+
}
31+
32+
@Override
33+
public boolean equals( final Object obj ) {
34+
if ( this == obj )
35+
return true;
36+
if ( obj == null || getClass() != obj.getClass() )
37+
return false;
38+
return this.getObject().equals(((CacheEntryBase<?>) obj).getObject());
39+
}
40+
41+
@Override
42+
public int hashCode() {
43+
return getObject().hashCode();
44+
}
1745
}

hefquin-base/src/main/java/se/liu/ida/hefquin/base/datastructures/impl/cache/CacheEntryBaseFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public class CacheEntryBaseFactory<ObjectType> implements CacheEntryFactory<Cach
77
{
88
@Override
99
public CacheEntryBase<ObjectType> createCacheEntry( final ObjectType obj ) {
10-
return new CacheEntryBase<>(obj);
10+
return new CacheEntryBase<>( obj );
1111
}
1212

1313
}

hefquin-base/src/main/java/se/liu/ida/hefquin/base/datastructures/impl/cache/CacheEntryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ public interface CacheEntryFactory<EntryType extends CacheEntry<ObjectType>, Obj
88
/**
99
* Creates and returns a new EntryType object that wraps the given object.
1010
*/
11-
EntryType createCacheEntry(ObjectType obj);
11+
EntryType createCacheEntry( ObjectType obj );
1212
}

hefquin-base/src/main/java/se/liu/ida/hefquin/base/datastructures/impl/cache/CacheInvalidationPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ public interface CacheInvalidationPolicy<EntryType extends CacheEntry<ObjectType
1111
* Returns <code>true</code> if the given cache entry is still
1212
* valid according to this cache invalidation policy.
1313
*/
14-
boolean isStillValid(EntryType e);
14+
boolean isStillValid( EntryType e );
1515
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package se.liu.ida.hefquin.base.datastructures.impl.cache;
2+
3+
import java.time.Instant;
4+
5+
public class CacheInvalidationPolicyTimeToLive<EntryType extends CacheEntry<ObjectType>, ObjectType>
6+
implements CacheInvalidationPolicy<EntryType, ObjectType>
7+
{
8+
protected final long timeToLive;
9+
10+
public CacheInvalidationPolicyTimeToLive( final long timeToLive ){
11+
this.timeToLive = timeToLive;
12+
}
13+
14+
/**
15+
* Returns <code>true</code> if the given cache entry has not reached
16+
* the time to live considered by this policy.
17+
*/
18+
public boolean isStillValid( final EntryType e ) {
19+
return e.createdAt() + timeToLive > Instant.now().toEpochMilli();
20+
}
21+
}

hefquin-engine/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,10 @@
2929
<artifactId>hefquin-graphqlconnector</artifactId>
3030
<version>0.0.4-SNAPSHOT</version>
3131
</dependency>
32+
<dependency>
33+
<groupId>net.openhft</groupId>
34+
<artifactId>chronicle-map</artifactId>
35+
<version>3.27ea0</version>
36+
</dependency>
3237
</dependencies>
3338
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package se.liu.ida.hefquin.engine.federation.access.impl;
2+
3+
import java.io.IOException;
4+
import java.util.Date;
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.ExecutorService;
7+
8+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheEntry;
9+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheEntryFactory;
10+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheInvalidationPolicy;
11+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheInvalidationPolicyTimeToLive;
12+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CachePolicies;
13+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheReplacementPolicy;
14+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheReplacementPolicyFactory;
15+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheReplacementPolicyLRU;
16+
import se.liu.ida.hefquin.engine.federation.BRTPFServer;
17+
import se.liu.ida.hefquin.engine.federation.SPARQLEndpoint;
18+
import se.liu.ida.hefquin.engine.federation.TPFServer;
19+
import se.liu.ida.hefquin.engine.federation.access.BRTPFRequest;
20+
import se.liu.ida.hefquin.engine.federation.access.CardinalityResponse;
21+
import se.liu.ida.hefquin.engine.federation.access.DataRetrievalResponse;
22+
import se.liu.ida.hefquin.engine.federation.access.FederationAccessException;
23+
import se.liu.ida.hefquin.engine.federation.access.FederationAccessManager;
24+
import se.liu.ida.hefquin.engine.federation.access.SPARQLRequest;
25+
import se.liu.ida.hefquin.engine.federation.access.TPFRequest;
26+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.CardinalityCacheEntry;
27+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.CardinalityCacheEntryFactory;
28+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.CardinalityCacheKey;
29+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.ChronicleMapCardinalityCache;
30+
import se.liu.ida.hefquin.engine.federation.access.impl.response.CachedCardinalityResponseImpl;
31+
32+
/**
33+
* A FederationAccessManager implementation that incorporates persistent disk
34+
* caching of cardinality requests.
35+
*/
36+
public class FederationAccessManagerWithChronicleMapCache extends FederationAccessManagerWithCache
37+
{
38+
protected final ChronicleMapCardinalityCache cardinalityCache;
39+
protected final static int defaultCacheCapacity = 1000;
40+
41+
public FederationAccessManagerWithChronicleMapCache( final FederationAccessManager fedAccMan,
42+
final int cacheCapacity,
43+
final CachePolicies<Key, CompletableFuture<? extends DataRetrievalResponse>, ? extends CacheEntry<CompletableFuture<? extends DataRetrievalResponse>>> cachePolicies,
44+
final CachePolicies<CardinalityCacheKey, Integer, CardinalityCacheEntry> cardinalityCachePolicies )
45+
throws IOException
46+
{
47+
super( fedAccMan, cacheCapacity, cachePolicies );
48+
cardinalityCache = new ChronicleMapCardinalityCache( cardinalityCachePolicies, cacheCapacity );
49+
}
50+
51+
public FederationAccessManagerWithChronicleMapCache( final FederationAccessManager fedAccMan,
52+
final int cacheCapacity,
53+
final long timeToLive )
54+
throws IOException
55+
{
56+
this( fedAccMan,
57+
cacheCapacity,
58+
new MyDefaultCachePolicies(),
59+
new MyDefaultCardinalityCachePolicies( timeToLive ) );
60+
}
61+
62+
/**
63+
* Creates a {@link FederationAccessManagerWithChronicleMapCache} with the default configuration.
64+
*/
65+
public FederationAccessManagerWithChronicleMapCache( final ExecutorService execService ) throws IOException
66+
{
67+
this( new AsyncFederationAccessManagerImpl( execService ),
68+
defaultCacheCapacity,
69+
new MyDefaultCachePolicies(),
70+
new MyDefaultCardinalityCachePolicies() );
71+
}
72+
73+
@Override
74+
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final SPARQLRequest req,
75+
final SPARQLEndpoint fm )
76+
throws FederationAccessException
77+
{
78+
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
79+
final Date requestStartTime = new Date();
80+
final CardinalityCacheEntry cachedEntry = cardinalityCache.get( key );
81+
final Date requestEndTime = new Date();
82+
if ( cachedEntry != null ) {
83+
cacheHitsSPARQLCardinality++;
84+
final CardinalityResponse cr = new CachedCardinalityResponseImpl( fm,
85+
req,
86+
cachedEntry.getObject(),
87+
requestStartTime,
88+
requestEndTime );
89+
return CompletableFuture.completedFuture( cr );
90+
}
91+
92+
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
93+
newResponse.thenAccept( value -> {
94+
cardinalityCache.put( key, value.getCardinality() );
95+
} );
96+
return newResponse;
97+
}
98+
99+
@Override
100+
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final TPFRequest req,
101+
final TPFServer fm )
102+
throws FederationAccessException
103+
{
104+
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
105+
final Date requestStartTime = new Date();
106+
final CardinalityCacheEntry cachedEntry = cardinalityCache.get( key );
107+
final Date requestEndTime = new Date();
108+
if ( cachedEntry != null ) {
109+
cacheHitsTPFCardinality++;
110+
final CardinalityResponse cr = new CachedCardinalityResponseImpl( fm,
111+
req,
112+
cachedEntry.getObject(),
113+
requestStartTime,
114+
requestEndTime );
115+
return CompletableFuture.completedFuture( cr );
116+
}
117+
118+
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
119+
newResponse.thenAccept( value -> {
120+
cardinalityCache.put( key, value.getCardinality() );
121+
} );
122+
return newResponse;
123+
}
124+
125+
@Override
126+
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final TPFRequest req,
127+
final BRTPFServer fm )
128+
throws FederationAccessException
129+
{
130+
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
131+
final Date requestStartTime = new Date();
132+
final CardinalityCacheEntry cachedEntry = cardinalityCache.get( key );
133+
final Date requestEndTime = new Date();
134+
if ( cachedEntry != null ) {
135+
cacheHitsTPFCardinality++;
136+
final CardinalityResponse cr = new CachedCardinalityResponseImpl( fm,
137+
req,
138+
cachedEntry.getObject(),
139+
requestStartTime,
140+
requestEndTime );
141+
return CompletableFuture.completedFuture( cr );
142+
}
143+
144+
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
145+
newResponse.thenAccept( value -> {
146+
cardinalityCache.put( key, value.getCardinality() );
147+
} );
148+
return newResponse;
149+
}
150+
151+
@Override
152+
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final BRTPFRequest req,
153+
final BRTPFServer fm )
154+
throws FederationAccessException
155+
{
156+
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
157+
final Date requestStartTime = new Date();
158+
final CardinalityCacheEntry cachedEntry = cardinalityCache.get( key );
159+
final Date requestEndTime = new Date();
160+
if ( cachedEntry != null ) {
161+
cacheHitsTPFCardinality++;
162+
final CardinalityResponse cr = new CachedCardinalityResponseImpl( fm,
163+
req,
164+
cachedEntry.getObject(),
165+
requestStartTime,
166+
requestEndTime );
167+
return CompletableFuture.completedFuture( cr );
168+
}
169+
170+
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
171+
newResponse.thenAccept( value -> {
172+
cardinalityCache.put( key, value.getCardinality() );
173+
} );
174+
return newResponse;
175+
}
176+
177+
/**
178+
* Clears the persisted cardinality cache map.
179+
*/
180+
public void clearCardinalityCache() {
181+
cardinalityCache.clear();
182+
}
183+
184+
protected static class MyDefaultCardinalityCachePolicies implements CachePolicies<CardinalityCacheKey, Integer, CardinalityCacheEntry>
185+
{
186+
protected final long timeToLive;
187+
protected final static long defaultTimeToLive = 300_000; // 5 minutes
188+
189+
public MyDefaultCardinalityCachePolicies() {
190+
this( defaultTimeToLive );
191+
}
192+
193+
public MyDefaultCardinalityCachePolicies( final long timeToLive ) {
194+
this.timeToLive = timeToLive;
195+
}
196+
197+
@Override
198+
public CacheEntryFactory<CardinalityCacheEntry, Integer> getEntryFactory() {
199+
return new CardinalityCacheEntryFactory();
200+
}
201+
202+
@Override
203+
public CacheReplacementPolicyFactory<CardinalityCacheKey, Integer, CardinalityCacheEntry> getReplacementPolicyFactory() {
204+
return new CacheReplacementPolicyFactory<>() {
205+
@Override
206+
public CacheReplacementPolicy<CardinalityCacheKey, Integer, CardinalityCacheEntry> create() {
207+
return new CacheReplacementPolicyLRU<>();
208+
}
209+
};
210+
}
211+
212+
@Override
213+
public CacheInvalidationPolicy<CardinalityCacheEntry, Integer> getInvalidationPolicy() {
214+
return new CacheInvalidationPolicyTimeToLive<>( timeToLive );
215+
}
216+
} // end of MyDefaultCachePolicies
217+
}

hefquin-engine/src/main/java/se/liu/ida/hefquin/engine/federation/access/impl/FederationAccessManagerWithPersistedDiskCache.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import se.liu.ida.hefquin.engine.federation.access.FederationAccessManager;
1616
import se.liu.ida.hefquin.engine.federation.access.SPARQLRequest;
1717
import se.liu.ida.hefquin.engine.federation.access.TPFRequest;
18+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.CardinalityCacheKey;
19+
import se.liu.ida.hefquin.engine.federation.access.impl.cache.PersistableCardinalityCacheImpl;
1820

1921
/**
2022
* A FederationAccessManager implementation that incorporates persistent disk
@@ -122,7 +124,7 @@ public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final BRT
122124
cacheHitsBRTPFCardinality++;
123125
return cachedResponse;
124126
}
125-
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm) ;
127+
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
126128
cardinalityCache.put( key, newResponse );
127129
newResponse.thenRun( () -> cardinalityCache.save() );
128130
return newResponse;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package se.liu.ida.hefquin.engine.federation.access.impl.cache;
2+
3+
import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheEntryBase;
4+
5+
/**
6+
* An entry used when caching cardinality requests.
7+
*/
8+
public class CardinalityCacheEntry extends CacheEntryBase<Integer>
9+
{
10+
private static final long serialVersionUID = 1L;
11+
12+
public CardinalityCacheEntry( final Integer cardinality, final long entryCreatedAt ) {
13+
super( cardinality, entryCreatedAt );
14+
}
15+
16+
@Override
17+
public String toString() {
18+
return "CardinalityCacheEntry{cardinality='" + getObject() + "'}";
19+
}
20+
}

0 commit comments

Comments
 (0)