Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Cache] Using a single cache manager for all ehcache disk caches #17513

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
- [Tiered caching] Create a single cache manager for all the disk caches. ([#17513](https://github.com/opensearch-project/OpenSearch/pull/17513))
- Added error handling support for the pull-based ingestion ([#17427](https://github.com/opensearch-project/OpenSearch/pull/17427))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.setSegmentCount(1) // We don't need to make underlying caches multi-segmented
.setStatsTrackingEnabled(false)
.setMaxSizeInBytes(diskCacheSizeInBytes)
.setStoragePath(builder.cacheConfig.getStoragePath() + "/" + segmentNumber)
.setStoragePath(builder.cacheConfig.getStoragePath())
.setCacheAlias("tiered_disk_cache#" + segmentNumber)
.build(),
builder.cacheType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.Map;

import static java.lang.Math.max;
import static org.opensearch.common.settings.Setting.Property.NodeScope;

/**
Expand All @@ -36,17 +37,25 @@ public class EhcacheDiskCacheSettings {

public static final Setting.AffixSetting<Integer> DISK_WRITE_MINIMUM_THREADS_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".min_threads",
(key) -> Setting.intSetting(key, 2, 1, 5, NodeScope)
(key) -> Setting.intSetting(key, 2, 1, max(2, Runtime.getRuntime().availableProcessors() / 8), NodeScope)
);

/**
* Ehcache disk write maximum threads for its pool
* Ehcache disk write maximum threads for its pool. We max disk write threads to ony go upto max(2, N / 8) where N
* is the number of CPU cores. This is done as disk operations are typically I/O bound rather than CPU bound, so
* can't scale this blindly just based on the CPU cores.
*
* Setting pattern: {cache_type}.ehcache_disk.max_threads
*/
public static final Setting.AffixSetting<Integer> DISK_WRITE_MAXIMUM_THREADS_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_threads",
(key) -> Setting.intSetting(key, 2, 1, 20, NodeScope)
(key) -> Setting.intSetting(
key,
max(2, Runtime.getRuntime().availableProcessors() / 8),
1,
Runtime.getRuntime().availableProcessors(),
NodeScope
)
Comment on lines +52 to +58
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing the number of threads can cause queuing of disk write operations, unless there is some batching logic within Ehcache. IMO, it is desirable for TieredCaching to write the computed results to disk as soon as possible. Number of threads is not worrisome as long as those threads are not compute intensive, which in this case they are not

);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -153,16 +152,19 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
if (this.storagePath == null || this.storagePath.isBlank()) {
throw new IllegalArgumentException("Storage path shouldn't be null or empty");
}
// Delete all the previous disk cache related files/data. We don't persist data between process restart for
// now which is why need to do this. Clean up in case there was a non graceful restart and we had older disk
// cache data still lying around.
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);
// Delete all the previous disk cache related files/data only if cache manager doesn't exist. As we can
// create multiple caches via single cache manager for a cache type. We don't persist data between process
// restart for now which is why need to do this. Clean up in case there was a non graceful restart and we had
// older disk cache data still lying around.
if (!EhcacheDiskCacheManager.doesCacheManagerExist(cacheType)) {
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);
}
}
}
if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) {
Expand All @@ -173,7 +175,7 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
this.settings = Objects.requireNonNull(builder.getSettings(), "Settings objects shouldn't be null");
this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null");
this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null");
this.cacheManager = buildCacheManager();
this.cacheManager = EhcacheDiskCacheManager.getCacheManager(cacheType, this.storagePath, settings, this.threadPoolAlias);
Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null");
this.removalListener = builder.getRemovalListener();
Objects.requireNonNull(builder.getWeigher(), "Weigher can't be null");
Expand All @@ -189,73 +191,54 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
}
}

// Package private for testing
PersistentCacheManager getCacheManager() {
return this.cacheManager;
}

@SuppressWarnings({ "rawtypes", "removal" })
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
// Creating the cache requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
try {
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_SEGMENT_KEY)
.get(settings);
if (builder.getNumberOfSegments() > 0) {
segmentCount = builder.getNumberOfSegments();
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings);
if (builder.getNumberOfSegments() > 0) {
segmentCount = builder.getNumberOfSegments();
}
CacheConfigurationBuilder<ICacheKey, ByteArrayWrapper> cacheConfigurationBuilder = CacheConfigurationBuilder
.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
segmentCount
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
});

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_WRITE_CONCURRENCY_KEY).get(settings),
segmentCount
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer()); // We pass ByteArrayWrapperSerializer as ehcache's value serializer. If
// V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.

return EhcacheDiskCacheManager.createCache(cacheType, this.diskCacheAlias, cacheConfigurationBuilder);
}

private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
Expand Down Expand Up @@ -470,20 +453,21 @@ public void refresh() {
@Override
@SuppressForbidden(reason = "Ehcache uses File.io")
public void close() {
try {
cacheManager.close();
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception occurred while trying to close ehcache manager"), e);
}
// Delete all the disk cache related files/data in case it is present
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete ehcache disk cache data under path: {}", this.storagePath));
}
}
EhcacheDiskCacheManager.closeCache(cacheType, diskCacheAlias, storagePath);
// try {
// cacheManager.close();
// } catch (Exception e) {
// logger.error(() -> new ParameterizedMessage("Exception occurred while trying to close ehcache manager"), e);
// }
// // Delete all the disk cache related files/data in case it is present
// Path ehcacheDirectory = Paths.get(this.storagePath);
// if (Files.exists(ehcacheDirectory)) {
// try {
// IOUtils.rm(ehcacheDirectory);
// } catch (IOException e) {
// logger.error(() -> new ParameterizedMessage("Failed to delete ehcache disk cache data under path: {}", this.storagePath));
// }
// }

}

Expand Down Expand Up @@ -597,16 +581,24 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrappe
* Wrapper over ICacheKeySerializer which is compatible with ehcache's serializer requirements.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
public class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
private ICacheKeySerializer<K> serializer;

/**
* Constructor for key serializer
* @param internalKeySerializer serializer for internal key
*/
public KeySerializerWrapper(Serializer<K, byte[]> internalKeySerializer) {
this.serializer = new ICacheKeySerializer<>(internalKeySerializer);
}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
/**
* This constructor must be present, but does not have to work as we are not actually persisting the disk
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
* .html#persistent-vs-transient-caches
* @param classLoader
* @param persistenceContext
*/
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
Expand All @@ -632,12 +624,19 @@ public boolean equals(ICacheKey object, ByteBuffer binary) throws ClassNotFoundE
/**
* Wrapper allowing Ehcache to serialize ByteArrayWrapper.
*/
private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
public static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
/**
* Default constructor
*/
public ByteArrayWrapperSerializer() {}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
/**
* This constructor must be present, but does not have to work as we are not actually persisting the disk
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
* .html#persistent-vs-transient-caches
* @param classLoader
* @param persistenceContext
*/
public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
Expand Down Expand Up @@ -906,9 +905,13 @@ public EhcacheDiskCache<K, V> build() {
* A wrapper over byte[], with equals() that works using Arrays.equals().
* Necessary due to a limitation in how Ehcache compares byte[].
*/
static class ByteArrayWrapper {
public static class ByteArrayWrapper {
private final byte[] value;

/**
* Constructor for byte array wrapper.
* @param value value to wrap.
*/
public ByteArrayWrapper(byte[] value) {
this.value = value;
}
Expand Down
Loading
Loading