Skip to content

Commit

Permalink
Merge pull request #379 from LiUSemWeb/378-persisted-disk-cache-for-c…
Browse files Browse the repository at this point in the history
…ardinality-queries

378 persisted disk cache for cardinality queries
  • Loading branch information
hartig authored Feb 3, 2025
2 parents 2784cb4 + 20bd44e commit 0004f28
Show file tree
Hide file tree
Showing 5 changed files with 1,312 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package se.liu.ida.hefquin.base.datastructures;

/**
* A generic interface for data structures that can be used as a persisted cache
* for objects of a specific type. Implementations of this interface may employ
* their particular policies for cache replacement and for cache invalidation.
*
* @param <IdType> the type of the values by which the cached objects can be identified
* @param <ObjectType> the type of the objects to be maintained in this cache
*/
public interface PersistableCache<IdType, ObjectType> extends Cache<IdType, ObjectType> {

/**
* Saves the current state of the cache to persistent storage.
* Implementations may choose different mechanisms for persistence,
* such as writing to a file or a database.
*
* This method should ensure that all cached data is synchronized with
* persistent storage. Depending on the implementation, synchronization
* may be automatic or explicitly controlled.
*/
void save();

/**
* Loads the cache state from persistent storage.
* If persistent data exists, it should be restored into the cache.
* Implementations should handle cases where no prior state exists
* gracefully.
*/
void load();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package se.liu.ida.hefquin.engine.federation.access.impl;

import java.io.Serializable;
import java.util.Objects;
import se.liu.ida.hefquin.engine.federation.BRTPFServer;
import se.liu.ida.hefquin.engine.federation.FederationMember;
import se.liu.ida.hefquin.engine.federation.SPARQLEndpoint;
import se.liu.ida.hefquin.engine.federation.TPFServer;
import se.liu.ida.hefquin.engine.federation.access.BRTPFRequest;
import se.liu.ida.hefquin.engine.federation.access.DataRetrievalRequest;
import se.liu.ida.hefquin.engine.federation.access.SPARQLRequest;
import se.liu.ida.hefquin.engine.federation.access.TPFRequest;

/**
* A key for caching cardinality requests, uniquely identified by a {@link DataRetrievalRequest} and a {@link FederationMember}.
*/
public class CardinalityCacheKey implements Serializable {
private static final long serialVersionUID = 1L;

protected final String query;
protected final String url;
protected final String bindings;

public CardinalityCacheKey( final DataRetrievalRequest req, final FederationMember fm ) {
if ( req instanceof SPARQLRequest sparqlRequest && fm instanceof SPARQLEndpoint sparqlEndpoint ) {
query = sparqlRequest.toString();
url = sparqlEndpoint.getInterface().getURL();
bindings = "";
}
else if ( req instanceof TPFRequest tpfRequest ) {
query = tpfRequest.toString();
bindings = "";
if ( fm instanceof TPFServer tpfServer ) {
url = tpfServer.getInterface().createRequestURL( tpfRequest );
}
else if ( fm instanceof BRTPFServer brtpfServer ) {
url = brtpfServer.getInterface().createRequestURL( tpfRequest );
}
else {
throw new IllegalArgumentException( "Unexpected type of server: " + fm.getClass().getName() );
}
}
else if ( req instanceof BRTPFRequest brtpfRequest && fm instanceof BRTPFServer brtpfServer ) {
query = brtpfRequest.getTriplePattern().toString();
url = brtpfServer.getInterface().createRequestURL( brtpfRequest );
bindings = brtpfRequest.getSolutionMappings().toString();
}
else {
throw new IllegalArgumentException( "Unexpected request type: " + req.getClass().getName() + "(server type: " + fm.getClass().getName() + ")" );
}
}

@Override
public boolean equals( Object obj ) {
if ( this == obj )
return true;
if ( obj == null || getClass() != obj.getClass() )
return false;
CardinalityCacheKey other = (CardinalityCacheKey) obj;
return query.equals( other.query ) && url.equals( other.url );
}

@Override
public int hashCode() {
return Objects.hash( query, url );
}

@Override
public String toString() {
return "CardinalityCacheKey{query='" + query + "', url='" + url + "', bindings='" + bindings + "'}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package se.liu.ida.hefquin.engine.federation.access.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import se.liu.ida.hefquin.base.datastructures.impl.cache.CacheEntry;
import se.liu.ida.hefquin.base.datastructures.impl.cache.CachePolicies;
import se.liu.ida.hefquin.engine.federation.BRTPFServer;
import se.liu.ida.hefquin.engine.federation.SPARQLEndpoint;
import se.liu.ida.hefquin.engine.federation.TPFServer;
import se.liu.ida.hefquin.engine.federation.access.BRTPFRequest;
import se.liu.ida.hefquin.engine.federation.access.CardinalityResponse;
import se.liu.ida.hefquin.engine.federation.access.DataRetrievalResponse;
import se.liu.ida.hefquin.engine.federation.access.FederationAccessException;
import se.liu.ida.hefquin.engine.federation.access.FederationAccessManager;
import se.liu.ida.hefquin.engine.federation.access.SPARQLRequest;
import se.liu.ida.hefquin.engine.federation.access.TPFRequest;

/**
* A FederationAccessManager implementation that incorporates persistent disk
* caching of SPARQL cardinality requests.
*
* TODO: The implementation uses a simple serialization/deserialization
* strategy, where the file is stored to disk by writing the full map to disk
* for each update. This approach is not optimized for this task but is simply
* intended as a proof of concept. A future implementation should support
* standard cache configuration policys, such as time-based eviction (time to
* live), but should leverage an optimized persistence strategy, preferably
* leveraging a libary.
*
* Note: Most of the classes/interfaces involved (e.g., DataRetrievalResponse,
* CardinalityResponse etc.) do not support serialization.
*/
public class FederationAccessManagerWithPersistedDiskCache extends FederationAccessManagerWithCache
{
protected final PersistableCardinalityCacheImpl<CardinalityCacheKey> cardinalityCache;

public FederationAccessManagerWithPersistedDiskCache( final FederationAccessManager fedAccMan,
final int cacheCapacity,
final CachePolicies<Key, CompletableFuture<? extends DataRetrievalResponse>, ? extends CacheEntry<CompletableFuture<? extends DataRetrievalResponse>>> cachePolicies )
{
super( fedAccMan, cacheCapacity, cachePolicies );
cardinalityCache = new PersistableCardinalityCacheImpl<>();
}

public FederationAccessManagerWithPersistedDiskCache( final FederationAccessManager fedAccMan,
final int cacheCapacity )
{
this( fedAccMan, cacheCapacity, new MyDefaultCachePolicies() );
}

/**
* Creates a {@link FederationAccessManagerWithPersistedDiskCache} with a default configuration.
*/
public FederationAccessManagerWithPersistedDiskCache( final ExecutorService execService )
{
this( new AsyncFederationAccessManagerImpl( execService ), 100, new MyDefaultCachePolicies() );
}

@Override
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final SPARQLRequest req,
final SPARQLEndpoint fm )
throws FederationAccessException
{
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
final CompletableFuture<CardinalityResponse> cachedResponse = cardinalityCache.get( key );
if ( cachedResponse != null ) {
cacheHitsSPARQLCardinality++;
return cachedResponse;
}

final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
cardinalityCache.put( key, newResponse );
newResponse.thenRun( () -> cardinalityCache.save() );
return newResponse;
}

@Override
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final TPFRequest req,
final TPFServer fm )
throws FederationAccessException
{
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
final CompletableFuture<CardinalityResponse> cachedResponse = cardinalityCache.get( key );
if ( cachedResponse != null ) {
cacheHitsTPFCardinality++;
return cachedResponse;
}

final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
cardinalityCache.put( key, newResponse );
newResponse.thenRun( () -> cardinalityCache.save() );
return newResponse;
}

@Override
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final TPFRequest req,
final BRTPFServer fm )
throws FederationAccessException
{
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
final CompletableFuture<CardinalityResponse> cachedResponse = cardinalityCache.get( key );
if ( cachedResponse != null ) {
cacheHitsTPFCardinality++;
return cachedResponse;
}

final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm );
cardinalityCache.put( key, newResponse );
newResponse.thenRun( () -> cardinalityCache.save() );
return newResponse;
}

@Override
public CompletableFuture<CardinalityResponse> issueCardinalityRequest( final BRTPFRequest req,
final BRTPFServer fm )
throws FederationAccessException
{
final CardinalityCacheKey key = new CardinalityCacheKey( req, fm );
final CompletableFuture<CardinalityResponse> cachedResponse = cardinalityCache.get( key );
if ( cachedResponse != null ) {
cacheHitsBRTPFCardinality++;
return cachedResponse;
}
final CompletableFuture<CardinalityResponse> newResponse = fedAccMan.issueCardinalityRequest( req, fm) ;
cardinalityCache.put( key, newResponse );
newResponse.thenRun( () -> cardinalityCache.save() );
return newResponse;
}

/**
* Clears the persisted cardinality cache map.
*/
public void clearCardinalityCache(){
cardinalityCache.clear();
cardinalityCache.save();
}
}
Loading

0 comments on commit 0004f28

Please sign in to comment.